From 9b8d0afacb9f0bf513f0b24cb94d7f9ac161c240 Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Tue, 27 Aug 2019 11:59:38 +0200 Subject: [PATCH] Fix sporadic crash in JSON API async calls In Restbed one is not supposed to call session->yield outside the threads controlled by Restbed. RetroShare JSON API async call were calling session->yield from threads controlled by RetroShare all the times, this caused crashes in some cases, like when the JSON API socket timed out concurrently with the session->yield call . To solve this problem session->yield from async calls are now wrapped insto mService->schedule to ensure they are executed on the right thread (aka one of the threads controlled by Restbed). While solving this issue I realized also that passing RsEvents as const references around was quite limiting in cases where the event need to be finally handled in another thread, in that case passing by const reference the RsEvent needed to be copied by value into the thread that process it, in this copy by value process the information of which was the original specific type is lost, and then only the data and methods from general RsEvents are available, unless the handler does tricky stuff with type coercion etc. To solve this limitation pass the events as std::shared_ptr seems the safer and more elegant solution. --- .../async-method-wrapper-template.cpp.tmpl | 21 +++++--- .../src/gossipdiscovery/p3gossipdiscovery.cc | 22 ++++---- libretroshare/src/jsonapi/jsonapi.cpp | 36 +++++++------ libretroshare/src/retroshare/rsevents.h | 11 ++-- libretroshare/src/rsserver/p3face-server.cc | 6 +-- .../src/services/broadcastdiscoveryservice.cc | 9 ++-- libretroshare/src/services/rseventsservice.cc | 50 +++++++++++-------- libretroshare/src/services/rseventsservice.h | 18 ++++--- 8 files changed, 94 insertions(+), 79 deletions(-) diff --git a/jsonapi-generator/src/async-method-wrapper-template.cpp.tmpl b/jsonapi-generator/src/async-method-wrapper-template.cpp.tmpl index 4724d2794..097ff8d8b 100644 --- a/jsonapi-generator/src/async-method-wrapper-template.cpp.tmpl +++ b/jsonapi-generator/src/async-method-wrapper-template.cpp.tmpl @@ -19,7 +19,7 @@ *******************************************************************************/ registerHandler("$%apiPath%$", - [$%captureVars%$](const std::shared_ptr session) + [this, $%captureVars%$](const std::shared_ptr session) { const std::multimap headers { @@ -29,7 +29,7 @@ registerHandler("$%apiPath%$", session->yield(rb::OK, headers); size_t reqSize = session->get_request()->get_header("Content-Length", 0); - session->fetch( reqSize, [$%captureVars%$]( + session->fetch( reqSize, [this, $%captureVars%$]( const std::shared_ptr session, const rb::Bytes& body ) { @@ -44,17 +44,24 @@ $%paramsDeclaration%$ $%inputParamsDeserialization%$ const std::weak_ptr weakSession(session); - $%callbackName%$ = [weakSession]($%callbackParams%$) + $%callbackName%$ = [this, weakSession]($%callbackParams%$) { auto session = weakSession.lock(); if(!session || session->is_closed()) return; $%callbackParamsSerialization%$ - std::stringstream message; - message << "data: " << compactJSON << ctx.mJson << "\n\n"; - session->yield(message.str()); - $%sessionEarlyClose%$ + std::stringstream sStream; + sStream << "data: " << compactJSON << ctx.mJson << "\n\n"; + const std::string message = sStream.str(); + + mService.schedule( [weakSession, message]() + { + auto session = weakSession.lock(); + if(!session || session->is_closed()) return; + session->yield(message); + $%sessionEarlyClose%$ + } ); }; $%functionCall%$ diff --git a/libretroshare/src/gossipdiscovery/p3gossipdiscovery.cc b/libretroshare/src/gossipdiscovery/p3gossipdiscovery.cc index d3a6955d5..f3a2cc460 100644 --- a/libretroshare/src/gossipdiscovery/p3gossipdiscovery.cc +++ b/libretroshare/src/gossipdiscovery/p3gossipdiscovery.cc @@ -99,8 +99,9 @@ void DiscPgpInfo::mergeFriendList(const std::set &friends) p3discovery2::p3discovery2( p3PeerMgr* peerMgr, p3LinkMgr* linkMgr, p3NetMgr* netMgr, p3ServiceControl* sc, RsGixs* gixs ) : - p3Service(), mPeerMgr(peerMgr), mLinkMgr(linkMgr), mNetMgr(netMgr), - mServiceCtrl(sc), mGixs(gixs), mDiscMtx("p3discovery2"), mLastPgpUpdate(0) + p3Service(), mRsEventsHandle(0), mPeerMgr(peerMgr), mLinkMgr(linkMgr), + mNetMgr(netMgr), mServiceCtrl(sc), mGixs(gixs), mDiscMtx("p3discovery2"), + mLastPgpUpdate(0) { Dbg3() << __PRETTY_FUNCTION__ << std::endl; @@ -110,8 +111,12 @@ p3discovery2::p3discovery2( // Add self into PGP FriendList. mFriendList[AuthGPG::getAuthGPG()->getGPGOwnId()] = DiscPgpInfo(); - mRsEventsHandle = 0 ; // avoids random behavior if not initialized - rsEvents->registerEventsHandler( [this](const RsEvent& event){ rsEventsHandler(event); }, mRsEventsHandle ); + if(rsEvents) + rsEvents->registerEventsHandler( + [this](std::shared_ptr event) + { + rsEventsHandler(*event); + }, mRsEventsHandle ); // mRsEventsHandle is zeroed in initializer list } @@ -1238,12 +1243,9 @@ void p3discovery2::recvInvite( std::unique_ptr inviteItem ) { typedef RsGossipDiscoveryFriendInviteReceivedEvent Evt_t; - - // Ensure rsEvents is not deleted while we use it - std::shared_ptr lockedRsEvents = rsEvents; - if(lockedRsEvents) - lockedRsEvents->postEvent( - std::unique_ptr(new Evt_t(inviteItem->mInvite)) ); + if(rsEvents) + rsEvents->postEvent( + std::shared_ptr(new Evt_t(inviteItem->mInvite)) ); } void p3discovery2::rsEventsHandler(const RsEvent& event) diff --git a/libretroshare/src/jsonapi/jsonapi.cpp b/libretroshare/src/jsonapi/jsonapi.cpp index eedac0766..be808b163 100644 --- a/libretroshare/src/jsonapi/jsonapi.cpp +++ b/libretroshare/src/jsonapi/jsonapi.cpp @@ -280,7 +280,7 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress, }, true); registerHandler("/rsEvents/registerEventsHandler", - [](const std::shared_ptr session) + [this](const std::shared_ptr session) { const std::multimap headers { @@ -291,7 +291,7 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress, size_t reqSize = static_cast( session->get_request()->get_header("Content-Length", 0) ); - session->fetch( reqSize, []( + session->fetch( reqSize, [this]( const std::shared_ptr session, const rb::Bytes& body ) { @@ -303,24 +303,28 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress, const std::weak_ptr weakSession(session); RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId(); - std::function multiCallback = - [weakSession, hId](const RsEvent& event) + std::function)> multiCallback = + [this, weakSession, hId](std::shared_ptr event) { - auto session = weakSession.lock(); - if(!session || session->is_closed()) + mService.schedule( [weakSession, hId, event]() { - if(rsEvents) rsEvents->unregisterEventsHandler(hId); - return; - } + auto session = weakSession.lock(); + if(!session || session->is_closed()) + { + if(rsEvents) rsEvents->unregisterEventsHandler(hId); + return; + } - RsGenericSerializer::SerializeContext ctx; - RsTypeSerializer::serial_process( - RsGenericSerializer::TO_JSON, ctx, - const_cast(event), "event" ); + RsGenericSerializer::SerializeContext ctx; + RsTypeSerializer::serial_process( + RsGenericSerializer::TO_JSON, ctx, + *const_cast(event.get()), "event" ); - std::stringstream message; - message << "data: " << compactJSON << ctx.mJson << "\n\n"; - session->yield(message.str()); + std::stringstream message; + message << "data: " << compactJSON << ctx.mJson << "\n\n"; + + session->yield(message.str()); + } ); }; bool retval = rsEvents->registerEventsHandler(multiCallback, hId); diff --git a/libretroshare/src/retroshare/rsevents.h b/libretroshare/src/retroshare/rsevents.h index 94ae866f8..373788f97 100644 --- a/libretroshare/src/retroshare/rsevents.h +++ b/libretroshare/src/retroshare/rsevents.h @@ -37,7 +37,7 @@ class RsEvents; * TODO: this should become std::weak_ptr once we have a reasonable services * management. */ -extern std::shared_ptr rsEvents; +extern RsEvents* rsEvents; /** * @brief Events types. @@ -113,21 +113,20 @@ public: * @return False on error, true otherwise. */ virtual bool postEvent( - std::unique_ptr event, + std::shared_ptr event, std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) ) = 0; /** * @brief Send event directly to handlers. Blocking API - * The handlers get exectuded on the caller thread, ensuring the function - * returns only after the event has been handled. + * The handlers get exectuded on the caller thread. * @param[in] event * @param[out] errorMessage Optional storage for error messsage, meaningful * only on failure. * @return False on error, true otherwise. */ virtual bool sendEvent( - const RsEvent& event, + std::shared_ptr event, std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) ) = 0; @@ -152,7 +151,7 @@ public: * @return False on error, true otherwise. */ virtual bool registerEventsHandler( - std::function multiCallback, + std::function)> multiCallback, RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0) ) = 0; diff --git a/libretroshare/src/rsserver/p3face-server.cc b/libretroshare/src/rsserver/p3face-server.cc index ed4981b6b..d8114cc12 100644 --- a/libretroshare/src/rsserver/p3face-server.cc +++ b/libretroshare/src/rsserver/p3face-server.cc @@ -35,8 +35,6 @@ #include "pqi/p3linkmgr.h" #include "pqi/p3netmgr.h" -int rsserverzone = 101; - #include "util/rsdebug.h" #include "retroshare/rsevents.h" @@ -86,7 +84,7 @@ RsServer::RsServer() : { { RsEventsService* tmpRsEvtPtr = new RsEventsService(); - rsEvents.reset(tmpRsEvtPtr); + rsEvents = tmpRsEvtPtr; startServiceThread(tmpRsEvtPtr, "RsEventsService"); } @@ -271,8 +269,6 @@ void RsServer::data_tick() std::string out; rs_sprintf(out, "RsServer::run() WARNING Excessively Long Cycle Time: %g secs => Please DEBUG", cycleTime); std::cerr << out << std::endl; - - rslog(RSL_ALERT, rsserverzone, out); } #endif } diff --git a/libretroshare/src/services/broadcastdiscoveryservice.cc b/libretroshare/src/services/broadcastdiscoveryservice.cc index d5e81c4b6..04d41785b 100644 --- a/libretroshare/src/services/broadcastdiscoveryservice.cc +++ b/libretroshare/src/services/broadcastdiscoveryservice.cc @@ -171,12 +171,9 @@ void BroadcastDiscoveryService::data_tick() else if(!isFriend) { typedef RsBroadcastDiscoveryPeerFoundEvent Evt_t; - - // Ensure rsEvents is not deleted while we use it - std::shared_ptr lockedRsEvents = rsEvents; - if(lockedRsEvents) - lockedRsEvents->postEvent( - std::unique_ptr(new Evt_t(rbdr)) ); + if(rsEvents) + rsEvents->postEvent( + std::shared_ptr(new Evt_t(rbdr)) ); } } } diff --git a/libretroshare/src/services/rseventsservice.cc b/libretroshare/src/services/rseventsservice.cc index add5b851e..b970e26e7 100644 --- a/libretroshare/src/services/rseventsservice.cc +++ b/libretroshare/src/services/rseventsservice.cc @@ -25,36 +25,43 @@ #include "services/rseventsservice.h" -/*extern*/ std::shared_ptr rsEvents(nullptr); +/*extern*/ RsEvents* rsEvents = nullptr; RsEvent::~RsEvent() {}; RsEvents::~RsEvents() {}; -bool isEventValid(const RsEvent& event, std::string& errorMessage) +bool isEventValid( + std::shared_ptr event, std::string& errorMessage ) { - if(event.mType <= RsEventType::NONE) + if(!event) + { + errorMessage = "Event is null!"; + return false; + } + + if(event->mType <= RsEventType::NONE) { errorMessage = "Event has type NONE: " + std::to_string( static_cast::type >( - event.mType ) ); + event->mType ) ); return false; } - if(event.mType >= RsEventType::MAX) + if(event->mType >= RsEventType::MAX) { errorMessage = "Event has type >= RsEventType::MAX: " + std::to_string( static_cast::type >( - event.mType ) ); + event->mType ) ); } return true; } -bool RsEventsService::postEvent( std::unique_ptr event, +bool RsEventsService::postEvent( std::shared_ptr event, std::string& errorMessage ) { - if(!isEventValid(*event, errorMessage)) + if(!isEventValid(event, errorMessage)) { std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage << std::endl; @@ -62,17 +69,16 @@ bool RsEventsService::postEvent( std::unique_ptr event, } RS_STACK_MUTEX(mEventQueueMtx); - mEventQueue.push_back(std::move(event)); + mEventQueue.push_back(event); return true; } -bool RsEventsService::sendEvent( const RsEvent& event, +bool RsEventsService::sendEvent( std::shared_ptr event, std::string& errorMessage ) { if(!isEventValid(event, errorMessage)) { - std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage - << std::endl; + RsErr() << __PRETTY_FUNCTION__ << " "<< errorMessage << std::endl; return false; } @@ -93,12 +99,12 @@ RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId_unlocked() } bool RsEventsService::registerEventsHandler( - std::function multiCallback, + std::function)> multiCallback, RsEventsHandlerId_t& hId ) { RS_STACK_MUTEX(mHandlerMapMtx); if(!hId) hId = generateUniqueHandlerId_unlocked(); - mHandlerMap[hId] = std::move(multiCallback); + mHandlerMap[hId] = multiCallback; return true; } @@ -116,19 +122,19 @@ void RsEventsService::data_tick() auto nextRunAt = std::chrono::system_clock::now() + std::chrono::milliseconds(1); - std::unique_ptr eventPtr(nullptr); + std::shared_ptr eventPtr(nullptr); size_t futureEventsCounter = 0; dispatchEventFromQueueLock: mEventQueueMtx.lock(); if(mEventQueue.size() > futureEventsCounter) { - eventPtr = std::move(mEventQueue.front()); + eventPtr = mEventQueue.front(); mEventQueue.pop_front(); if(eventPtr->mTimePoint >= nextRunAt) { - mEventQueue.push_back(std::move(eventPtr)); + mEventQueue.push_back(eventPtr); ++futureEventsCounter; } } @@ -137,17 +143,17 @@ dispatchEventFromQueueLock: if(eventPtr) { /* It is relevant that this stays out of mEventQueueMtx */ - handleEvent(*eventPtr); - eventPtr.reset(nullptr); // ensure memory is freed before sleep + handleEvent(eventPtr); + eventPtr = nullptr; // ensure refcounter is decremented before sleep goto dispatchEventFromQueueLock; } std::this_thread::sleep_until(nextRunAt); } -void RsEventsService::handleEvent(const RsEvent& event) +void RsEventsService::handleEvent(std::shared_ptr event) { - std::function mCallback; + std::function)> mCallback; mHandlerMapMtx.lock(); auto cbpt = mHandlerMap.begin(); @@ -165,7 +171,7 @@ getHandlerFromMapLock: if(mCallback) { mCallback(event); // It is relevant that this happens outside mutex - mCallback = std::function(nullptr); + mCallback = std::function)>(nullptr); goto getHandlerFromMapLock; } } diff --git a/libretroshare/src/services/rseventsservice.h b/libretroshare/src/services/rseventsservice.h index 7d39c7a46..05fff772e 100644 --- a/libretroshare/src/services/rseventsservice.h +++ b/libretroshare/src/services/rseventsservice.h @@ -27,6 +27,7 @@ #include "retroshare/rsevents.h" #include "util/rsthreads.h" +#include "util/rsdebug.h" class RsEventsService : public RsEvents, public RsTickingThread @@ -38,13 +39,13 @@ public: /// @see RsEvents bool postEvent( - std::unique_ptr event, + std::shared_ptr event, std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) ) override; /// @see RsEvents bool sendEvent( - const RsEvent& event, + std::shared_ptr event, std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) ) override; @@ -53,7 +54,7 @@ public: /// @see RsEvents bool registerEventsHandler( - std::function multiCallback, + std::function)> multiCallback, RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0) ) override; @@ -63,15 +64,18 @@ public: protected: RsMutex mHandlerMapMtx; RsEventsHandlerId_t mLastHandlerId; - std::map< RsEventsHandlerId_t, std::function > - mHandlerMap; + std::map< + RsEventsHandlerId_t, + std::function)> > mHandlerMap; RsMutex mEventQueueMtx; - std::deque< std::unique_ptr > mEventQueue; + std::deque< std::shared_ptr > mEventQueue; /// @see RsTickingThread void data_tick() override; - void handleEvent(const RsEvent& event); + void handleEvent(std::shared_ptr event); RsEventsHandlerId_t generateUniqueHandlerId_unlocked(); + + RS_SET_CONTEXT_DEBUG_LEVEL(3) };