Cleaned up sftp_read and added more explanation.

Replaced the gotos which were implementing the state machine with
a switch statement which makes the states more explicit.
This commit is contained in:
Alexander Lamaison 2012-02-07 16:23:11 +00:00 committed by Daniel Stenberg
parent 9836b0889f
commit 0d824e5702

View File

@ -1089,279 +1089,312 @@ static ssize_t sftp_read(LIBSSH2_SFTP_HANDLE * handle, char *buffer,
struct _libssh2_sftp_handle_file_data *filep =
&handle->u.file;
/* This function can be interrupted in three different places where it
might need to wait for data from the network. It returns EAGAIN to
allow non-blocking clients to do other work but these client are
expected to call this function again (possibly many times) to finish
the operation.
The tricky part is that if we previously aborted a sftp_read due to
EAGAIN, we must continue at the same spot to continue the previously
interrupted operation. This is done using a state machine to record
what phase of execution we were at. The state is stored in
sftp->read_state.
libssh2_NB_state_idle: The first phase is where we prepare multiple
FXP_READ packets to do optimistic read-ahead. We send off as many as
possible in the second phase without waiting for a response to each
one; this is the key to fast reads. But we may have to adjust the
channel window size to do this which may interrupt this function while
waiting. The state machine saves the phase as libssh2_NB_state_idle so
it returns here on the next call.
libssh2_NB_state_sent: The second phase is where we send the FXP_READ
packets. Writing them to the channel can be interrupted with EAGAIN
but the state machine ensures we skip the first phase on the next call
and resume sending.
libssh2_NB_state_sent2: In the third phase (indicated by ) we read the
data from the responses that have arrived so far. Reading can be
interrupted with EAGAIN but the state machine ensures we skip the first
and second phases on the next call and resume sending.
*/
/* Number of bytes asked for that haven't been acked yet */
size_t already = (filep->offset_sent - filep->offset);
if(filep->data_left) {
/* data left from previous call */
size_t copy = MIN(buffer_size, filep->data_left);
switch (sftp->read_state) {
case libssh2_NB_state_idle:
memcpy(buffer, &filep->data[ filep->data_len - filep->data_left],
copy);
/* Some data may already have been read from the server in the
previous call but didn't fit in the buffer at the time. We can
start by adding that to the buffer. */
if(filep->data_left) {
size_t copy = MIN(buffer_size, filep->data_left);
total_read += copy;
filep->data_left -= copy;
filep->offset += copy;
memcpy(buffer, &filep->data[ filep->data_len - filep->data_left],
copy);
if(filep->data_left)
return total_read;
total_read += copy;
filep->data_left -= copy;
filep->offset += copy;
LIBSSH2_FREE(session, filep->data);
filep->data = NULL;
}
/* if we previously aborted a sftp_read due to EAGAIN, we must continue at
the same spot to continue the previously aborted operation */
if(sftp->read_state == libssh2_NB_state_sent)
goto send_read_requests;
else if(sftp->read_state == libssh2_NB_state_sent2)
goto read_acks;
/* We allow a number of bytes being requested at any given time without
having been acked - until we reach EOF. */
if(!filep->eof) {
size_t max_read_ahead = buffer_size*4;
unsigned long recv_window;
if(max_read_ahead > LIBSSH2_CHANNEL_WINDOW_DEFAULT*4)
max_read_ahead = LIBSSH2_CHANNEL_WINDOW_DEFAULT*4;
/* if the buffer_size passed in now is smaller than what has already
been sent, we risk getting count become a very large number */
if(max_read_ahead > already)
count = max_read_ahead - already;
/* 'count' is how much more data to ask for, and 'already' is how much
data that already has been asked for but not yet returned.
Specificly, 'count' means how much data that have or will be asked
for by the nodes that are already added to the linked list. Some of
those read requests may not actually have been sent off
successfully yet.
If 'already' is very large it should be perfectly fine to have
count set to 0 as then we don't have to ask for more data (right
now).
buffer_size*4 is just picked more or less out of the air. The idea
is that when reading SFTP from a remote server, we send away
multiple read requests guessing that the client will read more than
only this 'buffer_size' amount of memory. So we ask for maximum
buffer_size*4 amount of data so that we can return them very fast
in subsequent calls.
*/
recv_window = libssh2_channel_window_read_ex(sftp->channel,
NULL, NULL);
if(max_read_ahead > recv_window) {
/* more data will be asked for than what the window currently
allows, expand it! */
if(total_read)
/* since we risk getting EAGAIN below, we return here if
there is data available */
if(filep->data_left)
return total_read;
rc = _libssh2_channel_receive_window_adjust(sftp->channel,
max_read_ahead*8,
0, NULL);
/* if this returns EAGAIN, we will get back to this function
at next call */
if (rc)
return rc;
LIBSSH2_FREE(session, filep->data);
filep->data = NULL;
}
}
while(count > 0) {
unsigned char *s;
uint32_t size = MIN(MAX_SFTP_READ_SIZE, count);
/* We allow a number of bytes being requested at any given time
without having been acked - until we reach EOF. */
if(!filep->eof) {
size_t max_read_ahead = buffer_size*4;
unsigned long recv_window;
/* 25 = packet_len(4) + packet_type(1) + request_id(4) +
handle_len(4) + offset(8) + count(4) */
uint32_t packet_len = (uint32_t)handle->handle_len + 25;
uint32_t request_id;
if(max_read_ahead > LIBSSH2_CHANNEL_WINDOW_DEFAULT*4)
max_read_ahead = LIBSSH2_CHANNEL_WINDOW_DEFAULT*4;
chunk = LIBSSH2_ALLOC(session, packet_len +
sizeof(struct sftp_pipeline_chunk));
if (!chunk)
return _libssh2_error(session, LIBSSH2_ERROR_ALLOC,
"malloc fail for FXP_WRITE");
/* if the buffer_size passed in now is smaller than what has
already been sent, we risk getting count become a very large
number */
if(max_read_ahead > already)
count = max_read_ahead - already;
chunk->len = size;
chunk->lefttosend = packet_len;
chunk->sent = 0;
/* 'count' is how much more data to ask for, and 'already' is how
much data that already has been asked for but not yet returned.
Specificly, 'count' means how much data that have or will be
asked for by the nodes that are already added to the linked
list. Some of those read requests may not actually have been
sent off successfully yet.
s = chunk->packet;
If 'already' is very large it should be perfectly fine to have
count set to 0 as then we don't have to ask for more data
(right now).
_libssh2_store_u32(&s, packet_len - 4);
*s++ = SSH_FXP_READ;
request_id = sftp->request_id++;
chunk->request_id = request_id;
_libssh2_store_u32(&s, request_id);
_libssh2_store_str(&s, handle->handle, handle->handle_len);
_libssh2_store_u64(&s, filep->offset_sent);
filep->offset_sent += size; /* advance offset at once */
_libssh2_store_u32(&s, size);
buffer_size*4 is just picked more or less out of the air. The
idea is that when reading SFTP from a remote server, we send
away multiple read requests guessing that the client will read
more than only this 'buffer_size' amount of memory. So we ask
for maximum buffer_size*4 amount of data so that we can return
them very fast in subsequent calls.
*/
/* add this new entry LAST in the list */
_libssh2_list_add(&handle->packet_list, &chunk->node);
count -= size; /* deduct the size we used, as we might have
to create more packets */
}
recv_window = libssh2_channel_window_read_ex(sftp->channel,
NULL, NULL);
if(max_read_ahead > recv_window) {
/* more data will be asked for than what the window currently
allows, expand it! */
send_read_requests:
if(total_read)
/* since we risk getting EAGAIN below, we return here if
there is data available */
return total_read;
/* move through the READ packets that haven't been sent and send as many
as possible - remember that we don't block */
chunk = _libssh2_list_first(&handle->packet_list);
rc = _libssh2_channel_receive_window_adjust(sftp->channel,
max_read_ahead*8,
0, NULL);
/* if this returns EAGAIN, we will get back to this function
at next call */
assert(rc != LIBSSH2_ERROR_EAGAIN || !filep->data_left);
assert(rc != LIBSSH2_ERROR_EAGAIN || !filep->eof);
if (rc)
return rc;
}
}
sftp->read_state = libssh2_NB_state_idle;
while(count > 0) {
unsigned char *s;
uint32_t size = MIN(MAX_SFTP_READ_SIZE, count);
/* 25 = packet_len(4) + packet_type(1) + request_id(4) +
handle_len(4) + offset(8) + count(4) */
uint32_t packet_len = (uint32_t)handle->handle_len + 25;
uint32_t request_id;
chunk = LIBSSH2_ALLOC(session, packet_len +
sizeof(struct sftp_pipeline_chunk));
if (!chunk)
return _libssh2_error(session, LIBSSH2_ERROR_ALLOC,
"malloc fail for FXP_WRITE");
chunk->len = size;
chunk->lefttosend = packet_len;
chunk->sent = 0;
s = chunk->packet;
_libssh2_store_u32(&s, packet_len - 4);
*s++ = SSH_FXP_READ;
request_id = sftp->request_id++;
chunk->request_id = request_id;
_libssh2_store_u32(&s, request_id);
_libssh2_store_str(&s, handle->handle, handle->handle_len);
_libssh2_store_u64(&s, filep->offset_sent);
filep->offset_sent += size; /* advance offset at once */
_libssh2_store_u32(&s, size);
/* add this new entry LAST in the list */
_libssh2_list_add(&handle->packet_list, &chunk->node);
count -= size; /* deduct the size we used, as we might have
to create more packets */
}
case libssh2_NB_state_sent:
sftp->read_state = libssh2_NB_state_idle;
/* move through the READ packets that haven't been sent and send as
many as possible - remember that we don't block */
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
if(chunk->lefttosend) {
if(total_read)
/* since we risk getting EAGAIN below, we return here if
there is data available */
return total_read;
rc = _libssh2_channel_write(channel, 0,
&chunk->packet[chunk->sent],
chunk->lefttosend);
if(rc < 0) {
sftp->read_state = libssh2_NB_state_sent;
return rc;
}
/* remember where to continue sending the next time */
chunk->lefttosend -= rc;
chunk->sent += rc;
if(chunk->lefttosend)
/* data left to send, get out of loop */
break;
}
/* move on to the next chunk with data to send */
chunk = _libssh2_list_next(&chunk->node);
}
case libssh2_NB_state_sent2:
sftp->read_state = libssh2_NB_state_idle;
/*
* Count all ACKed packets and act on the contents of them.
*/
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
unsigned char *data;
size_t data_len;
uint32_t rc32;
static const unsigned char read_responses[2] = {
SSH_FXP_DATA, SSH_FXP_STATUS
};
if(chunk->lefttosend)
/* if the chunk still has data left to send, we shouldn't wait
for an ACK for it just yet */
break;
while(chunk) {
if(chunk->lefttosend) {
if(total_read)
/* since we risk getting EAGAIN below, we return here if there
is data available */
return total_read;
rc = _libssh2_channel_write(channel, 0,
&chunk->packet[chunk->sent],
chunk->lefttosend);
if(rc < 0) {
sftp->read_state = libssh2_NB_state_sent;
rc = sftp_packet_requirev(sftp, 2, read_responses,
chunk->request_id, &data, &data_len);
if (rc < 0) {
sftp->read_state = libssh2_NB_state_sent2;
return rc;
}
/* remember where to continue sending the next time */
chunk->lefttosend -= rc;
chunk->sent += rc;
/*
* We get DATA or STATUS back. STATUS can be error, or it is
* FX_EOF when we reach the end of the file.
*/
if(chunk->lefttosend)
/* data left to send, get out of loop */
break;
}
switch (data[0]) {
case SSH_FXP_STATUS:
/* we must remove all outstanding READ requests, as either we
got an error or we're at end of file */
sftp_packetlist_flush(handle);
/* move on to the next chunk with data to send */
chunk = _libssh2_list_next(&chunk->node);
}
read_acks:
sftp->read_state = libssh2_NB_state_idle;
/*
* Count all ACKed packets and act on the contents of them.
*/
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
unsigned char *data;
size_t data_len;
uint32_t rc32;
static const unsigned char read_responses[2] = {
SSH_FXP_DATA, SSH_FXP_STATUS
};
if(chunk->lefttosend)
/* if the chunk still has data left to send, we shouldn't wait for
an ACK for it just yet */
break;
if(total_read)
/* since we risk getting EAGAIN below, we return here if there
is data available */
return total_read;
rc = sftp_packet_requirev(sftp, 2, read_responses,
chunk->request_id, &data, &data_len);
if (rc < 0) {
sftp->read_state = libssh2_NB_state_sent2;
return rc;
}
/*
* We get DATA or STATUS back. STATUS can be error, or it is FX_EOF
* when we reach the end of the file.
*/
switch (data[0]) {
case SSH_FXP_STATUS:
/* we must remove all outstanding READ requests, as either we got
an error or we're at end of file */
sftp_packetlist_flush(handle);
rc32 = _libssh2_ntohu32(data + 5);
LIBSSH2_FREE(session, data);
if (rc32 == LIBSSH2_FX_EOF) {
filep->eof = TRUE;
return total_read;
}
else {
sftp->last_errno = rc32;
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
"SFTP READ error");
}
break;
case SSH_FXP_DATA:
rc32 = _libssh2_ntohu32(data + 5);
if (rc32 > (data_len - 9))
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
"SFTP Protocol badness");
if(rc32 != chunk->len) {
/* a short read does not imply end of file, but we must adjust
the offset_sent since it was advanced with a full
chunk->len before */
filep->offset_sent -= (chunk->len - rc32);
}
if(total_read + rc32 > buffer_size) {
/* figure out the overlap amount */
filep->data_left = (total_read + rc32) - buffer_size;
/* getting the full packet would overflow the buffer, so
only get the correct amount and keep the remainder */
rc32 = (uint32_t)(buffer_size - total_read);
/* store data to keep for next call */
filep->data = data;
filep->data_len = data_len;
}
else
filep->data_len = 0;
/* copy the received data from the received FXP_DATA packet to the
buffer at the correct index */
memcpy(buffer + total_read, data + 9, rc32);
filep->offset += rc32;
total_read += rc32;
if(!filep->data_len)
/* free the allocated data if not stored to keep */
rc32 = _libssh2_ntohu32(data + 5);
LIBSSH2_FREE(session, data);
else {
/* force the loop to end since the receive buffer is full
already, but remove this chunk from the list first */
_libssh2_list_remove(&chunk->node); /* remove from list */
LIBSSH2_FREE(session, chunk); /* free memory */
chunk = NULL;
continue;
if (rc32 == LIBSSH2_FX_EOF) {
filep->eof = TRUE;
return total_read;
}
else {
sftp->last_errno = rc32;
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
"SFTP READ error");
}
break;
case SSH_FXP_DATA:
rc32 = _libssh2_ntohu32(data + 5);
if (rc32 > (data_len - 9))
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
"SFTP Protocol badness");
if(rc32 != chunk->len) {
/* a short read does not imply end of file, but we must
adjust the offset_sent since it was advanced with a
full chunk->len before */
filep->offset_sent -= (chunk->len - rc32);
}
if(total_read + rc32 > buffer_size) {
/* figure out the overlap amount */
filep->data_left = (total_read + rc32) - buffer_size;
/* getting the full packet would overflow the buffer, so
only get the correct amount and keep the remainder */
rc32 = (uint32_t)(buffer_size - total_read);
/* store data to keep for next call */
filep->data = data;
filep->data_len = data_len;
}
else
filep->data_len = 0;
/* copy the received data from the received FXP_DATA packet to
the buffer at the correct index */
memcpy(buffer + total_read, data + 9, rc32);
filep->offset += rc32;
total_read += rc32;
if(!filep->data_len)
/* free the allocated data if not stored to keep */
LIBSSH2_FREE(session, data);
else {
/* force the loop to end since the receive buffer is full
already, but remove this chunk from the list first */
_libssh2_list_remove(&chunk->node); /* remove from list */
LIBSSH2_FREE(session, chunk); /* free memory */
chunk = NULL;
continue;
}
break;
}
break;
next = _libssh2_list_next(&chunk->node);
_libssh2_list_remove(&chunk->node); /* remove from list */
LIBSSH2_FREE(session, chunk); /* free memory */
chunk = next;
}
break;
next = _libssh2_list_next(&chunk->node);
_libssh2_list_remove(&chunk->node); /* remove from list */
LIBSSH2_FREE(session, chunk); /* free memory */
chunk = next;
}
if(! total_read) {
fprintf(stderr, "MOO\n");
default:
assert(!"State machine error; unrecognised read state");
}
return total_read;