[DEV] add v1.76.0

This commit is contained in:
2021-10-05 21:37:46 +02:00
parent a97e9ae7d4
commit d0115b733d
45133 changed files with 4744437 additions and 1026325 deletions

View File

@@ -0,0 +1,36 @@
#
# Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
#
# Distributed under the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
#
lib socket ; # SOLARIS
lib nsl ; # SOLARIS
lib ws2_32 ; # NT
lib mswsock ; # NT
lib ipv6 ; # HPUX
lib network ; # HAIKU
project
: requirements
<library>/boost/system//boost_system
<library>/boost/chrono//boost_chrono
<define>BOOST_ALL_NO_LIB=1
<threading>multi
<target-os>solaris:<library>socket
<target-os>solaris:<library>nsl
<target-os>windows:<define>_WIN32_WINNT=0x0501
<target-os>windows,<toolset>gcc:<library>ws2_32
<target-os>windows,<toolset>gcc:<library>mswsock
<target-os>windows,<toolset>gcc-cygwin:<define>__USE_W32_SOCKETS
<target-os>hpux,<toolset>gcc:<define>_XOPEN_SOURCE_EXTENDED
<target-os>hpux:<library>ipv6
<target-os>haiku:<library>network
;
exe async_tcp_client : async_tcp_client.cpp ;
exe blocking_tcp_client : blocking_tcp_client.cpp ;
exe blocking_token_tcp_client : blocking_token_tcp_client.cpp ;
exe blocking_udp_client : blocking_udp_client.cpp ;
exe server : server.cpp ;

View File

@@ -0,0 +1,311 @@
//
// async_tcp_client.cpp
// ~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
#include <functional>
#include <iostream>
#include <string>
using boost::asio::steady_timer;
using boost::asio::ip::tcp;
using std::placeholders::_1;
using std::placeholders::_2;
//
// This class manages socket timeouts by applying the concept of a deadline.
// Some asynchronous operations are given deadlines by which they must complete.
// Deadlines are enforced by an "actor" that persists for the lifetime of the
// client object:
//
// +----------------+
// | |
// | check_deadline |<---+
// | | |
// +----------------+ | async_wait()
// | |
// +---------+
//
// If the deadline actor determines that the deadline has expired, the socket
// is closed and any outstanding operations are consequently cancelled.
//
// Connection establishment involves trying each endpoint in turn until a
// connection is successful, or the available endpoints are exhausted. If the
// deadline actor closes the socket, the connect actor is woken up and moves to
// the next endpoint.
//
// +---------------+
// | |
// | start_connect |<---+
// | | |
// +---------------+ |
// | |
// async_- | +----------------+
// connect() | | |
// +--->| handle_connect |
// | |
// +----------------+
// :
// Once a connection is :
// made, the connect :
// actor forks in two - :
// :
// an actor for reading : and an actor for
// inbound messages: : sending heartbeats:
// :
// +------------+ : +-------------+
// | |<- - - - -+- - - - ->| |
// | start_read | | start_write |<---+
// | |<---+ | | |
// +------------+ | +-------------+ | async_wait()
// | | | |
// async_- | +-------------+ async_- | +--------------+
// read_- | | | write() | | |
// until() +--->| handle_read | +--->| handle_write |
// | | | |
// +-------------+ +--------------+
//
// The input actor reads messages from the socket, where messages are delimited
// by the newline character. The deadline for a complete message is 30 seconds.
//
// The heartbeat actor sends a heartbeat (a message that consists of a single
// newline character) every 10 seconds. In this example, no deadline is applied
// to message sending.
//
class client
{
public:
client(boost::asio::io_context& io_context)
: socket_(io_context),
deadline_(io_context),
heartbeat_timer_(io_context)
{
}
// Called by the user of the client class to initiate the connection process.
// The endpoints will have been obtained using a tcp::resolver.
void start(tcp::resolver::results_type endpoints)
{
// Start the connect actor.
endpoints_ = endpoints;
start_connect(endpoints_.begin());
// Start the deadline actor. You will note that we're not setting any
// particular deadline here. Instead, the connect and input actors will
// update the deadline prior to each asynchronous operation.
deadline_.async_wait(std::bind(&client::check_deadline, this));
}
// This function terminates all the actors to shut down the connection. It
// may be called by the user of the client class, or by the class itself in
// response to graceful termination or an unrecoverable error.
void stop()
{
stopped_ = true;
boost::system::error_code ignored_error;
socket_.close(ignored_error);
deadline_.cancel();
heartbeat_timer_.cancel();
}
private:
void start_connect(tcp::resolver::results_type::iterator endpoint_iter)
{
if (endpoint_iter != endpoints_.end())
{
std::cout << "Trying " << endpoint_iter->endpoint() << "...\n";
// Set a deadline for the connect operation.
deadline_.expires_after(std::chrono::seconds(60));
// Start the asynchronous connect operation.
socket_.async_connect(endpoint_iter->endpoint(),
std::bind(&client::handle_connect,
this, _1, endpoint_iter));
}
else
{
// There are no more endpoints to try. Shut down the client.
stop();
}
}
void handle_connect(const boost::system::error_code& error,
tcp::resolver::results_type::iterator endpoint_iter)
{
if (stopped_)
return;
// The async_connect() function automatically opens the socket at the start
// of the asynchronous operation. If the socket is closed at this time then
// the timeout handler must have run first.
if (!socket_.is_open())
{
std::cout << "Connect timed out\n";
// Try the next available endpoint.
start_connect(++endpoint_iter);
}
// Check if the connect operation failed before the deadline expired.
else if (error)
{
std::cout << "Connect error: " << error.message() << "\n";
// We need to close the socket used in the previous connection attempt
// before starting a new one.
socket_.close();
// Try the next available endpoint.
start_connect(++endpoint_iter);
}
// Otherwise we have successfully established a connection.
else
{
std::cout << "Connected to " << endpoint_iter->endpoint() << "\n";
// Start the input actor.
start_read();
// Start the heartbeat actor.
start_write();
}
}
void start_read()
{
// Set a deadline for the read operation.
deadline_.expires_after(std::chrono::seconds(30));
// Start an asynchronous operation to read a newline-delimited message.
boost::asio::async_read_until(socket_,
boost::asio::dynamic_buffer(input_buffer_), '\n',
std::bind(&client::handle_read, this, _1, _2));
}
void handle_read(const boost::system::error_code& error, std::size_t n)
{
if (stopped_)
return;
if (!error)
{
// Extract the newline-delimited message from the buffer.
std::string line(input_buffer_.substr(0, n - 1));
input_buffer_.erase(0, n);
// Empty messages are heartbeats and so ignored.
if (!line.empty())
{
std::cout << "Received: " << line << "\n";
}
start_read();
}
else
{
std::cout << "Error on receive: " << error.message() << "\n";
stop();
}
}
void start_write()
{
if (stopped_)
return;
// Start an asynchronous operation to send a heartbeat message.
boost::asio::async_write(socket_, boost::asio::buffer("\n", 1),
std::bind(&client::handle_write, this, _1));
}
void handle_write(const boost::system::error_code& error)
{
if (stopped_)
return;
if (!error)
{
// Wait 10 seconds before sending the next heartbeat.
heartbeat_timer_.expires_after(std::chrono::seconds(10));
heartbeat_timer_.async_wait(std::bind(&client::start_write, this));
}
else
{
std::cout << "Error on heartbeat: " << error.message() << "\n";
stop();
}
}
void check_deadline()
{
if (stopped_)
return;
// Check whether the deadline has passed. We compare the deadline against
// the current time since a new asynchronous operation may have moved the
// deadline before this actor had a chance to run.
if (deadline_.expiry() <= steady_timer::clock_type::now())
{
// The deadline has passed. The socket is closed so that any outstanding
// asynchronous operations are cancelled.
socket_.close();
// There is no longer an active deadline. The expiry is set to the
// maximum time point so that the actor takes no action until a new
// deadline is set.
deadline_.expires_at(steady_timer::time_point::max());
}
// Put the actor back to sleep.
deadline_.async_wait(std::bind(&client::check_deadline, this));
}
private:
bool stopped_ = false;
tcp::resolver::results_type endpoints_;
tcp::socket socket_;
std::string input_buffer_;
steady_timer deadline_;
steady_timer heartbeat_timer_;
};
int main(int argc, char* argv[])
{
try
{
if (argc != 3)
{
std::cerr << "Usage: client <host> <port>\n";
return 1;
}
boost::asio::io_context io_context;
tcp::resolver r(io_context);
client c(io_context);
c.start(r.resolve(argv[1], argv[2]));
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@@ -0,0 +1,192 @@
//
// blocking_tcp_client.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/asio/buffer.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/system/system_error.hpp>
#include <boost/asio/write.hpp>
#include <cstdlib>
#include <iostream>
#include <string>
using boost::asio::ip::tcp;
//----------------------------------------------------------------------
//
// This class manages socket timeouts by running the io_context using the timed
// io_context::run_for() member function. Each asynchronous operation is given
// a timeout within which it must complete. The socket operations themselves
// use lambdas as completion handlers. For a given socket operation, the client
// object runs the io_context to block thread execution until the operation
// completes or the timeout is reached. If the io_context::run_for() function
// times out, the socket is closed and the outstanding asynchronous operation
// is cancelled.
//
class client
{
public:
void connect(const std::string& host, const std::string& service,
std::chrono::steady_clock::duration timeout)
{
// Resolve the host name and service to a list of endpoints.
auto endpoints = tcp::resolver(io_context_).resolve(host, service);
// Start the asynchronous operation itself. The lambda that is used as a
// callback will update the error variable when the operation completes.
// The blocking_udp_client.cpp example shows how you can use std::bind
// rather than a lambda.
boost::system::error_code error;
boost::asio::async_connect(socket_, endpoints,
[&](const boost::system::error_code& result_error,
const tcp::endpoint& /*result_endpoint*/)
{
error = result_error;
});
// Run the operation until it completes, or until the timeout.
run(timeout);
// Determine whether a connection was successfully established.
if (error)
throw std::system_error(error);
}
std::string read_line(std::chrono::steady_clock::duration timeout)
{
// Start the asynchronous operation. The lambda that is used as a callback
// will update the error and n variables when the operation completes. The
// blocking_udp_client.cpp example shows how you can use std::bind rather
// than a lambda.
boost::system::error_code error;
std::size_t n = 0;
boost::asio::async_read_until(socket_,
boost::asio::dynamic_buffer(input_buffer_), '\n',
[&](const boost::system::error_code& result_error,
std::size_t result_n)
{
error = result_error;
n = result_n;
});
// Run the operation until it completes, or until the timeout.
run(timeout);
// Determine whether the read completed successfully.
if (error)
throw std::system_error(error);
std::string line(input_buffer_.substr(0, n - 1));
input_buffer_.erase(0, n);
return line;
}
void write_line(const std::string& line,
std::chrono::steady_clock::duration timeout)
{
std::string data = line + "\n";
// Start the asynchronous operation itself. The lambda that is used as a
// callback will update the error variable when the operation completes.
// The blocking_udp_client.cpp example shows how you can use std::bind
// rather than a lambda.
boost::system::error_code error;
boost::asio::async_write(socket_, boost::asio::buffer(data),
[&](const boost::system::error_code& result_error,
std::size_t /*result_n*/)
{
error = result_error;
});
// Run the operation until it completes, or until the timeout.
run(timeout);
// Determine whether the read completed successfully.
if (error)
throw std::system_error(error);
}
private:
void run(std::chrono::steady_clock::duration timeout)
{
// Restart the io_context, as it may have been left in the "stopped" state
// by a previous operation.
io_context_.restart();
// Block until the asynchronous operation has completed, or timed out. If
// the pending asynchronous operation is a composed operation, the deadline
// applies to the entire operation, rather than individual operations on
// the socket.
io_context_.run_for(timeout);
// If the asynchronous operation completed successfully then the io_context
// would have been stopped due to running out of work. If it was not
// stopped, then the io_context::run_for call must have timed out.
if (!io_context_.stopped())
{
// Close the socket to cancel the outstanding asynchronous operation.
socket_.close();
// Run the io_context again until the operation completes.
io_context_.run();
}
}
boost::asio::io_context io_context_;
tcp::socket socket_{io_context_};
std::string input_buffer_;
};
//----------------------------------------------------------------------
int main(int argc, char* argv[])
{
try
{
if (argc != 4)
{
std::cerr << "Usage: blocking_tcp_client <host> <port> <message>\n";
return 1;
}
client c;
c.connect(argv[1], argv[2], std::chrono::seconds(10));
auto time_sent = std::chrono::steady_clock::now();
c.write_line(argv[3], std::chrono::seconds(10));
for (;;)
{
std::string line = c.read_line(std::chrono::seconds(10));
// Keep going until we get back the line that was sent.
if (line == argv[3])
break;
}
auto time_received = std::chrono::steady_clock::now();
std::cout << "Round trip time: ";
std::cout << std::chrono::duration_cast<
std::chrono::microseconds>(
time_received - time_sent).count();
std::cout << " microseconds\n";
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@@ -0,0 +1,200 @@
//
// blocking_token_tcp_client.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/asio/connect.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/system/system_error.hpp>
#include <boost/asio/write.hpp>
#include <cstdlib>
#include <iostream>
#include <memory>
#include <string>
using boost::asio::ip::tcp;
// We will use our sockets only with an io_context.
using tcp_socket = boost::asio::basic_stream_socket<
tcp, boost::asio::io_context::executor_type>;
//----------------------------------------------------------------------
// A custom completion token that makes asynchronous operations behave as
// though they are blocking calls with a timeout.
struct close_after
{
close_after(std::chrono::steady_clock::duration t, tcp_socket& s)
: timeout_(t), socket_(s)
{
}
// The maximum time to wait for an asynchronous operation to complete.
std::chrono::steady_clock::duration timeout_;
// The socket to be closed if the operation does not complete in time.
tcp_socket& socket_;
};
namespace boost {
namespace asio {
// The async_result template is specialised to allow the close_after token to
// be used with asynchronous operations that have a completion signature of
// void(error_code, T). Generalising this for all completion signature forms is
// left as an exercise for the reader.
template <typename T>
class async_result<close_after, void(boost::system::error_code, T)>
{
public:
// An asynchronous operation's initiating function automatically creates an
// completion_handler_type object from the token. This function object is
// then called on completion of the asynchronous operation.
class completion_handler_type
{
public:
completion_handler_type(const close_after& token)
: token_(token)
{
}
void operator()(const boost::system::error_code& error, T t)
{
*error_ = error;
*t_ = t;
}
private:
friend class async_result;
close_after token_;
boost::system::error_code* error_;
T* t_;
};
// The async_result constructor associates the completion handler object with
// the result of the initiating function.
explicit async_result(completion_handler_type& h)
: timeout_(h.token_.timeout_),
socket_(h.token_.socket_)
{
h.error_ = &error_;
h.t_ = &t_;
}
// The return_type typedef determines the result type of the asynchronous
// operation's initiating function.
typedef T return_type;
// The get() function is used to obtain the result of the asynchronous
// operation's initiating function. For the close_after completion token, we
// use this function to run the io_context until the operation is complete.
return_type get()
{
boost::asio::io_context& io_context = boost::asio::query(
socket_.get_executor(), boost::asio::execution::context);
// Restart the io_context, as it may have been left in the "stopped" state
// by a previous operation.
io_context.restart();
// Block until the asynchronous operation has completed, or timed out. If
// the pending asynchronous operation is a composed operation, the deadline
// applies to the entire operation, rather than individual operations on
// the socket.
io_context.run_for(timeout_);
// If the asynchronous operation completed successfully then the io_context
// would have been stopped due to running out of work. If it was not
// stopped, then the io_context::run_for call must have timed out and the
// operation is still incomplete.
if (!io_context.stopped())
{
// Close the socket to cancel the outstanding asynchronous operation.
socket_.close();
// Run the io_context again until the operation completes.
io_context.run();
}
// If the operation failed, throw an exception. Otherwise return the result.
return error_ ? throw std::system_error(error_) : t_;
}
private:
std::chrono::steady_clock::duration timeout_;
tcp_socket& socket_;
boost::system::error_code error_;
T t_;
};
} // namespace asio
} // namespace boost
//----------------------------------------------------------------------
int main(int argc, char* argv[])
{
try
{
if (argc != 4)
{
std::cerr << "Usage: blocking_tcp_client <host> <port> <message>\n";
return 1;
}
boost::asio::io_context io_context;
// Resolve the host name and service to a list of endpoints.
auto endpoints = tcp::resolver(io_context).resolve(argv[1], argv[2]);
tcp_socket socket(io_context);
// Run an asynchronous connect operation with a timeout.
boost::asio::async_connect(socket, endpoints,
close_after(std::chrono::seconds(10), socket));
auto time_sent = std::chrono::steady_clock::now();
// Run an asynchronous write operation with a timeout.
std::string msg = argv[3] + std::string("\n");
boost::asio::async_write(socket, boost::asio::buffer(msg),
close_after(std::chrono::seconds(10), socket));
for (std::string input_buffer;;)
{
// Run an asynchronous read operation with a timeout.
std::size_t n = boost::asio::async_read_until(socket,
boost::asio::dynamic_buffer(input_buffer), '\n',
close_after(std::chrono::seconds(10), socket));
std::string line(input_buffer.substr(0, n - 1));
input_buffer.erase(0, n);
// Keep going until we get back the line that was sent.
if (line == argv[3])
break;
}
auto time_received = std::chrono::steady_clock::now();
std::cout << "Round trip time: ";
std::cout << std::chrono::duration_cast<
std::chrono::microseconds>(
time_received - time_sent).count();
std::cout << " microseconds\n";
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@@ -0,0 +1,155 @@
//
// blocking_udp_client.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/udp.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
using boost::asio::ip::udp;
using std::placeholders::_1;
using std::placeholders::_2;
//----------------------------------------------------------------------
//
// This class manages socket timeouts by running the io_context using the timed
// io_context::run_for() member function. Each asynchronous operation is given
// a timeout within which it must complete. The socket operations themselves
// use std::bind to specify the completion handler:
//
// +---------------+
// | |
// | receive |
// | |
// +---------------+
// |
// async_- | +----------------+
// receive() | | |
// +--->| handle_receive |
// | |
// +----------------+
//
// For a given socket operation, the client object runs the io_context to block
// thread execution until the operation completes or the timeout is reached. If
// the io_context::run_for() function times out, the socket is closed and the
// outstanding asynchronous operation is cancelled.
//
class client
{
public:
client(const udp::endpoint& listen_endpoint)
: socket_(io_context_, listen_endpoint)
{
}
std::size_t receive(const boost::asio::mutable_buffer& buffer,
std::chrono::steady_clock::duration timeout,
boost::system::error_code& error)
{
// Start the asynchronous operation. The handle_receive function used as a
// callback will update the error and length variables.
std::size_t length = 0;
socket_.async_receive(boost::asio::buffer(buffer),
std::bind(&client::handle_receive, _1, _2, &error, &length));
// Run the operation until it completes, or until the timeout.
run(timeout);
return length;
}
private:
void run(std::chrono::steady_clock::duration timeout)
{
// Restart the io_context, as it may have been left in the "stopped" state
// by a previous operation.
io_context_.restart();
// Block until the asynchronous operation has completed, or timed out. If
// the pending asynchronous operation is a composed operation, the deadline
// applies to the entire operation, rather than individual operations on
// the socket.
io_context_.run_for(timeout);
// If the asynchronous operation completed successfully then the io_context
// would have been stopped due to running out of work. If it was not
// stopped, then the io_context::run_for call must have timed out.
if (!io_context_.stopped())
{
// Cancel the outstanding asynchronous operation.
socket_.cancel();
// Run the io_context again until the operation completes.
io_context_.run();
}
}
static void handle_receive(
const boost::system::error_code& error, std::size_t length,
boost::system::error_code* out_error, std::size_t* out_length)
{
*out_error = error;
*out_length = length;
}
private:
boost::asio::io_context io_context_;
udp::socket socket_;
};
//----------------------------------------------------------------------
int main(int argc, char* argv[])
{
try
{
using namespace std; // For atoi.
if (argc != 3)
{
std::cerr << "Usage: blocking_udp_client <listen_addr> <listen_port>\n";
return 1;
}
udp::endpoint listen_endpoint(
boost::asio::ip::make_address(argv[1]),
std::atoi(argv[2]));
client c(listen_endpoint);
for (;;)
{
char data[1024];
boost::system::error_code error;
std::size_t n = c.receive(boost::asio::buffer(data),
std::chrono::seconds(10), error);
if (error)
{
std::cout << "Receive error: " << error.message() << "\n";
}
else
{
std::cout << "Received: ";
std::cout.write(data, n);
std::cout << "\n";
}
}
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}

View File

@@ -0,0 +1,433 @@
//
// server.cpp
// ~~~~~~~~~~
//
// Copyright (c) 2003-2021 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <algorithm>
#include <cstdlib>
#include <deque>
#include <iostream>
#include <memory>
#include <set>
#include <string>
#include <boost/asio/buffer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ip/udp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
using boost::asio::steady_timer;
using boost::asio::ip::tcp;
using boost::asio::ip::udp;
//----------------------------------------------------------------------
class subscriber
{
public:
virtual ~subscriber() = default;
virtual void deliver(const std::string& msg) = 0;
};
typedef std::shared_ptr<subscriber> subscriber_ptr;
//----------------------------------------------------------------------
class channel
{
public:
void join(subscriber_ptr subscriber)
{
subscribers_.insert(subscriber);
}
void leave(subscriber_ptr subscriber)
{
subscribers_.erase(subscriber);
}
void deliver(const std::string& msg)
{
for (const auto& s : subscribers_)
{
s->deliver(msg);
}
}
private:
std::set<subscriber_ptr> subscribers_;
};
//----------------------------------------------------------------------
//
// This class manages socket timeouts by applying the concept of a deadline.
// Some asynchronous operations are given deadlines by which they must complete.
// Deadlines are enforced by two "actors" that persist for the lifetime of the
// session object, one for input and one for output:
//
// +----------------+ +----------------+
// | | | |
// | check_deadline |<-------+ | check_deadline |<-------+
// | | | | | |
// +----------------+ | +----------------+ |
// | | | |
// async_wait() | +----------------+ async_wait() | +----------------+
// on input | | lambda | on output | | lambda |
// deadline +--->| in | deadline +--->| in |
// | check_deadline | | check_deadline |
// +----------------+ +----------------+
//
// If either deadline actor determines that the corresponding deadline has
// expired, the socket is closed and any outstanding operations are cancelled.
//
// The input actor reads messages from the socket, where messages are delimited
// by the newline character:
//
// +-------------+
// | |
// | read_line |<----+
// | | |
// +-------------+ |
// | |
// async_- | +-------------+
// read_- | | lambda |
// until() +--->| in |
// | read_line |
// +-------------+
//
// The deadline for receiving a complete message is 30 seconds. If a non-empty
// message is received, it is delivered to all subscribers. If a heartbeat (a
// message that consists of a single newline character) is received, a heartbeat
// is enqueued for the client, provided there are no other messages waiting to
// be sent.
//
// The output actor is responsible for sending messages to the client:
//
// +----------------+
// | |<---------------------+
// | await_output | |
// | |<-------+ |
// +----------------+ | |
// | | | |
// | async_- | +----------------+ |
// | wait() | | lambda | |
// | +->| in | |
// | | await_output | |
// | +----------------+ |
// V |
// +--------------+ +--------------+
// | | async_write() | lambda |
// | write_line |-------------->| in |
// | | | write_line |
// +--------------+ +--------------+
//
// The output actor first waits for an output message to be enqueued. It does
// this by using a steady_timer as an asynchronous condition variable. The
// steady_timer will be signalled whenever the output queue is non-empty.
//
// Once a message is available, it is sent to the client. The deadline for
// sending a complete message is 30 seconds. After the message is successfully
// sent, the output actor again waits for the output queue to become non-empty.
//
class tcp_session
: public subscriber,
public std::enable_shared_from_this<tcp_session>
{
public:
tcp_session(tcp::socket socket, channel& ch)
: channel_(ch),
socket_(std::move(socket))
{
input_deadline_.expires_at(steady_timer::time_point::max());
output_deadline_.expires_at(steady_timer::time_point::max());
// The non_empty_output_queue_ steady_timer is set to the maximum time
// point whenever the output queue is empty. This ensures that the output
// actor stays asleep until a message is put into the queue.
non_empty_output_queue_.expires_at(steady_timer::time_point::max());
}
// Called by the server object to initiate the four actors.
void start()
{
channel_.join(shared_from_this());
read_line();
check_deadline(input_deadline_);
await_output();
check_deadline(output_deadline_);
}
private:
void stop()
{
channel_.leave(shared_from_this());
boost::system::error_code ignored_error;
socket_.close(ignored_error);
input_deadline_.cancel();
non_empty_output_queue_.cancel();
output_deadline_.cancel();
}
bool stopped() const
{
return !socket_.is_open();
}
void deliver(const std::string& msg) override
{
output_queue_.push_back(msg + "\n");
// Signal that the output queue contains messages. Modifying the expiry
// will wake the output actor, if it is waiting on the timer.
non_empty_output_queue_.expires_at(steady_timer::time_point::min());
}
void read_line()
{
// Set a deadline for the read operation.
input_deadline_.expires_after(std::chrono::seconds(30));
// Start an asynchronous operation to read a newline-delimited message.
auto self(shared_from_this());
boost::asio::async_read_until(socket_,
boost::asio::dynamic_buffer(input_buffer_), '\n',
[this, self](const boost::system::error_code& error, std::size_t n)
{
// Check if the session was stopped while the operation was pending.
if (stopped())
return;
if (!error)
{
// Extract the newline-delimited message from the buffer.
std::string msg(input_buffer_.substr(0, n - 1));
input_buffer_.erase(0, n);
if (!msg.empty())
{
channel_.deliver(msg);
}
else
{
// We received a heartbeat message from the client. If there's
// nothing else being sent or ready to be sent, send a heartbeat
// right back.
if (output_queue_.empty())
{
output_queue_.push_back("\n");
// Signal that the output queue contains messages. Modifying
// the expiry will wake the output actor, if it is waiting on
// the timer.
non_empty_output_queue_.expires_at(
steady_timer::time_point::min());
}
}
read_line();
}
else
{
stop();
}
});
}
void await_output()
{
auto self(shared_from_this());
non_empty_output_queue_.async_wait(
[this, self](const boost::system::error_code& /*error*/)
{
// Check if the session was stopped while the operation was pending.
if (stopped())
return;
if (output_queue_.empty())
{
// There are no messages that are ready to be sent. The actor goes
// to sleep by waiting on the non_empty_output_queue_ timer. When a
// new message is added, the timer will be modified and the actor
// will wake.
non_empty_output_queue_.expires_at(steady_timer::time_point::max());
await_output();
}
else
{
write_line();
}
});
}
void write_line()
{
// Set a deadline for the write operation.
output_deadline_.expires_after(std::chrono::seconds(30));
// Start an asynchronous operation to send a message.
auto self(shared_from_this());
boost::asio::async_write(socket_,
boost::asio::buffer(output_queue_.front()),
[this, self](const boost::system::error_code& error, std::size_t /*n*/)
{
// Check if the session was stopped while the operation was pending.
if (stopped())
return;
if (!error)
{
output_queue_.pop_front();
await_output();
}
else
{
stop();
}
});
}
void check_deadline(steady_timer& deadline)
{
auto self(shared_from_this());
deadline.async_wait(
[this, self, &deadline](const boost::system::error_code& /*error*/)
{
// Check if the session was stopped while the operation was pending.
if (stopped())
return;
// Check whether the deadline has passed. We compare the deadline
// against the current time since a new asynchronous operation may
// have moved the deadline before this actor had a chance to run.
if (deadline.expiry() <= steady_timer::clock_type::now())
{
// The deadline has passed. Stop the session. The other actors will
// terminate as soon as possible.
stop();
}
else
{
// Put the actor back to sleep.
check_deadline(deadline);
}
});
}
channel& channel_;
tcp::socket socket_;
std::string input_buffer_;
steady_timer input_deadline_{socket_.get_executor()};
std::deque<std::string> output_queue_;
steady_timer non_empty_output_queue_{socket_.get_executor()};
steady_timer output_deadline_{socket_.get_executor()};
};
typedef std::shared_ptr<tcp_session> tcp_session_ptr;
//----------------------------------------------------------------------
class udp_broadcaster
: public subscriber
{
public:
udp_broadcaster(boost::asio::io_context& io_context,
const udp::endpoint& broadcast_endpoint)
: socket_(io_context)
{
socket_.connect(broadcast_endpoint);
socket_.set_option(udp::socket::broadcast(true));
}
private:
void deliver(const std::string& msg)
{
boost::system::error_code ignored_error;
socket_.send(boost::asio::buffer(msg), 0, ignored_error);
}
udp::socket socket_;
};
//----------------------------------------------------------------------
class server
{
public:
server(boost::asio::io_context& io_context,
const tcp::endpoint& listen_endpoint,
const udp::endpoint& broadcast_endpoint)
: io_context_(io_context),
acceptor_(io_context, listen_endpoint)
{
channel_.join(
std::make_shared<udp_broadcaster>(
io_context_, broadcast_endpoint));
accept();
}
private:
void accept()
{
acceptor_.async_accept(
[this](const boost::system::error_code& error, tcp::socket socket)
{
if (!error)
{
std::make_shared<tcp_session>(std::move(socket), channel_)->start();
}
accept();
});
}
boost::asio::io_context& io_context_;
tcp::acceptor acceptor_;
channel channel_;
};
//----------------------------------------------------------------------
int main(int argc, char* argv[])
{
try
{
using namespace std; // For atoi.
if (argc != 4)
{
std::cerr << "Usage: server <listen_port> <bcast_address> <bcast_port>\n";
return 1;
}
boost::asio::io_context io_context;
tcp::endpoint listen_endpoint(tcp::v4(), atoi(argv[1]));
udp::endpoint broadcast_endpoint(
boost::asio::ip::make_address(argv[2]), atoi(argv[3]));
server s(io_context, listen_endpoint, broadcast_endpoint);
io_context.run();
}
catch (std::exception& e)
{
std::cerr << "Exception: " << e.what() << "\n";
}
return 0;
}