mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-08-23 13:15:51 -04:00
Implement a JSON API friendly notification system
This should bit by bit substitute RsNotify which would be very difficult to support properly in JSON API. The new system is much simpler to use also from the C++ side of the moon. BroadcastDiscovery take advantage of the new system to notify about new non friend peer discovered, tested successfully also in JSON API.
This commit is contained in:
parent
9c7a8d479f
commit
7dab487bde
9 changed files with 503 additions and 84 deletions
|
@ -29,9 +29,11 @@
|
|||
#include "retroshare/rspeers.h"
|
||||
#include "serialiser/rsserializable.h"
|
||||
#include "serialiser/rsserializer.h"
|
||||
#include "retroshare/rsevents.h"
|
||||
|
||||
/*extern*/ std::shared_ptr<RsBroadcastDiscovery> rsBroadcastDiscovery(nullptr);
|
||||
RsBroadcastDiscovery::~RsBroadcastDiscovery() { /* Beware of Rs prefix! */ }
|
||||
RsBroadcastDiscoveryPeerFoundEvent::~RsBroadcastDiscoveryPeerFoundEvent() {}
|
||||
|
||||
struct BroadcastDiscoveryPack : RsSerializable
|
||||
{
|
||||
|
@ -87,9 +89,7 @@ struct BroadcastDiscoveryPack : RsSerializable
|
|||
BroadcastDiscoveryService::BroadcastDiscoveryService(
|
||||
RsPeers& pRsPeers ) :
|
||||
mDiscoveredDataMutex("BroadcastDiscoveryService discovered data mutex"),
|
||||
mRsPeers(pRsPeers),
|
||||
mPeersDiscoveredEventHandlersListMutex(
|
||||
"BroadcastDiscoveryService event handlers mutex" )
|
||||
mRsPeers(pRsPeers)
|
||||
{
|
||||
if(mRsPeers.isHiddenNode(mRsPeers.getOwnId())) return;
|
||||
|
||||
|
@ -151,10 +151,7 @@ void BroadcastDiscoveryService::data_tick()
|
|||
}
|
||||
mDiscoveredDataMutex.unlock();
|
||||
|
||||
cleanTimedOutEventHandlers();
|
||||
|
||||
mPeersDiscoveredEventHandlersListMutex.lock();
|
||||
if(!mChangedData.empty() && !mPeersDiscoveredEventHandlersList.empty())
|
||||
if(!mChangedData.empty())
|
||||
{
|
||||
for (auto&& pp : mChangedData)
|
||||
{
|
||||
|
@ -173,12 +170,13 @@ void BroadcastDiscoveryService::data_tick()
|
|||
rbdr.locator.port() );
|
||||
mRsPeers.connectAttempt(rbdr.mSslId);
|
||||
}
|
||||
|
||||
for( const timedDiscHandlers_t& evtHandler :
|
||||
mPeersDiscoveredEventHandlersList ) evtHandler.first(rbdr);
|
||||
else if(rsEvents)
|
||||
{
|
||||
typedef RsBroadcastDiscoveryPeerFoundEvent Evt_t;
|
||||
rsEvents->postEvent(std::unique_ptr<Evt_t>(new Evt_t(rbdr)));
|
||||
}
|
||||
}
|
||||
}
|
||||
mPeersDiscoveredEventHandlersListMutex.unlock();
|
||||
}
|
||||
|
||||
/* Probably this would be better if done only on actual change */
|
||||
|
@ -205,40 +203,3 @@ RsBroadcastDiscoveryResult BroadcastDiscoveryService::createResult(
|
|||
|
||||
return rbdr;
|
||||
}
|
||||
|
||||
void BroadcastDiscoveryService::cleanTimedOutEventHandlers()
|
||||
{
|
||||
auto now = std::chrono::system_clock::now();
|
||||
|
||||
RS_STACK_MUTEX(mPeersDiscoveredEventHandlersListMutex);
|
||||
mPeersDiscoveredEventHandlersList.remove_if(
|
||||
[&](timedDiscHandlers_t h) { return h.second <= now; } );
|
||||
}
|
||||
|
||||
bool BroadcastDiscoveryService::registerPeersDiscoveredEventHandler(
|
||||
const std::function<void (const RsBroadcastDiscoveryResult&)>&
|
||||
multiCallback, rstime_t maxWait, std::string& errorMessage )
|
||||
{
|
||||
auto now = std::chrono::system_clock::now();
|
||||
auto timeout = std::chrono::system_clock::time_point::max();
|
||||
if(maxWait != std::numeric_limits<rstime_t>::max())
|
||||
timeout = now + std::chrono::seconds(maxWait);
|
||||
|
||||
if(timeout <= now)
|
||||
{
|
||||
errorMessage = " Invalid maxWait value: " + std::to_string(maxWait) +
|
||||
" either too big or too little, use: " +
|
||||
"std::numeric_limits<rstime_t>::max() == " +
|
||||
std::to_string(std::numeric_limits<rstime_t>::max()) +
|
||||
" if you meant \"wait forever\"";
|
||||
|
||||
std::cerr << __PRETTY_FUNCTION__ << errorMessage << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
RS_STACK_MUTEX(mPeersDiscoveredEventHandlersListMutex);
|
||||
mPeersDiscoveredEventHandlersList.push_front(
|
||||
std::make_pair(multiCallback,timeout) );
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -43,14 +43,6 @@ public:
|
|||
/// @see RsBroadcastDiscovery
|
||||
std::vector<RsBroadcastDiscoveryResult> getDiscoveredPeers() override;
|
||||
|
||||
/// @see RsBroadcastDiscovery
|
||||
bool registerPeersDiscoveredEventHandler(
|
||||
const std::function<void (const RsBroadcastDiscoveryResult&)>&
|
||||
multiCallback,
|
||||
rstime_t maxWait = 300,
|
||||
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
|
||||
) override;
|
||||
|
||||
/// @see RsTickingThread
|
||||
void data_tick() override;
|
||||
|
||||
|
@ -68,16 +60,6 @@ protected:
|
|||
|
||||
RsPeers& mRsPeers; // TODO: std::shared_ptr<RsPeers> mRsPeers;
|
||||
|
||||
typedef std::pair<
|
||||
std::function<void (const RsBroadcastDiscoveryResult&)>,
|
||||
std::chrono::system_clock::time_point > timedDiscHandlers_t;
|
||||
/** Store peer discovered event handlers with timeout */
|
||||
std::forward_list<timedDiscHandlers_t> mPeersDiscoveredEventHandlersList;
|
||||
RsMutex mPeersDiscoveredEventHandlersListMutex;
|
||||
|
||||
/// Cleanup mPeersDiscoveredEventHandlersList
|
||||
void cleanTimedOutEventHandlers();
|
||||
|
||||
RsBroadcastDiscoveryResult createResult(
|
||||
const UDC::IpPort& ipp, const std::string& uData );
|
||||
};
|
||||
|
|
171
libretroshare/src/services/rseventsservice.cc
Normal file
171
libretroshare/src/services/rseventsservice.cc
Normal file
|
@ -0,0 +1,171 @@
|
|||
/*******************************************************************************
|
||||
* Retroshare events service *
|
||||
* *
|
||||
* libretroshare: retroshare core library *
|
||||
* *
|
||||
* Copyright (C) 2019 Gioacchino Mazzurco <gio@eigenlab.org> *
|
||||
* *
|
||||
* This program is free software: you can redistribute it and/or modify *
|
||||
* it under the terms of the GNU Lesser General Public License as *
|
||||
* published by the Free Software Foundation, either version 3 of the *
|
||||
* License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU Lesser General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU Lesser General Public License *
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
|
||||
* *
|
||||
*******************************************************************************/
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "services/rseventsservice.h"
|
||||
|
||||
|
||||
/*extern*/ std::shared_ptr<RsEvents> rsEvents(nullptr);
|
||||
RsEvent::~RsEvent() {};
|
||||
RsEvents::~RsEvents() {};
|
||||
|
||||
bool isEventValid(const RsEvent& event, std::string& errorMessage)
|
||||
{
|
||||
if(event.mType <= RsEventType::NONE)
|
||||
{
|
||||
errorMessage = "Event has type NONE: " +
|
||||
std::to_string(
|
||||
static_cast<std::underlying_type<RsEventType>::type >(
|
||||
event.mType ) );
|
||||
return false;
|
||||
}
|
||||
|
||||
if(event.mType >= RsEventType::MAX)
|
||||
{
|
||||
errorMessage = "Event has type >= RsEventType::MAX: " +
|
||||
std::to_string(
|
||||
static_cast<std::underlying_type<RsEventType>::type >(
|
||||
event.mType ) );
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool RsEventsService::postEvent( std::unique_ptr<RsEvent> event,
|
||||
std::string& errorMessage )
|
||||
{
|
||||
if(!isEventValid(*event, errorMessage))
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage
|
||||
<< std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
RS_STACK_MUTEX(mEventQueueMtx);
|
||||
mEventQueue.push_back(std::move(event));
|
||||
return true;
|
||||
}
|
||||
|
||||
bool RsEventsService::sendEvent( const RsEvent& event,
|
||||
std::string& errorMessage )
|
||||
{
|
||||
if(!isEventValid(event, errorMessage))
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage
|
||||
<< std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
handleEvent(event);
|
||||
return true;
|
||||
}
|
||||
|
||||
RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId()
|
||||
{
|
||||
RS_STACK_MUTEX(mHandlerMapMtx);
|
||||
return generateUniqueHandlerId_unlocked();
|
||||
}
|
||||
|
||||
RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId_unlocked()
|
||||
{
|
||||
if(++mLastHandlerId) return mLastHandlerId; // Avoid 0 after overflow
|
||||
return 1;
|
||||
}
|
||||
|
||||
bool RsEventsService::registerEventsHandler(
|
||||
std::function<void(const RsEvent&)> multiCallback,
|
||||
RsEventsHandlerId_t& hId )
|
||||
{
|
||||
RS_STACK_MUTEX(mHandlerMapMtx);
|
||||
if(!hId) hId = generateUniqueHandlerId_unlocked();
|
||||
mHandlerMap[hId] = std::move(multiCallback);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool RsEventsService::unregisterEventsHandler(RsEventsHandlerId_t hId)
|
||||
{
|
||||
RS_STACK_MUTEX(mHandlerMapMtx);
|
||||
auto it = mHandlerMap.find(hId);
|
||||
if(it == mHandlerMap.end()) return false;
|
||||
mHandlerMap.erase(it);
|
||||
return true;
|
||||
}
|
||||
|
||||
void RsEventsService::data_tick()
|
||||
{
|
||||
auto nextRunAt = std::chrono::system_clock::now() +
|
||||
std::chrono::milliseconds(1);
|
||||
|
||||
std::unique_ptr<RsEvent> eventPtr(nullptr);
|
||||
uint futureEventsCounter = 0;
|
||||
|
||||
dispatchEventFromQueueLock:
|
||||
mEventQueueMtx.lock();
|
||||
if(mEventQueue.size() > futureEventsCounter)
|
||||
{
|
||||
eventPtr = std::move(mEventQueue.front());
|
||||
mEventQueue.pop_front();
|
||||
|
||||
if(eventPtr->mTimePoint >= nextRunAt)
|
||||
{
|
||||
mEventQueue.push_back(std::move(eventPtr));
|
||||
++futureEventsCounter;
|
||||
}
|
||||
}
|
||||
mEventQueueMtx.unlock();
|
||||
|
||||
if(eventPtr)
|
||||
{
|
||||
/* It is relevant that this stays out of mEventQueueMtx */
|
||||
handleEvent(*eventPtr);
|
||||
eventPtr.reset(nullptr); // ensure memory is freed before sleep
|
||||
goto dispatchEventFromQueueLock;
|
||||
}
|
||||
|
||||
std::this_thread::sleep_until(nextRunAt);
|
||||
}
|
||||
|
||||
void RsEventsService::handleEvent(const RsEvent& event)
|
||||
{
|
||||
std::function<void(const RsEvent&)> mCallback;
|
||||
|
||||
mHandlerMapMtx.lock();
|
||||
auto cbpt = mHandlerMap.begin();
|
||||
mHandlerMapMtx.unlock();
|
||||
|
||||
getHandlerFromMapLock:
|
||||
mHandlerMapMtx.lock();
|
||||
if(cbpt != mHandlerMap.end())
|
||||
{
|
||||
mCallback = cbpt->second;
|
||||
++cbpt;
|
||||
}
|
||||
mHandlerMapMtx.unlock();
|
||||
|
||||
if(mCallback)
|
||||
{
|
||||
mCallback(event); // It is relevant that this happens outside mutex
|
||||
mCallback = std::function<void(const RsEvent&)>(nullptr);
|
||||
goto getHandlerFromMapLock;
|
||||
}
|
||||
}
|
77
libretroshare/src/services/rseventsservice.h
Normal file
77
libretroshare/src/services/rseventsservice.h
Normal file
|
@ -0,0 +1,77 @@
|
|||
/*******************************************************************************
|
||||
* Retroshare events service *
|
||||
* *
|
||||
* libretroshare: retroshare core library *
|
||||
* *
|
||||
* Copyright (C) 2019 Gioacchino Mazzurco <gio@eigenlab.org> *
|
||||
* *
|
||||
* This program is free software: you can redistribute it and/or modify *
|
||||
* it under the terms of the GNU Lesser General Public License as *
|
||||
* published by the Free Software Foundation, either version 3 of the *
|
||||
* License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU Lesser General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU Lesser General Public License *
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
|
||||
* *
|
||||
*******************************************************************************/
|
||||
#pragma once
|
||||
|
||||
#include <memory>
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
|
||||
#include "retroshare/rsevents.h"
|
||||
#include "util/rsthreads.h"
|
||||
|
||||
class RsEventsService :
|
||||
public RsEvents, public RsTickingThread
|
||||
{
|
||||
public:
|
||||
RsEventsService():
|
||||
mHandlerMapMtx("RsEventsService::mHandlerMapMtx"), mLastHandlerId(1),
|
||||
mEventQueueMtx("RsEventsService::mEventQueueMtx") {}
|
||||
|
||||
/// @see RsEvents
|
||||
bool postEvent(
|
||||
std::unique_ptr<RsEvent> event,
|
||||
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
|
||||
) override;
|
||||
|
||||
/// @see RsEvents
|
||||
bool sendEvent(
|
||||
const RsEvent& event,
|
||||
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
|
||||
) override;
|
||||
|
||||
/// @see RsEvents
|
||||
RsEventsHandlerId_t generateUniqueHandlerId() override;
|
||||
|
||||
/// @see RsEvents
|
||||
bool registerEventsHandler(
|
||||
std::function<void(const RsEvent&)> multiCallback,
|
||||
RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0)
|
||||
) override;
|
||||
|
||||
/// @see RsEvents
|
||||
bool unregisterEventsHandler(RsEventsHandlerId_t hId) override;
|
||||
|
||||
protected:
|
||||
RsMutex mHandlerMapMtx;
|
||||
RsEventsHandlerId_t mLastHandlerId;
|
||||
std::map< RsEventsHandlerId_t, std::function<void(const RsEvent&)> >
|
||||
mHandlerMap;
|
||||
|
||||
RsMutex mEventQueueMtx;
|
||||
std::deque< std::unique_ptr<RsEvent> > mEventQueue;
|
||||
|
||||
/// @see RsTickingThread
|
||||
void data_tick() override;
|
||||
|
||||
void handleEvent(const RsEvent& event);
|
||||
RsEventsHandlerId_t generateUniqueHandlerId_unlocked();
|
||||
};
|
Loading…
Add table
Add a link
Reference in a new issue