Merge pull request #1621 from G10h4ck/jsonapi_async_crash_fix

Fix sporadic crash in JSON API async calls
This commit is contained in:
G10h4ck 2019-08-28 23:42:02 +02:00 committed by GitHub
commit 76dfa04bb5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 94 additions and 79 deletions

View file

@ -19,7 +19,7 @@
*******************************************************************************/ *******************************************************************************/
registerHandler("$%apiPath%$", registerHandler("$%apiPath%$",
[$%captureVars%$](const std::shared_ptr<rb::Session> session) [this, $%captureVars%$](const std::shared_ptr<rb::Session> session)
{ {
const std::multimap<std::string, std::string> headers const std::multimap<std::string, std::string> headers
{ {
@ -29,7 +29,7 @@ registerHandler("$%apiPath%$",
session->yield(rb::OK, headers); session->yield(rb::OK, headers);
size_t reqSize = session->get_request()->get_header("Content-Length", 0); size_t reqSize = session->get_request()->get_header("Content-Length", 0);
session->fetch( reqSize, [$%captureVars%$]( session->fetch( reqSize, [this, $%captureVars%$](
const std::shared_ptr<rb::Session> session, const std::shared_ptr<rb::Session> session,
const rb::Bytes& body ) const rb::Bytes& body )
{ {
@ -44,17 +44,24 @@ $%paramsDeclaration%$
$%inputParamsDeserialization%$ $%inputParamsDeserialization%$
const std::weak_ptr<rb::Session> weakSession(session); const std::weak_ptr<rb::Session> weakSession(session);
$%callbackName%$ = [weakSession]($%callbackParams%$) $%callbackName%$ = [this, weakSession]($%callbackParams%$)
{ {
auto session = weakSession.lock(); auto session = weakSession.lock();
if(!session || session->is_closed()) return; if(!session || session->is_closed()) return;
$%callbackParamsSerialization%$ $%callbackParamsSerialization%$
std::stringstream message; std::stringstream sStream;
message << "data: " << compactJSON << ctx.mJson << "\n\n"; sStream << "data: " << compactJSON << ctx.mJson << "\n\n";
session->yield(message.str()); const std::string message = sStream.str();
mService.schedule( [weakSession, message]()
{
auto session = weakSession.lock();
if(!session || session->is_closed()) return;
session->yield(message);
$%sessionEarlyClose%$ $%sessionEarlyClose%$
} );
}; };
$%functionCall%$ $%functionCall%$

View file

@ -99,8 +99,9 @@ void DiscPgpInfo::mergeFriendList(const std::set<PGPID> &friends)
p3discovery2::p3discovery2( p3discovery2::p3discovery2(
p3PeerMgr* peerMgr, p3LinkMgr* linkMgr, p3NetMgr* netMgr, p3PeerMgr* peerMgr, p3LinkMgr* linkMgr, p3NetMgr* netMgr,
p3ServiceControl* sc, RsGixs* gixs ) : p3ServiceControl* sc, RsGixs* gixs ) :
p3Service(), mPeerMgr(peerMgr), mLinkMgr(linkMgr), mNetMgr(netMgr), p3Service(), mRsEventsHandle(0), mPeerMgr(peerMgr), mLinkMgr(linkMgr),
mServiceCtrl(sc), mGixs(gixs), mDiscMtx("p3discovery2"), mLastPgpUpdate(0) mNetMgr(netMgr), mServiceCtrl(sc), mGixs(gixs), mDiscMtx("p3discovery2"),
mLastPgpUpdate(0)
{ {
Dbg3() << __PRETTY_FUNCTION__ << std::endl; Dbg3() << __PRETTY_FUNCTION__ << std::endl;
@ -110,8 +111,12 @@ p3discovery2::p3discovery2(
// Add self into PGP FriendList. // Add self into PGP FriendList.
mFriendList[AuthGPG::getAuthGPG()->getGPGOwnId()] = DiscPgpInfo(); mFriendList[AuthGPG::getAuthGPG()->getGPGOwnId()] = DiscPgpInfo();
mRsEventsHandle = 0 ; // avoids random behavior if not initialized if(rsEvents)
rsEvents->registerEventsHandler( [this](const RsEvent& event){ rsEventsHandler(event); }, mRsEventsHandle ); rsEvents->registerEventsHandler(
[this](std::shared_ptr<const RsEvent> event)
{
rsEventsHandler(*event);
}, mRsEventsHandle ); // mRsEventsHandle is zeroed in initializer list
} }
@ -1238,12 +1243,9 @@ void p3discovery2::recvInvite(
std::unique_ptr<RsGossipDiscoveryInviteItem> inviteItem ) std::unique_ptr<RsGossipDiscoveryInviteItem> inviteItem )
{ {
typedef RsGossipDiscoveryFriendInviteReceivedEvent Evt_t; typedef RsGossipDiscoveryFriendInviteReceivedEvent Evt_t;
if(rsEvents)
// Ensure rsEvents is not deleted while we use it rsEvents->postEvent(
std::shared_ptr<RsEvents> lockedRsEvents = rsEvents; std::shared_ptr<Evt_t>(new Evt_t(inviteItem->mInvite)) );
if(lockedRsEvents)
lockedRsEvents->postEvent(
std::unique_ptr<Evt_t>(new Evt_t(inviteItem->mInvite)) );
} }
void p3discovery2::rsEventsHandler(const RsEvent& event) void p3discovery2::rsEventsHandler(const RsEvent& event)

View file

@ -280,7 +280,7 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress,
}, true); }, true);
registerHandler("/rsEvents/registerEventsHandler", registerHandler("/rsEvents/registerEventsHandler",
[](const std::shared_ptr<rb::Session> session) [this](const std::shared_ptr<rb::Session> session)
{ {
const std::multimap<std::string, std::string> headers const std::multimap<std::string, std::string> headers
{ {
@ -291,7 +291,7 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress,
size_t reqSize = static_cast<size_t>( size_t reqSize = static_cast<size_t>(
session->get_request()->get_header("Content-Length", 0) ); session->get_request()->get_header("Content-Length", 0) );
session->fetch( reqSize, []( session->fetch( reqSize, [this](
const std::shared_ptr<rb::Session> session, const std::shared_ptr<rb::Session> session,
const rb::Bytes& body ) const rb::Bytes& body )
{ {
@ -303,8 +303,10 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress,
const std::weak_ptr<rb::Session> weakSession(session); const std::weak_ptr<rb::Session> weakSession(session);
RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId(); RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId();
std::function<void(const RsEvent&)> multiCallback = std::function<void(std::shared_ptr<const RsEvent>)> multiCallback =
[weakSession, hId](const RsEvent& event) [this, weakSession, hId](std::shared_ptr<const RsEvent> event)
{
mService.schedule( [weakSession, hId, event]()
{ {
auto session = weakSession.lock(); auto session = weakSession.lock();
if(!session || session->is_closed()) if(!session || session->is_closed())
@ -316,11 +318,13 @@ JsonApiServer::JsonApiServer(uint16_t port, const std::string& bindAddress,
RsGenericSerializer::SerializeContext ctx; RsGenericSerializer::SerializeContext ctx;
RsTypeSerializer::serial_process( RsTypeSerializer::serial_process(
RsGenericSerializer::TO_JSON, ctx, RsGenericSerializer::TO_JSON, ctx,
const_cast<RsEvent&>(event), "event" ); *const_cast<RsEvent*>(event.get()), "event" );
std::stringstream message; std::stringstream message;
message << "data: " << compactJSON << ctx.mJson << "\n\n"; message << "data: " << compactJSON << ctx.mJson << "\n\n";
session->yield(message.str()); session->yield(message.str());
} );
}; };
bool retval = rsEvents->registerEventsHandler(multiCallback, hId); bool retval = rsEvents->registerEventsHandler(multiCallback, hId);

View file

@ -37,7 +37,7 @@ class RsEvents;
* TODO: this should become std::weak_ptr once we have a reasonable services * TODO: this should become std::weak_ptr once we have a reasonable services
* management. * management.
*/ */
extern std::shared_ptr<RsEvents> rsEvents; extern RsEvents* rsEvents;
/** /**
* @brief Events types. * @brief Events types.
@ -113,21 +113,20 @@ public:
* @return False on error, true otherwise. * @return False on error, true otherwise.
*/ */
virtual bool postEvent( virtual bool postEvent(
std::unique_ptr<RsEvent> event, std::shared_ptr<const RsEvent> event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) = 0; ) = 0;
/** /**
* @brief Send event directly to handlers. Blocking API * @brief Send event directly to handlers. Blocking API
* The handlers get exectuded on the caller thread, ensuring the function * The handlers get exectuded on the caller thread.
* returns only after the event has been handled.
* @param[in] event * @param[in] event
* @param[out] errorMessage Optional storage for error messsage, meaningful * @param[out] errorMessage Optional storage for error messsage, meaningful
* only on failure. * only on failure.
* @return False on error, true otherwise. * @return False on error, true otherwise.
*/ */
virtual bool sendEvent( virtual bool sendEvent(
const RsEvent& event, std::shared_ptr<const RsEvent> event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) = 0; ) = 0;
@ -152,7 +151,7 @@ public:
* @return False on error, true otherwise. * @return False on error, true otherwise.
*/ */
virtual bool registerEventsHandler( virtual bool registerEventsHandler(
std::function<void(const RsEvent&)> multiCallback, std::function<void(std::shared_ptr<const RsEvent>)> multiCallback,
RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0) RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0)
) = 0; ) = 0;

View file

@ -35,8 +35,6 @@
#include "pqi/p3linkmgr.h" #include "pqi/p3linkmgr.h"
#include "pqi/p3netmgr.h" #include "pqi/p3netmgr.h"
int rsserverzone = 101;
#include "util/rsdebug.h" #include "util/rsdebug.h"
#include "retroshare/rsevents.h" #include "retroshare/rsevents.h"
@ -86,7 +84,7 @@ RsServer::RsServer() :
{ {
{ {
RsEventsService* tmpRsEvtPtr = new RsEventsService(); RsEventsService* tmpRsEvtPtr = new RsEventsService();
rsEvents.reset(tmpRsEvtPtr); rsEvents = tmpRsEvtPtr;
startServiceThread(tmpRsEvtPtr, "RsEventsService"); startServiceThread(tmpRsEvtPtr, "RsEventsService");
} }
@ -271,8 +269,6 @@ void RsServer::data_tick()
std::string out; std::string out;
rs_sprintf(out, "RsServer::run() WARNING Excessively Long Cycle Time: %g secs => Please DEBUG", cycleTime); rs_sprintf(out, "RsServer::run() WARNING Excessively Long Cycle Time: %g secs => Please DEBUG", cycleTime);
std::cerr << out << std::endl; std::cerr << out << std::endl;
rslog(RSL_ALERT, rsserverzone, out);
} }
#endif #endif
} }

View file

@ -171,12 +171,9 @@ void BroadcastDiscoveryService::data_tick()
else if(!isFriend) else if(!isFriend)
{ {
typedef RsBroadcastDiscoveryPeerFoundEvent Evt_t; typedef RsBroadcastDiscoveryPeerFoundEvent Evt_t;
if(rsEvents)
// Ensure rsEvents is not deleted while we use it rsEvents->postEvent(
std::shared_ptr<RsEvents> lockedRsEvents = rsEvents; std::shared_ptr<Evt_t>(new Evt_t(rbdr)) );
if(lockedRsEvents)
lockedRsEvents->postEvent(
std::unique_ptr<Evt_t>(new Evt_t(rbdr)) );
} }
} }
} }

View file

@ -25,36 +25,43 @@
#include "services/rseventsservice.h" #include "services/rseventsservice.h"
/*extern*/ std::shared_ptr<RsEvents> rsEvents(nullptr); /*extern*/ RsEvents* rsEvents = nullptr;
RsEvent::~RsEvent() {}; RsEvent::~RsEvent() {};
RsEvents::~RsEvents() {}; RsEvents::~RsEvents() {};
bool isEventValid(const RsEvent& event, std::string& errorMessage) bool isEventValid(
std::shared_ptr<const RsEvent> event, std::string& errorMessage )
{ {
if(event.mType <= RsEventType::NONE) if(!event)
{
errorMessage = "Event is null!";
return false;
}
if(event->mType <= RsEventType::NONE)
{ {
errorMessage = "Event has type NONE: " + errorMessage = "Event has type NONE: " +
std::to_string( std::to_string(
static_cast<std::underlying_type<RsEventType>::type >( static_cast<std::underlying_type<RsEventType>::type >(
event.mType ) ); event->mType ) );
return false; return false;
} }
if(event.mType >= RsEventType::MAX) if(event->mType >= RsEventType::MAX)
{ {
errorMessage = "Event has type >= RsEventType::MAX: " + errorMessage = "Event has type >= RsEventType::MAX: " +
std::to_string( std::to_string(
static_cast<std::underlying_type<RsEventType>::type >( static_cast<std::underlying_type<RsEventType>::type >(
event.mType ) ); event->mType ) );
} }
return true; return true;
} }
bool RsEventsService::postEvent( std::unique_ptr<RsEvent> event, bool RsEventsService::postEvent( std::shared_ptr<const RsEvent> event,
std::string& errorMessage ) std::string& errorMessage )
{ {
if(!isEventValid(*event, errorMessage)) if(!isEventValid(event, errorMessage))
{ {
std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage
<< std::endl; << std::endl;
@ -62,17 +69,16 @@ bool RsEventsService::postEvent( std::unique_ptr<RsEvent> event,
} }
RS_STACK_MUTEX(mEventQueueMtx); RS_STACK_MUTEX(mEventQueueMtx);
mEventQueue.push_back(std::move(event)); mEventQueue.push_back(event);
return true; return true;
} }
bool RsEventsService::sendEvent( const RsEvent& event, bool RsEventsService::sendEvent( std::shared_ptr<const RsEvent> event,
std::string& errorMessage ) std::string& errorMessage )
{ {
if(!isEventValid(event, errorMessage)) if(!isEventValid(event, errorMessage))
{ {
std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage RsErr() << __PRETTY_FUNCTION__ << " "<< errorMessage << std::endl;
<< std::endl;
return false; return false;
} }
@ -93,12 +99,12 @@ RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId_unlocked()
} }
bool RsEventsService::registerEventsHandler( bool RsEventsService::registerEventsHandler(
std::function<void(const RsEvent&)> multiCallback, std::function<void(std::shared_ptr<const RsEvent>)> multiCallback,
RsEventsHandlerId_t& hId ) RsEventsHandlerId_t& hId )
{ {
RS_STACK_MUTEX(mHandlerMapMtx); RS_STACK_MUTEX(mHandlerMapMtx);
if(!hId) hId = generateUniqueHandlerId_unlocked(); if(!hId) hId = generateUniqueHandlerId_unlocked();
mHandlerMap[hId] = std::move(multiCallback); mHandlerMap[hId] = multiCallback;
return true; return true;
} }
@ -116,19 +122,19 @@ void RsEventsService::data_tick()
auto nextRunAt = std::chrono::system_clock::now() + auto nextRunAt = std::chrono::system_clock::now() +
std::chrono::milliseconds(1); std::chrono::milliseconds(1);
std::unique_ptr<RsEvent> eventPtr(nullptr); std::shared_ptr<const RsEvent> eventPtr(nullptr);
size_t futureEventsCounter = 0; size_t futureEventsCounter = 0;
dispatchEventFromQueueLock: dispatchEventFromQueueLock:
mEventQueueMtx.lock(); mEventQueueMtx.lock();
if(mEventQueue.size() > futureEventsCounter) if(mEventQueue.size() > futureEventsCounter)
{ {
eventPtr = std::move(mEventQueue.front()); eventPtr = mEventQueue.front();
mEventQueue.pop_front(); mEventQueue.pop_front();
if(eventPtr->mTimePoint >= nextRunAt) if(eventPtr->mTimePoint >= nextRunAt)
{ {
mEventQueue.push_back(std::move(eventPtr)); mEventQueue.push_back(eventPtr);
++futureEventsCounter; ++futureEventsCounter;
} }
} }
@ -137,17 +143,17 @@ dispatchEventFromQueueLock:
if(eventPtr) if(eventPtr)
{ {
/* It is relevant that this stays out of mEventQueueMtx */ /* It is relevant that this stays out of mEventQueueMtx */
handleEvent(*eventPtr); handleEvent(eventPtr);
eventPtr.reset(nullptr); // ensure memory is freed before sleep eventPtr = nullptr; // ensure refcounter is decremented before sleep
goto dispatchEventFromQueueLock; goto dispatchEventFromQueueLock;
} }
std::this_thread::sleep_until(nextRunAt); std::this_thread::sleep_until(nextRunAt);
} }
void RsEventsService::handleEvent(const RsEvent& event) void RsEventsService::handleEvent(std::shared_ptr<const RsEvent> event)
{ {
std::function<void(const RsEvent&)> mCallback; std::function<void(std::shared_ptr<const RsEvent>)> mCallback;
mHandlerMapMtx.lock(); mHandlerMapMtx.lock();
auto cbpt = mHandlerMap.begin(); auto cbpt = mHandlerMap.begin();
@ -165,7 +171,7 @@ getHandlerFromMapLock:
if(mCallback) if(mCallback)
{ {
mCallback(event); // It is relevant that this happens outside mutex mCallback(event); // It is relevant that this happens outside mutex
mCallback = std::function<void(const RsEvent&)>(nullptr); mCallback = std::function<void(std::shared_ptr<const RsEvent>)>(nullptr);
goto getHandlerFromMapLock; goto getHandlerFromMapLock;
} }
} }

View file

@ -27,6 +27,7 @@
#include "retroshare/rsevents.h" #include "retroshare/rsevents.h"
#include "util/rsthreads.h" #include "util/rsthreads.h"
#include "util/rsdebug.h"
class RsEventsService : class RsEventsService :
public RsEvents, public RsTickingThread public RsEvents, public RsTickingThread
@ -38,13 +39,13 @@ public:
/// @see RsEvents /// @see RsEvents
bool postEvent( bool postEvent(
std::unique_ptr<RsEvent> event, std::shared_ptr<const RsEvent> event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) override; ) override;
/// @see RsEvents /// @see RsEvents
bool sendEvent( bool sendEvent(
const RsEvent& event, std::shared_ptr<const RsEvent> event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string) std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) override; ) override;
@ -53,7 +54,7 @@ public:
/// @see RsEvents /// @see RsEvents
bool registerEventsHandler( bool registerEventsHandler(
std::function<void(const RsEvent&)> multiCallback, std::function<void(std::shared_ptr<const RsEvent>)> multiCallback,
RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0) RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0)
) override; ) override;
@ -63,15 +64,18 @@ public:
protected: protected:
RsMutex mHandlerMapMtx; RsMutex mHandlerMapMtx;
RsEventsHandlerId_t mLastHandlerId; RsEventsHandlerId_t mLastHandlerId;
std::map< RsEventsHandlerId_t, std::function<void(const RsEvent&)> > std::map<
mHandlerMap; RsEventsHandlerId_t,
std::function<void(std::shared_ptr<const RsEvent>)> > mHandlerMap;
RsMutex mEventQueueMtx; RsMutex mEventQueueMtx;
std::deque< std::unique_ptr<RsEvent> > mEventQueue; std::deque< std::shared_ptr<const RsEvent> > mEventQueue;
/// @see RsTickingThread /// @see RsTickingThread
void data_tick() override; void data_tick() override;
void handleEvent(const RsEvent& event); void handleEvent(std::shared_ptr<const RsEvent> event);
RsEventsHandlerId_t generateUniqueHandlerId_unlocked(); RsEventsHandlerId_t generateUniqueHandlerId_unlocked();
RS_SET_CONTEXT_DEBUG_LEVEL(3)
}; };