[DEV] add v1.66.0

This commit is contained in:
2018-01-12 21:47:58 +01:00
parent 87059bb1af
commit a97e9ae7d4
49032 changed files with 7668950 additions and 0 deletions

View File

@@ -0,0 +1,261 @@
// Copyright 2003-2013 Christopher M. Kohlhoff
// Copyright Oliver Kowalke, Nat Goodspeed 2015.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <chrono>
#include <cstdlib>
#include <iomanip>
#include <iostream>
#include <map>
#include <memory>
#include <mutex>
#include <sstream>
#include <thread>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/fiber/all.hpp>
#include "round_robin.hpp"
#include "yield.hpp"
using boost::asio::ip::tcp;
const int max_length = 1024;
typedef boost::shared_ptr< tcp::socket > socket_ptr;
const char* const alpha = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
/*****************************************************************************
* thread names
*****************************************************************************/
class ThreadNames {
private:
std::map<std::thread::id, std::string> names_{};
const char* next_{ alpha };
std::mutex mtx_{};
public:
ThreadNames() = default;
std::string lookup() {
std::unique_lock<std::mutex> lk( mtx_);
auto this_id( std::this_thread::get_id() );
auto found = names_.find( this_id );
if ( found != names_.end() ) {
return found->second;
}
BOOST_ASSERT( *next_);
std::string name(1, *next_++ );
names_[ this_id ] = name;
return name;
}
};
ThreadNames thread_names;
/*****************************************************************************
* fiber names
*****************************************************************************/
class FiberNames {
private:
std::map<boost::fibers::fiber::id, std::string> names_{};
unsigned next_{ 0 };
boost::fibers::mutex mtx_{};
public:
FiberNames() = default;
std::string lookup() {
std::unique_lock<boost::fibers::mutex> lk( mtx_);
auto this_id( boost::this_fiber::get_id() );
auto found = names_.find( this_id );
if ( found != names_.end() ) {
return found->second;
}
std::ostringstream out;
// Bake into the fiber's name the thread name on which we first
// lookup() its ID, to be able to spot when a fiber hops between
// threads.
out << thread_names.lookup() << next_++;
std::string name( out.str() );
names_[ this_id ] = name;
return name;
}
};
FiberNames fiber_names;
std::string tag() {
std::ostringstream out;
out << "Thread " << thread_names.lookup() << ": "
<< std::setw(4) << fiber_names.lookup() << std::setw(0);
return out.str();
}
/*****************************************************************************
* message printing
*****************************************************************************/
void print_( std::ostream& out) {
out << '\n';
}
template < typename T, typename... Ts >
void print_( std::ostream& out, T const& arg, Ts const&... args) {
out << arg;
print_(out, args...);
}
template < typename... T >
void print( T const&... args ) {
std::ostringstream buffer;
print_( buffer, args...);
std::cout << buffer.str() << std::flush;
}
/*****************************************************************************
* fiber function per server connection
*****************************************************************************/
void session( socket_ptr sock) {
try {
for (;;) {
char data[max_length];
boost::system::error_code ec;
std::size_t length = sock->async_read_some(
boost::asio::buffer( data),
boost::fibers::asio::yield[ec]);
if ( ec == boost::asio::error::eof) {
break; //connection closed cleanly by peer
} else if ( ec) {
throw boost::system::system_error( ec); //some other error
}
print( tag(), ": handled: ", std::string(data, length));
boost::asio::async_write(
* sock,
boost::asio::buffer( data, length),
boost::fibers::asio::yield[ec]);
if ( ec == boost::asio::error::eof) {
break; //connection closed cleanly by peer
} else if ( ec) {
throw boost::system::system_error( ec); //some other error
}
}
print( tag(), ": connection closed");
} catch ( std::exception const& ex) {
print( tag(), ": caught exception : ", ex.what());
}
}
/*****************************************************************************
* listening server
*****************************************************************************/
void server( std::shared_ptr< boost::asio::io_service > const& io_svc, tcp::acceptor & a) {
print( tag(), ": echo-server started");
try {
for (;;) {
socket_ptr socket( new tcp::socket( * io_svc) );
boost::system::error_code ec;
a.async_accept(
* socket,
boost::fibers::asio::yield[ec]);
if ( ec) {
throw boost::system::system_error( ec); //some other error
} else {
boost::fibers::fiber( session, socket).detach();
}
}
} catch ( std::exception const& ex) {
print( tag(), ": caught exception : ", ex.what());
}
io_svc->stop();
print( tag(), ": echo-server stopped");
}
/*****************************************************************************
* fiber function per client
*****************************************************************************/
void client( std::shared_ptr< boost::asio::io_service > const& io_svc, tcp::acceptor & a,
boost::fibers::barrier& barrier, unsigned iterations) {
print( tag(), ": echo-client started");
for (unsigned count = 0; count < iterations; ++count) {
tcp::resolver resolver( * io_svc);
tcp::resolver::query query( tcp::v4(), "127.0.0.1", "9999");
tcp::resolver::iterator iterator = resolver.resolve( query);
tcp::socket s( * io_svc);
boost::asio::connect( s, iterator);
for (unsigned msg = 0; msg < 1; ++msg) {
std::ostringstream msgbuf;
msgbuf << "from " << fiber_names.lookup() << " " << count << "." << msg;
std::string message(msgbuf.str());
print( tag(), ": Sending: ", message);
boost::system::error_code ec;
boost::asio::async_write(
s,
boost::asio::buffer( message),
boost::fibers::asio::yield[ec]);
if ( ec == boost::asio::error::eof) {
return; //connection closed cleanly by peer
} else if ( ec) {
throw boost::system::system_error( ec); //some other error
}
char reply[max_length];
size_t reply_length = s.async_read_some(
boost::asio::buffer( reply, max_length),
boost::fibers::asio::yield[ec]);
if ( ec == boost::asio::error::eof) {
return; //connection closed cleanly by peer
} else if ( ec) {
throw boost::system::system_error( ec); //some other error
}
print( tag(), ": Reply : ", std::string( reply, reply_length));
}
}
// done with all iterations, wait for rest of client fibers
if ( barrier.wait()) {
// exactly one barrier.wait() call returns true
// we're the lucky one
a.close();
print( tag(), ": acceptor stopped");
}
print( tag(), ": echo-client stopped");
}
/*****************************************************************************
* main
*****************************************************************************/
int main( int argc, char* argv[]) {
try {
//[asio_rr_setup
std::shared_ptr< boost::asio::io_service > io_svc = std::make_shared< boost::asio::io_service >();
boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_svc);
//]
print( "Thread ", thread_names.lookup(), ": started");
//[asio_rr_launch_fibers
// server
tcp::acceptor a( * io_svc, tcp::endpoint( tcp::v4(), 9999) );
boost::fibers::fiber( server, io_svc, std::ref( a) ).detach();
// client
const unsigned iterations = 2;
const unsigned clients = 3;
boost::fibers::barrier b( clients);
for ( unsigned i = 0; i < clients; ++i) {
boost::fibers::fiber(
client, io_svc, std::ref( a), std::ref( b), iterations).detach();
}
//]
//[asio_rr_run
io_svc->run();
//]
print( tag(), ": io_service returned");
print( "Thread ", thread_names.lookup(), ": stopping");
std::cout << "done." << std::endl;
return EXIT_SUCCESS;
} catch ( std::exception const& e) {
print("Exception: ", e.what(), "\n");
}
return EXIT_FAILURE;
}

View File

@@ -0,0 +1,328 @@
// Copyright Oliver Kowalke, Nat Goodspeed 2015.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP
#define BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP
#include <boost/asio/async_result.hpp>
#include <boost/asio/detail/config.hpp>
#include <boost/asio/handler_type.hpp>
#include <boost/assert.hpp>
#include <boost/atomic.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/system/error_code.hpp>
#include <boost/system/system_error.hpp>
#include <boost/throw_exception.hpp>
#include <boost/fiber/all.hpp>
#include <mutex> // std::unique_lock
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace asio {
namespace detail {
//[fibers_asio_yield_completion
// Bundle a completion bool flag with a spinlock to protect it.
struct yield_completion {
enum state_t {
init,
waiting,
complete
};
typedef fibers::detail::spinlock mutex_t;
typedef std::unique_lock< mutex_t > lock_t;
typedef boost::intrusive_ptr< yield_completion > ptr_t;
std::atomic< std::size_t > use_count_{ 0 };
mutex_t mtx_{};
state_t state_{ init };
void wait() {
// yield_handler_base::operator()() will set state_ `complete` and
// attempt to wake a suspended fiber. It would be Bad if that call
// happened between our detecting (complete != state_) and suspending.
lock_t lk{ mtx_ };
// If state_ is already set, we're done here: don't suspend.
if ( complete != state_) {
state_ = waiting;
// suspend(unique_lock<spinlock>) unlocks the lock in the act of
// resuming another fiber
fibers::context::active()->suspend( lk);
}
}
friend void intrusive_ptr_add_ref( yield_completion * yc) noexcept {
BOOST_ASSERT( nullptr != yc);
yc->use_count_.fetch_add( 1, std::memory_order_relaxed);
}
friend void intrusive_ptr_release( yield_completion * yc) noexcept {
BOOST_ASSERT( nullptr != yc);
if ( 1 == yc->use_count_.fetch_sub( 1, std::memory_order_release) ) {
std::atomic_thread_fence( std::memory_order_acquire);
delete yc;
}
}
};
//]
//[fibers_asio_yield_handler_base
// This class encapsulates common elements between yield_handler<T> (capturing
// a value to return from asio async function) and yield_handler<void> (no
// such value). See yield_handler<T> and its <void> specialization below. Both
// yield_handler<T> and yield_handler<void> are passed by value through
// various layers of asio functions. In other words, they're potentially
// copied multiple times. So key data such as the yield_completion instance
// must be stored in our async_result<yield_handler<>> specialization, which
// should be instantiated only once.
class yield_handler_base {
public:
yield_handler_base( yield_t const& y) :
// capture the context* associated with the running fiber
ctx_{ boost::fibers::context::active() },
// capture the passed yield_t
yt_( y ) {
}
// completion callback passing only (error_code)
void operator()( boost::system::error_code const& ec) {
BOOST_ASSERT_MSG( ycomp_,
"Must inject yield_completion* "
"before calling yield_handler_base::operator()()");
BOOST_ASSERT_MSG( yt_.ec_,
"Must inject boost::system::error_code* "
"before calling yield_handler_base::operator()()");
// If originating fiber is busy testing state_ flag, wait until it
// has observed (completed != state_).
yield_completion::lock_t lk{ ycomp_->mtx_ };
yield_completion::state_t state = ycomp_->state_;
// Notify a subsequent yield_completion::wait() call that it need not
// suspend.
ycomp_->state_ = yield_completion::complete;
// set the error_code bound by yield_t
* yt_.ec_ = ec;
// unlock the lock that protects state_
lk.unlock();
// If ctx_ is still active, e.g. because the async operation
// immediately called its callback (this method!) before the asio
// async function called async_result_base::get(), we must not set it
// ready.
if ( yield_completion::waiting == state) {
// wake the fiber
fibers::context::active()->schedule( ctx_);
}
}
//private:
boost::fibers::context * ctx_;
yield_t yt_;
// We depend on this pointer to yield_completion, which will be injected
// by async_result.
yield_completion::ptr_t ycomp_{};
};
//]
//[fibers_asio_yield_handler_T
// asio uses handler_type<completion token type, signature>::type to decide
// what to instantiate as the actual handler. Below, we specialize
// handler_type< yield_t, ... > to indicate yield_handler<>. So when you pass
// an instance of yield_t as an asio completion token, asio selects
// yield_handler<> as the actual handler class.
template< typename T >
class yield_handler: public yield_handler_base {
public:
// asio passes the completion token to the handler constructor
explicit yield_handler( yield_t const& y) :
yield_handler_base{ y } {
}
// completion callback passing only value (T)
void operator()( T t) {
// just like callback passing success error_code
(*this)( boost::system::error_code(), std::move(t) );
}
// completion callback passing (error_code, T)
void operator()( boost::system::error_code const& ec, T t) {
BOOST_ASSERT_MSG( value_,
"Must inject value ptr "
"before caling yield_handler<T>::operator()()");
// move the value to async_result<> instance BEFORE waking up a
// suspended fiber
* value_ = std::move( t);
// forward the call to base-class completion handler
yield_handler_base::operator()( ec);
}
//private:
// pointer to destination for eventual value
// this must be injected by async_result before operator()() is called
T * value_{ nullptr };
};
//]
//[fibers_asio_yield_handler_void
// yield_handler<void> is like yield_handler<T> without value_. In fact it's
// just like yield_handler_base.
template<>
class yield_handler< void >: public yield_handler_base {
public:
explicit yield_handler( yield_t const& y) :
yield_handler_base{ y } {
}
// nullary completion callback
void operator()() {
( * this)( boost::system::error_code() );
}
// inherit operator()(error_code) overload from base class
using yield_handler_base::operator();
};
//]
// Specialize asio_handler_invoke hook to ensure that any exceptions thrown
// from the handler are propagated back to the caller
template< typename Fn, typename T >
void asio_handler_invoke( Fn fn, yield_handler< T > * h) {
fn();
}
//[fibers_asio_async_result_base
// Factor out commonality between async_result<yield_handler<T>> and
// async_result<yield_handler<void>>
class async_result_base {
public:
explicit async_result_base( yield_handler_base & h) :
ycomp_{ new yield_completion{} } {
// Inject ptr to our yield_completion instance into this
// yield_handler<>.
h.ycomp_ = this->ycomp_;
// if yield_t didn't bind an error_code, make yield_handler_base's
// error_code* point to an error_code local to this object so
// yield_handler_base::operator() can unconditionally store through
// its error_code*
if ( ! h.yt_.ec_) {
h.yt_.ec_ = & ec_;
}
}
void get() {
// Unless yield_handler_base::operator() has already been called,
// suspend the calling fiber until that call.
ycomp_->wait();
// The only way our own ec_ member could have a non-default value is
// if our yield_handler did not have a bound error_code AND the
// completion callback passed a non-default error_code.
if ( ec_) {
throw_exception( boost::system::system_error{ ec_ } );
}
}
private:
// If yield_t does not bind an error_code instance, store into here.
boost::system::error_code ec_{};
yield_completion::ptr_t ycomp_;
};
//]
}}}}
namespace boost {
namespace asio {
//[fibers_asio_async_result_T
// asio constructs an async_result<> instance from the yield_handler specified
// by handler_type<>::type. A particular asio async method constructs the
// yield_handler, constructs this async_result specialization from it, then
// returns the result of calling its get() method.
template< typename T >
class async_result< boost::fibers::asio::detail::yield_handler< T > > :
public boost::fibers::asio::detail::async_result_base {
public:
// type returned by get()
typedef T type;
explicit async_result( boost::fibers::asio::detail::yield_handler< T > & h) :
boost::fibers::asio::detail::async_result_base{ h } {
// Inject ptr to our value_ member into yield_handler<>: result will
// be stored here.
h.value_ = & value_;
}
// asio async method returns result of calling get()
type get() {
boost::fibers::asio::detail::async_result_base::get();
return std::move( value_);
}
private:
type value_{};
};
//]
//[fibers_asio_async_result_void
// Without the need to handle a passed value, our yield_handler<void>
// specialization is just like async_result_base.
template<>
class async_result< boost::fibers::asio::detail::yield_handler< void > > :
public boost::fibers::asio::detail::async_result_base {
public:
typedef void type;
explicit async_result( boost::fibers::asio::detail::yield_handler< void > & h):
boost::fibers::asio::detail::async_result_base{ h } {
}
};
//]
// Handler type specialisation for fibers::asio::yield.
// When 'yield' is passed as a completion handler which accepts no parameters,
// use yield_handler<void>.
template< typename ReturnType >
struct handler_type< fibers::asio::yield_t, ReturnType() >
{ typedef fibers::asio::detail::yield_handler< void > type; };
// Handler type specialisation for fibers::asio::yield.
// When 'yield' is passed as a completion handler which accepts a data
// parameter, use yield_handler<parameter type> to return that parameter to
// the caller.
template< typename ReturnType, typename Arg1 >
struct handler_type< fibers::asio::yield_t, ReturnType( Arg1) >
{ typedef fibers::asio::detail::yield_handler< Arg1 > type; };
//[asio_handler_type
// Handler type specialisation for fibers::asio::yield.
// When 'yield' is passed as a completion handler which accepts only
// error_code, use yield_handler<void>. yield_handler will take care of the
// error_code one way or another.
template< typename ReturnType >
struct handler_type< fibers::asio::yield_t, ReturnType( boost::system::error_code) >
{ typedef fibers::asio::detail::yield_handler< void > type; };
//]
// Handler type specialisation for fibers::asio::yield.
// When 'yield' is passed as a completion handler which accepts a data
// parameter and an error_code, use yield_handler<parameter type> to return
// just the parameter to the caller. yield_handler will take care of the
// error_code one way or another.
template< typename ReturnType, typename Arg2 >
struct handler_type< fibers::asio::yield_t, ReturnType( boost::system::error_code, Arg2) >
{ typedef fibers::asio::detail::yield_handler< Arg2 > type; };
}}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_ASIO_DETAIL_YIELD_HPP

View File

@@ -0,0 +1,56 @@
// Copyright Arnaud Kapp, Oliver Kowalke 2016
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <iostream>
#include <memory>
#include <thread>
#include <boost/asio.hpp>
#include <boost/fiber/all.hpp>
#include "round_robin.hpp"
std::shared_ptr< boost::fibers::unbuffered_channel< int > > c;
void foo() {
auto io_ptr = std::make_shared< boost::asio::io_service >();
boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_ptr);
boost::fibers::fiber([io_ptr](){
for ( int i = 0; i < 10; ++i) {
std::cout << "push " << i << std::endl;
c->push( i);
}
c->close();
io_ptr->stop();
}).detach();
io_ptr->run();
}
void bar() {
auto io_ptr = std::make_shared< boost::asio::io_service >();
boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_ptr);
boost::fibers::fiber([io_ptr](){
try {
for (;;) {
int i = c->value_pop();
std::cout << "pop " << i << std::endl;
}
} catch ( std::exception const& e) {
std::cout << "exception: " << e.what() << std::endl;
}
io_ptr->stop();
}).detach();
io_ptr->run();
}
int main() {
c = std::make_shared< boost::fibers::unbuffered_channel< int > >();
std::thread t1( foo);
std::thread t2( bar);
t2.join();
t1.join();
std::cout << "done." << std::endl;
return 0;
}

View File

@@ -0,0 +1,52 @@
//
// blocking_tcp_echo_client.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
enum {
max_length = 1024
};
int main( int argc, char* argv[]) {
try {
if ( 3 != argc) {
std::cerr << "Usage: publisher <host> <queue>\n";
return EXIT_FAILURE;
}
boost::asio::io_service io_service;
tcp::resolver resolver( io_service);
tcp::resolver::query query( tcp::v4(), argv[1], "9997");
tcp::resolver::iterator iterator = resolver.resolve(query);
tcp::socket s( io_service);
boost::asio::connect( s, iterator);
char msg[max_length];
std::string queue( argv[2]);
std::memset( msg, '\0', max_length);
std::memcpy( msg, queue.c_str(), queue.size() );
boost::asio::write( s, boost::asio::buffer( msg, max_length) );
for (;;) {
std::cout << "publish: ";
char request[max_length];
std::cin.getline( request, max_length);
boost::asio::write( s, boost::asio::buffer( request, max_length) );
}
return EXIT_SUCCESS;
} catch ( std::exception const& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return EXIT_FAILURE;
}

View File

@@ -0,0 +1,381 @@
// Copyright Oliver Kowalke 2015.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#include <cstddef>
#include <cstdlib>
#include <map>
#include <memory>
#include <set>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/utility.hpp>
#include <boost/fiber/all.hpp>
#include "../round_robin.hpp"
#include "../yield.hpp"
using boost::asio::ip::tcp;
const std::size_t max_length = 1024;
class subscriber_session;
typedef std::shared_ptr< subscriber_session > subscriber_session_ptr;
// a queue has n subscribers (subscriptions)
// this class holds a list of subcribers for one queue
class subscriptions {
public:
~subscriptions();
// subscribe to this queue
void subscribe( subscriber_session_ptr const& s) {
subscribers_.insert( s);
}
// unsubscribe from this queue
void unsubscribe( subscriber_session_ptr const& s) {
subscribers_.erase(s);
}
// publish a message, e.g. push this message to all subscribers
void publish( std::string const& msg);
private:
// list of subscribers
std::set< subscriber_session_ptr > subscribers_;
};
// a class to register queues and to subsribe clients to this queues
class registry : private boost::noncopyable {
private:
typedef std::map< std::string, std::shared_ptr< subscriptions > > queues_cont;
typedef queues_cont::iterator queues_iter;
boost::fibers::mutex mtx_;
queues_cont queues_;
void register_queue_( std::string const& queue) {
if ( queues_.end() != queues_.find( queue) ) {
throw std::runtime_error("queue already exists");
}
queues_[queue] = std::make_shared< subscriptions >();
std::cout << "new queue '" << queue << "' registered" << std::endl;
}
void unregister_queue_( std::string const& queue) {
queues_.erase( queue);
std::cout << "queue '" << queue << "' unregistered" << std::endl;
}
void subscribe_( std::string const& queue, subscriber_session_ptr s) {
queues_iter iter = queues_.find( queue);
if ( queues_.end() == iter ) {
throw std::runtime_error("queue does not exist");
}
iter->second->subscribe( s);
std::cout << "new subscription to queue '" << queue << "'" << std::endl;
}
void unsubscribe_( std::string const& queue, subscriber_session_ptr s) {
queues_iter iter = queues_.find( queue);
if ( queues_.end() != iter ) {
iter->second->unsubscribe( s);
}
}
void publish_( std::string const& queue, std::string const& msg) {
queues_iter iter = queues_.find( queue);
if ( queues_.end() == iter ) {
throw std::runtime_error("queue does not exist");
}
iter->second->publish( msg);
std::cout << "message '" << msg << "' to publish on queue '" << queue << "'" << std::endl;
}
public:
// add a queue to registry
void register_queue( std::string const& queue) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
register_queue_( queue);
}
// remove a queue from registry
void unregister_queue( std::string const& queue) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
unregister_queue_( queue);
}
// subscribe to a queue
void subscribe( std::string const& queue, subscriber_session_ptr s) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
subscribe_( queue, s);
}
// unsubscribe from a queue
void unsubscribe( std::string const& queue, subscriber_session_ptr s) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
unsubscribe_( queue, s);
}
// publish a message to all subscribers registerd to the queue
void publish( std::string const& queue, std::string const& msg) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
publish_( queue, msg);
}
};
// a subscriber subscribes to a given queue in order to receive messages published on this queue
class subscriber_session : public std::enable_shared_from_this< subscriber_session > {
public:
explicit subscriber_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
socket_( * io_service),
reg_( reg) {
}
tcp::socket& socket() {
return socket_;
}
// this function is executed inside the fiber
void run() {
std::string queue;
try {
boost::system::error_code ec;
// read first message == queue name
// async_ready() returns if the the complete message is read
// until this the fiber is suspended until the complete message
// is read int the given buffer 'data'
boost::asio::async_read(
socket_,
boost::asio::buffer( data_),
boost::fibers::asio::yield[ec]);
if ( ec) {
throw std::runtime_error("no queue from subscriber");
}
// first message ist equal to the queue name the publisher
// publishes to
queue = data_;
// subscribe to new queue
reg_.subscribe( queue, shared_from_this() );
// read published messages
for (;;) {
// wait for a conditon-variable for new messages
// the fiber will be suspended until the condtion
// becomes true and the fiber is resumed
// published message is stored in buffer 'data_'
std::unique_lock< boost::fibers::mutex > lk( mtx_);
cond_.wait( lk);
std::string data( data_);
lk.unlock();
// message '<fini>' terminates subscription
if ( "<fini>" == data) {
break;
}
// async. write message to socket connected with
// subscriber
// async_write() returns if the complete message was writen
// the fiber is suspended in the meanwhile
boost::asio::async_write(
socket_,
boost::asio::buffer( data, data.size() ),
boost::fibers::asio::yield[ec]);
if ( ec == boost::asio::error::eof) {
break; //connection closed cleanly by peer
} else if ( ec) {
throw boost::system::system_error( ec); //some other error
}
std::cout << "subscriber::run(): '" << data << "' written" << std::endl;
}
} catch ( std::exception const& e) {
std::cerr << "subscriber [" << queue << "] failed: " << e.what() << std::endl;
}
// close socket
socket_.close();
// unregister queue
reg_.unsubscribe( queue, shared_from_this() );
}
// called from publisher_session (running in other fiber)
void publish( std::string const& msg) {
std::unique_lock< boost::fibers::mutex > lk( mtx_);
std::memset( data_, '\0', sizeof( data_));
std::memcpy( data_, msg.c_str(), (std::min)(max_length, msg.size()));
cond_.notify_one();
}
private:
tcp::socket socket_;
registry & reg_;
boost::fibers::mutex mtx_;
boost::fibers::condition_variable cond_;
// fixed size message
char data_[max_length];
};
subscriptions::~subscriptions() {
for ( subscriber_session_ptr s : subscribers_) {
s->publish("<fini>");
}
}
void
subscriptions::publish( std::string const& msg) {
for ( subscriber_session_ptr s : subscribers_) {
s->publish( msg);
}
}
// a publisher publishes messages on its queue
// subscriber might register to this queue to get the published messages
class publisher_session : public std::enable_shared_from_this< publisher_session > {
public:
explicit publisher_session( std::shared_ptr< boost::asio::io_service > const& io_service, registry & reg) :
socket_( * io_service),
reg_( reg) {
}
tcp::socket& socket() {
return socket_;
}
// this function is executed inside the fiber
void run() {
std::string queue;
try {
boost::system::error_code ec;
// fixed size message
char data[max_length];
// read first message == queue name
// async_ready() returns if the the complete message is read
// until this the fiber is suspended until the complete message
// is read int the given buffer 'data'
boost::asio::async_read(
socket_,
boost::asio::buffer( data),
boost::fibers::asio::yield[ec]);
if ( ec) {
throw std::runtime_error("no queue from publisher");
}
// first message ist equal to the queue name the publisher
// publishes to
queue = data;
// register the new queue
reg_.register_queue( queue);
// start publishing messages
for (;;) {
// read message from publisher asyncronous
// async_read() suspends this fiber until the complete emssage is read
// and stored in the given buffer 'data'
boost::asio::async_read(
socket_,
boost::asio::buffer( data),
boost::fibers::asio::yield[ec]);
if ( ec == boost::asio::error::eof) {
break; //connection closed cleanly by peer
} else if ( ec) {
throw boost::system::system_error( ec); //some other error
}
// publish message to all subscribers
reg_.publish( queue, std::string( data) );
}
} catch ( std::exception const& e) {
std::cerr << "publisher [" << queue << "] failed: " << e.what() << std::endl;
}
// close socket
socket_.close();
// unregister queue
reg_.unregister_queue( queue);
}
private:
tcp::socket socket_;
registry & reg_;
};
typedef std::shared_ptr< publisher_session > publisher_session_ptr;
// function accepts connections requests from clients acting as a publisher
void accept_publisher( std::shared_ptr< boost::asio::io_service > const& io_service,
unsigned short port,
registry & reg) {
// create TCP-acceptor
tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
// loop for accepting connection requests
for (;;) {
boost::system::error_code ec;
// create new publisher-session
// this instance will be associated with one publisher
publisher_session_ptr new_publisher_session =
std::make_shared< publisher_session >( io_service, std::ref( reg) );
// async. accept of new connection request
// this function will suspend this execution context (fiber) until a
// connection was established, after returning from this function a new client (publisher)
// is connected
acceptor.async_accept(
new_publisher_session->socket(),
boost::fibers::asio::yield[ec]);
if ( ! ec) {
// run the new publisher in its own fiber (one fiber for one client)
boost::fibers::fiber(
std::bind( & publisher_session::run, new_publisher_session) ).detach();
}
}
}
// function accepts connections requests from clients acting as a subscriber
void accept_subscriber( std::shared_ptr< boost::asio::io_service > const& io_service,
unsigned short port,
registry & reg) {
// create TCP-acceptor
tcp::acceptor acceptor( * io_service, tcp::endpoint( tcp::v4(), port) );
// loop for accepting connection requests
for (;;) {
boost::system::error_code ec;
// create new subscriber-session
// this instance will be associated with one subscriber
subscriber_session_ptr new_subscriber_session =
std::make_shared< subscriber_session >( io_service, std::ref( reg) );
// async. accept of new connection request
// this function will suspend this execution context (fiber) until a
// connection was established, after returning from this function a new client (subscriber)
// is connected
acceptor.async_accept(
new_subscriber_session->socket(),
boost::fibers::asio::yield[ec]);
if ( ! ec) {
// run the new subscriber in its own fiber (one fiber for one client)
boost::fibers::fiber(
std::bind( & subscriber_session::run, new_subscriber_session) ).detach();
}
}
}
int main( int argc, char* argv[]) {
try {
// create io_service for async. I/O
std::shared_ptr< boost::asio::io_service > io_service = std::make_shared< boost::asio::io_service >();
// register asio scheduler
boost::fibers::use_scheduling_algorithm< boost::fibers::asio::round_robin >( io_service);
// registry for queues and its subscription
registry reg;
// create an acceptor for publishers, run it as fiber
boost::fibers::fiber(
accept_publisher, std::ref( io_service), 9997, std::ref( reg) ).detach();
// create an acceptor for subscribers, run it as fiber
boost::fibers::fiber(
accept_subscriber, std::ref( io_service), 9998, std::ref( reg) ).detach();
// dispatch
io_service->run();
return EXIT_SUCCESS;
} catch ( std::exception const& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return EXIT_FAILURE;
}

View File

@@ -0,0 +1,53 @@
//
// blocking_tcp_echo_client.cpp
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~
//
// Copyright (c) 2003-2013 Christopher M. Kohlhoff (chris at kohlhoff dot com)
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <boost/asio.hpp>
using boost::asio::ip::tcp;
enum {
max_length = 1024
};
int main( int argc, char* argv[]) {
try {
if ( 3 != argc) {
std::cerr << "Usage: subscriber <host> <queue>\n";
return EXIT_FAILURE;
}
boost::asio::io_service io_service;
tcp::resolver resolver( io_service);
tcp::resolver::query query( tcp::v4(), argv[1], "9998");
tcp::resolver::iterator iterator = resolver.resolve( query);
tcp::socket s( io_service);
boost::asio::connect( s, iterator);
char msg[max_length];
std::string queue( argv[2]);
std::memset( msg, '\0', max_length);
std::memcpy( msg, queue.c_str(), queue.size() );
boost::asio::write( s, boost::asio::buffer( msg, max_length) );
for (;;) {
char reply[max_length];
size_t reply_length = s.read_some( boost::asio::buffer( reply, max_length) );
std::cout << "published: ";
std::cout.write( reply, reply_length);
std::cout << std::endl;
}
return EXIT_SUCCESS;
} catch ( std::exception const& e) {
std::cerr << "Exception: " << e.what() << "\n";
}
return EXIT_FAILURE;
}

View File

@@ -0,0 +1,194 @@
// Copyright Oliver Kowalke 2013.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_FIBERS_ASIO_ROUND_ROBIN_H
#define BOOST_FIBERS_ASIO_ROUND_ROBIN_H
#include <chrono>
#include <cstddef>
#include <memory>
#include <mutex>
#include <queue>
#include <boost/asio.hpp>
#include <boost/assert.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/config.hpp>
#include <boost/fiber/condition_variable.hpp>
#include <boost/fiber/context.hpp>
#include <boost/fiber/mutex.hpp>
#include <boost/fiber/operations.hpp>
#include <boost/fiber/scheduler.hpp>
#include "yield.hpp"
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace asio {
class round_robin : public algo::algorithm {
private:
//[asio_rr_suspend_timer
std::shared_ptr< boost::asio::io_service > io_svc_;
boost::asio::steady_timer suspend_timer_;
//]
boost::fibers::scheduler::ready_queue_type rqueue_{};
boost::fibers::mutex mtx_{};
boost::fibers::condition_variable cnd_{};
std::size_t counter_{ 0 };
public:
//[asio_rr_service_top
struct service : public boost::asio::io_service::service {
static boost::asio::io_service::id id;
std::unique_ptr< boost::asio::io_service::work > work_;
service( boost::asio::io_service & io_svc) :
boost::asio::io_service::service( io_svc),
work_{ new boost::asio::io_service::work( io_svc) } {
}
virtual ~service() {}
service( service const&) = delete;
service & operator=( service const&) = delete;
void shutdown_service() override final {
work_.reset();
}
};
//]
//[asio_rr_ctor
round_robin( std::shared_ptr< boost::asio::io_service > const& io_svc) :
io_svc_( io_svc),
suspend_timer_( * io_svc_) {
// We use add_service() very deliberately. This will throw
// service_already_exists if you pass the same io_service instance to
// more than one round_robin instance.
boost::asio::add_service( * io_svc_, new service( * io_svc_) );
io_svc_->post([this]() mutable {
//]
//[asio_rr_service_lambda
while ( ! io_svc_->stopped() ) {
if ( has_ready_fibers() ) {
// run all pending handlers in round_robin
while ( io_svc_->poll() );
// block this fiber till all pending (ready) fibers are processed
// == round_robin::suspend_until() has been called
std::unique_lock< boost::fibers::mutex > lk( mtx_);
cnd_.wait( lk);
} else {
// run one handler inside io_service
// if no handler available, block this thread
if ( ! io_svc_->run_one() ) {
break;
}
}
}
//]
});
}
void awakened( context * ctx) noexcept {
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( ! ctx->ready_is_linked() );
ctx->ready_link( rqueue_); /*< fiber, enqueue on ready queue >*/
if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
++counter_;
}
}
context * pick_next() noexcept {
context * ctx( nullptr);
if ( ! rqueue_.empty() ) { /*<
pop an item from the ready queue
>*/
ctx = & rqueue_.front();
rqueue_.pop_front();
BOOST_ASSERT( nullptr != ctx);
BOOST_ASSERT( context::active() != ctx);
if ( ! ctx->is_context( boost::fibers::type::dispatcher_context) ) {
--counter_;
}
}
return ctx;
}
bool has_ready_fibers() const noexcept {
return 0 < counter_;
}
//[asio_rr_suspend_until
void suspend_until( std::chrono::steady_clock::time_point const& abs_time) noexcept {
// Set a timer so at least one handler will eventually fire, causing
// run_one() to eventually return.
if ( (std::chrono::steady_clock::time_point::max)() != abs_time) {
// Each expires_at(time_point) call cancels any previous pending
// call. We could inadvertently spin like this:
// dispatcher calls suspend_until() with earliest wake time
// suspend_until() sets suspend_timer_
// lambda loop calls run_one()
// some other asio handler runs before timer expires
// run_one() returns to lambda loop
// lambda loop yields to dispatcher
// dispatcher finds no ready fibers
// dispatcher calls suspend_until() with SAME wake time
// suspend_until() sets suspend_timer_ to same time, canceling
// previous async_wait()
// lambda loop calls run_one()
// asio calls suspend_timer_ handler with operation_aborted
// run_one() returns to lambda loop... etc. etc.
// So only actually set the timer when we're passed a DIFFERENT
// abs_time value.
suspend_timer_.expires_at( abs_time);
suspend_timer_.async_wait([](boost::system::error_code const&){
this_fiber::yield();
});
}
cnd_.notify_one();
}
//]
//[asio_rr_notify
void notify() noexcept {
// Something has happened that should wake one or more fibers BEFORE
// suspend_timer_ expires. Reset the timer to cause it to fire
// immediately, causing the run_one() call to return. In theory we
// could use cancel() because we don't care whether suspend_timer_'s
// handler is called with operation_aborted or success. However --
// cancel() doesn't change the expiration time, and we use
// suspend_timer_'s expiration time to decide whether it's already
// set. If suspend_until() set some specific wake time, then notify()
// canceled it, then suspend_until() was called again with the same
// wake time, it would match suspend_timer_'s expiration time and we'd
// refrain from setting the timer. So instead of simply calling
// cancel(), reset the timer, which cancels the pending sleep AND sets
// a new expiration time. This will cause us to spin the loop twice --
// once for the operation_aborted handler, once for timer expiration
// -- but that shouldn't be a big problem.
suspend_timer_.async_wait([](boost::system::error_code const&){
this_fiber::yield();
});
suspend_timer_.expires_at( std::chrono::steady_clock::now() );
}
//]
};
boost::asio::io_service::id round_robin::service::id;
}}}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#endif // BOOST_FIBERS_ASIO_ROUND_ROBIN_H

View File

@@ -0,0 +1,63 @@
// Copyright 2003-2013 Christopher M. Kohlhoff
// Copyright Oliver Kowalke, Nat Goodspeed 2015.
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
#ifndef BOOST_FIBERS_ASIO_YIELD_HPP
#define BOOST_FIBERS_ASIO_YIELD_HPP
#include <boost/config.hpp>
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_PREFIX
#endif
namespace boost {
namespace fibers {
namespace asio {
//[fibers_asio_yield_t
class yield_t {
public:
yield_t() = default;
/**
* @code
* static yield_t yield;
* boost::system::error_code myec;
* func(yield[myec]);
* @endcode
* @c yield[myec] returns an instance of @c yield_t whose @c ec_ points
* to @c myec. The expression @c yield[myec] "binds" @c myec to that
* (anonymous) @c yield_t instance, instructing @c func() to store any
* @c error_code it might produce into @c myec rather than throwing @c
* boost::system::system_error.
*/
yield_t operator[]( boost::system::error_code & ec) const {
yield_t tmp;
tmp.ec_ = & ec;
return tmp;
}
//private:
// ptr to bound error_code instance if any
boost::system::error_code * ec_{ nullptr };
};
//]
//[fibers_asio_yield
// canonical instance
thread_local yield_t yield{};
//]
}}}
#ifdef BOOST_HAS_ABI_HEADERS
# include BOOST_ABI_SUFFIX
#endif
#include "detail/yield.hpp"
#endif // BOOST_FIBERS_ASIO_YIELD_HPP