sws.c: improve proxy mode torture testing support

This commit is contained in:
Yang Tse 2012-01-13 05:13:48 +01:00
parent 54dede4166
commit d4bf87dc0e

View File

@ -1218,7 +1218,8 @@ static curl_socket_t connect_to(const char *ipaddr, unsigned short port)
#ifdef TCP_NODELAY
/* Disable the Nagle algorithm */
if(setsockopt(serverfd, level, TCP_NODELAY, (void *)&flag, sizeof(flag)) < 0)
if(setsockopt(serverfd, level, TCP_NODELAY,
(void *)&flag, sizeof(flag)) < 0)
logmsg("====> TCP_NODELAY for server conection failed");
#endif
@ -1284,24 +1285,40 @@ static curl_socket_t connect_to(const char *ipaddr, unsigned short port)
#define data_or_ctrl(x) ((x)?"DATA":"CTRL")
static void http_connect(curl_socket_t infd,
#define CTRL 0
#define DATA 1
static void http_connect(curl_socket_t *infdp,
curl_socket_t rootfd,
struct httprequest *req,
const char *ipaddr)
{
curl_socket_t serverfd[2];
curl_socket_t clientfd[2];
curl_socket_t datafd = CURL_SOCKET_BAD;
curl_socket_t serverfd[2] = {CURL_SOCKET_BAD, CURL_SOCKET_BAD};
curl_socket_t clientfd[2] = {CURL_SOCKET_BAD, CURL_SOCKET_BAD};
ssize_t toc[2] = {0, 0}; /* number of bytes to client */
ssize_t tos[2] = {0, 0}; /* number of bytes to server */
char readclient[2][256];
char readserver[2][256];
bool poll_client[2] = { TRUE, TRUE };
bool poll_server[2] = { TRUE, TRUE };
int control=0;
bool poll_client_rd[2] = { TRUE, TRUE };
bool poll_server_rd[2] = { TRUE, TRUE };
bool poll_client_wr[2] = { TRUE, TRUE };
bool poll_server_wr[2] = { TRUE, TRUE };
#ifdef TCP_NODELAY
curl_socklen_t flag = 1;
int level = IPPROTO_TCP;
#endif
bool primary = FALSE;
bool secondary = FALSE;
int max_tunnel_idx; /* CTRL or DATA */
#if 0
int quarters;
#endif
int i;
/* primary tunnel client endpoint already connected */
clientfd[CTRL] = *infdp;
#if 0
/* sleep here to make sure the client gets the CONNECT response
first and separate from the data that might follow here */
quarters = 4;
@ -1309,220 +1326,310 @@ static void http_connect(curl_socket_t infd,
quarters--;
wait_ms(250);
}
#endif
if(got_exit_signal)
return;
goto http_connect_cleanup;
clientfd[0] = infd;
clientfd[1] = CURL_SOCKET_BAD;
serverfd[CTRL] = connect_to(ipaddr, req->connect_port);
if(serverfd[CTRL] == CURL_SOCKET_BAD)
goto http_connect_cleanup;
serverfd[0] = connect_to(ipaddr, req->connect_port);
if(CURL_SOCKET_BAD == serverfd[0])
return;
serverfd[1] = CURL_SOCKET_BAD; /* nothing there (yet) */
/* Primary tunnel socket endpoints are now connected. Tunnel data back and
forth over the primary tunnel until client or server breaks the primary
tunnel, simultaneously allowing establishment, operation and teardown of
a secondary tunnel that may be used for passive FTP data connection. */
max_tunnel_idx = CTRL;
primary = TRUE;
while(!got_exit_signal) {
/* connected, now tunnel */
while(1) {
fd_set input;
fd_set output;
struct timeval timeout = {1,0};
struct timeval timeout = {0, 250000L}; /* 250 ms */
ssize_t rc;
curl_socket_t maxfd = (curl_socket_t)-1;
int used;
FD_ZERO(&input);
FD_ZERO(&output);
if(CURL_SOCKET_BAD != rootfd) {
FD_SET(rootfd, &input); /* monitor this for new connections */
if((clientfd[DATA] == CURL_SOCKET_BAD) &&
(serverfd[DATA] == CURL_SOCKET_BAD)) {
/* when secondary tunnel is not established the listener socket
is monitored to allow client to establish the secondary tunnel */
FD_SET(rootfd, &input);
maxfd = rootfd;
}
/* set sockets to wait for */
for(i=0; i<=control; i++) {
curl_socket_t mostfd = clientfd[i] > serverfd[i] ?
clientfd[i] : serverfd[i];
used = 0;
if(mostfd > maxfd)
maxfd = mostfd;
if(poll_client[i]) {
FD_SET(clientfd[i], &input);
used |= 1 << (i*4);
/* set tunnel sockets to wait for */
for(i = 0; i <= max_tunnel_idx; i++) {
/* client side socket monitoring */
if(clientfd[i] != CURL_SOCKET_BAD) {
if(poll_client_rd[i]) {
/* unless told not to do so, monitor readability */
FD_SET(clientfd[i], &input);
if(clientfd[i] > maxfd)
maxfd = clientfd[i];
}
if(poll_client_wr[i] && toc[i]) {
/* unless told not to do so, monitor writeability
if there is data ready to be sent to client */
FD_SET(clientfd[i], &output);
if(clientfd[i] > maxfd)
maxfd = clientfd[i];
}
}
if(poll_server[i]) {
FD_SET(serverfd[i], &input);
used |= 2 << (i*4);
}
if(toc[i]) { /* if there is data to client, wait until we can write */
FD_SET(clientfd[i], &output);
used |= 4 << (i*4);
}
if(tos[i]) { /* if there is data to server, wait until we can write */
FD_SET(serverfd[i], &output);
used |= 8 << (i*4);
/* server side socket monitoring */
if(serverfd[i] != CURL_SOCKET_BAD) {
if(poll_server_rd[i]) {
/* unless told not to do so, monitor readability */
FD_SET(serverfd[i], &input);
if(serverfd[i] > maxfd)
maxfd = serverfd[i];
}
if(poll_server_wr[i] && tos[i]) {
/* unless told not to do so, monitor writeability
if there is data ready to be sent to server */
FD_SET(serverfd[i], &output);
if(serverfd[i] > maxfd)
maxfd = serverfd[i];
}
}
}
if(got_exit_signal)
break;
rc = select((int)maxfd + 1, &input, &output, NULL, &timeout);
if(rc > 0) {
/* socket action */
size_t len;
int precontrol;
if((CURL_SOCKET_BAD != rootfd) &&
FD_ISSET(rootfd, &input)) {
/* a new connection! */
struct httprequest req2;
datafd = accept(rootfd, NULL, NULL);
if(CURL_SOCKET_BAD == datafd)
return;
logmsg("====> Client connect DATA");
req2.pipelining = FALSE;
if(get_request(datafd, &req2))
/* non-zero means error, break out of loop */
break;
send_doc(datafd, &req2);
if(DOCNUMBER_CONNECT != req2.testno) {
/* eeek, not a CONNECT */
sclose(datafd);
break;
}
/* deal with the new connection */
rootfd = CURL_SOCKET_BAD; /* prevent new connections */
clientfd[1] = datafd;
/* connect to the server */
serverfd[1] = connect_to(ipaddr, req2.connect_port);
if(serverfd[1] == CURL_SOCKET_BAD) {
/* BADNESS, bail out */
break;
}
control = 1; /* now we have two connections to work with */
}
/* store the value before the loop starts */
precontrol = control;
for(i=0; i<=control; i++) {
len = sizeof(readclient[i])-tos[i];
if(len && FD_ISSET(clientfd[i], &input)) {
/* read from client */
rc = sread(clientfd[i], &readclient[i][tos[i]], len);
if(rc <= 0) {
logmsg("[%s] got %d at %s:%d, STOP READING client", data_or_ctrl(i),
rc, __FILE__, __LINE__);
poll_client[i] = FALSE;
}
else {
logmsg("[%s] READ %d bytes from client", data_or_ctrl(i), rc);
logmsg("[%s] READ \"%s\"", data_or_ctrl(i),
data_to_hex(&readclient[i][tos[i]], rc));
tos[i] += rc;
}
}
len = sizeof(readserver[i])-toc[i];
if(len && FD_ISSET(serverfd[i], &input)) {
/* read from server */
rc = sread(serverfd[i], &readserver[i][toc[i]], len);
if(rc <= 0) {
logmsg("[%s] got %d at %s:%d, STOP READING server", data_or_ctrl(i),
rc, __FILE__, __LINE__);
poll_server[i] = FALSE;
}
else {
logmsg("[%s] READ %d bytes from server", data_or_ctrl(i), rc);
logmsg("[%s] READ \"%s\"", data_or_ctrl(i),
data_to_hex(&readserver[i][toc[i]], rc));
toc[i] += rc;
}
}
if(toc[i] && FD_ISSET(clientfd[i], &output)) {
/* write to client */
rc = swrite(clientfd[i], readserver[i], toc[i]);
if(rc <= 0) {
logmsg("[%s] got %d at %s:%d", data_or_ctrl(i),
rc, __FILE__, __LINE__);
control--;
break;
}
logmsg("[%s] SENT %d bytes to client", data_or_ctrl(i), rc);
logmsg("[%s] SENT \"%s\"", data_or_ctrl(i),
data_to_hex(readserver[i], rc));
if(toc[i] - rc)
memmove(&readserver[i][0], &readserver[i][rc], toc[i]-rc);
toc[i] -= rc;
}
if(tos[i] && FD_ISSET(serverfd[i], &output)) {
/* write to server */
rc = swrite(serverfd[i], readclient[i], tos[i]);
if(rc <= 0) {
logmsg("[%s] got %d at %s:%d", data_or_ctrl(i),
rc, __FILE__, __LINE__);
control--;
break;
}
logmsg("[%s] SENT %d bytes to server", data_or_ctrl(i), rc);
logmsg("[%s] SENT \"%s\"", data_or_ctrl(i),
data_to_hex(readclient[i], rc));
if(tos - rc)
memmove(&readclient[i][0], &readclient[i][rc], tos[i]-rc);
tos[i] -= rc;
}
if(!toc[i] && !poll_server[i]) {
/* nothing to send to the client is left, and server polling is
switched off, bail out */
logmsg("[%s] ENDING1", data_or_ctrl(i));
control--;
}
if(!tos[i] && !poll_client[i]) {
/* nothing to send to the server is left, and client polling is
switched off, bail out */
logmsg("[%s] ENDING2", data_or_ctrl(i));
control--;
}
}
if(precontrol > control) {
/* if the value was decremented we close the "lost" sockets */
if(serverfd[precontrol] != CURL_SOCKET_BAD)
shutdown(serverfd[precontrol], SHUT_RDWR);
if(clientfd[precontrol] != CURL_SOCKET_BAD)
shutdown(clientfd[precontrol], SHUT_RDWR);
quarters = 4;
while((quarters > 0) && !got_exit_signal) {
quarters--;
wait_ms(250);
}
if(serverfd[precontrol] != CURL_SOCKET_BAD)
sclose(serverfd[precontrol]);
if(clientfd[precontrol] != CURL_SOCKET_BAD)
sclose(clientfd[precontrol]);
}
if(control < 0)
if(got_exit_signal)
break;
/* ---------------------------------------------------------- */
/* passive mode FTP may establish a secondary tunnel */
if((clientfd[DATA] == CURL_SOCKET_BAD) &&
(serverfd[DATA] == CURL_SOCKET_BAD) && FD_ISSET(rootfd, &input)) {
/* a new connection on listener socket (most likely from client) */
curl_socket_t datafd = accept(rootfd, NULL, NULL);
if(datafd != CURL_SOCKET_BAD) {
struct httprequest req2;
int err;
logmsg("====> Client connect DATA");
#ifdef TCP_NODELAY
/* Disable the Nagle algorithm */
if(setsockopt(datafd, level, TCP_NODELAY,
(void *)&flag, sizeof(flag)) < 0)
logmsg("====> TCP_NODELAY for client conection failed");
#endif
req2.pipelining = FALSE;
err = get_request(datafd, &req2);
if(!err) {
err = send_doc(datafd, &req2);
if(!err && (req2.testno == DOCNUMBER_CONNECT)) {
/* connect to the server */
serverfd[DATA] = connect_to(ipaddr, req2.connect_port);
if(serverfd[DATA] != CURL_SOCKET_BAD) {
/* secondary tunnel established, now we have two connections */
poll_client_rd[DATA] = TRUE;
poll_client_wr[DATA] = TRUE;
poll_server_rd[DATA] = TRUE;
poll_server_wr[DATA] = TRUE;
max_tunnel_idx = DATA;
secondary = TRUE;
toc[DATA] = 0;
tos[DATA] = 0;
clientfd[DATA] = datafd;
datafd = CURL_SOCKET_BAD;
}
}
}
if(datafd != CURL_SOCKET_BAD) {
/* secondary tunnel not established */
shutdown(datafd, SHUT_RDWR);
sclose(datafd);
}
}
if(got_exit_signal)
break;
}
/* ---------------------------------------------------------- */
/* react to tunnel endpoint readable/writeable notifications */
for(i = 0; i <= max_tunnel_idx; i++) {
size_t len;
if(clientfd[i] != CURL_SOCKET_BAD) {
len = sizeof(readclient[i]) - tos[i];
if(len && FD_ISSET(clientfd[i], &input)) {
/* read from client */
rc = sread(clientfd[i], &readclient[i][tos[i]], len);
if(rc <= 0) {
logmsg("[%s] got %zd, STOP READING client", data_or_ctrl(i), rc);
shutdown(clientfd[i], SHUT_RD);
poll_client_rd[i] = FALSE;
}
else {
logmsg("[%s] READ %zd bytes from client", data_or_ctrl(i), rc);
logmsg("[%s] READ \"%s\"", data_or_ctrl(i),
data_to_hex(&readclient[i][tos[i]], rc));
tos[i] += rc;
}
}
}
if(serverfd[i] != CURL_SOCKET_BAD) {
len = sizeof(readserver[i])-toc[i];
if(len && FD_ISSET(serverfd[i], &input)) {
/* read from server */
rc = sread(serverfd[i], &readserver[i][toc[i]], len);
if(rc <= 0) {
logmsg("[%s] got %zd, STOP READING server", data_or_ctrl(i), rc);
shutdown(serverfd[i], SHUT_RD);
poll_server_rd[i] = FALSE;
}
else {
logmsg("[%s] READ %zd bytes from server", data_or_ctrl(i), rc);
logmsg("[%s] READ \"%s\"", data_or_ctrl(i),
data_to_hex(&readserver[i][toc[i]], rc));
toc[i] += rc;
}
}
}
if(clientfd[i] != CURL_SOCKET_BAD) {
if(toc[i] && FD_ISSET(clientfd[i], &output)) {
/* write to client */
rc = swrite(clientfd[i], readserver[i], toc[i]);
if(rc <= 0) {
logmsg("[%s] got %zd, STOP WRITING client", data_or_ctrl(i), rc);
shutdown(clientfd[i], SHUT_WR);
poll_client_wr[i] = FALSE;
}
else {
logmsg("[%s] SENT %zd bytes to client", data_or_ctrl(i), rc);
logmsg("[%s] SENT \"%s\"", data_or_ctrl(i),
data_to_hex(readserver[i], rc));
if(toc[i] - rc)
memmove(&readserver[i][0], &readserver[i][rc], toc[i]-rc);
toc[i] -= rc;
}
}
}
if(serverfd[i] != CURL_SOCKET_BAD) {
if(tos[i] && FD_ISSET(serverfd[i], &output)) {
/* write to server */
rc = swrite(serverfd[i], readclient[i], tos[i]);
if(rc <= 0) {
logmsg("[%s] got %zd, STOP WRITING server", data_or_ctrl(i), rc);
shutdown(serverfd[i], SHUT_WR);
poll_server_wr[i] = FALSE;
}
else {
logmsg("[%s] SENT %zd bytes to server", data_or_ctrl(i), rc);
logmsg("[%s] SENT \"%s\"", data_or_ctrl(i),
data_to_hex(readclient[i], rc));
if(tos[i] - rc)
memmove(&readclient[i][0], &readclient[i][rc], tos[i]-rc);
tos[i] -= rc;
}
}
}
}
if(got_exit_signal)
break;
/* ---------------------------------------------------------- */
/* endpoint read/write disabling, endpoint closing and tunnel teardown */
for(i = 0; i <= max_tunnel_idx; i++) {
int loop;
for(loop = 2; loop; loop--) {
/* loop twice to satisfy condition interdependencies without
having to await select timeout or another socket event */
if(clientfd[i] != CURL_SOCKET_BAD) {
if(poll_client_rd[i] && !poll_server_wr[i]) {
logmsg("[%s] DISABLED READING client", data_or_ctrl(i));
shutdown(clientfd[i], SHUT_RD);
poll_client_rd[i] = FALSE;
}
if(poll_client_wr[i] && !poll_server_rd[i] && !toc[i]) {
logmsg("[%s] DISABLED WRITING client", data_or_ctrl(i));
shutdown(clientfd[i], SHUT_WR);
poll_client_wr[i] = FALSE;
}
if(!poll_client_wr[i] && !poll_client_rd[i]) {
logmsg("[%s] CLOSING client socket", data_or_ctrl(i));
sclose(clientfd[i]);
clientfd[i] = CURL_SOCKET_BAD;
if(serverfd[i] == CURL_SOCKET_BAD) {
logmsg("[%s] ENDING", data_or_ctrl(i));
if(i == DATA)
secondary = FALSE;
else
primary = FALSE;
}
}
}
if(serverfd[i] != CURL_SOCKET_BAD) {
if(poll_server_rd[i] && !poll_client_wr[i]) {
logmsg("[%s] DISABLED READING server", data_or_ctrl(i));
shutdown(serverfd[i], SHUT_RD);
poll_server_rd[i] = FALSE;
}
if(poll_server_wr[i] && !poll_client_rd[i] && !tos[i]) {
logmsg("[%s] DISABLED WRITING server", data_or_ctrl(i));
shutdown(serverfd[i], SHUT_WR);
poll_server_wr[i] = FALSE;
}
if(!poll_server_wr[i] && !poll_server_rd[i]) {
logmsg("[%s] CLOSING server socket", data_or_ctrl(i));
sclose(serverfd[i]);
serverfd[i] = CURL_SOCKET_BAD;
if(clientfd[i] == CURL_SOCKET_BAD) {
logmsg("[%s] ENDING", data_or_ctrl(i));
if(i == DATA)
secondary = FALSE;
else
primary = FALSE;
}
}
}
}
}
/* ---------------------------------------------------------- */
max_tunnel_idx = secondary ? DATA : CTRL;
if(!primary)
/* exit loop upon primary tunnel teardown */
break;
} /* (rc > 0) */
}
http_connect_cleanup:
for(i = DATA; i >= CTRL; i--) {
if(serverfd[i] != CURL_SOCKET_BAD) {
logmsg("[%s] CLOSING server socket (cleanup)", data_or_ctrl(i));
shutdown(serverfd[i], SHUT_RDWR);
sclose(serverfd[i]);
}
if(clientfd[i] != CURL_SOCKET_BAD) {
logmsg("[%s] CLOSING client socket (cleanup)", data_or_ctrl(i));
shutdown(clientfd[i], SHUT_RDWR);
sclose(clientfd[i]);
}
if((serverfd[i] != CURL_SOCKET_BAD) ||
(clientfd[i] != CURL_SOCKET_BAD)) {
logmsg("[%s] ABORTING", data_or_ctrl(i));
}
}
#if 0
/* close all sockets we created */
for(i=0; i<2; i++) {
if(serverfd[i] != CURL_SOCKET_BAD)
sclose(serverfd[i]);
if(clientfd[i] != CURL_SOCKET_BAD)
sclose(clientfd[i]);
}
#endif
*infdp = CURL_SOCKET_BAD;
}
int main(int argc, char *argv[])
@ -1810,7 +1917,7 @@ int main(int argc, char *argv[])
if(DOCNUMBER_CONNECT == req.testno) {
/* a CONNECT request, setup and talk the tunnel */
http_connect(msgsock, sock, &req, hostport);
http_connect(&msgsock, sock, &req, hostport);
break;
}
@ -1845,8 +1952,10 @@ int main(int argc, char *argv[])
a single byte of server-reply. */
wait_ms(50);
sclose(msgsock);
msgsock = CURL_SOCKET_BAD;
if(msgsock != CURL_SOCKET_BAD) {
sclose(msgsock);
msgsock = CURL_SOCKET_BAD;
}
if(serverlogslocked) {
serverlogslocked = 0;