sftp_write: cannot return acked data *and* EAGAIN

Whenever we have acked data and is about to call a function that *MAY*
return EAGAIN we must return the number now and wait to get called
again. Our API only allows data *or* EAGAIN and we must never try to get
both.
This commit is contained in:
Daniel Stenberg 2012-02-07 00:35:51 +01:00
parent 4774d500e7
commit e07342443f
2 changed files with 148 additions and 136 deletions

View File

@ -1643,157 +1643,168 @@ static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer,
struct sftp_pipeline_chunk *next;
size_t acked = 0;
size_t org_count = count;
size_t eagain = 0;
size_t already;
/* Number of bytes sent off that haven't been acked and therefor we will
get passed in here again.
switch(sftp->write_state) {
default:
case libssh2_NB_state_idle:
Also, add up the number of bytes that actually already have been acked
but we haven't been able to return as such yet, so we will get that
data as well passed in here again.
*/
size_t already = (handle->u.file.offset_sent - handle->u.file.offset)+
handle->u.file.acked;
/* Number of bytes sent off that haven't been acked and therefor we
will get passed in here again.
if(count >= already) {
/* skip the part already made into packets */
buffer += already;
count -= already;
}
else
/* there is more data already fine than what we got in this call */
count = 0;
Also, add up the number of bytes that actually already have been
acked but we haven't been able to return as such yet, so we will
get that data as well passed in here again.
*/
already = (handle->u.file.offset_sent - handle->u.file.offset)+
handle->u.file.acked;
while(count) {
/* TODO: Possibly this should have some logic to prevent a very very
small fraction to be left but lets ignore that for now */
uint32_t size = MIN(MAX_SFTP_OUTGOING_SIZE, count);
uint32_t request_id;
if(count >= already) {
/* skip the part already made into packets */
buffer += already;
count -= already;
}
else
/* there is more data already fine than what we got in this call */
count = 0;
/* 25 = packet_len(4) + packet_type(1) + request_id(4) +
handle_len(4) + offset(8) + count(4) */
packet_len = handle->handle_len + size + 25;
sftp->write_state = libssh2_NB_state_idle;
while(count) {
/* TODO: Possibly this should have some logic to prevent a very
very small fraction to be left but lets ignore that for now */
uint32_t size = MIN(MAX_SFTP_OUTGOING_SIZE, count);
uint32_t request_id;
chunk = LIBSSH2_ALLOC(session, packet_len +
sizeof(struct sftp_pipeline_chunk));
if (!chunk)
return _libssh2_error(session, LIBSSH2_ERROR_ALLOC,
"malloc fail for FXP_WRITE");
/* 25 = packet_len(4) + packet_type(1) + request_id(4) +
handle_len(4) + offset(8) + count(4) */
packet_len = handle->handle_len + size + 25;
chunk->len = size;
chunk->sent = 0;
chunk->lefttosend = packet_len;
chunk = LIBSSH2_ALLOC(session, packet_len +
sizeof(struct sftp_pipeline_chunk));
if (!chunk)
return _libssh2_error(session, LIBSSH2_ERROR_ALLOC,
"malloc fail for FXP_WRITE");
s = chunk->packet;
_libssh2_store_u32(&s, packet_len - 4);
chunk->len = size;
chunk->sent = 0;
chunk->lefttosend = packet_len;
*(s++) = SSH_FXP_WRITE;
request_id = sftp->request_id++;
chunk->request_id = request_id;
_libssh2_store_u32(&s, request_id);
_libssh2_store_str(&s, handle->handle, handle->handle_len);
_libssh2_store_u64(&s, handle->u.file.offset_sent);
handle->u.file.offset_sent += size; /* advance offset at once */
_libssh2_store_str(&s, buffer, size);
s = chunk->packet;
_libssh2_store_u32(&s, packet_len - 4);
/* add this new entry LAST in the list */
_libssh2_list_add(&handle->packet_list, &chunk->node);
*(s++) = SSH_FXP_WRITE;
request_id = sftp->request_id++;
chunk->request_id = request_id;
_libssh2_store_u32(&s, request_id);
_libssh2_store_str(&s, handle->handle, handle->handle_len);
_libssh2_store_u64(&s, handle->u.file.offset_sent);
handle->u.file.offset_sent += size; /* advance offset at once */
_libssh2_store_str(&s, buffer, size);
buffer += size;
count -= size; /* deduct the size we used, as we might have
to create more packets */
}
/* add this new entry LAST in the list */
_libssh2_list_add(&handle->packet_list, &chunk->node);
/* move through the WRITE packets that haven't been sent and send as many
as possible - remember that we don't block */
chunk = _libssh2_list_first(&handle->packet_list);
buffer += size;
count -= size; /* deduct the size we used, as we might have
to create more packets */
}
while(chunk) {
if(chunk->lefttosend) {
rc = _libssh2_channel_write(channel, 0,
&chunk->packet[chunk->sent],
chunk->lefttosend);
if(rc < 0) {
if(rc != LIBSSH2_ERROR_EAGAIN)
/* error */
/* move through the WRITE packets that haven't been sent and send as many
as possible - remember that we don't block */
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
if(chunk->lefttosend) {
rc = _libssh2_channel_write(channel, 0,
&chunk->packet[chunk->sent],
chunk->lefttosend);
if(rc < 0)
/* remain in idle state */
return rc;
eagain++;
break;
/* remember where to continue sending the next time */
chunk->lefttosend -= rc;
chunk->sent += rc;
if(chunk->lefttosend)
/* data left to send, get out of loop */
break;
}
/* remember where to continue sending the next time */
chunk->lefttosend -= rc;
chunk->sent += rc;
/* move on to the next chunk with data to send */
chunk = _libssh2_list_next(&chunk->node);
}
/* fall-through */
case libssh2_NB_state_sent:
sftp->write_state = libssh2_NB_state_idle;
/*
* Count all ACKed packets
*/
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
if(chunk->lefttosend)
/* data left to send, get out of loop */
/* if the chunk still has data left to send, we shouldn't wait
for an ACK for it just yet */
break;
else if(acked)
/* if we have sent data that is acked, we must return that
info before we call a function that might return EAGAIN */
break;
/* we check the packets in order */
rc = sftp_packet_require(sftp, SSH_FXP_STATUS,
chunk->request_id, &data, &data_len);
if (rc < 0) {
if (rc == LIBSSH2_ERROR_EAGAIN)
sftp->write_state = libssh2_NB_state_sent;
return rc;
}
retcode = _libssh2_ntohu32(data + 5);
LIBSSH2_FREE(session, data);
sftp->last_errno = retcode;
if (retcode == LIBSSH2_FX_OK) {
acked += chunk->len; /* number of payload data that was acked
here */
/* we increase the offset value for all acks */
handle->u.file.offset += chunk->len;
next = _libssh2_list_next(&chunk->node);
_libssh2_list_remove(&chunk->node); /* remove from list */
LIBSSH2_FREE(session, chunk); /* free memory */
chunk = next;
}
else {
/* flush all pending packets from the outgoing list */
sftp_packetlist_flush(handle);
/* since we return error now, the applicaton will not get any
outstanding data acked, so we need to rewind the offset to
where the application knows it has reached with acked data */
handle->u.file.offset -= handle->u.file.acked;
/* then reset the offset_sent to be the same as the offset */
handle->u.file.offset_sent = handle->u.file.offset;
/* clear the acked counter since we can have no pending data to
ack after an error */
handle->u.file.acked = 0;
/* the server returned an error for that written chunk, propagate
this back to our parent function */
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
"FXP write failed");
}
}
/* move on to the next chunk with data to send */
chunk = _libssh2_list_next(&chunk->node);
}
/*
* Count all ACKed packets
*/
chunk = _libssh2_list_first(&handle->packet_list);
while(chunk) {
if(chunk->lefttosend)
/* if the chunk still has data left to send, we shouldn't wait for
an ACK for it just yet */
break;
/* we check the packets in order */
rc = sftp_packet_require(sftp, SSH_FXP_STATUS,
chunk->request_id, &data, &data_len);
if (rc == LIBSSH2_ERROR_EAGAIN) {
eagain++;
break;
}
else if (rc) {
return _libssh2_error(session, rc, "Waiting for SFTP status");
}
retcode = _libssh2_ntohu32(data + 5);
LIBSSH2_FREE(session, data);
sftp->last_errno = retcode;
if (retcode == LIBSSH2_FX_OK) {
acked += chunk->len; /* number of payload data that was acked
here */
/* we increase the offset value for all acks */
handle->u.file.offset += chunk->len;
next = _libssh2_list_next(&chunk->node);
_libssh2_list_remove(&chunk->node); /* remove from list */
LIBSSH2_FREE(session, chunk); /* free memory */
chunk = next;
}
else {
/* flush all pending packets from the outgoing list */
sftp_packetlist_flush(handle);
/* since we return error now, the applicaton will not get any
outstanding data acked, so we need to rewind the offset to
where the application knows it has reached with acked data */
handle->u.file.offset -= handle->u.file.acked;
/* then reset the offset_sent to be the same as the offset */
handle->u.file.offset_sent = handle->u.file.offset;
/* clear the acked counter since we can have no pending data to
ack after an error */
handle->u.file.acked = 0;
/* the server returned an error for that written chunk, propagate
this back to our parent function */
return _libssh2_error(session, LIBSSH2_ERROR_SFTP_PROTOCOL,
"FXP write failed");
}
break;
}
/* if there were acked data in a previous call that wasn't returned then,
@ -1813,9 +1824,7 @@ static ssize_t sftp_write(LIBSSH2_SFTP_HANDLE *handle, const char *buffer,
return ret;
}
else if(eagain)
return _libssh2_error(session, LIBSSH2_ERROR_EAGAIN,
"Would block sftp_write");
else
return 0; /* nothing was acked, and no EAGAIN was received! */
}

View File

@ -1,7 +1,7 @@
#ifndef _LIBSSH2_SFTP_H
#define _LIBSSH2_SFTP_H
/*
* Copyright (C) 2010, 2011 by Daniel Stenberg
* Copyright (C) 2010 - 2012 by Daniel Stenberg
* Author: Daniel Stenberg <daniel@haxx.se>
*
* Redistribution and use in source and binary forms,
@ -158,9 +158,12 @@ struct _LIBSSH2_SFTP
size_t open_packet_sent;
uint32_t open_request_id;
/* State variables used in libssh2_sftp_read() */
/* State variable used in sftp_read() */
libssh2_nonblocking_states read_state;
/* State variable used in sftp_write() */
libssh2_nonblocking_states write_state;
/* State variables used in libssh2_sftp_readdir() */
libssh2_nonblocking_states readdir_state;
unsigned char *readdir_packet;