sftp_read: pipeline reads

The SFTP read function now does transfers the same way the SFTP write
function was made to recently: it creates a list of many outgoing
FXP_READ packets that each asks for a small data chunk. The code then
tries to keep sending read request while collecting the acks for the
previous requests and returns the received data.
This commit is contained in:
Daniel Stenberg 2010-12-14 09:30:25 +01:00
parent 92ea3bda60
commit 1b65c6ae26
2 changed files with 257 additions and 159 deletions

View File

@ -81,9 +81,6 @@
#define SSH_FXP_EXTENDED 200
#define SSH_FXP_EXTENDED_REPLY 201
#define LIBSSH2_SFTP_HANDLE_FILE 0
#define LIBSSH2_SFTP_HANDLE_DIR 1
/* S_IFREG */
#define LIBSSH2_SFTP_ATTR_PFILETYPE_FILE 0100000
/* S_IFDIR */
@ -1033,165 +1030,240 @@ libssh2_sftp_open_ex(LIBSSH2_SFTP *sftp, const char *filename,
return hnd;
}
/* sftp_read
/*
* sftp_read
*
* Read from an SFTP file handle
*
*/
static ssize_t sftp_read(LIBSSH2_SFTP_HANDLE * handle, char *buffer,
size_t buffer_maxlen)
size_t buffer_size)
{
LIBSSH2_SFTP *sftp = handle->sftp;
LIBSSH2_CHANNEL *channel = sftp->channel;
LIBSSH2_SESSION *session = channel->session;
size_t data_len, request_id = 0;
/* 25 = packet_len(4) + packet_type(1) + request_id(4) + handle_len(4) +
offset(8) + length(4) */
ssize_t packet_len = handle->handle_len + 25;
unsigned char *packet, *s, *data;
size_t bytes_read = 0;
size_t bytes_requested = 0;
size_t count;
struct sftp_read_chunk *chunk;
struct sftp_read_chunk *next;
int rc;
size_t eagain = 0;
size_t total_read = 0;
int retcode;
struct _libssh2_sftp_handle_file_data *filep =
&handle->u.file;
if (sftp->read_state == libssh2_NB_state_idle) {
_libssh2_debug(session, LIBSSH2_TRACE_SFTP,
"Reading %lu bytes from SFTP handle",
(unsigned long) buffer_maxlen);
packet = handle->request_packet;
sftp->read_state = libssh2_NB_state_allocated;
} else {
packet = sftp->read_packet;
request_id = sftp->read_request_id;
total_read = sftp->read_total_read;
/* Number of bytes asked for that haven't been acked yet */
size_t already = (filep->offset_sent - filep->offset);
if(filep->data_left) {
/* data left from previous call */
size_t copy = MIN(buffer_size, filep->data_left);
memcpy(buffer, &filep->data[ filep->data_len - filep->data_left],
copy);
total_read += copy;
filep->data_left -= copy;
if(filep->data_left)
return total_read;
LIBSSH2_FREE(session, filep->data);
filep->data = NULL;
}
while (total_read < buffer_maxlen) {
s = packet;
/* We allow a number of bytes being requested at any given time without
having been acked - until we reach EOF. */
count = filep->eof?0:(buffer_size*4) - already;
while(count > 0) {
unsigned char *s;
size_t size = MIN(MAX_SFTP_READ_SIZE, count);
/* 25 = packet_len(4) + packet_type(1) + request_id(4) +
handle_len(4) + offset(8) + count(4) */
size_t packet_len = handle->handle_len + 25;
uint32_t request_id;
chunk = LIBSSH2_ALLOC(session, packet_len +
sizeof(struct sftp_read_chunk));
if (!chunk)
return _libssh2_error(session, LIBSSH2_ERROR_ALLOC,
"malloc fail for FXP_WRITE");
chunk->askedfor = size;
chunk->lefttosend = packet_len;
chunk->sent = 0;
s = chunk->packet;
_libssh2_store_u32(&s, packet_len - 4);
*s++ = SSH_FXP_READ;
request_id = sftp->request_id++;
chunk->request_id = request_id;
_libssh2_store_u32(&s, request_id);
_libssh2_store_str(&s, handle->handle, handle->handle_len);
_libssh2_store_u64(&s, filep->offset_sent);
filep->offset_sent += size; /* advance offset at once */
_libssh2_store_u32(&s, size);
/* add this new entry LAST in the list */
_libssh2_list_add(&handle->packet_list, &chunk->node);
count -= size; /* deduct the size we used, as we might have
to create more packets */
}
/* move through the READ packets that haven't been sent and send as many
as possible - remember that we don't block */
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
if(chunk->lefttosend) {
rc = _libssh2_channel_write(channel, 0,
&chunk->packet[chunk->sent],
chunk->lefttosend);
if(rc < 0) {
if(rc != LIBSSH2_ERROR_EAGAIN)
/* error */
return rc;
eagain++;
break;
}
/* remember where to continue sending the next time */
chunk->lefttosend -= rc;
chunk->sent += rc;
if(chunk->lefttosend)
/* data left to send, get out of loop */
break;
}
/* move on to the next chunk with data to send */
chunk = _libssh2_list_next(&chunk->node);
}
/*
* Count all ACKed packets and act on the contents of them.
*/
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
unsigned char *data;
size_t data_len;
uint32_t rc32;
static const unsigned char read_responses[2] = {
SSH_FXP_DATA, SSH_FXP_STATUS
};
if(chunk->lefttosend)
/* if the chunk still has data left to send, we shouldn't wait for
an ACK for it just yet */
break;
rc = sftp_packet_requirev(sftp, 2, read_responses,
chunk->request_id, &data, &data_len);
if (rc == LIBSSH2_ERROR_EAGAIN) {
eagain++;
break;
}
else if (rc)
return _libssh2_error(session, rc,
"Error waiting for FXP_READ ACK");
/*
* If buffer_maxlen bytes will be requested, server may return all
* with one packet. But libssh2 have packet length limit.
* So we request data by pieces.
* We get DATA or STATUS back. STATUS can be error, or it is FX_EOF
* when we reach the end of the file.
*/
bytes_requested = buffer_maxlen - total_read;
/* 10 = packet_type(1) + request_id(4) + data_length(4) +
end_of_line_flag(1)
10 is changed to 13 below simple because it seems there's a
"GlobalScape" SFTP server that responds with a slightly too big
buffer at times and we can apparently compensate for that by doing
this trick.
Further details on this issue:
https://sourceforge.net/mailarchive/forum.php?thread_name=9c3275a90811261517v6c0b1da2u918cc1b8370abf83%40mail.gmail.com&forum_name=libssh2-devel
http://forums.globalscape.com/tm.aspx?m=15249
*/
if (bytes_requested > LIBSSH2_SFTP_PACKET_MAXLEN - 13) {
bytes_requested = LIBSSH2_SFTP_PACKET_MAXLEN - 13;
}
#ifdef LIBSSH2_DEBUG_SFTP
_libssh2_debug(session, LIBSSH2_TRACE_SFTP,
"Requesting %lu bytes from SFTP handle",
(unsigned long) bytes_requested);
#endif
if (sftp->read_state == libssh2_NB_state_allocated) {
_libssh2_store_u32(&s, packet_len - 4);
*(s++) = SSH_FXP_READ;
request_id = sftp->request_id++;
_libssh2_store_u32(&s, request_id);
_libssh2_store_str(&s, handle->handle, handle->handle_len);
_libssh2_store_u64(&s, handle->u.file.offset);
_libssh2_store_u32(&s, bytes_requested);
sftp->read_state = libssh2_NB_state_created;
}
if (sftp->read_state == libssh2_NB_state_created) {
retcode = _libssh2_channel_write(channel, 0, packet, packet_len);
if (retcode == LIBSSH2_ERROR_EAGAIN) {
sftp->read_packet = packet;
sftp->read_request_id = request_id;
sftp->read_total_read = total_read;
return retcode;
} else if (packet_len != retcode) {
/* TODO: a partial write is not a critical error when in
non-blocking mode! */
sftp->read_packet = NULL;
sftp->read_state = libssh2_NB_state_idle;
return _libssh2_error(session, LIBSSH2_ERROR_SOCKET_SEND,
"_libssh2_channel_write() failed");
}
sftp->read_packet = packet;
sftp->read_request_id = request_id;
sftp->read_total_read = total_read;
sftp->read_state = libssh2_NB_state_sent;
}
if (sftp->read_state == libssh2_NB_state_sent) {
static const unsigned char read_responses[2] =
{ SSH_FXP_DATA, SSH_FXP_STATUS };
retcode =
sftp_packet_requirev(sftp, 2, read_responses,
request_id, &data, &data_len);
if (retcode == LIBSSH2_ERROR_EAGAIN) {
return _libssh2_error(session, retcode,
"Would block waiting for status message");
} else if (retcode) {
sftp->read_packet = NULL;
sftp->read_state = libssh2_NB_state_idle;
return _libssh2_error(session, retcode,
"Timeout waiting for status message");
}
sftp->read_state = libssh2_NB_state_sent1;
}
else
/* internal error, 'data' is not assigned */
return -1;
switch (data[0]) {
case SSH_FXP_STATUS:
retcode = _libssh2_ntohu32(data + 5);
LIBSSH2_FREE(session, data);
sftp->read_packet = NULL;
sftp->read_state = libssh2_NB_state_idle;
/* we must remove all outstanding READ requests, as either we got
an error or we're at end of file */
if (retcode == LIBSSH2_FX_EOF) {
rc = chunk->request_id;
do {
next = _libssh2_list_next(&chunk->node);
_libssh2_list_remove(&chunk->node); /* remove from list */
LIBSSH2_FREE(session, chunk); /* free memory */
chunk = next;
} while(chunk);
rc32 = _libssh2_ntohu32(data + 5);
LIBSSH2_FREE(session, data);
if (rc32 == LIBSSH2_FX_EOF) {
filep->eof = TRUE;
return total_read;
} else {
sftp->last_errno = retcode;
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
"SFTP Protocol Error");
}
else {
sftp->last_errno = rc32;
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
"SFTP READ error");
}
break;
case SSH_FXP_DATA:
bytes_read = _libssh2_ntohu32(data + 5);
if (bytes_read > (data_len - 9)) {
sftp->read_packet = NULL;
sftp->read_state = libssh2_NB_state_idle;
return -1;
rc32 = _libssh2_ntohu32(data + 5);
if (rc32 > (data_len - 9))
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
"SFTP Protocol badness");
if(rc32 != chunk->askedfor)
/* a short read means this is the last read in the file */
filep->eof = TRUE;
if(total_read + rc32 > buffer_size) {
/* figure out the overlap amount */
filep->data_left = (total_read + rc32) - buffer_size;
/* getting the full packet would overflow the buffer, so
only get the correct amount and keep the remainder */
rc32 = buffer_size - total_read;
/* store data to keep for next call */
filep->data = data;
filep->data_len = data_len;
}
#ifdef LIBSSH2_DEBUG_SFTP
_libssh2_debug(session, LIBSSH2_TRACE_SFTP, "%lu bytes returned",
(unsigned long) bytes_read);
#endif
memcpy(buffer + total_read, data + 9, bytes_read);
handle->u.file.offset += bytes_read;
total_read += bytes_read;
LIBSSH2_FREE(session, data);
/*
* Set the state back to allocated, so a new one will be
* created to either request more data or get EOF
*/
sftp->read_state = libssh2_NB_state_allocated;
else
filep->data_len = 0;
/* copy the received data from the received FXP_DATA packet to the
buffer at the correct index */
memcpy(buffer + total_read, data + 9, rc32);
filep->offset += rc32;
total_read += rc32;
if(!filep->data_len)
/* free the allocated data if not stored to keep */
LIBSSH2_FREE(session, data);
else {
/* force the loop to end since the receive buffer is full
already, but remove this chunk from the list first */
_libssh2_list_remove(&chunk->node); /* remove from list */
LIBSSH2_FREE(session, chunk); /* free memory */
chunk = NULL;
continue;
}
break;
}
next = _libssh2_list_next(&chunk->node);
_libssh2_list_remove(&chunk->node); /* remove from list */
LIBSSH2_FREE(session, chunk); /* free memory */
chunk = next;
}
sftp->read_packet = NULL;
sftp->read_state = libssh2_NB_state_idle;
return total_read;
if(total_read)
return total_read;
else if(eagain)
return _libssh2_error(session, LIBSSH2_ERROR_EAGAIN,
"Would block sftp_read");
return 0;
}
/* libssh2_sftp_read
@ -1457,6 +1529,7 @@ static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer,
/* TODO: Possibly this should have some logic to prevent a very very
small fraction to be left but lets ignore that for now */
size_t size = MIN(MAX_SFTP_OUTGOING_SIZE, count);
uint32_t request_id;
/* 25 = packet_len(4) + packet_type(1) + request_id(4) +
handle_len(4) + offset(8) + count(4) */
@ -1476,25 +1549,25 @@ static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer,
_libssh2_store_u32(&s, packet_len - 4);
*(s++) = SSH_FXP_WRITE;
sftp->write_request_id = sftp->request_id++;
chunk->request_id = sftp->write_request_id;
_libssh2_store_u32(&s, sftp->write_request_id);
request_id = sftp->request_id++;
chunk->request_id = request_id;
_libssh2_store_u32(&s, request_id);
_libssh2_store_str(&s, handle->handle, handle->handle_len);
_libssh2_store_u64(&s, handle->u.file.offset_sent);
handle->u.file.offset_sent += size; /* advance offset at once */
_libssh2_store_str(&s, buffer, size);
/* add this new entry LAST in the list */
_libssh2_list_add(&handle->write_list, &chunk->node);
_libssh2_list_add(&handle->packet_list, &chunk->node);
buffer += size;
count -= size; /* deduct the size we used, as we might have
to create more packets */
}
/* move through the packets that haven't been sent and send as many
/* move through the WRITE packets that haven't been sent and send as many
as possible - remember that we don't block */
chunk = _libssh2_list_first(&handle->write_list);
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
if(chunk->lefttosend) {
@ -1525,7 +1598,7 @@ static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer,
/*
* Count all ACKed packets
*/
chunk = _libssh2_list_first(&handle->write_list);
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
if(chunk->lefttosend)
@ -1856,10 +1929,13 @@ sftp_close_handle(LIBSSH2_SFTP_HANDLE *handle)
&& handle->u.dir.names_left) {
LIBSSH2_FREE(session, handle->u.dir.names_packet);
}
else {
if(handle->u.file.data)
LIBSSH2_FREE(session, handle->u.file.data);
}
/* remove pending write chunks */
do {
chunk = _libssh2_list_first(&handle->write_list);
chunk = _libssh2_list_first(&handle->packet_list);
if(chunk) {
struct sftp_write_chunk *next =
_libssh2_list_next(&chunk->node);

View File

@ -40,10 +40,16 @@
*/
/*
* MAX_SFTP_OUTGOING_SIZE MUST not be larger than 32500 or so
* MAX_SFTP_OUTGOING_SIZE MUST not be larger than 32500 or so. This is the
* amount of data sent in each FXP_WRITE packet
*/
#define MAX_SFTP_OUTGOING_SIZE 30000
/* MAX_SFTP_READ_SIZE is how much data is asked for at max in each FXP_READ
* packets.
*/
#define MAX_SFTP_READ_SIZE 2000
struct sftp_write_chunk {
struct list_node node;
size_t org_buflen;
@ -53,10 +59,20 @@ struct sftp_write_chunk {
unsigned char packet[1]; /* data */
};
struct sftp_read_chunk {
struct list_node node;
size_t askedfor; /* number of bytes asked for */
size_t sent; /* how much of the packet that has been sent off */
ssize_t lefttosend; /* if 0, the entire packet has been sent off */
uint32_t request_id;
unsigned char packet[1]; /* data */
};
#ifndef MIN
#define MIN(x,y) ((x)<(y)?(x):(y))
#endif
#define SFTP_HANDLE_MAXLEN 256 /* according to spec! */
struct _LIBSSH2_SFTP_HANDLE
@ -65,15 +81,13 @@ struct _LIBSSH2_SFTP_HANDLE
LIBSSH2_SFTP *sftp;
/* This is a pre-allocated buffer used for sending SFTP requests as the
whole thing might not get sent in one go. This buffer is used for read,
write, close and MUST thus be big enough to suit all these. */
unsigned char request_packet[SFTP_HANDLE_MAXLEN + 25];
char handle[SFTP_HANDLE_MAXLEN];
size_t handle_len;
char handle_type;
enum {
LIBSSH2_SFTP_HANDLE_FILE,
LIBSSH2_SFTP_HANDLE_DIR
} handle_type;
union _libssh2_sftp_handle_data
{
@ -82,7 +96,18 @@ struct _LIBSSH2_SFTP_HANDLE
libssh2_uint64_t offset;
libssh2_uint64_t offset_sent;
size_t acked; /* container for acked data that hasn't been
returned yet */
returned to caller yet, used for sftp_write */
/* 'data' is used by sftp_read() and is allocated data that has
been received already from the server but wasn't returned to
the caller yet. It is of size 'data_len' and 'data_left is the
number of bytes not yet returned, counted from the end of the
buffer. */
unsigned char *data;
size_t data_len;
size_t data_left;
char eof; /* we have read to the end */
} file;
struct _libssh2_sftp_handle_dir_data
{
@ -97,8 +122,8 @@ struct _LIBSSH2_SFTP_HANDLE
unsigned long close_request_id;
unsigned char *close_packet;
/* list of chunks being written to server */
struct list_head write_list;
/* list of outstanding packets sent to server */
struct list_head packet_list;
};
@ -143,9 +168,6 @@ struct _LIBSSH2_SFTP
unsigned char *readdir_packet;
uint32_t readdir_request_id;
/* State variables used in libssh2_sftp_write() */
uint32_t write_request_id;
/* State variables used in libssh2_sftp_fstat_ex() */
libssh2_nonblocking_states fstat_state;
unsigned char *fstat_packet;