985 lines
37 KiB
C++
985 lines
37 KiB
C++
|
// Copyright Nat Goodspeed 2015.
|
||
|
// Distributed under the Boost Software License, Version 1.0.
|
||
|
// (See accompanying file LICENSE_1_0.txt or copy at
|
||
|
// http://www.boost.org/LICENSE_1_0.txt)
|
||
|
//
|
||
|
#include <algorithm>
|
||
|
#include <cassert>
|
||
|
#include <chrono>
|
||
|
#include <iostream>
|
||
|
#include <memory>
|
||
|
#include <sstream>
|
||
|
#include <string>
|
||
|
#include <type_traits>
|
||
|
#include <utility>
|
||
|
#include <vector>
|
||
|
|
||
|
#include <boost/fiber/all.hpp>
|
||
|
#include <boost/variant/variant.hpp>
|
||
|
#include <boost/variant/get.hpp>
|
||
|
|
||
|
// These are wait_something() functions rather than when_something()
|
||
|
// functions. A big part of the point of the Fiber library is to model
|
||
|
// sequencing using the processor's instruction pointer rather than chains of
|
||
|
// callbacks. The future-oriented when_all() / when_any() functions are still
|
||
|
// based on chains of callbacks. With Fiber, we can do better.
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* Verbose
|
||
|
*****************************************************************************/
|
||
|
class Verbose {
|
||
|
public:
|
||
|
Verbose( std::string const& d):
|
||
|
desc( d) {
|
||
|
std::cout << desc << " start" << std::endl;
|
||
|
}
|
||
|
|
||
|
~Verbose() {
|
||
|
std::cout << desc << " stop" << std::endl;
|
||
|
}
|
||
|
|
||
|
Verbose( Verbose const&) = delete;
|
||
|
Verbose & operator=( Verbose const&) = delete;
|
||
|
|
||
|
private:
|
||
|
const std::string desc;
|
||
|
};
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* Runner and Example
|
||
|
*****************************************************************************/
|
||
|
// collect and ultimately run every Example
|
||
|
class Runner {
|
||
|
typedef std::vector< std::pair< std::string, std::function< void() > > > function_list;
|
||
|
|
||
|
public:
|
||
|
void add( std::string const& desc, std::function< void() > const& func) {
|
||
|
functions_.push_back( function_list::value_type( desc, func) );
|
||
|
}
|
||
|
|
||
|
void run() {
|
||
|
for ( function_list::value_type const& pair : functions_) {
|
||
|
Verbose v( pair.first);
|
||
|
pair.second();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
function_list functions_;
|
||
|
};
|
||
|
|
||
|
Runner runner;
|
||
|
|
||
|
// Example allows us to embed Runner::add() calls at module scope
|
||
|
struct Example {
|
||
|
Example( Runner & runner, std::string const& desc, std::function< void() > const& func) {
|
||
|
runner.add( desc, func);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* example task functions
|
||
|
*****************************************************************************/
|
||
|
//[wait_sleeper
|
||
|
template< typename T >
|
||
|
T sleeper_impl( T item, int ms, bool thrw = false) {
|
||
|
std::ostringstream descb, funcb;
|
||
|
descb << item;
|
||
|
std::string desc( descb.str() );
|
||
|
funcb << " sleeper(" << item << ")";
|
||
|
Verbose v( funcb.str() );
|
||
|
|
||
|
boost::this_fiber::sleep_for( std::chrono::milliseconds( ms) );
|
||
|
if ( thrw) {
|
||
|
throw std::runtime_error( desc);
|
||
|
}
|
||
|
return item;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
inline
|
||
|
std::string sleeper( std::string const& item, int ms, bool thrw = false) {
|
||
|
return sleeper_impl( item, ms, thrw);
|
||
|
}
|
||
|
|
||
|
inline
|
||
|
double sleeper( double item, int ms, bool thrw = false) {
|
||
|
return sleeper_impl( item, ms, thrw);
|
||
|
}
|
||
|
|
||
|
inline
|
||
|
int sleeper(int item, int ms, bool thrw = false) {
|
||
|
return sleeper_impl( item, ms, thrw);
|
||
|
}
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* Done
|
||
|
*****************************************************************************/
|
||
|
//[wait_done
|
||
|
// Wrap canonical pattern for condition_variable + bool flag
|
||
|
struct Done {
|
||
|
private:
|
||
|
boost::fibers::condition_variable cond;
|
||
|
boost::fibers::mutex mutex;
|
||
|
bool ready = false;
|
||
|
|
||
|
public:
|
||
|
typedef std::shared_ptr< Done > ptr;
|
||
|
|
||
|
void wait() {
|
||
|
std::unique_lock< boost::fibers::mutex > lock( mutex);
|
||
|
cond.wait( lock, [this](){ return ready; });
|
||
|
}
|
||
|
|
||
|
void notify() {
|
||
|
{
|
||
|
std::unique_lock< boost::fibers::mutex > lock( mutex);
|
||
|
ready = true;
|
||
|
} // release mutex
|
||
|
cond.notify_one();
|
||
|
}
|
||
|
};
|
||
|
//]
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_any, simple completion
|
||
|
*****************************************************************************/
|
||
|
//[wait_first_simple_impl
|
||
|
// Degenerate case: when there are no functions to wait for, return
|
||
|
// immediately.
|
||
|
void wait_first_simple_impl( Done::ptr) {
|
||
|
}
|
||
|
|
||
|
// When there's at least one function to wait for, launch it and recur to
|
||
|
// process the rest.
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
void wait_first_simple_impl( Done::ptr done, Fn && function, Fns && ... functions) {
|
||
|
boost::fibers::fiber( [done, function](){
|
||
|
function();
|
||
|
done->notify();
|
||
|
}).detach();
|
||
|
wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// interface function: instantiate Done, launch tasks, wait for Done
|
||
|
//[wait_first_simple
|
||
|
template< typename ... Fns >
|
||
|
void wait_first_simple( Fns && ... functions) {
|
||
|
// Use shared_ptr because each function's fiber will bind it separately,
|
||
|
// and we're going to return before the last of them completes.
|
||
|
auto done( std::make_shared< Done >() );
|
||
|
wait_first_simple_impl( done, std::forward< Fns >( functions) ... );
|
||
|
done->wait();
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// example usage
|
||
|
Example wfs( runner, "wait_first_simple()", [](){
|
||
|
//[wait_first_simple_ex
|
||
|
wait_first_simple(
|
||
|
[](){ sleeper("wfs_long", 150); },
|
||
|
[](){ sleeper("wfs_medium", 100); },
|
||
|
[](){ sleeper("wfs_short", 50); });
|
||
|
//]
|
||
|
});
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_any, return value
|
||
|
*****************************************************************************/
|
||
|
// When there's only one function, call this overload
|
||
|
//[wait_first_value_impl
|
||
|
template< typename T, typename Fn >
|
||
|
void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
|
||
|
Fn && function) {
|
||
|
boost::fibers::fiber( [chan, function](){
|
||
|
// Ignore channel_op_status returned by push():
|
||
|
// might be closed; we simply don't care.
|
||
|
chan->push( function() );
|
||
|
}).detach();
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// When there are two or more functions, call this overload
|
||
|
template< typename T, typename Fn0, typename Fn1, typename ... Fns >
|
||
|
void wait_first_value_impl( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
|
||
|
Fn0 && function0,
|
||
|
Fn1 && function1,
|
||
|
Fns && ... functions) {
|
||
|
// process the first function using the single-function overload
|
||
|
wait_first_value_impl< T >( chan,
|
||
|
std::forward< Fn0 >( function0) );
|
||
|
// then recur to process the rest
|
||
|
wait_first_value_impl< T >( chan,
|
||
|
std::forward< Fn1 >( function1),
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
}
|
||
|
|
||
|
//[wait_first_value
|
||
|
// Assume that all passed functions have the same return type. The return type
|
||
|
// of wait_first_value() is the return type of the first passed function. It is
|
||
|
// simply invalid to pass NO functions.
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
typename std::result_of< Fn() >::type
|
||
|
wait_first_value( Fn && function, Fns && ... functions) {
|
||
|
typedef typename std::result_of< Fn() >::type return_t;
|
||
|
typedef boost::fibers::buffered_channel< return_t > channel_t;
|
||
|
auto chanp( std::make_shared< channel_t >( 64) );
|
||
|
// launch all the relevant fibers
|
||
|
wait_first_value_impl< return_t >( chanp,
|
||
|
std::forward< Fn >( function),
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
// retrieve the first value
|
||
|
return_t value( chanp->value_pop() );
|
||
|
// close the channel: no subsequent push() has to succeed
|
||
|
chanp->close();
|
||
|
return value;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// example usage
|
||
|
Example wfv( runner, "wait_first_value()", [](){
|
||
|
//[wait_first_value_ex
|
||
|
std::string result = wait_first_value(
|
||
|
[](){ return sleeper("wfv_third", 150); },
|
||
|
[](){ return sleeper("wfv_second", 100); },
|
||
|
[](){ return sleeper("wfv_first", 50); });
|
||
|
std::cout << "wait_first_value() => " << result << std::endl;
|
||
|
assert(result == "wfv_first");
|
||
|
//]
|
||
|
});
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_any, produce first outcome, whether result or exception
|
||
|
*****************************************************************************/
|
||
|
// When there's only one function, call this overload.
|
||
|
//[wait_first_outcome_impl
|
||
|
template< typename T, typename CHANP, typename Fn >
|
||
|
void wait_first_outcome_impl( CHANP chan, Fn && function) {
|
||
|
boost::fibers::fiber(
|
||
|
// Use std::bind() here for C++11 compatibility. C++11 lambda capture
|
||
|
// can't move a move-only Fn type, but bind() can. Let bind() move the
|
||
|
// channel pointer and the function into the bound object, passing
|
||
|
// references into the lambda.
|
||
|
std::bind(
|
||
|
[]( CHANP & chan,
|
||
|
typename std::decay< Fn >::type & function) {
|
||
|
// Instantiate a packaged_task to capture any exception thrown by
|
||
|
// function.
|
||
|
boost::fibers::packaged_task< T() > task( function);
|
||
|
// Immediately run this packaged_task on same fiber. We want
|
||
|
// function() to have completed BEFORE we push the future.
|
||
|
task();
|
||
|
// Pass the corresponding future to consumer. Ignore
|
||
|
// channel_op_status returned by push(): might be closed; we
|
||
|
// simply don't care.
|
||
|
chan->push( task.get_future() );
|
||
|
},
|
||
|
chan,
|
||
|
std::forward< Fn >( function)
|
||
|
)).detach();
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// When there are two or more functions, call this overload
|
||
|
template< typename T, typename CHANP, typename Fn0, typename Fn1, typename ... Fns >
|
||
|
void wait_first_outcome_impl( CHANP chan,
|
||
|
Fn0 && function0,
|
||
|
Fn1 && function1,
|
||
|
Fns && ... functions) {
|
||
|
// process the first function using the single-function overload
|
||
|
wait_first_outcome_impl< T >( chan,
|
||
|
std::forward< Fn0 >( function0) );
|
||
|
// then recur to process the rest
|
||
|
wait_first_outcome_impl< T >( chan,
|
||
|
std::forward< Fn1 >( function1),
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
}
|
||
|
|
||
|
// Assume that all passed functions have the same return type. The return type
|
||
|
// of wait_first_outcome() is the return type of the first passed function. It is
|
||
|
// simply invalid to pass NO functions.
|
||
|
//[wait_first_outcome
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
typename std::result_of< Fn() >::type
|
||
|
wait_first_outcome( Fn && function, Fns && ... functions) {
|
||
|
// In this case, the value we pass through the channel is actually a
|
||
|
// future -- which is already ready. future can carry either a value or an
|
||
|
// exception.
|
||
|
typedef typename std::result_of< Fn() >::type return_t;
|
||
|
typedef boost::fibers::future< return_t > future_t;
|
||
|
typedef boost::fibers::buffered_channel< future_t > channel_t;
|
||
|
auto chanp(std::make_shared< channel_t >( 64) );
|
||
|
// launch all the relevant fibers
|
||
|
wait_first_outcome_impl< return_t >( chanp,
|
||
|
std::forward< Fn >( function),
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
// retrieve the first future
|
||
|
future_t future( chanp->value_pop() );
|
||
|
// close the channel: no subsequent push() has to succeed
|
||
|
chanp->close();
|
||
|
// either return value or throw exception
|
||
|
return future.get();
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// example usage
|
||
|
Example wfo( runner, "wait_first_outcome()", [](){
|
||
|
//[wait_first_outcome_ex
|
||
|
std::string result = wait_first_outcome(
|
||
|
[](){ return sleeper("wfos_first", 50); },
|
||
|
[](){ return sleeper("wfos_second", 100); },
|
||
|
[](){ return sleeper("wfos_third", 150); });
|
||
|
std::cout << "wait_first_outcome(success) => " << result << std::endl;
|
||
|
assert(result == "wfos_first");
|
||
|
|
||
|
std::string thrown;
|
||
|
try {
|
||
|
result = wait_first_outcome(
|
||
|
[](){ return sleeper("wfof_first", 50, true); },
|
||
|
[](){ return sleeper("wfof_second", 100); },
|
||
|
[](){ return sleeper("wfof_third", 150); });
|
||
|
} catch ( std::exception const& e) {
|
||
|
thrown = e.what();
|
||
|
}
|
||
|
std::cout << "wait_first_outcome(fail) threw '" << thrown
|
||
|
<< "'" << std::endl;
|
||
|
assert(thrown == "wfof_first");
|
||
|
//]
|
||
|
});
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_any, collect exceptions until success; throw exception_list if no
|
||
|
* success
|
||
|
*****************************************************************************/
|
||
|
// define an exception to aggregate exception_ptrs; prefer
|
||
|
// std::exception_list (N4407 et al.) once that becomes available
|
||
|
//[exception_list
|
||
|
class exception_list : public std::runtime_error {
|
||
|
public:
|
||
|
exception_list( std::string const& what) :
|
||
|
std::runtime_error( what) {
|
||
|
}
|
||
|
|
||
|
typedef std::vector< std::exception_ptr > bundle_t;
|
||
|
|
||
|
// N4407 proposed std::exception_list API
|
||
|
typedef bundle_t::const_iterator iterator;
|
||
|
|
||
|
std::size_t size() const noexcept {
|
||
|
return bundle_.size();
|
||
|
}
|
||
|
|
||
|
iterator begin() const noexcept {
|
||
|
return bundle_.begin();
|
||
|
}
|
||
|
|
||
|
iterator end() const noexcept {
|
||
|
return bundle_.end();
|
||
|
}
|
||
|
|
||
|
// extension to populate
|
||
|
void add( std::exception_ptr ep) {
|
||
|
bundle_.push_back( ep);
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
bundle_t bundle_;
|
||
|
};
|
||
|
//]
|
||
|
|
||
|
// Assume that all passed functions have the same return type. The return type
|
||
|
// of wait_first_success() is the return type of the first passed function. It is
|
||
|
// simply invalid to pass NO functions.
|
||
|
//[wait_first_success
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
typename std::result_of< Fn() >::type
|
||
|
wait_first_success( Fn && function, Fns && ... functions) {
|
||
|
std::size_t count( 1 + sizeof ... ( functions) );
|
||
|
// In this case, the value we pass through the channel is actually a
|
||
|
// future -- which is already ready. future can carry either a value or an
|
||
|
// exception.
|
||
|
typedef typename std::result_of< typename std::decay< Fn >::type() >::type return_t;
|
||
|
typedef boost::fibers::future< return_t > future_t;
|
||
|
typedef boost::fibers::buffered_channel< future_t > channel_t;
|
||
|
auto chanp( std::make_shared< channel_t >( 64) );
|
||
|
// launch all the relevant fibers
|
||
|
wait_first_outcome_impl< return_t >( chanp,
|
||
|
std::forward< Fn >( function),
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
// instantiate exception_list, just in case
|
||
|
exception_list exceptions("wait_first_success() produced only errors");
|
||
|
// retrieve up to 'count' results -- but stop there!
|
||
|
for ( std::size_t i = 0; i < count; ++i) {
|
||
|
// retrieve the next future
|
||
|
future_t future( chanp->value_pop() );
|
||
|
// retrieve exception_ptr if any
|
||
|
std::exception_ptr error( future.get_exception_ptr() );
|
||
|
// if no error, then yay, return value
|
||
|
if ( ! error) {
|
||
|
// close the channel: no subsequent push() has to succeed
|
||
|
chanp->close();
|
||
|
// show caller the value we got
|
||
|
return future.get();
|
||
|
}
|
||
|
|
||
|
// error is non-null: collect
|
||
|
exceptions.add( error);
|
||
|
}
|
||
|
// We only arrive here when every passed function threw an exception.
|
||
|
// Throw our collection to inform caller.
|
||
|
throw exceptions;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// example usage
|
||
|
Example wfss( runner, "wait_first_success()", [](){
|
||
|
//[wait_first_success_ex
|
||
|
std::string result = wait_first_success(
|
||
|
[](){ return sleeper("wfss_first", 50, true); },
|
||
|
[](){ return sleeper("wfss_second", 100); },
|
||
|
[](){ return sleeper("wfss_third", 150); });
|
||
|
std::cout << "wait_first_success(success) => " << result << std::endl;
|
||
|
assert(result == "wfss_second");
|
||
|
//]
|
||
|
|
||
|
std::string thrown;
|
||
|
std::size_t count = 0;
|
||
|
try {
|
||
|
result = wait_first_success(
|
||
|
[](){ return sleeper("wfsf_first", 50, true); },
|
||
|
[](){ return sleeper("wfsf_second", 100, true); },
|
||
|
[](){ return sleeper("wfsf_third", 150, true); });
|
||
|
} catch ( exception_list const& e) {
|
||
|
thrown = e.what();
|
||
|
count = e.size();
|
||
|
} catch ( std::exception const& e) {
|
||
|
thrown = e.what();
|
||
|
}
|
||
|
std::cout << "wait_first_success(fail) threw '" << thrown << "': "
|
||
|
<< count << " errors" << std::endl;
|
||
|
assert(thrown == "wait_first_success() produced only errors");
|
||
|
assert(count == 3);
|
||
|
});
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_any, heterogeneous
|
||
|
*****************************************************************************/
|
||
|
//[wait_first_value_het
|
||
|
// No need to break out the first Fn for interface function: let the compiler
|
||
|
// complain if empty.
|
||
|
// Our functions have different return types, and we might have to return any
|
||
|
// of them. Use a variant, expanding std::result_of<Fn()>::type for each Fn in
|
||
|
// parameter pack.
|
||
|
template< typename ... Fns >
|
||
|
boost::variant< typename std::result_of< Fns() >::type ... >
|
||
|
wait_first_value_het( Fns && ... functions) {
|
||
|
// Use buffered_channel<boost::variant<T1, T2, ...>>; see remarks above.
|
||
|
typedef boost::variant< typename std::result_of< Fns() >::type ... > return_t;
|
||
|
typedef boost::fibers::buffered_channel< return_t > channel_t;
|
||
|
auto chanp( std::make_shared< channel_t >( 64) );
|
||
|
// launch all the relevant fibers
|
||
|
wait_first_value_impl< return_t >( chanp,
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
// retrieve the first value
|
||
|
return_t value( chanp->value_pop() );
|
||
|
// close the channel: no subsequent push() has to succeed
|
||
|
chanp->close();
|
||
|
return value;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// example usage
|
||
|
Example wfvh( runner, "wait_first_value_het()", [](){
|
||
|
//[wait_first_value_het_ex
|
||
|
boost::variant< std::string, double, int > result =
|
||
|
wait_first_value_het(
|
||
|
[](){ return sleeper("wfvh_third", 150); },
|
||
|
[](){ return sleeper(3.14, 100); },
|
||
|
[](){ return sleeper(17, 50); });
|
||
|
std::cout << "wait_first_value_het() => " << result << std::endl;
|
||
|
assert(boost::get< int >( result) == 17);
|
||
|
//]
|
||
|
});
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_all, simple completion
|
||
|
*****************************************************************************/
|
||
|
// Degenerate case: when there are no functions to wait for, return
|
||
|
// immediately.
|
||
|
void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier >) {
|
||
|
}
|
||
|
|
||
|
// When there's at least one function to wait for, launch it and recur to
|
||
|
// process the rest.
|
||
|
//[wait_all_simple_impl
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
void wait_all_simple_impl( std::shared_ptr< boost::fibers::barrier > barrier,
|
||
|
Fn && function, Fns && ... functions) {
|
||
|
boost::fibers::fiber(
|
||
|
std::bind(
|
||
|
[]( std::shared_ptr< boost::fibers::barrier > & barrier,
|
||
|
typename std::decay< Fn >::type & function) mutable {
|
||
|
function();
|
||
|
barrier->wait();
|
||
|
},
|
||
|
barrier,
|
||
|
std::forward< Fn >( function)
|
||
|
)).detach();
|
||
|
wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// interface function: instantiate barrier, launch tasks, wait for barrier
|
||
|
//[wait_all_simple
|
||
|
template< typename ... Fns >
|
||
|
void wait_all_simple( Fns && ... functions) {
|
||
|
std::size_t count( sizeof ... ( functions) );
|
||
|
// Initialize a barrier(count+1) because we'll immediately wait on it. We
|
||
|
// don't want to wake up until 'count' more fibers wait on it. Even though
|
||
|
// we'll stick around until the last of them completes, use shared_ptr
|
||
|
// anyway because it's easier to be confident about lifespan issues.
|
||
|
auto barrier( std::make_shared< boost::fibers::barrier >( count + 1) );
|
||
|
wait_all_simple_impl( barrier, std::forward< Fns >( functions) ... );
|
||
|
barrier->wait();
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// example usage
|
||
|
Example was( runner, "wait_all_simple()", [](){
|
||
|
//[wait_all_simple_ex
|
||
|
wait_all_simple(
|
||
|
[](){ sleeper("was_long", 150); },
|
||
|
[](){ sleeper("was_medium", 100); },
|
||
|
[](){ sleeper("was_short", 50); });
|
||
|
//]
|
||
|
});
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_all, return values
|
||
|
*****************************************************************************/
|
||
|
//[wait_nchannel
|
||
|
// Introduce a channel facade that closes the channel once a specific number
|
||
|
// of items has been pushed. This allows an arbitrary consumer to read until
|
||
|
// 'closed' without itself having to count items.
|
||
|
template< typename T >
|
||
|
class nchannel {
|
||
|
public:
|
||
|
nchannel( std::shared_ptr< boost::fibers::buffered_channel< T > > chan,
|
||
|
std::size_t lm):
|
||
|
chan_( chan),
|
||
|
limit_( lm) {
|
||
|
assert(chan_);
|
||
|
if ( 0 == limit_) {
|
||
|
chan_->close();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
boost::fibers::channel_op_status push( T && va) {
|
||
|
boost::fibers::channel_op_status ok =
|
||
|
chan_->push( std::forward< T >( va) );
|
||
|
if ( ok == boost::fibers::channel_op_status::success &&
|
||
|
--limit_ == 0) {
|
||
|
// after the 'limit_'th successful push, close the channel
|
||
|
chan_->close();
|
||
|
}
|
||
|
return ok;
|
||
|
}
|
||
|
|
||
|
private:
|
||
|
std::shared_ptr< boost::fibers::buffered_channel< T > > chan_;
|
||
|
std::size_t limit_;
|
||
|
};
|
||
|
//]
|
||
|
|
||
|
// When there's only one function, call this overload
|
||
|
//[wait_all_values_impl
|
||
|
template< typename T, typename Fn >
|
||
|
void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
|
||
|
Fn && function) {
|
||
|
boost::fibers::fiber( [chan, function](){
|
||
|
chan->push(function());
|
||
|
}).detach();
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// When there are two or more functions, call this overload
|
||
|
template< typename T, typename Fn0, typename Fn1, typename ... Fns >
|
||
|
void wait_all_values_impl( std::shared_ptr< nchannel< T > > chan,
|
||
|
Fn0 && function0,
|
||
|
Fn1 && function1,
|
||
|
Fns && ... functions) {
|
||
|
// process the first function using the single-function overload
|
||
|
wait_all_values_impl< T >( chan, std::forward< Fn0 >( function0) );
|
||
|
// then recur to process the rest
|
||
|
wait_all_values_impl< T >( chan,
|
||
|
std::forward< Fn1 >( function1),
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
}
|
||
|
|
||
|
//[wait_all_values_source
|
||
|
// Return a shared_ptr<buffered_channel<T>> from which the caller can
|
||
|
// retrieve each new result as it arrives, until 'closed'.
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
std::shared_ptr< boost::fibers::buffered_channel< typename std::result_of< Fn() >::type > >
|
||
|
wait_all_values_source( Fn && function, Fns && ... functions) {
|
||
|
std::size_t count( 1 + sizeof ... ( functions) );
|
||
|
typedef typename std::result_of< Fn() >::type return_t;
|
||
|
typedef boost::fibers::buffered_channel< return_t > channel_t;
|
||
|
// make the channel
|
||
|
auto chanp( std::make_shared< channel_t >( 64) );
|
||
|
// and make an nchannel facade to close it after 'count' items
|
||
|
auto ncp( std::make_shared< nchannel< return_t > >( chanp, count) );
|
||
|
// pass that nchannel facade to all the relevant fibers
|
||
|
wait_all_values_impl< return_t >( ncp,
|
||
|
std::forward< Fn >( function),
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
// then return the channel for consumer
|
||
|
return chanp;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// When all passed functions have completed, return vector<T> containing
|
||
|
// collected results. Assume that all passed functions have the same return
|
||
|
// type. It is simply invalid to pass NO functions.
|
||
|
//[wait_all_values
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
std::vector< typename std::result_of< Fn() >::type >
|
||
|
wait_all_values( Fn && function, Fns && ... functions) {
|
||
|
std::size_t count( 1 + sizeof ... ( functions) );
|
||
|
typedef typename std::result_of< Fn() >::type return_t;
|
||
|
typedef std::vector< return_t > vector_t;
|
||
|
vector_t results;
|
||
|
results.reserve( count);
|
||
|
|
||
|
// get channel
|
||
|
std::shared_ptr< boost::fibers::buffered_channel< return_t > > chan =
|
||
|
wait_all_values_source( std::forward< Fn >( function),
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
// fill results vector
|
||
|
return_t value;
|
||
|
while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
|
||
|
results.push_back( value);
|
||
|
}
|
||
|
// return vector to caller
|
||
|
return results;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
Example wav( runner, "wait_all_values()", [](){
|
||
|
//[wait_all_values_source_ex
|
||
|
std::shared_ptr< boost::fibers::buffered_channel< std::string > > chan =
|
||
|
wait_all_values_source(
|
||
|
[](){ return sleeper("wavs_third", 150); },
|
||
|
[](){ return sleeper("wavs_second", 100); },
|
||
|
[](){ return sleeper("wavs_first", 50); });
|
||
|
std::string value;
|
||
|
while ( boost::fibers::channel_op_status::success == chan->pop(value) ) {
|
||
|
std::cout << "wait_all_values_source() => '" << value
|
||
|
<< "'" << std::endl;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
//[wait_all_values_ex
|
||
|
std::vector< std::string > values =
|
||
|
wait_all_values(
|
||
|
[](){ return sleeper("wav_late", 150); },
|
||
|
[](){ return sleeper("wav_middle", 100); },
|
||
|
[](){ return sleeper("wav_early", 50); });
|
||
|
//]
|
||
|
std::cout << "wait_all_values() =>";
|
||
|
for ( std::string const& v : values) {
|
||
|
std::cout << " '" << v << "'";
|
||
|
}
|
||
|
std::cout << std::endl;
|
||
|
});
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_all, throw first exception
|
||
|
*****************************************************************************/
|
||
|
//[wait_all_until_error_source
|
||
|
// Return a shared_ptr<buffered_channel<future<T>>> from which the caller can
|
||
|
// get() each new result as it arrives, until 'closed'.
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
std::shared_ptr<
|
||
|
boost::fibers::buffered_channel<
|
||
|
boost::fibers::future<
|
||
|
typename std::result_of< Fn() >::type > > >
|
||
|
wait_all_until_error_source( Fn && function, Fns && ... functions) {
|
||
|
std::size_t count( 1 + sizeof ... ( functions) );
|
||
|
typedef typename std::result_of< Fn() >::type return_t;
|
||
|
typedef boost::fibers::future< return_t > future_t;
|
||
|
typedef boost::fibers::buffered_channel< future_t > channel_t;
|
||
|
// make the channel
|
||
|
auto chanp( std::make_shared< channel_t >( 64) );
|
||
|
// and make an nchannel facade to close it after 'count' items
|
||
|
auto ncp( std::make_shared< nchannel< future_t > >( chanp, count) );
|
||
|
// pass that nchannel facade to all the relevant fibers
|
||
|
wait_first_outcome_impl< return_t >( ncp,
|
||
|
std::forward< Fn >( function),
|
||
|
std::forward< Fns >( functions) ... );
|
||
|
// then return the channel for consumer
|
||
|
return chanp;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// When all passed functions have completed, return vector<T> containing
|
||
|
// collected results, or throw the first exception thrown by any of the passed
|
||
|
// functions. Assume that all passed functions have the same return type. It
|
||
|
// is simply invalid to pass NO functions.
|
||
|
//[wait_all_until_error
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
std::vector< typename std::result_of< Fn() >::type >
|
||
|
wait_all_until_error( Fn && function, Fns && ... functions) {
|
||
|
std::size_t count( 1 + sizeof ... ( functions) );
|
||
|
typedef typename std::result_of< Fn() >::type return_t;
|
||
|
typedef typename boost::fibers::future< return_t > future_t;
|
||
|
typedef std::vector< return_t > vector_t;
|
||
|
vector_t results;
|
||
|
results.reserve( count);
|
||
|
|
||
|
// get channel
|
||
|
std::shared_ptr<
|
||
|
boost::fibers::buffered_channel< future_t > > chan(
|
||
|
wait_all_until_error_source( std::forward< Fn >( function),
|
||
|
std::forward< Fns >( functions) ... ) );
|
||
|
// fill results vector
|
||
|
future_t future;
|
||
|
while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
|
||
|
results.push_back( future.get() );
|
||
|
}
|
||
|
// return vector to caller
|
||
|
return results;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
Example waue( runner, "wait_all_until_error()", [](){
|
||
|
//[wait_all_until_error_source_ex
|
||
|
typedef boost::fibers::future< std::string > future_t;
|
||
|
std::shared_ptr< boost::fibers::buffered_channel< future_t > > chan =
|
||
|
wait_all_until_error_source(
|
||
|
[](){ return sleeper("wauess_third", 150); },
|
||
|
[](){ return sleeper("wauess_second", 100); },
|
||
|
[](){ return sleeper("wauess_first", 50); });
|
||
|
future_t future;
|
||
|
while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
|
||
|
std::string value( future.get() );
|
||
|
std::cout << "wait_all_until_error_source(success) => '" << value
|
||
|
<< "'" << std::endl;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
chan = wait_all_until_error_source(
|
||
|
[](){ return sleeper("wauesf_third", 150); },
|
||
|
[](){ return sleeper("wauesf_second", 100, true); },
|
||
|
[](){ return sleeper("wauesf_first", 50); });
|
||
|
//[wait_all_until_error_ex
|
||
|
std::string thrown;
|
||
|
//<-
|
||
|
try {
|
||
|
while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
|
||
|
std::string value( future.get() );
|
||
|
std::cout << "wait_all_until_error_source(fail) => '" << value
|
||
|
<< "'" << std::endl;
|
||
|
}
|
||
|
} catch ( std::exception const& e) {
|
||
|
thrown = e.what();
|
||
|
}
|
||
|
std::cout << "wait_all_until_error_source(fail) threw '" << thrown
|
||
|
<< "'" << std::endl;
|
||
|
|
||
|
thrown.clear();
|
||
|
//->
|
||
|
try {
|
||
|
std::vector< std::string > values = wait_all_until_error(
|
||
|
[](){ return sleeper("waue_late", 150); },
|
||
|
[](){ return sleeper("waue_middle", 100, true); },
|
||
|
[](){ return sleeper("waue_early", 50); });
|
||
|
//<-
|
||
|
std::cout << "wait_all_until_error(fail) =>";
|
||
|
for ( std::string const& v : values) {
|
||
|
std::cout << " '" << v << "'";
|
||
|
}
|
||
|
std::cout << std::endl;
|
||
|
//->
|
||
|
} catch ( std::exception const& e) {
|
||
|
thrown = e.what();
|
||
|
}
|
||
|
std::cout << "wait_all_until_error(fail) threw '" << thrown
|
||
|
<< "'" << std::endl;
|
||
|
//]
|
||
|
});
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_all, collect exceptions
|
||
|
*****************************************************************************/
|
||
|
// When all passed functions have succeeded, return vector<T> containing
|
||
|
// collected results, or throw exception_list containing all exceptions thrown
|
||
|
// by any of the passed functions. Assume that all passed functions have the
|
||
|
// same return type. It is simply invalid to pass NO functions.
|
||
|
//[wait_all_collect_errors
|
||
|
template< typename Fn, typename ... Fns >
|
||
|
std::vector< typename std::result_of< Fn() >::type >
|
||
|
wait_all_collect_errors( Fn && function, Fns && ... functions) {
|
||
|
std::size_t count( 1 + sizeof ... ( functions) );
|
||
|
typedef typename std::result_of< Fn() >::type return_t;
|
||
|
typedef typename boost::fibers::future< return_t > future_t;
|
||
|
typedef std::vector< return_t > vector_t;
|
||
|
vector_t results;
|
||
|
results.reserve( count);
|
||
|
exception_list exceptions("wait_all_collect_errors() exceptions");
|
||
|
|
||
|
// get channel
|
||
|
std::shared_ptr<
|
||
|
boost::fibers::buffered_channel< future_t > > chan(
|
||
|
wait_all_until_error_source( std::forward< Fn >( function),
|
||
|
std::forward< Fns >( functions) ... ) );
|
||
|
// fill results and/or exceptions vectors
|
||
|
future_t future;
|
||
|
while ( boost::fibers::channel_op_status::success == chan->pop( future) ) {
|
||
|
std::exception_ptr exp = future.get_exception_ptr();
|
||
|
if ( ! exp) {
|
||
|
results.push_back( future.get() );
|
||
|
} else {
|
||
|
exceptions.add( exp);
|
||
|
}
|
||
|
}
|
||
|
// if there were any exceptions, throw
|
||
|
if ( exceptions.size() ) {
|
||
|
throw exceptions;
|
||
|
}
|
||
|
// no exceptions: return vector to caller
|
||
|
return results;
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
Example wace( runner, "wait_all_collect_errors()", [](){
|
||
|
std::vector< std::string > values = wait_all_collect_errors(
|
||
|
[](){ return sleeper("waces_late", 150); },
|
||
|
[](){ return sleeper("waces_middle", 100); },
|
||
|
[](){ return sleeper("waces_early", 50); });
|
||
|
std::cout << "wait_all_collect_errors(success) =>";
|
||
|
for ( std::string const& v : values) {
|
||
|
std::cout << " '" << v << "'";
|
||
|
}
|
||
|
std::cout << std::endl;
|
||
|
|
||
|
std::string thrown;
|
||
|
std::size_t errors = 0;
|
||
|
try {
|
||
|
values = wait_all_collect_errors(
|
||
|
[](){ return sleeper("wacef_late", 150, true); },
|
||
|
[](){ return sleeper("wacef_middle", 100, true); },
|
||
|
[](){ return sleeper("wacef_early", 50); });
|
||
|
std::cout << "wait_all_collect_errors(fail) =>";
|
||
|
for ( std::string const& v : values) {
|
||
|
std::cout << " '" << v << "'";
|
||
|
}
|
||
|
std::cout << std::endl;
|
||
|
} catch ( exception_list const& e) {
|
||
|
thrown = e.what();
|
||
|
errors = e.size();
|
||
|
} catch ( std::exception const& e) {
|
||
|
thrown = e.what();
|
||
|
}
|
||
|
std::cout << "wait_all_collect_errors(fail) threw '" << thrown
|
||
|
<< "': " << errors << " errors" << std::endl;
|
||
|
});
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* when_all, heterogeneous
|
||
|
*****************************************************************************/
|
||
|
//[wait_all_members_get
|
||
|
template< typename Result, typename ... Futures >
|
||
|
Result wait_all_members_get( Futures && ... futures) {
|
||
|
// Fetch the results from the passed futures into Result's initializer
|
||
|
// list. It's true that the get() calls here will block the implicit
|
||
|
// iteration over futures -- but that doesn't matter because we won't be
|
||
|
// done until the slowest of them finishes anyway. As results are
|
||
|
// processed in argument-list order rather than order of completion, the
|
||
|
// leftmost get() to throw an exception will cause that exception to
|
||
|
// propagate to the caller.
|
||
|
return Result{ futures.get() ... };
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
//[wait_all_members
|
||
|
// Explicitly pass Result. This can be any type capable of being initialized
|
||
|
// from the results of the passed functions, such as a struct.
|
||
|
template< typename Result, typename ... Fns >
|
||
|
Result wait_all_members( Fns && ... functions) {
|
||
|
// Run each of the passed functions on a separate fiber, passing all their
|
||
|
// futures to helper function for processing.
|
||
|
return wait_all_members_get< Result >(
|
||
|
boost::fibers::async( std::forward< Fns >( functions) ) ... );
|
||
|
}
|
||
|
//]
|
||
|
|
||
|
// used by following example
|
||
|
//[wait_Data
|
||
|
struct Data {
|
||
|
std::string str;
|
||
|
double inexact;
|
||
|
int exact;
|
||
|
|
||
|
friend std::ostream& operator<<( std::ostream& out, Data const& data)/*=;
|
||
|
...*/
|
||
|
//<-
|
||
|
{
|
||
|
return out << "Data{str='" << data.str << "', inexact=" << data.inexact
|
||
|
<< ", exact=" << data.exact << "}";
|
||
|
}
|
||
|
//->
|
||
|
};
|
||
|
//]
|
||
|
|
||
|
// example usage
|
||
|
Example wam( runner, "wait_all_members()", [](){
|
||
|
//[wait_all_members_data_ex
|
||
|
Data data = wait_all_members< Data >(
|
||
|
[](){ return sleeper("wams_left", 100); },
|
||
|
[](){ return sleeper(3.14, 150); },
|
||
|
[](){ return sleeper(17, 50); });
|
||
|
std::cout << "wait_all_members<Data>(success) => " << data << std::endl;
|
||
|
//]
|
||
|
|
||
|
std::string thrown;
|
||
|
try {
|
||
|
data = wait_all_members< Data >(
|
||
|
[](){ return sleeper("wamf_left", 100, true); },
|
||
|
[](){ return sleeper(3.14, 150); },
|
||
|
[](){ return sleeper(17, 50, true); });
|
||
|
std::cout << "wait_all_members<Data>(fail) => " << data << std::endl;
|
||
|
} catch ( std::exception const& e) {
|
||
|
thrown = e.what();
|
||
|
}
|
||
|
std::cout << "wait_all_members<Data>(fail) threw '" << thrown
|
||
|
<< '"' << std::endl;
|
||
|
|
||
|
//[wait_all_members_vector_ex
|
||
|
// If we don't care about obtaining results as soon as they arrive, and we
|
||
|
// prefer a result vector in passed argument order rather than completion
|
||
|
// order, wait_all_members() is another possible implementation of
|
||
|
// wait_all_until_error().
|
||
|
auto strings = wait_all_members< std::vector< std::string > >(
|
||
|
[](){ return sleeper("wamv_left", 150); },
|
||
|
[](){ return sleeper("wamv_middle", 100); },
|
||
|
[](){ return sleeper("wamv_right", 50); });
|
||
|
std::cout << "wait_all_members<vector>() =>";
|
||
|
for ( std::string const& str : strings) {
|
||
|
std::cout << " '" << str << "'";
|
||
|
}
|
||
|
std::cout << std::endl;
|
||
|
//]
|
||
|
});
|
||
|
|
||
|
|
||
|
/*****************************************************************************
|
||
|
* main()
|
||
|
*****************************************************************************/
|
||
|
int main( int argc, char *argv[]) {
|
||
|
runner.run();
|
||
|
std::cout << "done." << std::endl;
|
||
|
return EXIT_SUCCESS;
|
||
|
}
|