From 7dab487bde716cf5e82d3dfe00c059d99ed09ed0 Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Mon, 15 Apr 2019 00:12:29 +0200 Subject: [PATCH] Implement a JSON API friendly notification system This should bit by bit substitute RsNotify which would be very difficult to support properly in JSON API. The new system is much simpler to use also from the C++ side of the moon. BroadcastDiscovery take advantage of the new system to notify about new non friend peer discovered, tested successfully also in JSON API. --- libretroshare/src/jsonapi/jsonapi.cpp | 59 ++++++ libretroshare/src/libretroshare.pro | 3 + .../src/retroshare/rsbroadcastdiscovery.h | 46 +++-- libretroshare/src/retroshare/rsevents.h | 147 +++++++++++++++ libretroshare/src/rsserver/p3face-server.cc | 9 + .../src/services/broadcastdiscoveryservice.cc | 57 +----- .../src/services/broadcastdiscoveryservice.h | 18 -- libretroshare/src/services/rseventsservice.cc | 171 ++++++++++++++++++ libretroshare/src/services/rseventsservice.h | 77 ++++++++ 9 files changed, 503 insertions(+), 84 deletions(-) create mode 100644 libretroshare/src/retroshare/rsevents.h create mode 100644 libretroshare/src/services/rseventsservice.cc create mode 100644 libretroshare/src/services/rseventsservice.h diff --git a/libretroshare/src/jsonapi/jsonapi.cpp b/libretroshare/src/jsonapi/jsonapi.cpp index 71d748dcc..eaceacf08 100644 --- a/libretroshare/src/jsonapi/jsonapi.cpp +++ b/libretroshare/src/jsonapi/jsonapi.cpp @@ -37,6 +37,7 @@ #include "retroshare/rsinit.h" #include "util/rsurl.h" #include "util/rstime.h" +#include "retroshare/rsevents.h" // Generated at compile time #include "jsonapi-includes.inl" @@ -278,6 +279,64 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress, } ); }, true); + registerHandler("/rsEvents/registerEventsHandler", + [this](const std::shared_ptr session) + { + const std::multimap headers + { + { "Connection", "keep-alive" }, + { "Content-Type", "text/event-stream" } + }; + session->yield(rb::OK, headers); + + size_t reqSize = session->get_request()->get_header("Content-Length", 0); + session->fetch( reqSize, [this]( + const std::shared_ptr session, + const rb::Bytes& body ) + { + INITIALIZE_API_CALL_JSON_CONTEXT; + + if( !checkRsServicePtrReady( + rsEvents, "rsEvents", cAns, session ) ) + return; + + const std::weak_ptr weakSession(session); + RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId(); + std::function multiCallback = + [weakSession, hId](const RsEvent& event) + { + auto session = weakSession.lock(); + if(!session || session->is_closed()) + { + if(rsEvents) rsEvents->unregisterEventsHandler(hId); + return; + } + + RsGenericSerializer::SerializeContext ctx; + RsTypeSerializer::serial_process( + RsGenericSerializer::TO_JSON, ctx, + const_cast(event), "event" ); + + std::stringstream message; + message << "data: " << compactJSON << ctx.mJson << "\n\n"; + session->yield(message.str()); + }; + + bool retval = rsEvents->registerEventsHandler(multiCallback, hId); + + { + RsGenericSerializer::SerializeContext& ctx(cAns); + RsGenericSerializer::SerializeJob j(RsGenericSerializer::TO_JSON); + RS_SERIAL_PROCESS(retval); + } + + // return them to the API caller + std::stringstream message; + message << "data: " << compactJSON << cAns.mJson << "\n\n"; + session->yield(message.str()); + } ); + }, true); + // Generated at compile time #include "jsonapi-wrappers.inl" } diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index 339412bb4..dabe2e7af 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -134,6 +134,7 @@ SOURCES += tcponudp/udppeer.cc \ PUBLIC_HEADERS = retroshare/rsdisc.h \ + retroshare/rsevents.h \ retroshare/rsexpr.h \ retroshare/rsfiles.h \ retroshare/rshistory.h \ @@ -459,6 +460,7 @@ HEADERS += rsitems/rsitem.h \ rsitems/rsserviceinfoitems.h \ HEADERS += services/autoproxy/p3i2pbob.h \ + services/rseventsservice.h \ services/autoproxy/rsautoproxymonitor.h \ services/p3msgservice.h \ services/p3service.h \ @@ -609,6 +611,7 @@ SOURCES += serialiser/rsbaseserial.cc \ SOURCES += services/autoproxy/rsautoproxymonitor.cc \ + services/rseventsservice.cc \ services/autoproxy/p3i2pbob.cc \ services/p3msgservice.cc \ services/p3service.cc \ diff --git a/libretroshare/src/retroshare/rsbroadcastdiscovery.h b/libretroshare/src/retroshare/rsbroadcastdiscovery.h index 5136c7334..f0a2ee037 100644 --- a/libretroshare/src/retroshare/rsbroadcastdiscovery.h +++ b/libretroshare/src/retroshare/rsbroadcastdiscovery.h @@ -30,6 +30,7 @@ #include "util/rstime.h" #include "util/rsurl.h" #include "util/rsmemory.h" +#include "retroshare/rsevents.h" class RsBroadcastDiscovery; @@ -46,6 +47,7 @@ struct RsBroadcastDiscoveryResult : RsSerializable std::string mProfileName; RsUrl locator; + /// @see RsSerializable void serial_process( RsGenericSerializer::SerializeJob j, RsGenericSerializer::SerializeContext& ctx) override { @@ -56,11 +58,34 @@ struct RsBroadcastDiscoveryResult : RsSerializable } }; +struct RsBroadcastDiscoveryPeerFoundEvent : RsEvent +{ + RsBroadcastDiscoveryPeerFoundEvent( + const RsBroadcastDiscoveryResult& eventData ) : + RsEvent(RsEventType::BROADCAST_DISCOVERY_PEER_FOUND), mData(eventData) {} + + RsBroadcastDiscoveryResult mData; + + /// @see RsSerializable + void serial_process( RsGenericSerializer::SerializeJob j, + RsGenericSerializer::SerializeContext& ctx) override + { + RsEvent::serial_process(j, ctx); + RS_SERIAL_PROCESS(mData); + } + + ~RsBroadcastDiscoveryPeerFoundEvent(); +}; + +/** + * Announce own RetroShare instace and look friends and peers in own broadcast + * domain (aka LAN). + * Emit event @see RsBroadcastDiscoveryPeerFoundEvent when a new peer (not + * friend yet) is found. + */ class RsBroadcastDiscovery { public: - virtual ~RsBroadcastDiscovery(); - /** * @brief Get potential peers that have been discovered up until now * @jsonapi{development} @@ -68,20 +93,5 @@ public: */ virtual std::vector getDiscoveredPeers() = 0; - /** - * @brief registerPeersDiscoveredEventHandler - * @jsonapi{development} - * @param multiCallback function that will be called each time a potential - * peer is discovered - * @param[in] maxWait maximum wait time in seconds for discovery results, - * passing std::numeric_limits::max() means wait forever. - * @param[out] errorMessage Optional storage for error message, meaningful - * only on failure. - * @return false on error, true otherwise - */ - virtual bool registerPeersDiscoveredEventHandler( - const std::function& - multiCallback, - rstime_t maxWait = 300, - std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) ) = 0; + virtual ~RsBroadcastDiscovery(); }; diff --git a/libretroshare/src/retroshare/rsevents.h b/libretroshare/src/retroshare/rsevents.h new file mode 100644 index 000000000..9d8a0392a --- /dev/null +++ b/libretroshare/src/retroshare/rsevents.h @@ -0,0 +1,147 @@ +/******************************************************************************* + * Retroshare events service * + * * + * libretroshare: retroshare core library * + * * + * Copyright (C) 2019 Gioacchino Mazzurco * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU Lesser General Public License as * + * published by the Free Software Foundation, either version 3 of the * + * License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public License * + * along with this program. If not, see . * + * * + *******************************************************************************/ +#pragma once + +#include +#include + +#include "util/rsmemory.h" +#include "serialiser/rsserializable.h" +#include "serialiser/rstypeserializer.h" + +class RsEvents; + +/** + * Pointer to global instance of RsEvents service implementation + * @jsonapi{development} + */ +extern std::shared_ptr rsEvents; + +/** + * @brief Events types. + * When creating a new type of event, add a new type here and use that to + * initialize mType in the constructor of your derivative of @see RsEvent + */ +enum class RsEventType : uint32_t +{ + NONE = 0, /// Used to detect uninitialized event + + BROADCAST_DISCOVERY_PEER_FOUND = 1, /// @see RsBroadcastDiscovery + + MAX /// Used to detect invalid event type passed +}; + +/** + * This struct is not meant to be used directly, you should create events type + * deriving from it. + */ +struct RsEvent : RsSerializable +{ + RsEvent() = delete; + RsEvent(RsEventType type) : + mType(type), mTimePoint(std::chrono::system_clock::now()) {} + virtual ~RsEvent(); + + RsEventType mType; + std::chrono::system_clock::time_point mTimePoint; + + /** + * Derived types must call this method at beginning of their implementation + * of serial_process + * @see RsSerializable + */ + virtual void serial_process(RsGenericSerializer::SerializeJob j, + RsGenericSerializer::SerializeContext& ctx) + { + RS_SERIAL_PROCESS(mType); + + rstime_t mTime = std::chrono::system_clock::to_time_t(mTimePoint); + RS_SERIAL_PROCESS(mTime); + mTimePoint = std::chrono::system_clock::from_time_t(mTime); + } +}; + +typedef uint32_t RsEventsHandlerId_t; + +class RsEvents +{ +public: + /** + * @brief Post event to the event queue. + * @param[in] event + * @param[out] errorMessage Optional storage for error messsage, meaningful + * only on failure. + * @return False on error, true otherwise. + */ + virtual bool postEvent( + std::unique_ptr event, + std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) + ) = 0; + + /** + * @brief Send event directly to handlers. Blocking API + * The handlers get exectuded on the caller thread, ensuring the function + * returns only after the event has been handled. + * @param[in] event + * @param[out] errorMessage Optional storage for error messsage, meaningful + * only on failure. + * @return False on error, true otherwise. + */ + virtual bool sendEvent( + const RsEvent& event, + std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) + ) = 0; + + /** + * @brief Generate unique handler identifier + * @return generate Id + */ + virtual RsEventsHandlerId_t generateUniqueHandlerId() = 0; + + /** + * @brief Register events handler + * Every time an event is dispatced the registered events handlers will get + * their method handleEvent called with the event passed as paramether. + * @jsonapi{development,manualwrapper} + * @param multiCallback Function that will be called each time an event + * is dispatched. + * @param[inout] hId Optional storage for handler id, useful to + * eventually unregister the handler later. The + * value may be provided to the function call but + * must habe been generated with + * @see generateUniqueHandlerId() + * @return False on error, true otherwise. + */ + virtual bool registerEventsHandler( + std::function multiCallback, + RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0) + ) = 0; + + /** + * @brief Unregister event handler + * @param[in] hId Id of the event handler to unregister + * @return True if the handler id has been found, false otherwise. + */ + virtual bool unregisterEventsHandler(RsEventsHandlerId_t hId) = 0; + + virtual ~RsEvents(); +}; diff --git a/libretroshare/src/rsserver/p3face-server.cc b/libretroshare/src/rsserver/p3face-server.cc index 40bc1c452..ed4981b6b 100644 --- a/libretroshare/src/rsserver/p3face-server.cc +++ b/libretroshare/src/rsserver/p3face-server.cc @@ -39,6 +39,9 @@ int rsserverzone = 101; #include "util/rsdebug.h" +#include "retroshare/rsevents.h" +#include "services/rseventsservice.h" + /**** #define DEBUG_TICK 1 @@ -81,6 +84,12 @@ RsServer::RsServer() : coreMutex("RsServer"), mShutdownCallback([](int){}), coreReady(false) { + { + RsEventsService* tmpRsEvtPtr = new RsEventsService(); + rsEvents.reset(tmpRsEvtPtr); + startServiceThread(tmpRsEvtPtr, "RsEventsService"); + } + // This is needed asap. // mNotify = new p3Notify() ; diff --git a/libretroshare/src/services/broadcastdiscoveryservice.cc b/libretroshare/src/services/broadcastdiscoveryservice.cc index f9031abf3..19f612414 100644 --- a/libretroshare/src/services/broadcastdiscoveryservice.cc +++ b/libretroshare/src/services/broadcastdiscoveryservice.cc @@ -29,9 +29,11 @@ #include "retroshare/rspeers.h" #include "serialiser/rsserializable.h" #include "serialiser/rsserializer.h" +#include "retroshare/rsevents.h" /*extern*/ std::shared_ptr rsBroadcastDiscovery(nullptr); RsBroadcastDiscovery::~RsBroadcastDiscovery() { /* Beware of Rs prefix! */ } +RsBroadcastDiscoveryPeerFoundEvent::~RsBroadcastDiscoveryPeerFoundEvent() {} struct BroadcastDiscoveryPack : RsSerializable { @@ -87,9 +89,7 @@ struct BroadcastDiscoveryPack : RsSerializable BroadcastDiscoveryService::BroadcastDiscoveryService( RsPeers& pRsPeers ) : mDiscoveredDataMutex("BroadcastDiscoveryService discovered data mutex"), - mRsPeers(pRsPeers), - mPeersDiscoveredEventHandlersListMutex( - "BroadcastDiscoveryService event handlers mutex" ) + mRsPeers(pRsPeers) { if(mRsPeers.isHiddenNode(mRsPeers.getOwnId())) return; @@ -151,10 +151,7 @@ void BroadcastDiscoveryService::data_tick() } mDiscoveredDataMutex.unlock(); - cleanTimedOutEventHandlers(); - - mPeersDiscoveredEventHandlersListMutex.lock(); - if(!mChangedData.empty() && !mPeersDiscoveredEventHandlersList.empty()) + if(!mChangedData.empty()) { for (auto&& pp : mChangedData) { @@ -173,12 +170,13 @@ void BroadcastDiscoveryService::data_tick() rbdr.locator.port() ); mRsPeers.connectAttempt(rbdr.mSslId); } - - for( const timedDiscHandlers_t& evtHandler : - mPeersDiscoveredEventHandlersList ) evtHandler.first(rbdr); + else if(rsEvents) + { + typedef RsBroadcastDiscoveryPeerFoundEvent Evt_t; + rsEvents->postEvent(std::unique_ptr(new Evt_t(rbdr))); + } } } - mPeersDiscoveredEventHandlersListMutex.unlock(); } /* Probably this would be better if done only on actual change */ @@ -205,40 +203,3 @@ RsBroadcastDiscoveryResult BroadcastDiscoveryService::createResult( return rbdr; } - -void BroadcastDiscoveryService::cleanTimedOutEventHandlers() -{ - auto now = std::chrono::system_clock::now(); - - RS_STACK_MUTEX(mPeersDiscoveredEventHandlersListMutex); - mPeersDiscoveredEventHandlersList.remove_if( - [&](timedDiscHandlers_t h) { return h.second <= now; } ); -} - -bool BroadcastDiscoveryService::registerPeersDiscoveredEventHandler( - const std::function& - multiCallback, rstime_t maxWait, std::string& errorMessage ) -{ - auto now = std::chrono::system_clock::now(); - auto timeout = std::chrono::system_clock::time_point::max(); - if(maxWait != std::numeric_limits::max()) - timeout = now + std::chrono::seconds(maxWait); - - if(timeout <= now) - { - errorMessage = " Invalid maxWait value: " + std::to_string(maxWait) + - " either too big or too little, use: " + - "std::numeric_limits::max() == " + - std::to_string(std::numeric_limits::max()) + - " if you meant \"wait forever\""; - - std::cerr << __PRETTY_FUNCTION__ << errorMessage << std::endl; - return false; - } - - RS_STACK_MUTEX(mPeersDiscoveredEventHandlersListMutex); - mPeersDiscoveredEventHandlersList.push_front( - std::make_pair(multiCallback,timeout) ); - - return true; -} diff --git a/libretroshare/src/services/broadcastdiscoveryservice.h b/libretroshare/src/services/broadcastdiscoveryservice.h index c2f70e0cd..ddee16fb3 100644 --- a/libretroshare/src/services/broadcastdiscoveryservice.h +++ b/libretroshare/src/services/broadcastdiscoveryservice.h @@ -43,14 +43,6 @@ public: /// @see RsBroadcastDiscovery std::vector getDiscoveredPeers() override; - /// @see RsBroadcastDiscovery - bool registerPeersDiscoveredEventHandler( - const std::function& - multiCallback, - rstime_t maxWait = 300, - std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) - ) override; - /// @see RsTickingThread void data_tick() override; @@ -68,16 +60,6 @@ protected: RsPeers& mRsPeers; // TODO: std::shared_ptr mRsPeers; - typedef std::pair< - std::function, - std::chrono::system_clock::time_point > timedDiscHandlers_t; - /** Store peer discovered event handlers with timeout */ - std::forward_list mPeersDiscoveredEventHandlersList; - RsMutex mPeersDiscoveredEventHandlersListMutex; - - /// Cleanup mPeersDiscoveredEventHandlersList - void cleanTimedOutEventHandlers(); - RsBroadcastDiscoveryResult createResult( const UDC::IpPort& ipp, const std::string& uData ); }; diff --git a/libretroshare/src/services/rseventsservice.cc b/libretroshare/src/services/rseventsservice.cc new file mode 100644 index 000000000..57053f4ee --- /dev/null +++ b/libretroshare/src/services/rseventsservice.cc @@ -0,0 +1,171 @@ +/******************************************************************************* + * Retroshare events service * + * * + * libretroshare: retroshare core library * + * * + * Copyright (C) 2019 Gioacchino Mazzurco * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU Lesser General Public License as * + * published by the Free Software Foundation, either version 3 of the * + * License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public License * + * along with this program. If not, see . * + * * + *******************************************************************************/ + +#include + +#include "services/rseventsservice.h" + + +/*extern*/ std::shared_ptr rsEvents(nullptr); +RsEvent::~RsEvent() {}; +RsEvents::~RsEvents() {}; + +bool isEventValid(const RsEvent& event, std::string& errorMessage) +{ + if(event.mType <= RsEventType::NONE) + { + errorMessage = "Event has type NONE: " + + std::to_string( + static_cast::type >( + event.mType ) ); + return false; + } + + if(event.mType >= RsEventType::MAX) + { + errorMessage = "Event has type >= RsEventType::MAX: " + + std::to_string( + static_cast::type >( + event.mType ) ); + } + + return true; +} + +bool RsEventsService::postEvent( std::unique_ptr event, + std::string& errorMessage ) +{ + if(!isEventValid(*event, errorMessage)) + { + std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage + << std::endl; + return false; + } + + RS_STACK_MUTEX(mEventQueueMtx); + mEventQueue.push_back(std::move(event)); + return true; +} + +bool RsEventsService::sendEvent( const RsEvent& event, + std::string& errorMessage ) +{ + if(!isEventValid(event, errorMessage)) + { + std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage + << std::endl; + return false; + } + + handleEvent(event); + return true; +} + +RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId() +{ + RS_STACK_MUTEX(mHandlerMapMtx); + return generateUniqueHandlerId_unlocked(); +} + +RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId_unlocked() +{ + if(++mLastHandlerId) return mLastHandlerId; // Avoid 0 after overflow + return 1; +} + +bool RsEventsService::registerEventsHandler( + std::function multiCallback, + RsEventsHandlerId_t& hId ) +{ + RS_STACK_MUTEX(mHandlerMapMtx); + if(!hId) hId = generateUniqueHandlerId_unlocked(); + mHandlerMap[hId] = std::move(multiCallback); + return true; +} + +bool RsEventsService::unregisterEventsHandler(RsEventsHandlerId_t hId) +{ + RS_STACK_MUTEX(mHandlerMapMtx); + auto it = mHandlerMap.find(hId); + if(it == mHandlerMap.end()) return false; + mHandlerMap.erase(it); + return true; +} + +void RsEventsService::data_tick() +{ + auto nextRunAt = std::chrono::system_clock::now() + + std::chrono::milliseconds(1); + + std::unique_ptr eventPtr(nullptr); + uint futureEventsCounter = 0; + +dispatchEventFromQueueLock: + mEventQueueMtx.lock(); + if(mEventQueue.size() > futureEventsCounter) + { + eventPtr = std::move(mEventQueue.front()); + mEventQueue.pop_front(); + + if(eventPtr->mTimePoint >= nextRunAt) + { + mEventQueue.push_back(std::move(eventPtr)); + ++futureEventsCounter; + } + } + mEventQueueMtx.unlock(); + + if(eventPtr) + { + /* It is relevant that this stays out of mEventQueueMtx */ + handleEvent(*eventPtr); + eventPtr.reset(nullptr); // ensure memory is freed before sleep + goto dispatchEventFromQueueLock; + } + + std::this_thread::sleep_until(nextRunAt); +} + +void RsEventsService::handleEvent(const RsEvent& event) +{ + std::function mCallback; + + mHandlerMapMtx.lock(); + auto cbpt = mHandlerMap.begin(); + mHandlerMapMtx.unlock(); + +getHandlerFromMapLock: + mHandlerMapMtx.lock(); + if(cbpt != mHandlerMap.end()) + { + mCallback = cbpt->second; + ++cbpt; + } + mHandlerMapMtx.unlock(); + + if(mCallback) + { + mCallback(event); // It is relevant that this happens outside mutex + mCallback = std::function(nullptr); + goto getHandlerFromMapLock; + } +} diff --git a/libretroshare/src/services/rseventsservice.h b/libretroshare/src/services/rseventsservice.h new file mode 100644 index 000000000..7d39c7a46 --- /dev/null +++ b/libretroshare/src/services/rseventsservice.h @@ -0,0 +1,77 @@ +/******************************************************************************* + * Retroshare events service * + * * + * libretroshare: retroshare core library * + * * + * Copyright (C) 2019 Gioacchino Mazzurco * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU Lesser General Public License as * + * published by the Free Software Foundation, either version 3 of the * + * License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public License * + * along with this program. If not, see . * + * * + *******************************************************************************/ +#pragma once + +#include +#include +#include + +#include "retroshare/rsevents.h" +#include "util/rsthreads.h" + +class RsEventsService : + public RsEvents, public RsTickingThread +{ +public: + RsEventsService(): + mHandlerMapMtx("RsEventsService::mHandlerMapMtx"), mLastHandlerId(1), + mEventQueueMtx("RsEventsService::mEventQueueMtx") {} + + /// @see RsEvents + bool postEvent( + std::unique_ptr event, + std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) + ) override; + + /// @see RsEvents + bool sendEvent( + const RsEvent& event, + std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) + ) override; + + /// @see RsEvents + RsEventsHandlerId_t generateUniqueHandlerId() override; + + /// @see RsEvents + bool registerEventsHandler( + std::function multiCallback, + RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0) + ) override; + + /// @see RsEvents + bool unregisterEventsHandler(RsEventsHandlerId_t hId) override; + +protected: + RsMutex mHandlerMapMtx; + RsEventsHandlerId_t mLastHandlerId; + std::map< RsEventsHandlerId_t, std::function > + mHandlerMap; + + RsMutex mEventQueueMtx; + std::deque< std::unique_ptr > mEventQueue; + + /// @see RsTickingThread + void data_tick() override; + + void handleEvent(const RsEvent& event); + RsEventsHandlerId_t generateUniqueHandlerId_unlocked(); +};