diff --git a/examples/ActorThread/HelloWorld/GNUmakefile b/examples/ActorThread/HelloWorld/GNUmakefile index 4715050..87de792 100644 --- a/examples/ActorThread/HelloWorld/GNUmakefile +++ b/examples/ActorThread/HelloWorld/GNUmakefile @@ -1,9 +1,9 @@ ifeq ($(DEBUG), 1) BUILD_DIR := debug - CXXFLAGS += -O0 -g3 -Wno-misleading-indentation -Wno-unknown-warning-option + CXXFLAGS := -O0 -g3 $(CXXFLAGS) else BUILD_DIR := release - CXXFLAGS += -O2 + CXXFLAGS := -O2 $(CXXFLAGS) endif PATH_BIN := $(BUILD_DIR)/application diff --git a/examples/ActorThread/MyLib/GNUmakefile b/examples/ActorThread/MyLib/GNUmakefile index 71996c7..b5280a2 100644 --- a/examples/ActorThread/MyLib/GNUmakefile +++ b/examples/ActorThread/MyLib/GNUmakefile @@ -1,9 +1,9 @@ ifeq ($(DEBUG), 1) BUILD_DIR := debug - CXXFLAGS += -O0 -g3 -Wno-misleading-indentation -Wno-unknown-warning-option + CXXFLAGS := -O0 -g3 $(CXXFLAGS) else BUILD_DIR := release - CXXFLAGS += -O2 + CXXFLAGS := -O2 $(CXXFLAGS) endif OUT_LIB := MyLib.a diff --git a/examples/ActorThread/MyLibClient/GNUmakefile b/examples/ActorThread/MyLibClient/GNUmakefile index de6817b..07441e0 100644 --- a/examples/ActorThread/MyLibClient/GNUmakefile +++ b/examples/ActorThread/MyLibClient/GNUmakefile @@ -1,9 +1,9 @@ ifeq ($(DEBUG), 1) BUILD_DIR := debug - CXXFLAGS += -O0 -g3 -Wno-misleading-indentation -Wno-unknown-warning-option + CXXFLAGS := -O0 -g3 $(CXXFLAGS) else BUILD_DIR := release - CXXFLAGS += -O2 + CXXFLAGS := -O2 $(CXXFLAGS) endif PATH_BIN := $(BUILD_DIR)/application diff --git a/examples/ActorThread/Test/GNUmakefile b/examples/ActorThread/Test/GNUmakefile index f50b67f..87de792 100644 --- a/examples/ActorThread/Test/GNUmakefile +++ b/examples/ActorThread/Test/GNUmakefile @@ -1,9 +1,9 @@ ifeq ($(DEBUG), 1) BUILD_DIR := debug - CXXFLAGS += -O0 -g3 -Wno-misleading-indentation -Wno-unknown-warning-option + CXXFLAGS := -O0 -g3 $(CXXFLAGS) else BUILD_DIR := release - CXXFLAGS += -O2 -g3 + CXXFLAGS := -O2 $(CXXFLAGS) endif PATH_BIN := $(BUILD_DIR)/application diff --git a/examples/ActorThread/Test/src/Application.cpp b/examples/ActorThread/Test/src/Application.cpp index 64d99a4..ce1c15d 100644 --- a/examples/ActorThread/Test/src/Application.cpp +++ b/examples/ActorThread/Test/src/Application.cpp @@ -237,12 +237,13 @@ template <> void Application::onMessage(Mpsc& msg) repliesCount++; if (repliesCount == 2) // end of 0P1C phase? { + auto per_second_produced_2p1c = (count_mpsc1 + count_mpsc2) / mpsc_elapsed_lap; + auto per_second_consumed_2p1c = (count_mpsc1_lap + count_mpsc2_lap) / mpsc_elapsed_lap; auto sc1 = count_mpsc1 - count_mpsc1_lap; auto sc2 = count_mpsc2 - count_mpsc2_lap; auto elapsed_sc_avg = (mpsc_elapsed_sc1 + mpsc_elapsed_sc2) / 2; - double min_msgs = std::min(std::min(std::min(std::min(std::min( - count_mpsc1, count_mpsc2), count_mpsc1_lap), count_mpsc2_lap), sc1), sc2); + double min_msgs = std::min(std::min(std::min(count_mpsc1, count_mpsc2), count_mpsc1_lap), count_mpsc2_lap); double r_2p1c_p = 1.0 * std::max(count_mpsc1, count_mpsc2) / std::min(count_mpsc1, count_mpsc2); double r_2p1c_c = 1.0 * std::max(count_mpsc1_lap, count_mpsc2_lap) / std::min(count_mpsc1_lap, count_mpsc2_lap); @@ -251,12 +252,14 @@ template <> void Application::onMessage(Mpsc& msg) crazyScheduler = (min_msgs < 100) || (max_ratio > 50); - std::cout << (count_mpsc1 + count_mpsc2) / mpsc_elapsed_lap << " msg/sec produced (" - << count_mpsc1 / mpsc_elapsed_lap << " + " << count_mpsc2 / mpsc_elapsed_lap << ") 2P1C test in " - << mpsc_elapsed_lap << " seconds" << std::endl; - std::cout << (count_mpsc1_lap + count_mpsc2_lap) / mpsc_elapsed_lap << " msg/sec consumed (" - << count_mpsc1_lap / mpsc_elapsed_lap << " + " << count_mpsc2_lap / mpsc_elapsed_lap << ") 2P1C test " - << "[max ratio: " << max_ratio << "]" << std::endl; + std::cout << per_second_produced_2p1c << " msg/sec produced (" + << count_mpsc1 / mpsc_elapsed_lap << " + " << count_mpsc2 / mpsc_elapsed_lap + << ") 2P1C test in " << mpsc_elapsed_lap << " seconds" << std::endl; + std::cout << per_second_consumed_2p1c << " msg/sec consumed (" + << count_mpsc1_lap / mpsc_elapsed_lap << " + " << count_mpsc2_lap / mpsc_elapsed_lap + << ") 2P1C test in " << mpsc_elapsed_lap << " seconds" << std::endl; + std::cout << (per_second_produced_2p1c + per_second_consumed_2p1c) / 3 << " msg/sec throughput " + << "per thread 2P1C test (priority inversion hint: " << max_ratio << ")" << std::endl; std::cout << (sc1 + sc2) / elapsed_sc_avg << " msg/sec consumed (" << sc1 / mpsc_elapsed_sc1 << " + " << sc2 / mpsc_elapsed_sc2 << ") 0P1C test in " << elapsed_sc_avg << " seconds" << std::endl; diff --git a/include/sys++/ActorThread.hpp b/include/sys++/ActorThread.hpp index 34443e6..227f521 100644 --- a/include/sys++/ActorThread.hpp +++ b/include/sys++/ActorThread.hpp @@ -24,13 +24,12 @@ #include #include #include -#include #include #include #include +#include #include #include -#include #include #include @@ -72,7 +71,7 @@ template class ActorThread { // in std::function (and a shared_ptr auto aliveTarget = weakBind.lock(); // would prevent objects destruction) if (aliveTarget) - aliveTarget->template send(std::move(data)); + std::static_pointer_cast(aliveTarget)->template send(std::move(data)); }); } @@ -89,7 +88,6 @@ template class ActorThread std::size_t pendingMessages() const // amount of undispatched messages in the active object { - std::lock_guard ulock(mtx); return mboxNormPri.size() + mboxHighPri.size(); } @@ -108,7 +106,6 @@ template class ActorThread bool exiting() const // mostly to allow active objects running intensive jobs to poll for a shutdown request { - std::lock_guard ulock(mtx); return !dispatching; } @@ -140,6 +137,8 @@ template class ActorThread void onStart() {} void onStop() {} + std::thread::id threadID() const { return id; } + /* the active object may use this family of methods to perform the callbacks onto connected clients */ template inline static void publish(Any msg) @@ -242,8 +241,8 @@ template class ActorThread void handleActorEvents() // this must be invoked from the external dispatcher as specified above { auto status = eventsLoop(); - if (std::get<1>(status)) - static_cast(this)->onWaitingTimer(std::get<2>(status)); + if (status.first) + static_cast(this)->onWaitingTimer(status.second); else static_cast(this)->onWaitingTimerCancel(); } @@ -253,12 +252,76 @@ template class ActorThread ActorThread& operator=(const ActorThread&) = delete; ActorThread(const ActorThread&) = delete; - struct ActorParcel + template class ActorQueue // FIFO (based on the MPSC queue at https://github.com/mstump/queues) + { + template using Aligned = typename std::aligned_storage::value>::type; + + public: + + struct Linked { std::atomic next; }; + + ActorQueue() : head(reinterpret_cast(new Aligned)), // dummy transient placeholder + tail(head.load(std::memory_order_relaxed)), + count(0), + lastFront(nullptr), + prevFront(tail.load(std::memory_order_relaxed)) + { + prevFront->next.store(nullptr, std::memory_order_relaxed); + } + + void clear() { while (front()) pop_front(); } + + ~ActorQueue() + { + clear(); + ::operator delete(head.load(std::memory_order_relaxed)); // also suitable for a dummy instance + } + + template std::size_t push_back(Linkable* item) // e.g. any type derived from 'Linked' + { + item->next.store(nullptr, std::memory_order_relaxed); + Item* back = head.exchange(item, std::memory_order_acq_rel); + back->next.store(item, std::memory_order_release); + return count.fetch_add(1, std::memory_order_release); // amount of queued items minus 1 + } + + inline Item* front() // returns nullptr if empty (must be always invoked just before pop_front()) + { + return lastFront = prevFront->next.load(std::memory_order_acquire); + } + + void pop_front() // also destroys and ultimately deletes the item + { + count.fetch_sub(1, std::memory_order_release); + tail.store(lastFront, std::memory_order_release); + lastFront->~Item(); // (atomic destructor is trivial) memory deletion is actually deferred one step behind + ::operator delete(prevFront); // delete the *previous* item memory no longer needed + prevFront = lastFront; + } + + inline std::size_t size() const { return count.load(std::memory_order_acquire); } + + inline bool empty() const { return !size(); } + + private: + + std::atomic head; // the stored objects hold the linked list pointers (single memory allocation) + std::atomic tail; + std::atomic count; + Item* lastFront; + Item* prevFront; + }; + + protected: + + struct ActorParcel : public ActorQueue::Linked { virtual ~ActorParcel() {} virtual void deliverTo(Runnable* instance) = 0; }; + private: + template struct ActorMessage : public ActorParcel // wraps any type { ActorMessage(Any&& msg) : message(std::move(msg)) {} @@ -369,19 +432,22 @@ template class ActorThread } catch (...) { return false; } + protected: + template void post(Any&& msg) // runs on the calling thread { auto& mbox = HighPri? mboxHighPri : mboxNormPri; - std::lock_guard lock(mtx); if (!dispatching) return; // don't store anything in a frozen queue - bool isIdle = mbox.empty(); - mbox.emplace_back(new Parcelable(std::forward(msg))); + bool isIdle = mbox.push_back(new Parcelable(std::forward(msg))) == 0; if (HighPri) mboxPaused = false; - if (!isIdle) return; - messageWaiter.notify_one(); + if (!isIdle) return; // if the consumer has pending messages (e.g. under high load) this method returns here + std::lock_guard lock(mtx); // under high load is only acquired in eventsLoop() (no effective lock) + messageWaiter.notify_one(); // wakeup the consumer thread static_cast(this)->onWaitingEvents(); } + private: + int dispatcher() // runs on the wrapped thread { id = std::this_thread::get_id(); @@ -390,10 +456,9 @@ template class ActorThread for (;;) { burst = 0; - auto status = eventsLoop(); - if (!std::get<0>(status)) break; // dispatching == false + eventsLoop(); + if (!dispatching) break; runnable->onDispatching(); - std::lock_guard lock(mtx); externalDispatcher = false; } runnable->onStop(); @@ -404,51 +469,49 @@ template class ActorThread void retryMbox(const DispatchRetry&) { mboxPaused = false; } - std::tuple eventsLoop() + std::pair eventsLoop() { bool haveTimerLapse = false; TimerClock::duration timerLapse; Runnable* runnable = static_cast(this); - std::unique_lock ulock(mtx); - while (dispatching) + bool mustDispatch = true; + while (dispatching && mustDispatch) { bool hasHigh = !mboxHighPri.empty(); bool hasNorm = !mboxNormPri.empty(); - bool isPaused = mboxPaused; - if (!isPaused && (hasHigh || hasNorm)) // consume the messages queue + if (!mboxPaused && (hasHigh || hasNorm)) // consume the messages queue { auto& mbox = hasHigh? mboxHighPri : mboxNormPri; - try { - auto& msg = mbox.front(); // queue iterator valid through insertions (but thread-unsafe call) - ulock.unlock(); - msg->deliverTo(runnable); - msg.reset(); // delete the argument before getting the lock (prevent a self-lock - ulock.lock(); // if that object sends a message to this thread from its destructor) - mbox.pop_front(); + while (ActorParcel* msg = mbox.front()) + { + msg->deliverTo(runnable); + mbox.pop_front(); + if ((++burst % 64) == 0) + { + if (externalDispatcher) // do not monopolize the CPU on this dispatcher + { + runnable->onWaitingEvents(); // queue a resume request + mustDispatch = false; + } + break; // keep an eye on the timers + } + } } catch (const DispatchRetry& retry) { auto event = Channel([this](const DispatchRetry& dr) { retryMbox(dr); }); timerStart(retry, retry.retryInterval, std::move(event)); - ulock.lock(); mboxPaused = true; } - - if (externalDispatcher && ((++burst % 64) == 0)) // do not monopolize the CPU on this dispatcher - { - runnable->onWaitingEvents(); // queue a resume request - break; - } } - ulock.unlock(); auto firstTimer = timers.cbegin(); if (firstTimer == timers.cend()) { - ulock.lock(); + std::unique_lock ulock(mtx); // lock required *here* to overcome the sleeping barber problem if (mboxNormPri.empty() && mboxHighPri.empty() && dispatching) { idleWaiter.notify_all(); @@ -463,11 +526,10 @@ template class ActorThread { auto timerEvent = *firstTimer; // this shared_ptr keeps it alive when self-removed from the set timerEvent->deliverTo(runnable); // here it could be self-removed (timerStop) - ulock.lock(); } else // the other timers are scheduled even further { - ulock.lock(); + std::unique_lock ulock(mtx); // prevent sleeping barber problem if (dispatching && mboxHighPri.empty() && (mboxNormPri.empty() || mboxPaused)) { idleWaiter.notify_all(); @@ -482,7 +544,7 @@ template class ActorThread } } } - return std::make_tuple(dispatching, haveTimerLapse, timerLapse); // take advantage of the acquired lock + return std::make_pair(haveTimerLapse, timerLapse); } template struct ActorPointedKeyComparator @@ -493,9 +555,9 @@ template class ActorThread } }; - bool dispatching; - bool externalDispatcher; - bool detached; + std::atomic dispatching; + std::atomic externalDispatcher; + std::atomic detached; mutable std::weak_ptr weak_this; std::thread runner; std::thread::id id; @@ -503,9 +565,9 @@ template class ActorThread mutable std::mutex mtx; std::condition_variable messageWaiter; std::condition_variable idleWaiter; - std::deque> mboxNormPri; - std::deque> mboxHighPri; - bool mboxPaused; + ActorQueue mboxNormPri; + ActorQueue mboxHighPri; + std::atomic mboxPaused; uint16_t burst; std::set, ActorPointedKeyComparator> timers; // ordered by deadline }; diff --git a/include/sys++/String.hpp b/include/sys++/String.hpp index 06760e5..5499511 100644 --- a/include/sys++/String.hpp +++ b/include/sys++/String.hpp @@ -12,41 +12,46 @@ #include #include +#ifndef VA_STR #define VA_STR(x) static_cast(std::ostringstream().flush() << x).str() +#endif struct String // a "namespace" not requiring a cpp nor inlining to avoid "unused function" warnings { - static std::string& tolower(std::string& str) + static void tolower(std::string& str) { std::transform(str.begin(), str.end(), str.begin(), ::tolower); - return str; } - static std::string& toupper(std::string& str) + static void toupper(std::string& str) { std::transform(str.begin(), str.end(), str.begin(), ::toupper); - return str; } - static std::string& ltrim(std::string& str) + static void ltrim(std::string& str) { std::string::iterator i = str.begin(); while (i != str.end()) if (!std::isspace(*i)) break; else ++i; str.erase(str.begin(), i); - return str; } - static std::string& rtrim(std::string& str) + static void rtrim(std::string& str) { std::string::iterator i = str.end(); while (i != str.begin()) if (!std::isspace(*(--i))) { ++i; break; } str.erase(i, str.end()); - return str; } - static std::string& trim(std::string& str) + static void trim(std::string& str) { - return ltrim(rtrim(str)); + rtrim(str); + ltrim(str); + } + + static std::string trimmed(std::string str) + { + trim(str); + return str; } static std::string right(const std::string& str, std::string::size_type count) @@ -54,7 +59,7 @@ struct String // a "namespace" not requiring a cpp nor inlining to avoid "unused return str.substr(str.size() - std::min(count, str.size())); } - static std::string& replaceAll(std::string& str, const std::string& sWhat, const std::string& sWith) + static void replaceAll(std::string& str, const std::string& sWhat, const std::string& sWith) { std::string::size_type lookHere = 0; std::string::size_type foundHere; @@ -63,7 +68,6 @@ struct String // a "namespace" not requiring a cpp nor inlining to avoid "unused str.replace(foundHere, sWhat.size(), sWith); lookHere = foundHere + sWith.size(); } - return str; } template static void split(const std::string& str, const char delimiter, T& result, bool trimmed = true) diff --git a/posix.mk b/posix.mk index dbe4347..8bdf262 100644 --- a/posix.mk +++ b/posix.mk @@ -40,11 +40,14 @@ WARNFLAGS ?= -Wall -Wextra -pedantic -Wconversion -Wsign-conversion -Wsign-promo -Wpointer-arith -Wnon-virtual-dtor -Woverloaded-virtual -Wshadow -Wundef -Wmissing-include-dirs CXXFLAGS += $(ARCHFLAGS) $(WARNFLAGS) -INCLUDES += $(foreach dir,$(SUBPRJS),-I$(dir)/include) -LIBS += $(foreach dir,$(SUBPRJS),$(wildcard $(dir)/$(BUILD_DIR)/*.a)) + +CXXFLAGS := $(filter-out $(SKIPFLAGS), $(CXXFLAGS)) # allow skipping particular warnings + +INCLUDES := $(foreach dir,$(SUBPRJS),-I$(dir)/include) $(INCLUDES) +LIBS := $(foreach dir,$(SUBPRJS),$(wildcard $(dir)/$(BUILD_DIR)/*.a)) $(LIBS) ifdef OUT_LIB -INCLUDES += -Iinclude +INCLUDES := -Iinclude $(INCLUDES) endif ifndef MK_NOHL