Fix sporadic crash in JSON API async calls

In Restbed one is not supposed to call session->yield outside the
  threads controlled by Restbed. RetroShare JSON API async call were
  calling session->yield from threads controlled by RetroShare all the
  times, this caused crashes in some cases, like when the JSON API
  socket timed out concurrently with the session->yield call .
  To solve this problem session->yield from async
  calls are now wrapped insto mService->schedule to ensure they are
  executed on the right thread (aka one of the threads controlled by
  Restbed).
While solving this issue I realized also that passing RsEvents as const
  references around was quite limiting in cases where the event need to
  be finally handled in another thread, in that case passing by const
  reference the RsEvent needed to be copied by value into the thread
  that process it, in this copy by value process the information of
  which was the original specific type is lost, and then only the data
  and methods from general RsEvents are available, unless the handler
  does tricky stuff with type coercion etc. To solve this limitation
  pass the events as std::shared_ptr<const RsEvent> seems the safer and
  more elegant solution.
This commit is contained in:
Gioacchino Mazzurco 2019-08-27 11:59:38 +02:00
parent 202cee687e
commit 9b8d0afacb
No known key found for this signature in database
GPG Key ID: A1FBCA3872E87051
8 changed files with 94 additions and 79 deletions

View File

@ -19,7 +19,7 @@
*******************************************************************************/
registerHandler("$%apiPath%$",
[$%captureVars%$](const std::shared_ptr<rb::Session> session)
[this, $%captureVars%$](const std::shared_ptr<rb::Session> session)
{
const std::multimap<std::string, std::string> headers
{
@ -29,7 +29,7 @@ registerHandler("$%apiPath%$",
session->yield(rb::OK, headers);
size_t reqSize = session->get_request()->get_header("Content-Length", 0);
session->fetch( reqSize, [$%captureVars%$](
session->fetch( reqSize, [this, $%captureVars%$](
const std::shared_ptr<rb::Session> session,
const rb::Bytes& body )
{
@ -44,17 +44,24 @@ $%paramsDeclaration%$
$%inputParamsDeserialization%$
const std::weak_ptr<rb::Session> weakSession(session);
$%callbackName%$ = [weakSession]($%callbackParams%$)
$%callbackName%$ = [this, weakSession]($%callbackParams%$)
{
auto session = weakSession.lock();
if(!session || session->is_closed()) return;
$%callbackParamsSerialization%$
std::stringstream message;
message << "data: " << compactJSON << ctx.mJson << "\n\n";
session->yield(message.str());
std::stringstream sStream;
sStream << "data: " << compactJSON << ctx.mJson << "\n\n";
const std::string message = sStream.str();
mService.schedule( [weakSession, message]()
{
auto session = weakSession.lock();
if(!session || session->is_closed()) return;
session->yield(message);
$%sessionEarlyClose%$
} );
};
$%functionCall%$

View File

@ -99,8 +99,9 @@ void DiscPgpInfo::mergeFriendList(const std::set<PGPID> &friends)
p3discovery2::p3discovery2(
p3PeerMgr* peerMgr, p3LinkMgr* linkMgr, p3NetMgr* netMgr,
p3ServiceControl* sc, RsGixs* gixs ) :
p3Service(), mPeerMgr(peerMgr), mLinkMgr(linkMgr), mNetMgr(netMgr),
mServiceCtrl(sc), mGixs(gixs), mDiscMtx("p3discovery2"), mLastPgpUpdate(0)
p3Service(), mRsEventsHandle(0), mPeerMgr(peerMgr), mLinkMgr(linkMgr),
mNetMgr(netMgr), mServiceCtrl(sc), mGixs(gixs), mDiscMtx("p3discovery2"),
mLastPgpUpdate(0)
{
Dbg3() << __PRETTY_FUNCTION__ << std::endl;
@ -110,8 +111,12 @@ p3discovery2::p3discovery2(
// Add self into PGP FriendList.
mFriendList[AuthGPG::getAuthGPG()->getGPGOwnId()] = DiscPgpInfo();
mRsEventsHandle = 0 ; // avoids random behavior if not initialized
rsEvents->registerEventsHandler( [this](const RsEvent& event){ rsEventsHandler(event); }, mRsEventsHandle );
if(rsEvents)
rsEvents->registerEventsHandler(
[this](std::shared_ptr<const RsEvent> event)
{
rsEventsHandler(*event);
}, mRsEventsHandle ); // mRsEventsHandle is zeroed in initializer list
}
@ -1238,12 +1243,9 @@ void p3discovery2::recvInvite(
std::unique_ptr<RsGossipDiscoveryInviteItem> inviteItem )
{
typedef RsGossipDiscoveryFriendInviteReceivedEvent Evt_t;
// Ensure rsEvents is not deleted while we use it
std::shared_ptr<RsEvents> lockedRsEvents = rsEvents;
if(lockedRsEvents)
lockedRsEvents->postEvent(
std::unique_ptr<Evt_t>(new Evt_t(inviteItem->mInvite)) );
if(rsEvents)
rsEvents->postEvent(
std::shared_ptr<Evt_t>(new Evt_t(inviteItem->mInvite)) );
}
void p3discovery2::rsEventsHandler(const RsEvent& event)

View File

@ -280,7 +280,7 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress,
}, true);
registerHandler("/rsEvents/registerEventsHandler",
[](const std::shared_ptr<rb::Session> session)
[this](const std::shared_ptr<rb::Session> session)
{
const std::multimap<std::string, std::string> headers
{
@ -291,7 +291,7 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress,
size_t reqSize = static_cast<size_t>(
session->get_request()->get_header("Content-Length", 0) );
session->fetch( reqSize, [](
session->fetch( reqSize, [this](
const std::shared_ptr<rb::Session> session,
const rb::Bytes& body )
{
@ -303,8 +303,10 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress,
const std::weak_ptr<rb::Session> weakSession(session);
RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId();
std::function<void(const RsEvent&)> multiCallback =
[weakSession, hId](const RsEvent& event)
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback =
[this, weakSession, hId](std::shared_ptr<const RsEvent> event)
{
mService.schedule( [weakSession, hId, event]()
{
auto session = weakSession.lock();
if(!session || session->is_closed())
@ -316,11 +318,13 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress,
RsGenericSerializer::SerializeContext ctx;
RsTypeSerializer::serial_process(
RsGenericSerializer::TO_JSON, ctx,
const_cast<RsEvent&>(event), "event" );
*const_cast<RsEvent*>(event.get()), "event" );
std::stringstream message;
message << "data: " << compactJSON << ctx.mJson << "\n\n";
session->yield(message.str());
} );
};
bool retval = rsEvents->registerEventsHandler(multiCallback, hId);

View File

@ -37,7 +37,7 @@ class RsEvents;
* TODO: this should become std::weak_ptr once we have a reasonable services
* management.
*/
extern std::shared_ptr<RsEvents> rsEvents;
extern RsEvents* rsEvents;
/**
* @brief Events types.
@ -113,21 +113,20 @@ public:
* @return False on error, true otherwise.
*/
virtual bool postEvent(
std::unique_ptr<RsEvent> event,
std::shared_ptr<const RsEvent> event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) = 0;
/**
* @brief Send event directly to handlers. Blocking API
* The handlers get exectuded on the caller thread, ensuring the function
* returns only after the event has been handled.
* The handlers get exectuded on the caller thread.
* @param[in] event
* @param[out] errorMessage Optional storage for error messsage, meaningful
* only on failure.
* @return False on error, true otherwise.
*/
virtual bool sendEvent(
const RsEvent& event,
std::shared_ptr<const RsEvent> event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) = 0;
@ -152,7 +151,7 @@ public:
* @return False on error, true otherwise.
*/
virtual bool registerEventsHandler(
std::function<void(const RsEvent&)> multiCallback,
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback,
RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0)
) = 0;

View File

@ -35,8 +35,6 @@
#include "pqi/p3linkmgr.h"
#include "pqi/p3netmgr.h"
int rsserverzone = 101;
#include "util/rsdebug.h"
#include "retroshare/rsevents.h"
@ -86,7 +84,7 @@ RsServer::RsServer() :
{
{
RsEventsService* tmpRsEvtPtr = new RsEventsService();
rsEvents.reset(tmpRsEvtPtr);
rsEvents = tmpRsEvtPtr;
startServiceThread(tmpRsEvtPtr, "RsEventsService");
}
@ -271,8 +269,6 @@ void RsServer::data_tick()
std::string out;
rs_sprintf(out, "RsServer::run() WARNING Excessively Long Cycle Time: %g secs => Please DEBUG", cycleTime);
std::cerr << out << std::endl;
rslog(RSL_ALERT, rsserverzone, out);
}
#endif
}

View File

@ -171,12 +171,9 @@ void BroadcastDiscoveryService::data_tick()
else if(!isFriend)
{
typedef RsBroadcastDiscoveryPeerFoundEvent Evt_t;
// Ensure rsEvents is not deleted while we use it
std::shared_ptr<RsEvents> lockedRsEvents = rsEvents;
if(lockedRsEvents)
lockedRsEvents->postEvent(
std::unique_ptr<Evt_t>(new Evt_t(rbdr)) );
if(rsEvents)
rsEvents->postEvent(
std::shared_ptr<Evt_t>(new Evt_t(rbdr)) );
}
}
}

View File

@ -25,36 +25,43 @@
#include "services/rseventsservice.h"
/*extern*/ std::shared_ptr<RsEvents> rsEvents(nullptr);
/*extern*/ RsEvents* rsEvents = nullptr;
RsEvent::~RsEvent() {};
RsEvents::~RsEvents() {};
bool isEventValid(const RsEvent& event, std::string& errorMessage)
bool isEventValid(
std::shared_ptr<const RsEvent> event, std::string& errorMessage )
{
if(event.mType <= RsEventType::NONE)
if(!event)
{
errorMessage = "Event is null!";
return false;
}
if(event->mType <= RsEventType::NONE)
{
errorMessage = "Event has type NONE: " +
std::to_string(
static_cast<std::underlying_type<RsEventType>::type >(
event.mType ) );
event->mType ) );
return false;
}
if(event.mType >= RsEventType::MAX)
if(event->mType >= RsEventType::MAX)
{
errorMessage = "Event has type >= RsEventType::MAX: " +
std::to_string(
static_cast<std::underlying_type<RsEventType>::type >(
event.mType ) );
event->mType ) );
}
return true;
}
bool RsEventsService::postEvent( std::unique_ptr<RsEvent> event,
bool RsEventsService::postEvent( std::shared_ptr<const RsEvent> event,
std::string& errorMessage )
{
if(!isEventValid(*event, errorMessage))
if(!isEventValid(event, errorMessage))
{
std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage
<< std::endl;
@ -62,17 +69,16 @@ bool RsEventsService::postEvent( std::unique_ptr<RsEvent> event,
}
RS_STACK_MUTEX(mEventQueueMtx);
mEventQueue.push_back(std::move(event));
mEventQueue.push_back(event);
return true;
}
bool RsEventsService::sendEvent( const RsEvent& event,
bool RsEventsService::sendEvent( std::shared_ptr<const RsEvent> event,
std::string& errorMessage )
{
if(!isEventValid(event, errorMessage))
{
std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage
<< std::endl;
RsErr() << __PRETTY_FUNCTION__ << " "<< errorMessage << std::endl;
return false;
}
@ -93,12 +99,12 @@ RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId_unlocked()
}
bool RsEventsService::registerEventsHandler(
std::function<void(const RsEvent&)> multiCallback,
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback,
RsEventsHandlerId_t& hId )
{
RS_STACK_MUTEX(mHandlerMapMtx);
if(!hId) hId = generateUniqueHandlerId_unlocked();
mHandlerMap[hId] = std::move(multiCallback);
mHandlerMap[hId] = multiCallback;
return true;
}
@ -116,19 +122,19 @@ void RsEventsService::data_tick()
auto nextRunAt = std::chrono::system_clock::now() +
std::chrono::milliseconds(1);
std::unique_ptr<RsEvent> eventPtr(nullptr);
std::shared_ptr<const RsEvent> eventPtr(nullptr);
size_t futureEventsCounter = 0;
dispatchEventFromQueueLock:
mEventQueueMtx.lock();
if(mEventQueue.size() > futureEventsCounter)
{
eventPtr = std::move(mEventQueue.front());
eventPtr = mEventQueue.front();
mEventQueue.pop_front();
if(eventPtr->mTimePoint >= nextRunAt)
{
mEventQueue.push_back(std::move(eventPtr));
mEventQueue.push_back(eventPtr);
++futureEventsCounter;
}
}
@ -137,17 +143,17 @@ dispatchEventFromQueueLock:
if(eventPtr)
{
/* It is relevant that this stays out of mEventQueueMtx */
handleEvent(*eventPtr);
eventPtr.reset(nullptr); // ensure memory is freed before sleep
handleEvent(eventPtr);
eventPtr = nullptr; // ensure refcounter is decremented before sleep
goto dispatchEventFromQueueLock;
}
std::this_thread::sleep_until(nextRunAt);
}
void RsEventsService::handleEvent(const RsEvent& event)
void RsEventsService::handleEvent(std::shared_ptr<const RsEvent> event)
{
std::function<void(const RsEvent&)> mCallback;
std::function<void(std::shared_ptr<const RsEvent>)> mCallback;
mHandlerMapMtx.lock();
auto cbpt = mHandlerMap.begin();
@ -165,7 +171,7 @@ getHandlerFromMapLock:
if(mCallback)
{
mCallback(event); // It is relevant that this happens outside mutex
mCallback = std::function<void(const RsEvent&)>(nullptr);
mCallback = std::function<void(std::shared_ptr<const RsEvent>)>(nullptr);
goto getHandlerFromMapLock;
}
}

View File

@ -27,6 +27,7 @@
#include "retroshare/rsevents.h"
#include "util/rsthreads.h"
#include "util/rsdebug.h"
class RsEventsService :
public RsEvents, public RsTickingThread
@ -38,13 +39,13 @@ public:
/// @see RsEvents
bool postEvent(
std::unique_ptr<RsEvent> event,
std::shared_ptr<const RsEvent> event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) override;
/// @see RsEvents
bool sendEvent(
const RsEvent& event,
std::shared_ptr<const RsEvent> event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) override;
@ -53,7 +54,7 @@ public:
/// @see RsEvents
bool registerEventsHandler(
std::function<void(const RsEvent&)> multiCallback,
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback,
RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0)
) override;
@ -63,15 +64,18 @@ public:
protected:
RsMutex mHandlerMapMtx;
RsEventsHandlerId_t mLastHandlerId;
std::map< RsEventsHandlerId_t, std::function<void(const RsEvent&)> >
mHandlerMap;
std::map<
RsEventsHandlerId_t,
std::function<void(std::shared_ptr<const RsEvent>)> > mHandlerMap;
RsMutex mEventQueueMtx;
std::deque< std::unique_ptr<RsEvent> > mEventQueue;
std::deque< std::shared_ptr<const RsEvent> > mEventQueue;
/// @see RsTickingThread
void data_tick() override;
void handleEvent(const RsEvent& event);
void handleEvent(std::shared_ptr<const RsEvent> event);
RsEventsHandlerId_t generateUniqueHandlerId_unlocked();
RS_SET_CONTEXT_DEBUG_LEVEL(3)
};