diff --git a/libretroshare/src/jsonapi/jsonapi.cpp b/libretroshare/src/jsonapi/jsonapi.cpp index 71d748dcc..eaceacf08 100644 --- a/libretroshare/src/jsonapi/jsonapi.cpp +++ b/libretroshare/src/jsonapi/jsonapi.cpp @@ -37,6 +37,7 @@ #include "retroshare/rsinit.h" #include "util/rsurl.h" #include "util/rstime.h" +#include "retroshare/rsevents.h" // Generated at compile time #include "jsonapi-includes.inl" @@ -278,6 +279,64 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress, } ); }, true); + registerHandler("/rsEvents/registerEventsHandler", + [this](const std::shared_ptr session) + { + const std::multimap headers + { + { "Connection", "keep-alive" }, + { "Content-Type", "text/event-stream" } + }; + session->yield(rb::OK, headers); + + size_t reqSize = session->get_request()->get_header("Content-Length", 0); + session->fetch( reqSize, [this]( + const std::shared_ptr session, + const rb::Bytes& body ) + { + INITIALIZE_API_CALL_JSON_CONTEXT; + + if( !checkRsServicePtrReady( + rsEvents, "rsEvents", cAns, session ) ) + return; + + const std::weak_ptr weakSession(session); + RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId(); + std::function multiCallback = + [weakSession, hId](const RsEvent& event) + { + auto session = weakSession.lock(); + if(!session || session->is_closed()) + { + if(rsEvents) rsEvents->unregisterEventsHandler(hId); + return; + } + + RsGenericSerializer::SerializeContext ctx; + RsTypeSerializer::serial_process( + RsGenericSerializer::TO_JSON, ctx, + const_cast(event), "event" ); + + std::stringstream message; + message << "data: " << compactJSON << ctx.mJson << "\n\n"; + session->yield(message.str()); + }; + + bool retval = rsEvents->registerEventsHandler(multiCallback, hId); + + { + RsGenericSerializer::SerializeContext& ctx(cAns); + RsGenericSerializer::SerializeJob j(RsGenericSerializer::TO_JSON); + RS_SERIAL_PROCESS(retval); + } + + // return them to the API caller + std::stringstream message; + message << "data: " << compactJSON << cAns.mJson << "\n\n"; + session->yield(message.str()); + } ); + }, true); + // Generated at compile time #include "jsonapi-wrappers.inl" } diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index 339412bb4..dabe2e7af 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -134,6 +134,7 @@ SOURCES += tcponudp/udppeer.cc \ PUBLIC_HEADERS = retroshare/rsdisc.h \ + retroshare/rsevents.h \ retroshare/rsexpr.h \ retroshare/rsfiles.h \ retroshare/rshistory.h \ @@ -459,6 +460,7 @@ HEADERS += rsitems/rsitem.h \ rsitems/rsserviceinfoitems.h \ HEADERS += services/autoproxy/p3i2pbob.h \ + services/rseventsservice.h \ services/autoproxy/rsautoproxymonitor.h \ services/p3msgservice.h \ services/p3service.h \ @@ -609,6 +611,7 @@ SOURCES += serialiser/rsbaseserial.cc \ SOURCES += services/autoproxy/rsautoproxymonitor.cc \ + services/rseventsservice.cc \ services/autoproxy/p3i2pbob.cc \ services/p3msgservice.cc \ services/p3service.cc \ diff --git a/libretroshare/src/retroshare/rsbroadcastdiscovery.h b/libretroshare/src/retroshare/rsbroadcastdiscovery.h index 5136c7334..f0a2ee037 100644 --- a/libretroshare/src/retroshare/rsbroadcastdiscovery.h +++ b/libretroshare/src/retroshare/rsbroadcastdiscovery.h @@ -30,6 +30,7 @@ #include "util/rstime.h" #include "util/rsurl.h" #include "util/rsmemory.h" +#include "retroshare/rsevents.h" class RsBroadcastDiscovery; @@ -46,6 +47,7 @@ struct RsBroadcastDiscoveryResult : RsSerializable std::string mProfileName; RsUrl locator; + /// @see RsSerializable void serial_process( RsGenericSerializer::SerializeJob j, RsGenericSerializer::SerializeContext& ctx) override { @@ -56,11 +58,34 @@ struct RsBroadcastDiscoveryResult : RsSerializable } }; +struct RsBroadcastDiscoveryPeerFoundEvent : RsEvent +{ + RsBroadcastDiscoveryPeerFoundEvent( + const RsBroadcastDiscoveryResult& eventData ) : + RsEvent(RsEventType::BROADCAST_DISCOVERY_PEER_FOUND), mData(eventData) {} + + RsBroadcastDiscoveryResult mData; + + /// @see RsSerializable + void serial_process( RsGenericSerializer::SerializeJob j, + RsGenericSerializer::SerializeContext& ctx) override + { + RsEvent::serial_process(j, ctx); + RS_SERIAL_PROCESS(mData); + } + + ~RsBroadcastDiscoveryPeerFoundEvent(); +}; + +/** + * Announce own RetroShare instace and look friends and peers in own broadcast + * domain (aka LAN). + * Emit event @see RsBroadcastDiscoveryPeerFoundEvent when a new peer (not + * friend yet) is found. + */ class RsBroadcastDiscovery { public: - virtual ~RsBroadcastDiscovery(); - /** * @brief Get potential peers that have been discovered up until now * @jsonapi{development} @@ -68,20 +93,5 @@ public: */ virtual std::vector getDiscoveredPeers() = 0; - /** - * @brief registerPeersDiscoveredEventHandler - * @jsonapi{development} - * @param multiCallback function that will be called each time a potential - * peer is discovered - * @param[in] maxWait maximum wait time in seconds for discovery results, - * passing std::numeric_limits::max() means wait forever. - * @param[out] errorMessage Optional storage for error message, meaningful - * only on failure. - * @return false on error, true otherwise - */ - virtual bool registerPeersDiscoveredEventHandler( - const std::function& - multiCallback, - rstime_t maxWait = 300, - std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) ) = 0; + virtual ~RsBroadcastDiscovery(); }; diff --git a/libretroshare/src/retroshare/rsevents.h b/libretroshare/src/retroshare/rsevents.h new file mode 100644 index 000000000..9d8a0392a --- /dev/null +++ b/libretroshare/src/retroshare/rsevents.h @@ -0,0 +1,147 @@ +/******************************************************************************* + * Retroshare events service * + * * + * libretroshare: retroshare core library * + * * + * Copyright (C) 2019 Gioacchino Mazzurco * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU Lesser General Public License as * + * published by the Free Software Foundation, either version 3 of the * + * License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public License * + * along with this program. If not, see . * + * * + *******************************************************************************/ +#pragma once + +#include +#include + +#include "util/rsmemory.h" +#include "serialiser/rsserializable.h" +#include "serialiser/rstypeserializer.h" + +class RsEvents; + +/** + * Pointer to global instance of RsEvents service implementation + * @jsonapi{development} + */ +extern std::shared_ptr rsEvents; + +/** + * @brief Events types. + * When creating a new type of event, add a new type here and use that to + * initialize mType in the constructor of your derivative of @see RsEvent + */ +enum class RsEventType : uint32_t +{ + NONE = 0, /// Used to detect uninitialized event + + BROADCAST_DISCOVERY_PEER_FOUND = 1, /// @see RsBroadcastDiscovery + + MAX /// Used to detect invalid event type passed +}; + +/** + * This struct is not meant to be used directly, you should create events type + * deriving from it. + */ +struct RsEvent : RsSerializable +{ + RsEvent() = delete; + RsEvent(RsEventType type) : + mType(type), mTimePoint(std::chrono::system_clock::now()) {} + virtual ~RsEvent(); + + RsEventType mType; + std::chrono::system_clock::time_point mTimePoint; + + /** + * Derived types must call this method at beginning of their implementation + * of serial_process + * @see RsSerializable + */ + virtual void serial_process(RsGenericSerializer::SerializeJob j, + RsGenericSerializer::SerializeContext& ctx) + { + RS_SERIAL_PROCESS(mType); + + rstime_t mTime = std::chrono::system_clock::to_time_t(mTimePoint); + RS_SERIAL_PROCESS(mTime); + mTimePoint = std::chrono::system_clock::from_time_t(mTime); + } +}; + +typedef uint32_t RsEventsHandlerId_t; + +class RsEvents +{ +public: + /** + * @brief Post event to the event queue. + * @param[in] event + * @param[out] errorMessage Optional storage for error messsage, meaningful + * only on failure. + * @return False on error, true otherwise. + */ + virtual bool postEvent( + std::unique_ptr event, + std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) + ) = 0; + + /** + * @brief Send event directly to handlers. Blocking API + * The handlers get exectuded on the caller thread, ensuring the function + * returns only after the event has been handled. + * @param[in] event + * @param[out] errorMessage Optional storage for error messsage, meaningful + * only on failure. + * @return False on error, true otherwise. + */ + virtual bool sendEvent( + const RsEvent& event, + std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) + ) = 0; + + /** + * @brief Generate unique handler identifier + * @return generate Id + */ + virtual RsEventsHandlerId_t generateUniqueHandlerId() = 0; + + /** + * @brief Register events handler + * Every time an event is dispatced the registered events handlers will get + * their method handleEvent called with the event passed as paramether. + * @jsonapi{development,manualwrapper} + * @param multiCallback Function that will be called each time an event + * is dispatched. + * @param[inout] hId Optional storage for handler id, useful to + * eventually unregister the handler later. The + * value may be provided to the function call but + * must habe been generated with + * @see generateUniqueHandlerId() + * @return False on error, true otherwise. + */ + virtual bool registerEventsHandler( + std::function multiCallback, + RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0) + ) = 0; + + /** + * @brief Unregister event handler + * @param[in] hId Id of the event handler to unregister + * @return True if the handler id has been found, false otherwise. + */ + virtual bool unregisterEventsHandler(RsEventsHandlerId_t hId) = 0; + + virtual ~RsEvents(); +}; diff --git a/libretroshare/src/rsserver/p3face-server.cc b/libretroshare/src/rsserver/p3face-server.cc index 40bc1c452..ed4981b6b 100644 --- a/libretroshare/src/rsserver/p3face-server.cc +++ b/libretroshare/src/rsserver/p3face-server.cc @@ -39,6 +39,9 @@ int rsserverzone = 101; #include "util/rsdebug.h" +#include "retroshare/rsevents.h" +#include "services/rseventsservice.h" + /**** #define DEBUG_TICK 1 @@ -81,6 +84,12 @@ RsServer::RsServer() : coreMutex("RsServer"), mShutdownCallback([](int){}), coreReady(false) { + { + RsEventsService* tmpRsEvtPtr = new RsEventsService(); + rsEvents.reset(tmpRsEvtPtr); + startServiceThread(tmpRsEvtPtr, "RsEventsService"); + } + // This is needed asap. // mNotify = new p3Notify() ; diff --git a/libretroshare/src/services/broadcastdiscoveryservice.cc b/libretroshare/src/services/broadcastdiscoveryservice.cc index f9031abf3..19f612414 100644 --- a/libretroshare/src/services/broadcastdiscoveryservice.cc +++ b/libretroshare/src/services/broadcastdiscoveryservice.cc @@ -29,9 +29,11 @@ #include "retroshare/rspeers.h" #include "serialiser/rsserializable.h" #include "serialiser/rsserializer.h" +#include "retroshare/rsevents.h" /*extern*/ std::shared_ptr rsBroadcastDiscovery(nullptr); RsBroadcastDiscovery::~RsBroadcastDiscovery() { /* Beware of Rs prefix! */ } +RsBroadcastDiscoveryPeerFoundEvent::~RsBroadcastDiscoveryPeerFoundEvent() {} struct BroadcastDiscoveryPack : RsSerializable { @@ -87,9 +89,7 @@ struct BroadcastDiscoveryPack : RsSerializable BroadcastDiscoveryService::BroadcastDiscoveryService( RsPeers& pRsPeers ) : mDiscoveredDataMutex("BroadcastDiscoveryService discovered data mutex"), - mRsPeers(pRsPeers), - mPeersDiscoveredEventHandlersListMutex( - "BroadcastDiscoveryService event handlers mutex" ) + mRsPeers(pRsPeers) { if(mRsPeers.isHiddenNode(mRsPeers.getOwnId())) return; @@ -151,10 +151,7 @@ void BroadcastDiscoveryService::data_tick() } mDiscoveredDataMutex.unlock(); - cleanTimedOutEventHandlers(); - - mPeersDiscoveredEventHandlersListMutex.lock(); - if(!mChangedData.empty() && !mPeersDiscoveredEventHandlersList.empty()) + if(!mChangedData.empty()) { for (auto&& pp : mChangedData) { @@ -173,12 +170,13 @@ void BroadcastDiscoveryService::data_tick() rbdr.locator.port() ); mRsPeers.connectAttempt(rbdr.mSslId); } - - for( const timedDiscHandlers_t& evtHandler : - mPeersDiscoveredEventHandlersList ) evtHandler.first(rbdr); + else if(rsEvents) + { + typedef RsBroadcastDiscoveryPeerFoundEvent Evt_t; + rsEvents->postEvent(std::unique_ptr(new Evt_t(rbdr))); + } } } - mPeersDiscoveredEventHandlersListMutex.unlock(); } /* Probably this would be better if done only on actual change */ @@ -205,40 +203,3 @@ RsBroadcastDiscoveryResult BroadcastDiscoveryService::createResult( return rbdr; } - -void BroadcastDiscoveryService::cleanTimedOutEventHandlers() -{ - auto now = std::chrono::system_clock::now(); - - RS_STACK_MUTEX(mPeersDiscoveredEventHandlersListMutex); - mPeersDiscoveredEventHandlersList.remove_if( - [&](timedDiscHandlers_t h) { return h.second <= now; } ); -} - -bool BroadcastDiscoveryService::registerPeersDiscoveredEventHandler( - const std::function& - multiCallback, rstime_t maxWait, std::string& errorMessage ) -{ - auto now = std::chrono::system_clock::now(); - auto timeout = std::chrono::system_clock::time_point::max(); - if(maxWait != std::numeric_limits::max()) - timeout = now + std::chrono::seconds(maxWait); - - if(timeout <= now) - { - errorMessage = " Invalid maxWait value: " + std::to_string(maxWait) + - " either too big or too little, use: " + - "std::numeric_limits::max() == " + - std::to_string(std::numeric_limits::max()) + - " if you meant \"wait forever\""; - - std::cerr << __PRETTY_FUNCTION__ << errorMessage << std::endl; - return false; - } - - RS_STACK_MUTEX(mPeersDiscoveredEventHandlersListMutex); - mPeersDiscoveredEventHandlersList.push_front( - std::make_pair(multiCallback,timeout) ); - - return true; -} diff --git a/libretroshare/src/services/broadcastdiscoveryservice.h b/libretroshare/src/services/broadcastdiscoveryservice.h index c2f70e0cd..ddee16fb3 100644 --- a/libretroshare/src/services/broadcastdiscoveryservice.h +++ b/libretroshare/src/services/broadcastdiscoveryservice.h @@ -43,14 +43,6 @@ public: /// @see RsBroadcastDiscovery std::vector getDiscoveredPeers() override; - /// @see RsBroadcastDiscovery - bool registerPeersDiscoveredEventHandler( - const std::function& - multiCallback, - rstime_t maxWait = 300, - std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) - ) override; - /// @see RsTickingThread void data_tick() override; @@ -68,16 +60,6 @@ protected: RsPeers& mRsPeers; // TODO: std::shared_ptr mRsPeers; - typedef std::pair< - std::function, - std::chrono::system_clock::time_point > timedDiscHandlers_t; - /** Store peer discovered event handlers with timeout */ - std::forward_list mPeersDiscoveredEventHandlersList; - RsMutex mPeersDiscoveredEventHandlersListMutex; - - /// Cleanup mPeersDiscoveredEventHandlersList - void cleanTimedOutEventHandlers(); - RsBroadcastDiscoveryResult createResult( const UDC::IpPort& ipp, const std::string& uData ); }; diff --git a/libretroshare/src/services/rseventsservice.cc b/libretroshare/src/services/rseventsservice.cc new file mode 100644 index 000000000..57053f4ee --- /dev/null +++ b/libretroshare/src/services/rseventsservice.cc @@ -0,0 +1,171 @@ +/******************************************************************************* + * Retroshare events service * + * * + * libretroshare: retroshare core library * + * * + * Copyright (C) 2019 Gioacchino Mazzurco * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU Lesser General Public License as * + * published by the Free Software Foundation, either version 3 of the * + * License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public License * + * along with this program. If not, see . * + * * + *******************************************************************************/ + +#include + +#include "services/rseventsservice.h" + + +/*extern*/ std::shared_ptr rsEvents(nullptr); +RsEvent::~RsEvent() {}; +RsEvents::~RsEvents() {}; + +bool isEventValid(const RsEvent& event, std::string& errorMessage) +{ + if(event.mType <= RsEventType::NONE) + { + errorMessage = "Event has type NONE: " + + std::to_string( + static_cast::type >( + event.mType ) ); + return false; + } + + if(event.mType >= RsEventType::MAX) + { + errorMessage = "Event has type >= RsEventType::MAX: " + + std::to_string( + static_cast::type >( + event.mType ) ); + } + + return true; +} + +bool RsEventsService::postEvent( std::unique_ptr event, + std::string& errorMessage ) +{ + if(!isEventValid(*event, errorMessage)) + { + std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage + << std::endl; + return false; + } + + RS_STACK_MUTEX(mEventQueueMtx); + mEventQueue.push_back(std::move(event)); + return true; +} + +bool RsEventsService::sendEvent( const RsEvent& event, + std::string& errorMessage ) +{ + if(!isEventValid(event, errorMessage)) + { + std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage + << std::endl; + return false; + } + + handleEvent(event); + return true; +} + +RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId() +{ + RS_STACK_MUTEX(mHandlerMapMtx); + return generateUniqueHandlerId_unlocked(); +} + +RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId_unlocked() +{ + if(++mLastHandlerId) return mLastHandlerId; // Avoid 0 after overflow + return 1; +} + +bool RsEventsService::registerEventsHandler( + std::function multiCallback, + RsEventsHandlerId_t& hId ) +{ + RS_STACK_MUTEX(mHandlerMapMtx); + if(!hId) hId = generateUniqueHandlerId_unlocked(); + mHandlerMap[hId] = std::move(multiCallback); + return true; +} + +bool RsEventsService::unregisterEventsHandler(RsEventsHandlerId_t hId) +{ + RS_STACK_MUTEX(mHandlerMapMtx); + auto it = mHandlerMap.find(hId); + if(it == mHandlerMap.end()) return false; + mHandlerMap.erase(it); + return true; +} + +void RsEventsService::data_tick() +{ + auto nextRunAt = std::chrono::system_clock::now() + + std::chrono::milliseconds(1); + + std::unique_ptr eventPtr(nullptr); + uint futureEventsCounter = 0; + +dispatchEventFromQueueLock: + mEventQueueMtx.lock(); + if(mEventQueue.size() > futureEventsCounter) + { + eventPtr = std::move(mEventQueue.front()); + mEventQueue.pop_front(); + + if(eventPtr->mTimePoint >= nextRunAt) + { + mEventQueue.push_back(std::move(eventPtr)); + ++futureEventsCounter; + } + } + mEventQueueMtx.unlock(); + + if(eventPtr) + { + /* It is relevant that this stays out of mEventQueueMtx */ + handleEvent(*eventPtr); + eventPtr.reset(nullptr); // ensure memory is freed before sleep + goto dispatchEventFromQueueLock; + } + + std::this_thread::sleep_until(nextRunAt); +} + +void RsEventsService::handleEvent(const RsEvent& event) +{ + std::function mCallback; + + mHandlerMapMtx.lock(); + auto cbpt = mHandlerMap.begin(); + mHandlerMapMtx.unlock(); + +getHandlerFromMapLock: + mHandlerMapMtx.lock(); + if(cbpt != mHandlerMap.end()) + { + mCallback = cbpt->second; + ++cbpt; + } + mHandlerMapMtx.unlock(); + + if(mCallback) + { + mCallback(event); // It is relevant that this happens outside mutex + mCallback = std::function(nullptr); + goto getHandlerFromMapLock; + } +} diff --git a/libretroshare/src/services/rseventsservice.h b/libretroshare/src/services/rseventsservice.h new file mode 100644 index 000000000..7d39c7a46 --- /dev/null +++ b/libretroshare/src/services/rseventsservice.h @@ -0,0 +1,77 @@ +/******************************************************************************* + * Retroshare events service * + * * + * libretroshare: retroshare core library * + * * + * Copyright (C) 2019 Gioacchino Mazzurco * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU Lesser General Public License as * + * published by the Free Software Foundation, either version 3 of the * + * License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public License * + * along with this program. If not, see . * + * * + *******************************************************************************/ +#pragma once + +#include +#include +#include + +#include "retroshare/rsevents.h" +#include "util/rsthreads.h" + +class RsEventsService : + public RsEvents, public RsTickingThread +{ +public: + RsEventsService(): + mHandlerMapMtx("RsEventsService::mHandlerMapMtx"), mLastHandlerId(1), + mEventQueueMtx("RsEventsService::mEventQueueMtx") {} + + /// @see RsEvents + bool postEvent( + std::unique_ptr event, + std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) + ) override; + + /// @see RsEvents + bool sendEvent( + const RsEvent& event, + std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) + ) override; + + /// @see RsEvents + RsEventsHandlerId_t generateUniqueHandlerId() override; + + /// @see RsEvents + bool registerEventsHandler( + std::function multiCallback, + RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0) + ) override; + + /// @see RsEvents + bool unregisterEventsHandler(RsEventsHandlerId_t hId) override; + +protected: + RsMutex mHandlerMapMtx; + RsEventsHandlerId_t mLastHandlerId; + std::map< RsEventsHandlerId_t, std::function > + mHandlerMap; + + RsMutex mEventQueueMtx; + std::deque< std::unique_ptr > mEventQueue; + + /// @see RsTickingThread + void data_tick() override; + + void handleEvent(const RsEvent& event); + RsEventsHandlerId_t generateUniqueHandlerId_unlocked(); +};