From 0d824e570216fa1328ad0cefc05749efac3c0df3 Mon Sep 17 00:00:00 2001 From: Alexander Lamaison Date: Tue, 7 Feb 2012 16:23:11 +0000 Subject: [PATCH] Cleaned up sftp_read and added more explanation. Replaced the gotos which were implementing the state machine with a switch statement which makes the states more explicit. --- src/sftp.c | 507 ++++++++++++++++++++++++++++------------------------- 1 file changed, 270 insertions(+), 237 deletions(-) diff --git a/src/sftp.c b/src/sftp.c index 0a1cf61..ecd1b46 100644 --- a/src/sftp.c +++ b/src/sftp.c @@ -1089,279 +1089,312 @@ static ssize_t sftp_read(LIBSSH2_SFTP_HANDLE * handle, char *buffer, struct _libssh2_sftp_handle_file_data *filep = &handle->u.file; + /* This function can be interrupted in three different places where it + might need to wait for data from the network. It returns EAGAIN to + allow non-blocking clients to do other work but these client are + expected to call this function again (possibly many times) to finish + the operation. + + The tricky part is that if we previously aborted a sftp_read due to + EAGAIN, we must continue at the same spot to continue the previously + interrupted operation. This is done using a state machine to record + what phase of execution we were at. The state is stored in + sftp->read_state. + + libssh2_NB_state_idle: The first phase is where we prepare multiple + FXP_READ packets to do optimistic read-ahead. We send off as many as + possible in the second phase without waiting for a response to each + one; this is the key to fast reads. But we may have to adjust the + channel window size to do this which may interrupt this function while + waiting. The state machine saves the phase as libssh2_NB_state_idle so + it returns here on the next call. + + libssh2_NB_state_sent: The second phase is where we send the FXP_READ + packets. Writing them to the channel can be interrupted with EAGAIN + but the state machine ensures we skip the first phase on the next call + and resume sending. + + libssh2_NB_state_sent2: In the third phase (indicated by ) we read the + data from the responses that have arrived so far. Reading can be + interrupted with EAGAIN but the state machine ensures we skip the first + and second phases on the next call and resume sending. + */ + /* Number of bytes asked for that haven't been acked yet */ size_t already = (filep->offset_sent - filep->offset); - if(filep->data_left) { - /* data left from previous call */ - size_t copy = MIN(buffer_size, filep->data_left); + switch (sftp->read_state) { + case libssh2_NB_state_idle: - memcpy(buffer, &filep->data[ filep->data_len - filep->data_left], - copy); + /* Some data may already have been read from the server in the + previous call but didn't fit in the buffer at the time. We can + start by adding that to the buffer. */ + if(filep->data_left) { + size_t copy = MIN(buffer_size, filep->data_left); - total_read += copy; - filep->data_left -= copy; - filep->offset += copy; + memcpy(buffer, &filep->data[ filep->data_len - filep->data_left], + copy); - if(filep->data_left) - return total_read; + total_read += copy; + filep->data_left -= copy; + filep->offset += copy; - LIBSSH2_FREE(session, filep->data); - filep->data = NULL; - } - - /* if we previously aborted a sftp_read due to EAGAIN, we must continue at - the same spot to continue the previously aborted operation */ - if(sftp->read_state == libssh2_NB_state_sent) - goto send_read_requests; - else if(sftp->read_state == libssh2_NB_state_sent2) - goto read_acks; - - /* We allow a number of bytes being requested at any given time without - having been acked - until we reach EOF. */ - if(!filep->eof) { - size_t max_read_ahead = buffer_size*4; - unsigned long recv_window; - - if(max_read_ahead > LIBSSH2_CHANNEL_WINDOW_DEFAULT*4) - max_read_ahead = LIBSSH2_CHANNEL_WINDOW_DEFAULT*4; - - /* if the buffer_size passed in now is smaller than what has already - been sent, we risk getting count become a very large number */ - if(max_read_ahead > already) - count = max_read_ahead - already; - - /* 'count' is how much more data to ask for, and 'already' is how much - data that already has been asked for but not yet returned. - Specificly, 'count' means how much data that have or will be asked - for by the nodes that are already added to the linked list. Some of - those read requests may not actually have been sent off - successfully yet. - - If 'already' is very large it should be perfectly fine to have - count set to 0 as then we don't have to ask for more data (right - now). - - buffer_size*4 is just picked more or less out of the air. The idea - is that when reading SFTP from a remote server, we send away - multiple read requests guessing that the client will read more than - only this 'buffer_size' amount of memory. So we ask for maximum - buffer_size*4 amount of data so that we can return them very fast - in subsequent calls. - */ - - recv_window = libssh2_channel_window_read_ex(sftp->channel, - NULL, NULL); - if(max_read_ahead > recv_window) { - /* more data will be asked for than what the window currently - allows, expand it! */ - - if(total_read) - /* since we risk getting EAGAIN below, we return here if - there is data available */ + if(filep->data_left) return total_read; - rc = _libssh2_channel_receive_window_adjust(sftp->channel, - max_read_ahead*8, - 0, NULL); - /* if this returns EAGAIN, we will get back to this function - at next call */ - if (rc) - return rc; + LIBSSH2_FREE(session, filep->data); + filep->data = NULL; } - } - while(count > 0) { - unsigned char *s; - uint32_t size = MIN(MAX_SFTP_READ_SIZE, count); + /* We allow a number of bytes being requested at any given time + without having been acked - until we reach EOF. */ + if(!filep->eof) { + size_t max_read_ahead = buffer_size*4; + unsigned long recv_window; - /* 25 = packet_len(4) + packet_type(1) + request_id(4) + - handle_len(4) + offset(8) + count(4) */ - uint32_t packet_len = (uint32_t)handle->handle_len + 25; - uint32_t request_id; + if(max_read_ahead > LIBSSH2_CHANNEL_WINDOW_DEFAULT*4) + max_read_ahead = LIBSSH2_CHANNEL_WINDOW_DEFAULT*4; - chunk = LIBSSH2_ALLOC(session, packet_len + - sizeof(struct sftp_pipeline_chunk)); - if (!chunk) - return _libssh2_error(session, LIBSSH2_ERROR_ALLOC, - "malloc fail for FXP_WRITE"); + /* if the buffer_size passed in now is smaller than what has + already been sent, we risk getting count become a very large + number */ + if(max_read_ahead > already) + count = max_read_ahead - already; - chunk->len = size; - chunk->lefttosend = packet_len; - chunk->sent = 0; + /* 'count' is how much more data to ask for, and 'already' is how + much data that already has been asked for but not yet returned. + Specificly, 'count' means how much data that have or will be + asked for by the nodes that are already added to the linked + list. Some of those read requests may not actually have been + sent off successfully yet. - s = chunk->packet; + If 'already' is very large it should be perfectly fine to have + count set to 0 as then we don't have to ask for more data + (right now). - _libssh2_store_u32(&s, packet_len - 4); - *s++ = SSH_FXP_READ; - request_id = sftp->request_id++; - chunk->request_id = request_id; - _libssh2_store_u32(&s, request_id); - _libssh2_store_str(&s, handle->handle, handle->handle_len); - _libssh2_store_u64(&s, filep->offset_sent); - filep->offset_sent += size; /* advance offset at once */ - _libssh2_store_u32(&s, size); + buffer_size*4 is just picked more or less out of the air. The + idea is that when reading SFTP from a remote server, we send + away multiple read requests guessing that the client will read + more than only this 'buffer_size' amount of memory. So we ask + for maximum buffer_size*4 amount of data so that we can return + them very fast in subsequent calls. + */ - /* add this new entry LAST in the list */ - _libssh2_list_add(&handle->packet_list, &chunk->node); - count -= size; /* deduct the size we used, as we might have - to create more packets */ - } + recv_window = libssh2_channel_window_read_ex(sftp->channel, + NULL, NULL); + if(max_read_ahead > recv_window) { + /* more data will be asked for than what the window currently + allows, expand it! */ - send_read_requests: + if(total_read) + /* since we risk getting EAGAIN below, we return here if + there is data available */ + return total_read; - /* move through the READ packets that haven't been sent and send as many - as possible - remember that we don't block */ - chunk = _libssh2_list_first(&handle->packet_list); + rc = _libssh2_channel_receive_window_adjust(sftp->channel, + max_read_ahead*8, + 0, NULL); + /* if this returns EAGAIN, we will get back to this function + at next call */ + assert(rc != LIBSSH2_ERROR_EAGAIN || !filep->data_left); + assert(rc != LIBSSH2_ERROR_EAGAIN || !filep->eof); + if (rc) + return rc; + } + } - sftp->read_state = libssh2_NB_state_idle; + while(count > 0) { + unsigned char *s; + uint32_t size = MIN(MAX_SFTP_READ_SIZE, count); + + /* 25 = packet_len(4) + packet_type(1) + request_id(4) + + handle_len(4) + offset(8) + count(4) */ + uint32_t packet_len = (uint32_t)handle->handle_len + 25; + uint32_t request_id; + + chunk = LIBSSH2_ALLOC(session, packet_len + + sizeof(struct sftp_pipeline_chunk)); + if (!chunk) + return _libssh2_error(session, LIBSSH2_ERROR_ALLOC, + "malloc fail for FXP_WRITE"); + + chunk->len = size; + chunk->lefttosend = packet_len; + chunk->sent = 0; + + s = chunk->packet; + + _libssh2_store_u32(&s, packet_len - 4); + *s++ = SSH_FXP_READ; + request_id = sftp->request_id++; + chunk->request_id = request_id; + _libssh2_store_u32(&s, request_id); + _libssh2_store_str(&s, handle->handle, handle->handle_len); + _libssh2_store_u64(&s, filep->offset_sent); + filep->offset_sent += size; /* advance offset at once */ + _libssh2_store_u32(&s, size); + + /* add this new entry LAST in the list */ + _libssh2_list_add(&handle->packet_list, &chunk->node); + count -= size; /* deduct the size we used, as we might have + to create more packets */ + } + + case libssh2_NB_state_sent: + + sftp->read_state = libssh2_NB_state_idle; + + /* move through the READ packets that haven't been sent and send as + many as possible - remember that we don't block */ + chunk = _libssh2_list_first(&handle->packet_list); + + while(chunk) { + if(chunk->lefttosend) { + if(total_read) + /* since we risk getting EAGAIN below, we return here if + there is data available */ + return total_read; + + rc = _libssh2_channel_write(channel, 0, + &chunk->packet[chunk->sent], + chunk->lefttosend); + if(rc < 0) { + sftp->read_state = libssh2_NB_state_sent; + return rc; + } + + /* remember where to continue sending the next time */ + chunk->lefttosend -= rc; + chunk->sent += rc; + + if(chunk->lefttosend) + /* data left to send, get out of loop */ + break; + } + + /* move on to the next chunk with data to send */ + chunk = _libssh2_list_next(&chunk->node); + } + + case libssh2_NB_state_sent2: + + sftp->read_state = libssh2_NB_state_idle; + + /* + * Count all ACKed packets and act on the contents of them. + */ + chunk = _libssh2_list_first(&handle->packet_list); + + while(chunk) { + unsigned char *data; + size_t data_len; + uint32_t rc32; + static const unsigned char read_responses[2] = { + SSH_FXP_DATA, SSH_FXP_STATUS + }; + + if(chunk->lefttosend) + /* if the chunk still has data left to send, we shouldn't wait + for an ACK for it just yet */ + break; - while(chunk) { - if(chunk->lefttosend) { if(total_read) /* since we risk getting EAGAIN below, we return here if there is data available */ return total_read; - rc = _libssh2_channel_write(channel, 0, - &chunk->packet[chunk->sent], - chunk->lefttosend); - if(rc < 0) { - sftp->read_state = libssh2_NB_state_sent; + rc = sftp_packet_requirev(sftp, 2, read_responses, + chunk->request_id, &data, &data_len); + if (rc < 0) { + sftp->read_state = libssh2_NB_state_sent2; return rc; } - /* remember where to continue sending the next time */ - chunk->lefttosend -= rc; - chunk->sent += rc; + /* + * We get DATA or STATUS back. STATUS can be error, or it is + * FX_EOF when we reach the end of the file. + */ - if(chunk->lefttosend) - /* data left to send, get out of loop */ - break; - } + switch (data[0]) { + case SSH_FXP_STATUS: + /* we must remove all outstanding READ requests, as either we + got an error or we're at end of file */ + sftp_packetlist_flush(handle); - /* move on to the next chunk with data to send */ - chunk = _libssh2_list_next(&chunk->node); - } - - read_acks: - - sftp->read_state = libssh2_NB_state_idle; - - /* - * Count all ACKed packets and act on the contents of them. - */ - chunk = _libssh2_list_first(&handle->packet_list); - - while(chunk) { - unsigned char *data; - size_t data_len; - uint32_t rc32; - static const unsigned char read_responses[2] = { - SSH_FXP_DATA, SSH_FXP_STATUS - }; - - if(chunk->lefttosend) - /* if the chunk still has data left to send, we shouldn't wait for - an ACK for it just yet */ - break; - - if(total_read) - /* since we risk getting EAGAIN below, we return here if there - is data available */ - return total_read; - - rc = sftp_packet_requirev(sftp, 2, read_responses, - chunk->request_id, &data, &data_len); - if (rc < 0) { - sftp->read_state = libssh2_NB_state_sent2; - return rc; - } - - /* - * We get DATA or STATUS back. STATUS can be error, or it is FX_EOF - * when we reach the end of the file. - */ - - switch (data[0]) { - case SSH_FXP_STATUS: - /* we must remove all outstanding READ requests, as either we got - an error or we're at end of file */ - sftp_packetlist_flush(handle); - - rc32 = _libssh2_ntohu32(data + 5); - LIBSSH2_FREE(session, data); - - if (rc32 == LIBSSH2_FX_EOF) { - filep->eof = TRUE; - return total_read; - } - else { - sftp->last_errno = rc32; - return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL, - "SFTP READ error"); - } - break; - - case SSH_FXP_DATA: - rc32 = _libssh2_ntohu32(data + 5); - if (rc32 > (data_len - 9)) - return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL, - "SFTP Protocol badness"); - - if(rc32 != chunk->len) { - /* a short read does not imply end of file, but we must adjust - the offset_sent since it was advanced with a full - chunk->len before */ - filep->offset_sent -= (chunk->len - rc32); - } - - if(total_read + rc32 > buffer_size) { - /* figure out the overlap amount */ - filep->data_left = (total_read + rc32) - buffer_size; - - /* getting the full packet would overflow the buffer, so - only get the correct amount and keep the remainder */ - rc32 = (uint32_t)(buffer_size - total_read); - - /* store data to keep for next call */ - filep->data = data; - filep->data_len = data_len; - } - else - filep->data_len = 0; - - /* copy the received data from the received FXP_DATA packet to the - buffer at the correct index */ - memcpy(buffer + total_read, data + 9, rc32); - filep->offset += rc32; - total_read += rc32; - - if(!filep->data_len) - /* free the allocated data if not stored to keep */ + rc32 = _libssh2_ntohu32(data + 5); LIBSSH2_FREE(session, data); - else { - /* force the loop to end since the receive buffer is full - already, but remove this chunk from the list first */ - _libssh2_list_remove(&chunk->node); /* remove from list */ - LIBSSH2_FREE(session, chunk); /* free memory */ - chunk = NULL; - continue; + if (rc32 == LIBSSH2_FX_EOF) { + filep->eof = TRUE; + return total_read; + } + else { + sftp->last_errno = rc32; + return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL, + "SFTP READ error"); + } + break; + + case SSH_FXP_DATA: + rc32 = _libssh2_ntohu32(data + 5); + if (rc32 > (data_len - 9)) + return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL, + "SFTP Protocol badness"); + + if(rc32 != chunk->len) { + /* a short read does not imply end of file, but we must + adjust the offset_sent since it was advanced with a + full chunk->len before */ + filep->offset_sent -= (chunk->len - rc32); + } + + if(total_read + rc32 > buffer_size) { + /* figure out the overlap amount */ + filep->data_left = (total_read + rc32) - buffer_size; + + /* getting the full packet would overflow the buffer, so + only get the correct amount and keep the remainder */ + rc32 = (uint32_t)(buffer_size - total_read); + + /* store data to keep for next call */ + filep->data = data; + filep->data_len = data_len; + } + else + filep->data_len = 0; + + /* copy the received data from the received FXP_DATA packet to + the buffer at the correct index */ + memcpy(buffer + total_read, data + 9, rc32); + filep->offset += rc32; + total_read += rc32; + + if(!filep->data_len) + /* free the allocated data if not stored to keep */ + LIBSSH2_FREE(session, data); + else { + /* force the loop to end since the receive buffer is full + already, but remove this chunk from the list first */ + _libssh2_list_remove(&chunk->node); /* remove from list */ + LIBSSH2_FREE(session, chunk); /* free memory */ + + chunk = NULL; + continue; + } + break; } - break; + + next = _libssh2_list_next(&chunk->node); + + _libssh2_list_remove(&chunk->node); /* remove from list */ + LIBSSH2_FREE(session, chunk); /* free memory */ + + chunk = next; } + break; - next = _libssh2_list_next(&chunk->node); - - _libssh2_list_remove(&chunk->node); /* remove from list */ - LIBSSH2_FREE(session, chunk); /* free memory */ - - chunk = next; - } - - if(! total_read) { - fprintf(stderr, "MOO\n"); + default: + assert(!"State machine error; unrecognised read state"); } return total_read;