Files
poco/Foundation/include/Poco/MPSCQueue.h
Aleksandar Fabijanic 1850dc16aa Benchmark and FastLogger (#5081)
* fix(SharedLibrary): Missing DLLs not reported #5069

* fix(CMake): not producing proper binary names #5070

* fix(SharedLibrary): disable shared lib tests in static build #5069

* fix(misc): add pdjson links to gitignore, remove unused var in SharedLibrary, harden TaskManagerTest

* fic(ci): separate oracle and sqlserver odbc (out of disk space) (#5075)

* fic(ci): separate oracle and sqlserver odbc (out of disk space)

* use oracle odbc driver

* use oracle free

* ad db user

* postpone adding user after build

* remove default tablespace (does not exist)

* reinstate all ci jobs

* add postgresl odb tests to ci

* remove spurious syminks

* fix gitignore (pdjson)

* Remove VS projects #5076

* chore: revert leftover ODB IP address

* fix(CodeQL): float comparison alerts

* fix: compile errors

* chore: upgrade asan to macos-14 (tryout)

* fix: .gitignore symlinks; XML Makefile wrong pattern

* Optimize PatternFormatter and Timezone performance #5078

PatternFormatter:
- Cache node name (Environment::nodeName()) to avoid repeated syscalls
- Add extractBasename() for efficient %O format specifier
- Add string reserve(128) to reduce reallocations during formatting

Timezone:
- Cache UTC offset to avoid repeated syscalls (8x speedup for %L patterns)
- Auto-detect TZ environment variable changes to invalidate cache
- Add reloadCache() method for explicit cache refresh

Tests:
- Add TimezoneTest::testUtcOffsetCaching()
- Add PatternFormatterTest::testExtractBasename()

* fix: use Path::separatorin extractBasename #5078

* Add Benchmark #5080

* enh(Logging): move constructors for Message and Logger #5078

* chore(AsyncNotificationCenter): eliminate MSVC warnings

* enh(build): c++20 support #5084

* feat(CppUnit): print class name
execute all named tests (not only the first one)
accept test name with class (eg. testrunner LoggerTest::testLogger) #5083

* feat(Benchmark): Add Logger/FastLogger comparison benchmarks and Windows support

- Add LoggerBench.cpp with AsyncChannel vs FastLogger benchmarks
- Add compare.sh (Linux/macOS) and compare.ps1 (Windows) scripts
- Add LOGGER_BENCHMARK.md with cross-platform benchmark results
- Update README.md with Windows build instructions (Ninja, CMAKE_PREFIX_PATH)
- Add error message when -- options are used on Windows (should use /)
- Update CMakeLists.txt and Makefile to include LoggerBench #5080

* feat(FastLogger): #5078
FastLogger provides a Poco-compatible wrapper around the Quill logging
library, offering significant performance improvements over AsyncChannel
through lock-free SPSC queues and backend thread processing.

Key features:
- Drop-in replacement for Poco::Logger with FastLogger::get()
- Support for all standard Poco channels (Console, File, Rotating, etc.)
- XML/properties configuration via FastLoggerConfigurator
- Thread affinity for backend worker on Linux and Windows
- Log file rotation with size and time-based policies

Performance (CPU time - calling thread latency):
- Linux: 31-70x faster than AsyncChannel
- Windows: 23-87x faster than AsyncChannel
- macOS: Limited improvement due to lack of thread affinity support

New files:
- Foundation/include/Poco/FastLogger.h
- Foundation/src/FastLogger.cpp
- Util/include/Poco/Util/FastLoggerConfigurator.h
- Util/src/FastLoggerConfigurator.cpp
- dependencies/quill/ (header-only Quill 7.5.0 library)

* fix(cmake): disable FastLogger on emscripten (not supported) #5078

* feat(FastLogger): add cpuAfinity config parameter #5087

* fix(FastLogger): Fix lock-order-inversion in FastLogger (TSAN) #5078

* fix(cmake): build not stripping release binaries #5085

* fix(PCRE): fails to compile with clang/c++20 #5131

* feat(AsyncChannel): add CPU affinity property #5087

* feat(SpinlockMutex): make it adaptive #5132

* feat(AsyncChannel): add CPU affinity property #5087

* chore: remove leftover file commited by mistake

* feat(build): allow FastLogger to be fully disabled at build time #5078

Build system changes:
- Add POCO_NO_FASTLOGGER compile definition in CMake when ENABLE_FASTLOGGER=OFF
  to prevent Config.h from auto-enabling FastLogger
- Add ifdef guards around FastLogger tests in LoggingTestSuite.cpp
- Exclude FastLoggerTest.cpp and FastLoggerChannelsTest.cpp from CMake build
  when FastLogger is disabled
- Add POCO_NO_FASTLOGGER support to Make build system for Foundation and Util
- Add CI jobs to verify builds work without FastLogger (CMake and Make)

Code changes:
- Add LoggingConfigurator::configure() convenience method for quick logging setup

* fix(ci): testrunner args

* chore(progen): remove leftover script #5076

* fix(test): give ANC a bit more time to process

* fix(ci): set env before test run

* chore(doc): quill license

* feat(Channel): add log(Message&&) #5133

* fix(ci): set env before test run

* fix(TestRunner): don't search children #5083

* feat: lock-free queues #5134

* feat(Benchmark): various comparisons

* chore: cleanup benchmark
2025-12-22 21:06:43 +01:00

267 lines
7.2 KiB
C++

//
// MPSCQueue.h
//
// Library: Foundation
// Package: Core
// Module: MPSCQueue
//
// Definition of the MPSCQueue class template.
//
// Copyright (c) 2004-2024, Applied Informatics Software Engineering GmbH.,
// Aleph ONE Software Engineering LLC
// and Contributors.
//
// SPDX-License-Identifier: BSL-1.0
//
#ifndef Foundation_MPSCQueue_INCLUDED
#define Foundation_MPSCQueue_INCLUDED
#include "Poco/Foundation.h"
#include <atomic>
#include <cstddef>
#include <new>
#include <type_traits>
#include <utility>
namespace Poco {
template <typename T>
class MPSCQueue
/// A lock-free multi-producer single-consumer (MPSC) bounded queue.
///
/// This queue is optimized for the case where multiple threads
/// produce items and exactly one thread consumes them.
///
/// The queue uses a bounded ring buffer with atomic operations.
/// Producers use compare-and-swap to reserve slots; the consumer
/// reads committed entries in order.
///
/// The queue has a fixed capacity specified at construction time.
/// When full, tryPush() returns false. When empty, tryPop() returns false.
///
/// Usage:
/// MPSCQueue<Message> queue(1024);
///
/// // Producer threads (any number):
/// if (queue.tryPush(msg)) { /* success */ }
///
/// // Consumer thread (exactly one):
/// Message msg;
/// if (queue.tryPop(msg)) { /* got message */ }
///
/// Thread safety:
/// - Multiple threads may call tryPush/emplace concurrently (producers)
/// - Exactly one thread may call tryPop (consumer)
/// - size(), empty(), capacity() may be called from any thread
///
/// Note: This queue does not provide blocking operations. For blocking
/// behavior, combine with a semaphore or condition variable.
{
public:
#ifdef __cpp_lib_hardware_interference_size
static constexpr std::size_t CACHE_LINE_SIZE = std::hardware_destructive_interference_size;
#else
static constexpr std::size_t CACHE_LINE_SIZE = 64;
#endif
explicit MPSCQueue(std::size_t capacity):
/// Creates the queue with the given capacity.
/// Capacity must be at least 1 and will be rounded up
/// to the next power of two for efficient indexing.
_capacity(nextPowerOfTwo(capacity)),
_mask(_capacity - 1),
_slots(static_cast<Slot*>(::operator new(_capacity * sizeof(Slot))))
{
// Initialize sequence numbers
for (std::size_t i = 0; i < _capacity; ++i)
{
new (&_slots[i].sequence) std::atomic<std::size_t>(i);
}
}
~MPSCQueue()
/// Destroys the queue.
{
// Destroy any remaining items
std::size_t tail = _tail.load(std::memory_order_relaxed);
std::size_t head = _head.load(std::memory_order_relaxed);
while (tail != head)
{
std::size_t idx = index(tail);
_slots[idx].ptr()->~T();
++tail;
}
// Destroy sequence atomics and free memory
for (std::size_t i = 0; i < _capacity; ++i)
{
_slots[i].sequence.~atomic();
}
::operator delete(_slots);
}
MPSCQueue(const MPSCQueue&) = delete;
MPSCQueue& operator=(const MPSCQueue&) = delete;
MPSCQueue(MPSCQueue&&) = delete;
MPSCQueue& operator=(MPSCQueue&&) = delete;
bool tryPush(const T& item)
/// Attempts to push a copy of item onto the queue.
/// Returns true if successful, false if the queue is full.
/// Thread-safe for multiple concurrent producers.
{
return emplace(item);
}
bool tryPush(T&& item)
/// Attempts to push item onto the queue using move semantics.
/// Returns true if successful, false if the queue is full.
/// Thread-safe for multiple concurrent producers.
{
return emplace(std::move(item));
}
template <typename... Args>
bool emplace(Args&&... args)
/// Attempts to construct an item in-place at the back of the queue.
/// Returns true if successful, false if the queue is full.
/// Thread-safe for multiple concurrent producers.
{
std::size_t head = _head.load(std::memory_order_relaxed);
for (;;)
{
Slot& slot = _slots[index(head)];
std::size_t seq = slot.sequence.load(std::memory_order_acquire);
std::intptr_t diff = static_cast<std::intptr_t>(seq) - static_cast<std::intptr_t>(head);
if (diff == 0)
{
// Slot is available for writing
if (_head.compare_exchange_weak(head, head + 1, std::memory_order_relaxed))
{
// We claimed this slot, construct the item
new (slot.ptr()) T(std::forward<Args>(args)...);
// Publish: allow consumer to read this slot
slot.sequence.store(head + 1, std::memory_order_release);
return true;
}
// CAS failed, another producer claimed it, retry with updated head
}
else if (diff < 0)
{
// Queue is full (head has wrapped around and caught up with tail)
return false;
}
else
{
// Another producer is writing to this slot, reload head and retry
head = _head.load(std::memory_order_relaxed);
}
}
}
bool tryPop(T& item)
/// Attempts to pop an item from the queue.
/// If successful, moves the item into the provided reference
/// and returns true. Returns false if the queue is empty.
/// Must be called from exactly one thread (the consumer).
{
std::size_t tail = _tail.load(std::memory_order_relaxed);
Slot& slot = _slots[index(tail)];
std::size_t seq = slot.sequence.load(std::memory_order_acquire);
std::intptr_t diff = static_cast<std::intptr_t>(seq) - static_cast<std::intptr_t>(tail + 1);
if (diff == 0)
{
// Slot has been written and is ready to read
item = std::move(*slot.ptr());
slot.ptr()->~T();
// Mark slot as available for producers (sequence = tail + capacity)
slot.sequence.store(tail + _capacity, std::memory_order_release);
_tail.store(tail + 1, std::memory_order_relaxed);
return true;
}
else if (diff < 0)
{
// Queue is empty or producer hasn't finished writing yet
return false;
}
// This shouldn't happen with a single consumer
return false;
}
std::size_t size() const
/// Returns an approximate count of items in the queue.
/// This is approximate because producers may be modifying
/// the queue concurrently.
{
std::size_t head = _head.load(std::memory_order_acquire);
std::size_t tail = _tail.load(std::memory_order_acquire);
return head - tail;
}
bool empty() const
/// Returns true if the queue appears to be empty.
/// This is approximate for the same reason as size().
{
return size() == 0;
}
std::size_t capacity() const
/// Returns the capacity of the queue.
{
return _capacity;
}
private:
struct Slot
{
std::atomic<std::size_t> sequence;
typename std::aligned_storage<sizeof(T), alignof(T)>::type storage;
T* ptr() { return reinterpret_cast<T*>(&storage); }
const T* ptr() const { return reinterpret_cast<const T*>(&storage); }
};
static std::size_t nextPowerOfTwo(std::size_t n)
{
if (n == 0) return 1;
--n;
n |= n >> 1;
n |= n >> 2;
n |= n >> 4;
n |= n >> 8;
n |= n >> 16;
if constexpr (sizeof(std::size_t) > 4)
{
n |= n >> 32;
}
return n + 1;
}
std::size_t index(std::size_t pos) const
{
return pos & _mask;
}
const std::size_t _capacity;
const std::size_t _mask;
Slot* const _slots;
// Align head and tail to separate cache lines to avoid false sharing
alignas(CACHE_LINE_SIZE) std::atomic<std::size_t> _head{0}; // written by producers (CAS)
alignas(CACHE_LINE_SIZE) std::atomic<std::size_t> _tail{0}; // written by consumer only
};
} // namespace Poco
#endif // Foundation_MPSCQueue_INCLUDED