/* * libjingle * Copyright 2004--2005, Google Inc. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * 1. Redistributions of source code must retain the above copyright notice, * this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright notice, * this list of conditions and the following disclaimer in the documentation * and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #ifndef TALK_BASE_MESSAGEQUEUE_H_ #define TALK_BASE_MESSAGEQUEUE_H_ #include #include #include #include #include #include "talk/base/basictypes.h" #include "talk/base/constructormagic.h" #include "talk/base/criticalsection.h" #include "talk/base/messagehandler.h" #include "talk/base/scoped_ptr.h" #include "talk/base/scoped_ref_ptr.h" #include "talk/base/sigslot.h" #include "talk/base/socketserver.h" #include "talk/base/timeutils.h" namespace talk_base { struct Message; class MessageQueue; // MessageQueueManager does cleanup of of message queues class MessageQueueManager { public: static MessageQueueManager* Instance(); void Add(MessageQueue *message_queue); void Remove(MessageQueue *message_queue); void Clear(MessageHandler *handler); private: MessageQueueManager(); ~MessageQueueManager(); static MessageQueueManager* instance_; // This list contains 'active' MessageQueues. std::vector message_queues_; CriticalSection crit_; }; // Derive from this for specialized data // App manages lifetime, except when messages are purged class MessageData { public: MessageData() {} virtual ~MessageData() {} }; template class TypedMessageData : public MessageData { public: explicit TypedMessageData(const T& data) : data_(data) { } const T& data() const { return data_; } T& data() { return data_; } private: T data_; }; // Like TypedMessageData, but for pointers that require a delete. template class ScopedMessageData : public MessageData { public: explicit ScopedMessageData(T* data) : data_(data) { } const scoped_ptr& data() const { return data_; } scoped_ptr& data() { return data_; } private: scoped_ptr data_; }; // Like ScopedMessageData, but for reference counted pointers. template class ScopedRefMessageData : public MessageData { public: explicit ScopedRefMessageData(T* data) : data_(data) { } const scoped_refptr& data() const { return data_; } scoped_refptr& data() { return data_; } private: scoped_refptr data_; }; template inline MessageData* WrapMessageData(const T& data) { return new TypedMessageData(data); } template inline const T& UseMessageData(MessageData* data) { return static_cast< TypedMessageData* >(data)->data(); } template class DisposeData : public MessageData { public: explicit DisposeData(T* data) : data_(data) { } virtual ~DisposeData() { delete data_; } private: T* data_; }; const uint32 MQID_ANY = static_cast(-1); const uint32 MQID_DISPOSE = static_cast(-2); // No destructor struct Message { Message() { memset(this, 0, sizeof(*this)); } inline bool Match(MessageHandler* handler, uint32 id) const { return (handler == NULL || handler == phandler) && (id == MQID_ANY || id == message_id); } MessageHandler *phandler; uint32 message_id; MessageData *pdata; uint32 ts_sensitive; }; typedef std::list MessageList; // DelayedMessage goes into a priority queue, sorted by trigger time. Messages // with the same trigger time are processed in num_ (FIFO) order. class DelayedMessage { public: DelayedMessage(int delay, uint32 trigger, uint32 num, const Message& msg) : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) { } bool operator< (const DelayedMessage& dmsg) const { return (dmsg.msTrigger_ < msTrigger_) || ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); } int cmsDelay_; // for debugging uint32 msTrigger_; uint32 num_; Message msg_; }; class MessageQueue { public: explicit MessageQueue(SocketServer* ss = NULL); virtual ~MessageQueue(); SocketServer* socketserver() { return ss_; } void set_socketserver(SocketServer* ss); // Note: The behavior of MessageQueue has changed. When a MQ is stopped, // futher Posts and Sends will fail. However, any pending Sends and *ready* // Posts (as opposed to unexpired delayed Posts) will be delivered before // Get (or Peek) returns false. By guaranteeing delivery of those messages, // we eliminate the race condition when an MessageHandler and MessageQueue // may be destroyed independently of each other. virtual void Quit(); virtual bool IsQuitting(); virtual void Restart(); // Get() will process I/O until: // 1) A message is available (returns true) // 2) cmsWait seconds have elapsed (returns false) // 3) Stop() is called (returns false) virtual bool Get(Message *pmsg, int cmsWait = kForever, bool process_io = true); virtual bool Peek(Message *pmsg, int cmsWait = 0); virtual void Post(MessageHandler *phandler, uint32 id = 0, MessageData *pdata = NULL, bool time_sensitive = false); virtual void PostDelayed(int cmsDelay, MessageHandler *phandler, uint32 id = 0, MessageData *pdata = NULL) { return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); } virtual void PostAt(uint32 tstamp, MessageHandler *phandler, uint32 id = 0, MessageData *pdata = NULL) { return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); } virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY, MessageList* removed = NULL); virtual void Dispatch(Message *pmsg); virtual void ReceiveSends(); // Amount of time until the next message can be retrieved virtual int GetDelay(); bool empty() const { return size() == 0u; } size_t size() const { CritScope cs(&crit_); // msgq_.size() is not thread safe. return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u); } // Internally posts a message which causes the doomed object to be deleted template void Dispose(T* doomed) { if (doomed) { Post(NULL, MQID_DISPOSE, new DisposeData(doomed)); } } // When this signal is sent out, any references to this queue should // no longer be used. sigslot::signal0<> SignalQueueDestroyed; protected: class PriorityQueue : public std::priority_queue { public: container_type& container() { return c; } void reheap() { make_heap(c.begin(), c.end(), comp); } }; void EnsureActive(); void DoDelayPost(int cmsDelay, uint32 tstamp, MessageHandler *phandler, uint32 id, MessageData* pdata); // The SocketServer is not owned by MessageQueue. SocketServer* ss_; // If a server isn't supplied in the constructor, use this one. scoped_ptr default_ss_; bool fStop_; bool fPeekKeep_; Message msgPeek_; // A message queue is active if it has ever had a message posted to it. // This also corresponds to being in MessageQueueManager's global list. bool active_; MessageList msgq_; PriorityQueue dmsgq_; uint32 dmsgq_next_num_; mutable CriticalSection crit_; private: DISALLOW_COPY_AND_ASSIGN(MessageQueue); }; } // namespace talk_base #endif // TALK_BASE_MESSAGEQUEUE_H_