Add a polling loop in main to read from more than one socket at once. Add the O_NONBLOCK and

SO_KEEPALIVE flag to all sockets. Note that several loops which used to continue on a return value
of 0 (theoretical since 0 would never be returned without O_NONBLOCK) now break on 0 so that they
won't continue reading until after poll is called again.
This commit is contained in:
Joe Mason 2012-08-02 17:22:46 -04:00
parent 84490052d4
commit 674da8ae07

View File

@ -50,6 +50,9 @@
#include <netinet/tcp.h> /* for TCP_NODELAY */
#endif
#include <fcntl.h>
#include <poll.h>
#define ENABLE_CURLX_PRINTF
/* make the curlx header define all printf() functions to use the curlx_*
versions instead */
@ -113,6 +116,11 @@ struct httprequest {
int done_processing;
};
#define MAX_SOCKETS 1024
static struct pollfd all_sockets[MAX_SOCKETS];
static nfds_t num_sockets = 0;
static int ProcessRequest(struct httprequest *req);
static void storerequest(char *reqbuf, size_t totalsize);
@ -1708,7 +1716,12 @@ static int accept_connection(int sock)
{
curl_socket_t msgsock = CURL_SOCKET_BAD;
int error;
int flag;
int flag = 1;
if(MAX_SOCKETS == num_sockets) {
logmsg("Too many open sockets!");
return CURL_SOCKET_BAD;
}
msgsock = accept(sock, NULL, NULL);
@ -1729,23 +1742,45 @@ static int accept_connection(int sock)
return CURL_SOCKET_BAD;
}
if(0 != fcntl(msgsock, F_SETFL, O_NONBLOCK)) {
error = SOCKERRNO;
logmsg("fcntl(O_NONBLOCK) failed with error: (%d) %s",
error, strerror(error));
sclose(msgsock);
return CURL_SOCKET_BAD;
}
if(0 != setsockopt(msgsock, SOL_SOCKET, SO_KEEPALIVE,
(void *)&flag, sizeof(flag))) {
error = SOCKERRNO;
logmsg("setsockopt(SO_KEEPALIVE) failed with error: (%d) %s",
error, strerror(error));
sclose(msgsock);
return CURL_SOCKET_BAD;
}
/*
** As soon as this server acepts a connection from the test harness it
** As soon as this server accepts a connection from the test harness it
** must set the server logs advisor read lock to indicate that server
** logs should not be read until this lock is removed by this server.
*/
if(!serverlogslocked)
set_advisor_read_lock(SERVERLOGS_LOCK);
serverlogslocked = 1;
serverlogslocked += 1;
logmsg("====> Client connect");
all_sockets[num_sockets].fd = msgsock;
all_sockets[num_sockets].events = POLLIN;
all_sockets[num_sockets].revents = 0;
num_sockets += 1;
#ifdef TCP_NODELAY
/*
* Disable the Nagle algorithm to make it easier to send out a large
* response in many small segments to torture the clients more.
*/
flag = 1;
if(0 != setsockopt(msgsock, IPPROTO_TCP, TCP_NODELAY,
(void *)&flag, sizeof(flag)))
logmsg("====> TCP_NODELAY failed");
@ -1764,9 +1799,9 @@ static int service_connection(int msgsock, struct httprequest *req,
if(got_exit_signal)
return -1;
init_httprequest(req);
while(!req->done_processing) {
int rc = get_request(msgsock, req);
logmsg("get_request %d returned %d", msgsock, rc);
if (rc <= 0) {
/* Nothing further to read now (possibly because the socket was closed */
return rc;
@ -1827,7 +1862,6 @@ int main(int argc, char *argv[])
{
srvr_sockaddr_union_t me;
curl_socket_t sock = CURL_SOCKET_BAD;
curl_socket_t msgsock = CURL_SOCKET_BAD;
int wrotepidfile = 0;
int flag;
unsigned short port = DEFAULT_PORT;
@ -1838,6 +1872,7 @@ int main(int argc, char *argv[])
int arg=1;
long pid;
const char *hostport = "127.0.0.1";
nfds_t socket_idx;
memset(&req, 0, sizeof(req));
@ -1949,6 +1984,11 @@ int main(int argc, char *argv[])
sock = socket(AF_INET6, SOCK_STREAM, 0);
#endif
all_sockets[0].fd = sock;
all_sockets[0].events = POLLIN;
all_sockets[0].revents = 0;
num_sockets = 1;
if(CURL_SOCKET_BAD == sock) {
error = SOCKERRNO;
logmsg("Error creating socket: (%d) %s",
@ -1964,6 +2004,12 @@ int main(int argc, char *argv[])
error, strerror(error));
goto sws_cleanup;
}
if(0 != fcntl(sock, F_SETFL, O_NONBLOCK)) {
error = SOCKERRNO;
logmsg("fcntl(O_NONBLOCK) failed with error: (%d) %s",
error, strerror(error));
goto sws_cleanup;
}
#ifdef ENABLE_IPV6
if(!use_ipv6) {
@ -2011,21 +2057,61 @@ int main(int argc, char *argv[])
if(!wrotepidfile)
goto sws_cleanup;
for (;;) {
do {
msgsock = accept_connection(sock);
if (CURL_SOCKET_BAD == msgsock)
goto sws_cleanup;
} while (msgsock >= 0);
/* initialization of httprequest struct is done before get_request(), but
the pipelining struct field must be initialized previously to FALSE
every time a new connection arrives. */
req.pipelining = FALSE;
init_httprequest(&req);
for(;;) {
/* Clear out closed sockets */
for (socket_idx = num_sockets - 1; socket_idx >= 1; --socket_idx) {
if (CURL_SOCKET_BAD == all_sockets[socket_idx].fd) {
char* dst = (char *) all_sockets + socket_idx;
char* src = (char *) all_sockets + socket_idx + 1;
char* end = (char *) all_sockets + num_sockets;
memmove(dst, src, end - src);
num_sockets -= 1;
}
}
rc = poll(all_sockets, num_sockets, -1);
if (rc < 0) {
error = SOCKERRNO;
logmsg("poll() failed with error: (%d) %s",
error, strerror(error));
goto sws_cleanup;
}
/* Check if the listening socket is ready to accept */
if ((all_sockets[0].revents & POLLIN) == POLLIN) {
/* Service all queued connections */
curl_socket_t msgsock;
do {
rc = service_connection(msgsock, &req, sock, hostport);
msgsock = accept_connection(sock);
logmsg("accept_connection %d returned %d", sock, msgsock);
if (CURL_SOCKET_BAD == msgsock)
goto sws_cleanup;
} while (msgsock > 0);
}
else if (all_sockets[0].revents != 0) {
logmsg("unexpected poll event on listening socket: %d",
all_sockets[0].revents);
goto sws_cleanup;
}
/* Service all connections that are ready */
for (socket_idx = 1; socket_idx < num_sockets; ++socket_idx) {
if ((all_sockets[socket_idx].revents & POLLIN) == POLLIN) {
if(got_exit_signal)
goto sws_cleanup;
/* Service this connection until it has nothing available */
do {
rc = service_connection(all_sockets[socket_idx].fd, &req, sock, hostport);
logmsg("service_connection %d returned %d", all_sockets[socket_idx].fd, rc);
if(got_exit_signal)
goto sws_cleanup;
@ -2044,26 +2130,41 @@ int main(int argc, char *argv[])
a single byte of server-reply. */
wait_ms(50);
if(msgsock != CURL_SOCKET_BAD) {
sclose(msgsock);
msgsock = CURL_SOCKET_BAD;
if(all_sockets[socket_idx].fd != CURL_SOCKET_BAD) {
sclose(all_sockets[socket_idx].fd);
all_sockets[socket_idx].fd = CURL_SOCKET_BAD;
}
if(serverlogslocked) {
serverlogslocked = 0;
serverlogslocked -= 1;
if(!serverlogslocked)
clear_advisor_read_lock(SERVERLOGS_LOCK);
}
if (req.testno == DOCNUMBER_QUIT)
goto sws_cleanup;
}
} while (rc > = 0);
/* Reset the request, unless we're still in the middle of reading */
if (rc != 0)
init_httprequest(&req);
} while (rc > 0);
}
else if (all_sockets[socket_idx].revents != 0) {
logmsg("unexpected poll event on socket %d: %d",
socket_idx, all_sockets[socket_idx].revents);
goto sws_cleanup;
}
}
if(got_exit_signal)
goto sws_cleanup;
}
sws_cleanup:
if((msgsock != sock) && (msgsock != CURL_SOCKET_BAD))
sclose(msgsock);
for (socket_idx = 1; socket_idx < num_sockets; ++socket_idx)
if((all_sockets[socket_idx].fd != sock) &&
(all_sockets[socket_idx].fd != CURL_SOCKET_BAD))
sclose(all_sockets[socket_idx].fd);
if(sock != CURL_SOCKET_BAD)
sclose(sock);