changed RsEvents so that it takes event types when registering handlers, which limits the number of handlers called for each event

This commit is contained in:
csoler 2019-12-29 13:50:54 +01:00
parent dc2f2f5eb4
commit c544b1da7c
No known key found for this signature in database
GPG key ID: 7BCA522266C0804C
19 changed files with 186 additions and 140 deletions

View file

@ -112,6 +112,7 @@ p3discovery2::p3discovery2(
if(rsEvents)
rsEvents->registerEventsHandler(
RsEventType::GOSSIP_DISCOVERY,
[this](std::shared_ptr<const RsEvent> event)
{
rsEventsHandler(*event);
@ -1348,8 +1349,3 @@ void p3discovery2::rsEventsHandler(const RsEvent& event)
//
// /* ignore other operations */
// }
// (cyril) do we still need this??
RsGossipDiscoveryFriendInviteReceivedEvent::RsGossipDiscoveryFriendInviteReceivedEvent(const std::string& invite) :
RsEvent(RsEventType::GOSSIP_DISCOVERY_INVITE_RECEIVED),
mInvite(invite) {}

View file

@ -71,23 +71,32 @@ struct RsBroadcastDiscoveryResult : RsSerializable
* @brief Event emitted when a non friend new peer is found in the local network
* @see RsEvents
*/
struct RsBroadcastDiscoveryPeerFoundEvent : RsEvent
{
RsBroadcastDiscoveryPeerFoundEvent(
const RsBroadcastDiscoveryResult& eventData ) :
RsEvent(RsEventType::BROADCAST_DISCOVERY_PEER_FOUND), mData(eventData) {}
enum class RsBroadcastDiscoveryEventType: uint32_t {
UNKNOWN = 0x00,
PEER_FOUND = 0x01
};
RsBroadcastDiscoveryResult mData;
struct RsBroadcastDiscoveryEvent : RsEvent
{
RsBroadcastDiscoveryEvent()
: RsEvent(RsEventType::BROADCAST_DISCOVERY),
mDiscoveryEventType(RsBroadcastDiscoveryEventType::UNKNOWN)
{}
virtual ~RsBroadcastDiscoveryEvent() override = default;
RsBroadcastDiscoveryEventType mDiscoveryEventType;
RsBroadcastDiscoveryResult mData;
/// @see RsSerializable
void serial_process( RsGenericSerializer::SerializeJob j,
RsGenericSerializer::SerializeContext& ctx) override
{
RsEvent::serial_process(j, ctx);
RS_SERIAL_PROCESS(mDiscoveryEventType);
RS_SERIAL_PROCESS(mData);
}
~RsBroadcastDiscoveryPeerFoundEvent() override;
};

View file

@ -50,10 +50,10 @@ enum class RsEventType : uint32_t
NONE = 0, /// Used to detect uninitialized event
/// @see RsBroadcastDiscovery
BROADCAST_DISCOVERY_PEER_FOUND = 1,
BROADCAST_DISCOVERY = 1,
/// @see RsDiscPendingPgpReceivedEvent
GOSSIP_DISCOVERY_INVITE_RECEIVED = 2,
GOSSIP_DISCOVERY = 2,
/// @see AuthSSL
AUTHSSL_CONNECTION_AUTENTICATION = 3,
@ -61,14 +61,14 @@ enum class RsEventType : uint32_t
/// @see pqissl
PEER_CONNECTION = 4,
/// @see RsGxsChanges // this one should probably be removed because it's not used anywhere
/// @see RsGxsChanges // this one is used in RsGxsBroadcast
GXS_CHANGES = 5,
/// Emitted when a peer state changes, @see RsPeers
PEER_STATE_CHANGED = 6,
/// @see RsMailStatusEvent
MAIL_STATUS_CHANGE = 7,
MAIL_STATUS = 7,
/// @see RsGxsCircleEvent
GXS_CIRCLES = 8,
@ -160,6 +160,7 @@ public:
* Every time an event is dispatced the registered events handlers will get
* their method handleEvent called with the event passed as paramether.
* @jsonapi{development,manualwrapper}
* @param eventType Type of event for which the callback is called
* @param multiCallback Function that will be called each time an event
* is dispatched.
* @param[inout] hId Optional storage for handler id, useful to
@ -170,6 +171,7 @@ public:
* @return False on error, true otherwise.
*/
virtual bool registerEventsHandler(
RsEventType eventType,
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback,
RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0)
) = 0;

View file

@ -45,11 +45,18 @@ extern std::shared_ptr<RsGossipDiscovery> rsGossipDiscovery;
/**
* @brief Emitted when a pending PGP certificate is received
*/
struct RsGossipDiscoveryFriendInviteReceivedEvent : RsEvent
{
RsGossipDiscoveryFriendInviteReceivedEvent(
const std::string& invite );
enum class RsGossipDiscoveryEventType: uint32_t {
UNKNOWN = 0x00,
PEER_INVITE_RECEIVED = 0x01
};
struct RsGossipDiscoveryEvent : RsEvent
{
RsGossipDiscoveryEvent(): RsEvent(RsEventType::GOSSIP_DISCOVERY) {}
virtual ~RsGossipDiscoveryEvent() override {}
RsGossipDiscoveryEventType mGossipDiscoveryEventType;
std::string mInvite;
/// @see RsSerializable
@ -57,6 +64,7 @@ struct RsGossipDiscoveryFriendInviteReceivedEvent : RsEvent
RsGenericSerializer::SerializeContext& ctx )
{
RsEvent::serial_process(j,ctx);
RS_SERIAL_PROCESS(mGossipDiscoveryEventType);
RS_SERIAL_PROCESS(mInvite);
}
};

View file

@ -296,19 +296,19 @@ struct MsgTagType : RsSerializable
} //namespace Rs
} //namespace Msgs
struct RsMailStatusEvent : RsEvent
{
RsMailStatusEvent() : RsEvent(RsEventType::MAIL_STATUS_CHANGE) {}
enum MailStatusEventCode: uint8_t {
enum class RsMailStatusEventType: uint8_t {
NEW_MESSAGE = 0x00,
MESSAGE_REMOVED = 0x01,
MESSAGE_SENT = 0x02,
MESSAGE_RECEIVED_ACK = 0x03, // means the peer received the message
FAILED_SIGNATURE = 0x04, // means the signature of the message cannot be verified
};
};
MailStatusEventCode mMailStatusEventCode;
struct RsMailStatusEvent : RsEvent
{
RsMailStatusEvent() : RsEvent(RsEventType::MAIL_STATUS) {}
RsMailStatusEventType mMailStatusEventCode;
std::set<RsMailMessageId> mChangedMsgIds;
/// @see RsEvent
@ -320,7 +320,7 @@ struct RsMailStatusEvent : RsEvent
RS_SERIAL_PROCESS(mMailStatusEventCode);
}
~RsMailStatusEvent() override;
~RsMailStatusEvent() override = default;
};
#define RS_CHAT_PUBLIC 0x0001
@ -329,7 +329,7 @@ struct RsMailStatusEvent : RsEvent
#define RS_DISTANT_CHAT_STATUS_UNKNOWN 0x0000
#define RS_DISTANT_CHAT_STATUS_TUNNEL_DN 0x0001
#define RS_DISTANT_CHAT_STATUS_CAN_TALK 0x0002
#define RS_DISTANT_CHAT_STATUS_CAN_TALK 0x0002
#define RS_DISTANT_CHAT_STATUS_REMOTELY_CLOSED 0x0003
#define RS_DISTANT_CHAT_ERROR_NO_ERROR 0x0000

View file

@ -33,8 +33,8 @@ void RsGxsPostedPostItem::serial_process(RsGenericSerializer::SerializeJob j,RsG
// and do not expect to deserialize mImage member if the data block has been consummed entirely (keeps compatibility
// of new RS with older posts.
if(j == RsGenericSerializer::DESERIALIZE && ctx.mOffset == ctx.mSize)
return ;
if(j == RsGenericSerializer::DESERIALIZE && ctx.mOffset == ctx.mSize)
return ;
if((j == RsGenericSerializer::SIZE_ESTIMATE || j == RsGenericSerializer::SERIALIZE) && mImage.empty())
return ;

View file

@ -538,8 +538,7 @@ bool p3Msgs::initiateDistantChatConnexion(
const RsGxsId& to_gxs_id, const RsGxsId& from_gxs_id,
DistantChatPeerId& pid, uint32_t& error_code, bool notify )
{
return mChatSrv->initiateDistantChatConnexion( to_gxs_id, from_gxs_id,
pid, error_code, notify );
return mChatSrv->initiateDistantChatConnexion( to_gxs_id, from_gxs_id, pid, error_code, notify );
}
bool p3Msgs::getDistantChatStatus(const DistantChatPeerId& pid,DistantChatPeerInfo& info)
{
@ -559,7 +558,6 @@ uint32_t p3Msgs::getDistantChatPermissionFlags()
}
RsMsgs::~RsMsgs() = default;
RsMailStatusEvent::~RsMailStatusEvent() = default;
Rs::Msgs::MessageInfo::~MessageInfo() = default;
MsgInfoSummary::~MsgInfoSummary() = default;
VisibleChatLobbyRecord::~VisibleChatLobbyRecord() = default;

View file

@ -182,10 +182,15 @@ void BroadcastDiscoveryService::data_tick()
}
else if(!isFriend)
{
typedef RsBroadcastDiscoveryPeerFoundEvent Evt_t;
if(rsEvents)
rsEvents->postEvent(
std::shared_ptr<Evt_t>(new Evt_t(rbdr)) );
{
auto ev = std::make_shared<RsBroadcastDiscoveryEvent>();
ev->mDiscoveryEventType = RsBroadcastDiscoveryEventType::PEER_FOUND;
ev->mData = rbdr;
rsEvents->postEvent(ev);
}
}
}
}
@ -307,5 +312,4 @@ bool BroadcastDiscoveryService::assertMulticastLockIsvalid()
RsBroadcastDiscovery::~RsBroadcastDiscovery() = default;
RsBroadcastDiscoveryResult::~RsBroadcastDiscoveryResult() = default;
RsBroadcastDiscoveryPeerFoundEvent::~RsBroadcastDiscoveryPeerFoundEvent() = default;
BroadcastDiscoveryPack::~BroadcastDiscoveryPack() = default;

View file

@ -178,7 +178,7 @@ void p3MsgService::processIncomingMsg(RsMsgItem *mi)
if (rsEvents)
{
auto ev = std::make_shared<RsMailStatusEvent>();
ev->mMailStatusEventCode = RsMailStatusEvent::NEW_MESSAGE;
ev->mMailStatusEventCode = RsMailStatusEventType::NEW_MESSAGE;
ev->mChangedMsgIds.insert(std::to_string(mi->msgId));
rsEvents->sendEvent(ev);
@ -338,7 +338,7 @@ int p3MsgService::checkOutgoingMessages()
std::list<RsMsgItem*> output_queue;
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEvent::MESSAGE_SENT;
pEvent->mMailStatusEventCode = RsMailStatusEventType::MESSAGE_SENT;
{
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
@ -900,7 +900,7 @@ bool p3MsgService::removeMsgId(const std::string &mid)
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEvent::MESSAGE_REMOVED;
pEvent->mMailStatusEventCode = RsMailStatusEventType::MESSAGE_REMOVED;
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
@ -1269,7 +1269,7 @@ uint32_t p3MsgService::sendMail(
uint32_t ret = 0;
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEvent::MESSAGE_SENT;
pEvent->mMailStatusEventCode = RsMailStatusEventType::MESSAGE_SENT;
auto pSend = [&](const std::set<RsGxsId>& sDest)
{
@ -2085,7 +2085,7 @@ void p3MsgService::notifyDataStatus( const GRouterMsgPropagationId& id,
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEvent::NEW_MESSAGE;
pEvent->mMailStatusEventCode = RsMailStatusEventType::NEW_MESSAGE;
pEvent->mChangedMsgIds.insert(std::to_string(msg_id));
if(rsEvents) rsEvents->postEvent(pEvent);
@ -2189,7 +2189,7 @@ bool p3MsgService::notifyGxsTransSendStatus( RsGxsTransId mailId,
if( status == GxsTransSendStatus::RECEIPT_RECEIVED )
{
pEvent->mMailStatusEventCode = RsMailStatusEvent::NEW_MESSAGE;
pEvent->mMailStatusEventCode = RsMailStatusEventType::NEW_MESSAGE;
uint32_t msg_id;
{
@ -2244,7 +2244,7 @@ bool p3MsgService::notifyGxsTransSendStatus( RsGxsTransId mailId,
else if( status >= GxsTransSendStatus::FAILED_RECEIPT_SIGNATURE )
{
uint32_t msg_id;
pEvent->mMailStatusEventCode = RsMailStatusEvent::FAILED_SIGNATURE;
pEvent->mMailStatusEventCode = RsMailStatusEventType::FAILED_SIGNATURE;
{
RS_STACK_MUTEX(gxsOngoingMutex);

View file

@ -99,22 +99,42 @@ RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId_unlocked()
}
bool RsEventsService::registerEventsHandler(
RsEventType eventType,
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback,
RsEventsHandlerId_t& hId )
{
RS_STACK_MUTEX(mHandlerMapMtx);
if(!hId) hId = generateUniqueHandlerId_unlocked();
mHandlerMap[hId] = multiCallback;
if( (int)eventType > mHandlerMaps.size() + 10)
{
RsErr() << "Cannot register an event handler for an event type larger than 10 plus the max pre-defined event (value passed was " << (int)eventType << " whereas max is " << (int)RsEventType::MAX << ")" << std::endl;
return false;
}
if( (int)eventType >= mHandlerMaps.size())
mHandlerMaps.resize( (int)eventType +1 );
if(!hId)
hId = generateUniqueHandlerId_unlocked();
mHandlerMaps[(int)eventType][hId] = multiCallback;
return true;
}
bool RsEventsService::unregisterEventsHandler(RsEventsHandlerId_t hId)
{
RS_STACK_MUTEX(mHandlerMapMtx);
auto it = mHandlerMap.find(hId);
if(it == mHandlerMap.end()) return false;
mHandlerMap.erase(it);
return true;
for(uint32_t i=0;i<mHandlerMaps.size();++i)
{
auto it = mHandlerMaps[i].find(hId);
if(it != mHandlerMaps[i].end())
{
mHandlerMaps[i].erase(it);
return true;
}
}
return false;
}
void RsEventsService::data_tick()
@ -155,13 +175,20 @@ void RsEventsService::handleEvent(std::shared_ptr<const RsEvent> event)
{
std::function<void(std::shared_ptr<const RsEvent>)> mCallback;
uint32_t event_type_index = (uint32_t)event->mType;
mHandlerMapMtx.lock();
auto cbpt = mHandlerMap.begin();
if(event_type_index >= mHandlerMaps.size() || event_type_index < 1)
{
RsErr() << "Cannot handle an event of type " << event_type_index << ": out of scope!" << std::endl;
return;
}
auto cbpt = mHandlerMaps[event_type_index].begin();
mHandlerMapMtx.unlock();
getHandlerFromMapLock:
mHandlerMapMtx.lock();
if(cbpt != mHandlerMap.end())
if(cbpt != mHandlerMaps[event_type_index].end())
{
mCallback = cbpt->second;
++cbpt;

View file

@ -35,6 +35,7 @@ class RsEventsService :
public:
RsEventsService():
mHandlerMapMtx("RsEventsService::mHandlerMapMtx"), mLastHandlerId(1),
mHandlerMaps(static_cast<int>(RsEventType::MAX)),
mEventQueueMtx("RsEventsService::mEventQueueMtx") {}
/// @see RsEvents
@ -54,6 +55,7 @@ public:
/// @see RsEvents
bool registerEventsHandler(
RsEventType eventType,
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback,
RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0)
) override;
@ -64,9 +66,11 @@ public:
protected:
RsMutex mHandlerMapMtx;
RsEventsHandlerId_t mLastHandlerId;
std::map<
RsEventsHandlerId_t,
std::function<void(std::shared_ptr<const RsEvent>)> > mHandlerMap;
std::vector<
std::map<
RsEventsHandlerId_t,
std::function<void(std::shared_ptr<const RsEvent>)> > > mHandlerMaps;
RsMutex mEventQueueMtx;
std::deque< std::shared_ptr<const RsEvent> > mEventQueue;