mirror of
https://github.com/zeromq/cppzmq.git
synced 2025-10-22 16:02:31 +02:00
Problem: extra abstraction layer type poller_t is in zmq.hpp
Solution: move to zmq_addon.hpp, rename to active_poller_t, and rename base_poller_t to poller_t
This commit is contained in:
@@ -426,6 +426,99 @@ inline std::ostream& operator<<(std::ostream& os, const multipart_t& msg)
|
||||
return os << msg.str();
|
||||
}
|
||||
|
||||
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
|
||||
class active_poller_t
|
||||
{
|
||||
public:
|
||||
active_poller_t () = default;
|
||||
~active_poller_t () = default;
|
||||
|
||||
active_poller_t(const active_poller_t&) = delete;
|
||||
active_poller_t &operator=(const active_poller_t&) = delete;
|
||||
|
||||
active_poller_t(active_poller_t&& src) = default;
|
||||
active_poller_t &operator=(active_poller_t&& src) = default;
|
||||
|
||||
using handler_t = std::function<void(short)>;
|
||||
|
||||
void add (zmq::socket_t &socket, short events, handler_t handler)
|
||||
{
|
||||
auto it = decltype (handlers)::iterator {};
|
||||
auto inserted = bool {};
|
||||
std::tie(it, inserted) = handlers.emplace (static_cast<void*>(socket), std::make_shared<handler_t> (std::move (handler)));
|
||||
try
|
||||
{
|
||||
base_poller.add (socket, events, inserted && *(it->second) ? it->second.get() : nullptr);
|
||||
need_rebuild |= inserted;
|
||||
}
|
||||
catch (const zmq::error_t&)
|
||||
{
|
||||
// rollback
|
||||
if (inserted)
|
||||
{
|
||||
handlers.erase (static_cast<void*>(socket));
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
void remove (zmq::socket_t &socket)
|
||||
{
|
||||
base_poller.remove (socket);
|
||||
handlers.erase (static_cast<void*>(socket));
|
||||
need_rebuild = true;
|
||||
}
|
||||
|
||||
void modify (zmq::socket_t &socket, short events)
|
||||
{
|
||||
base_poller.modify (socket, events);
|
||||
}
|
||||
|
||||
int wait (std::chrono::milliseconds timeout)
|
||||
{
|
||||
if (need_rebuild) {
|
||||
poller_events.clear ();
|
||||
poller_handlers.clear ();
|
||||
poller_events.reserve (handlers.size ());
|
||||
poller_handlers.reserve (handlers.size ());
|
||||
for (const auto &handler : handlers) {
|
||||
poller_events.emplace_back (zmq_poller_event_t {});
|
||||
poller_handlers.push_back (handler.second);
|
||||
}
|
||||
need_rebuild = false;
|
||||
}
|
||||
const int count = base_poller.wait_all (poller_events, timeout);
|
||||
if (count != 0) {
|
||||
std::for_each (poller_events.begin (), poller_events.begin () + count,
|
||||
[](zmq_poller_event_t& event) {
|
||||
if (event.user_data != NULL)
|
||||
(*reinterpret_cast<handler_t*> (event.user_data)) (event.events);
|
||||
});
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
||||
bool empty () const
|
||||
{
|
||||
return handlers.empty ();
|
||||
}
|
||||
|
||||
size_t size () const
|
||||
{
|
||||
return handlers.size ();
|
||||
}
|
||||
|
||||
private:
|
||||
bool need_rebuild {false};
|
||||
|
||||
poller_t<handler_t> base_poller {};
|
||||
std::unordered_map<void*, std::shared_ptr<handler_t>> handlers {};
|
||||
std::vector<zmq_poller_event_t> poller_events {};
|
||||
std::vector<std::shared_ptr<handler_t>> poller_handlers {};
|
||||
}; // class active_poller_t
|
||||
#endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
|
||||
|
||||
|
||||
} // namespace zmq
|
||||
|
||||
#endif // __ZMQ_ADDON_HPP_INCLUDED__
|
||||
|
||||
Reference in New Issue
Block a user