From e4f25a558d48629155486d3e8517740328823f8f Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Sat, 13 Mar 2021 20:17:11 +0100 Subject: [PATCH 1/8] Implement pull request mechanism in RsGxsNetService This could be used to request the online peers to pull updates from us ASAP, as an exaple when a group is created a pull request can be emitted too so the online peers pull the groups from us ASAP instead of waiting for the usual 60 seconds. A mechanism like this is especially useful on mobile phones where the internet connection is usually turned on only in a few moments (as an example while the user is interacting with the app). Cleanup a few old corners in the code keeping retro-compatibility and make the code more welcoming to new developers. Put a bunch of dead code under #ifdef. --- libretroshare/src/gxs/rsgxsnetservice.cc | 115 +++++++++++++++------ libretroshare/src/gxs/rsgxsnetservice.h | 22 +++- libretroshare/src/libretroshare.pro | 3 +- libretroshare/src/rsitems/itempriorities.h | 8 +- libretroshare/src/rsitems/rsitem.h | 37 +++++-- libretroshare/src/rsitems/rsnxsitems.h | 55 ++++++---- libretroshare/src/serialiser/rsserial.cc | 33 +++--- libretroshare/src/util/cxx23retrocompat.h | 34 ++++++ 8 files changed, 233 insertions(+), 74 deletions(-) create mode 100644 libretroshare/src/util/cxx23retrocompat.h diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index e664b580c..adea14205 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -35,7 +35,7 @@ // | // +----------- sharePublishKeys() // | -// +----------- syncWithPeers() +// +----------- pullFromPeers() // | | // | +--if AutoSync--- send global UpdateTS of each peer to itself => the peer knows the last // | | time current peer has received an updated from himself @@ -127,14 +127,14 @@ // (Set at server side to be mGrpServerUpdateItem->grpUpdateTS) // // Only updated in processCompletedIncomingTransaction() from Grp list transaction. -// Used in syncWithPeers() sending in RsNxsSyncGrp once to all peers: peer will send data if +// Used in pullFromPeers() sending in RsNxsSyncGrp once to all peers: peer will send data if // has something new. All time comparisons are in the friends' clock time. // // mClientMsgUpdateMap: map< RsPeerId, map > // // Last msg list modification time sent by that peer Id // Updated in processCompletedIncomingTransaction() from Grp list trans. -// Used in syncWithPeers() sending in RsNxsSyncGrp once to all peers. +// Used in pullFromPeers() sending in RsNxsSyncGrp once to all peers. // Set at server to be mServerMsgUpdateMap[grpId]->msgUpdateTS // // mGrpServerUpdateItem: TimeStamp Last group local modification timestamp over all groups @@ -150,7 +150,7 @@ // // tick() tick() // | | -// +---- SyncWithPeers +-- recvNxsItemQueue() +// +---- pullFromPeers +-- recvNxsItemQueue() // | | // +---------------- Send global UpdateTS of each peer to itself => the peer knows +---------> +------ handleRecvSyncGroup( RsNxsSyncGrp*) // | the last msg sent (stored in mClientGrpUpdateMap[peer_id]), | | - parse all subscribed groups. For each, send a RsNxsSyncGrpItem with publish TS @@ -457,7 +457,7 @@ int RsGxsNetService::tick() if((elapsed) < now) { - syncWithPeers(); + pullFromPeers(); syncGrpStatistics(); checkDistantSyncState(); @@ -570,39 +570,39 @@ RsGxsGroupId RsGxsNetService::hashGrpId(const RsGxsGroupId& gid,const RsPeerId& return RsGxsGroupId( RsDirUtil::sha1sum(tmpmem,SIZE).toByteArray() ); } -void RsGxsNetService::syncWithPeers() +void RsGxsNetService::pullFromPeers(std::set peers) { #ifdef NXS_NET_DEBUG_0 - GXSNETDEBUG___ << "RsGxsNetService::syncWithPeers() this=" << (void*)this << ". serviceInfo=" << mServiceInfo << std::endl; + RS_DBG("this=", (void*)this, ". serviceInfo=", mServiceInfo); #endif - static RsNxsSerialiser ser(mServType) ; // this is used to estimate bandwidth. - - RS_STACK_MUTEX(mNxsMutex) ; - - std::set peers; - mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers); - - if(mAllowDistSync && mGxsNetTunnel != NULL) + /* If specific peers are passed as paramether ask only to them */ + if(peers.empty()) { - // Grab all online virtual peers of distant tunnels for the current service. + mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers); - std::list vpids ; - mGxsNetTunnel->getVirtualPeers(vpids); + if(mAllowDistSync && mGxsNetTunnel != nullptr) + { + /* Grab all online virtual peers of distant tunnels for the current + * service. */ - for(auto it(vpids.begin());it!=vpids.end();++it) - peers.insert(RsPeerId(*it)) ; + std::list vpids ; + mGxsNetTunnel->getVirtualPeers(vpids); + + for(auto it(vpids.begin());it!=vpids.end();++it) + peers.insert(RsPeerId(*it)) ; + } } - if (peers.empty()) { - // nothing to do - return; - } + // Still empty? Then nothing to do + if (peers.empty()) return; + + + RS_STACK_MUTEX(mNxsMutex); // for now just grps for(auto sit = peers.begin(); sit != peers.end(); ++sit) { - const RsPeerId peerId = *sit; ClientGrpMap::const_iterator cit = mClientGrpUpdateMap.find(peerId); @@ -624,8 +624,7 @@ void RsGxsNetService::syncWithPeers() generic_sendItem(grp); } - if(!mAllowMsgSync) - return ; + if(!mAllowMsgSync) return; #ifndef GXS_DISABLE_SYNC_MSGS @@ -746,7 +745,7 @@ void RsGxsNetService::syncWithPeers() #endif } -void RsGxsNetService::generic_sendItem(RsNxsItem *si) +void RsGxsNetService::generic_sendItem(rs_owner_ptr si) { // check if the item is to be sent to a distant peer or not @@ -1718,13 +1717,25 @@ RsItem *RsGxsNetService::generic_recvItem() void RsGxsNetService::recvNxsItemQueue() { - RsItem *item ; + RsItem* item; - while(NULL != (item=generic_recvItem())) - { + while(nullptr != (item=generic_recvItem())) + { #ifdef NXS_NET_DEBUG_1 - GXSNETDEBUG_P_(item->PeerId()) << "Received RsGxsNetService Item:" << (void*)item << " type=" << std::hex << item->PacketId() << std::dec << std::endl ; + RS_DBG( "Received RsGxsNetService Item: ", (void*)item, " type=", + item->PacketId() ); #endif + /* Handle pull request and other new items here to not mess with all the + * old nested code and items hell */ + switch(static_cast(item->PacketSubType())) + { + case RsNxsSubtype::PULL_REQUEST: + std::unique_ptr pullItem( + static_cast(item) ); + handlePullRequest(std::move(pullItem)); + continue; + } + // RsNxsItem needs dynamic_cast, since they have derived siblings. // RsNxsItem *ni = dynamic_cast(item) ; @@ -5075,6 +5086,46 @@ void RsGxsNetService::handleRecvPublishKeys(RsNxsGroupPublishKeyItem *item) } } +std::error_condition RsGxsNetService::requestPull(std::set peers) +{ + /* If specific peers are passed as paramether ask only to them */ + if(peers.empty()) + { + mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers); + + if(mAllowDistSync && mGxsNetTunnel != nullptr) + { + /* Grab all online virtual peers of distant tunnels for the current + * service. */ + + std::list vpids ; + mGxsNetTunnel->getVirtualPeers(vpids); + + for(auto it(vpids.begin());it!=vpids.end();++it) + peers.insert(RsPeerId(*it)) ; + } + } + + // Still empty? Reports there are no available peers + if (peers.empty()) return std::errc::network_down; + + for(auto& peerId: std::as_const(peers)) + { + auto item = new RsNxsPullRequestItem( + static_cast(mServType) ); + item->PeerId(peerId); + generic_sendItem(item); + } + + return std::error_condition(); +} + +void RsGxsNetService::handlePullRequest( + std::unique_ptr item ) +{ + pullFromPeers(std::set{item->PeerId()}); +} + bool RsGxsNetService::getGroupServerUpdateTS(const RsGxsGroupId& gid,rstime_t& group_server_update_TS, rstime_t& msg_server_update_TS) { RS_STACK_MUTEX(mNxsMutex) ; diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index a4b448003..406531852 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -423,6 +423,8 @@ private: */ void handleRecvPublishKeys(RsNxsGroupPublishKeyItem*) ; + void handlePullRequest(std::unique_ptr item); + /** E: item handlers **/ @@ -459,7 +461,23 @@ private: void locked_pushMsgRespFromList(std::list& itemL, const RsPeerId& sslId, const RsGxsGroupId &grp_id, const uint32_t& transN); void checkDistantSyncState(); - void syncWithPeers(); + + /** + * @brief Pull new stuff from peers + * @param peers peers to pull from, if empty all available peers are pulled + */ + void pullFromPeers(std::set peers = std::set()); + + /** + * @brief request online peers to pull updates from our node ASAP + * @param peers peers to which request pull from, if empty all available + * peers are requested to pull + * @return success or error details + * TODO: should this be exposed via RsNetworkExchangeService? + */ + std::error_condition requestPull( + std::set peers = std::set() ); + void syncGrpStatistics(); void addGroupItemToList(NxsTransaction*& tr, const RsGxsGroupId& grpId, uint32_t& transN, @@ -559,7 +577,7 @@ private: void cleanRejectedMessages(); void processObserverNotifications(); - void generic_sendItem(RsNxsItem *si); + void generic_sendItem(rs_owner_ptr si); RsItem *generic_recvItem(); private: diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index d42f68328..9358d8147 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -507,7 +507,8 @@ HEADERS += util/folderiterator.h \ util/cxx11retrocompat.h \ util/cxx14retrocompat.h \ util/cxx17retrocompat.h \ - util/rsurl.h + util/cxx23retrocompat.h \ + util/rsurl.h SOURCES += ft/ftchunkmap.cc \ ft/ftcontroller.cc \ diff --git a/libretroshare/src/rsitems/itempriorities.h b/libretroshare/src/rsitems/itempriorities.h index a376a9d64..7c2b92cb6 100644 --- a/libretroshare/src/rsitems/itempriorities.h +++ b/libretroshare/src/rsitems/itempriorities.h @@ -3,7 +3,9 @@ * * * libretroshare: retroshare core library * * * - * Copyright 2011-2011 by Cyril Soler * + * Copyright (C) 2011-2018 Cyril Soler * + * Copyright (C) 2021 Gioacchino Mazzurco * + * Copyright (C) 2021 Asociación Civil Altermundi * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU Lesser General Public License as * @@ -21,7 +23,9 @@ *******************************************************************************/ #pragma once -#include +#include + +using RsItemPriority = uint8_t; // This file centralises QoS priorities for all transfer RsItems // diff --git a/libretroshare/src/rsitems/rsitem.h b/libretroshare/src/rsitems/rsitem.h index 3ef44f859..70844f840 100644 --- a/libretroshare/src/rsitems/rsitem.h +++ b/libretroshare/src/rsitems/rsitem.h @@ -3,7 +3,8 @@ * * * libretroshare: retroshare core library * * * - * Copyright (C) 2018 Gioacchino Mazzurco * + * Copyright (C) 2018-2021 Gioacchino Mazzurco * + * Copyright (C) 2021 Asociación Civil Altermundi * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU Lesser General Public License as * @@ -28,6 +29,9 @@ #include "serialiser/rsserializer.h" #include "serialiser/rsserializable.h" #include "util/stacktrace.h" +#include "rsitems/itempriorities.h" +#include "rsitems/rsserviceids.h" + #include @@ -42,8 +46,13 @@ struct RsItem : RsMemoryManagement::SmallObject, RsSerializable virtual ~RsItem(); - /// TODO: Do this make sense with the new serialization system? - virtual void clear() = 0; + /** TODO: Does the existence of this method make sense with the new + * serialization system? **/ + virtual void clear() + { + RS_ERR("Called without being overridden, report to developers"); + print_stacktrace(); + } /// @deprecated use << ostream operator instead RS_DEPRECATED_FOR("<< ostream operator") @@ -70,14 +79,21 @@ struct RsItem : RsMemoryManagement::SmallObject, RsSerializable uint8_t PacketType(); uint8_t PacketSubType() const; + /** For Service Packets, @deprecated use the costructor with priority + * paramether instead */ + RS_DEPRECATED RsItem(uint8_t ver, uint16_t service, uint8_t subtype); + /// For Service Packets - RsItem(uint8_t ver, uint16_t service, uint8_t subtype); + RsItem( uint8_t ver, RsServiceType service, uint8_t subtype, + RsItemPriority prio ); + uint16_t PacketService() const; /* combined Packet class/type (mid 16bits) */ void setPacketService(uint16_t service); inline uint8_t priority_level() const { return _priority_level ;} inline void setPriorityLevel(uint8_t l) { _priority_level = l ;} +#ifdef RS_DEAD_CODE /* * TODO: This default implementation should be removed and childs structs * implement ::serial_process(...) as soon as all the codebase is ported to @@ -90,11 +106,12 @@ struct RsItem : RsMemoryManagement::SmallObject, RsSerializable "overriding Class is: ", typeid(*this).name() ); print_stacktrace(); } +#endif //def RS_DEAD_CODE protected: uint32_t type; RsPeerId peerId; - uint8_t _priority_level; + RsItemPriority _priority_level; }; /// TODO: Do this make sense with the new serialization system? @@ -108,9 +125,17 @@ public: uint32_t getRawLength() { return len; } void * getRawData() { return data; } - virtual void clear() {} +// virtual void clear() override {} virtual std::ostream &print(std::ostream &out, uint16_t indent = 0); + virtual void serial_process(RsGenericSerializer::SerializeJob, + RsGenericSerializer::SerializeContext&) override + { + RS_ERR( "called by an item using new serialization system ", + typeid(*this).name() ); + print_stacktrace(); + } + private: void *data; uint32_t len; diff --git a/libretroshare/src/rsitems/rsnxsitems.h b/libretroshare/src/rsitems/rsnxsitems.h index 2b6731bfc..8be63f74c 100644 --- a/libretroshare/src/rsitems/rsnxsitems.h +++ b/libretroshare/src/rsitems/rsnxsitems.h @@ -3,7 +3,10 @@ * * * libretroshare: retroshare core library * * * - * Copyright 2012 Christopher Evi-Parker,Robert Fernie* + * Copyright (C) 2012 Christopher Evi-Parker * + * Copyright (C) 2012 Robert Fernie * + * Copyright (C) 2021 Gioacchino Mazzurco * + * Copyright (C) 2021 Asociación Civil Altermundi * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU Lesser General Public License as * @@ -19,8 +22,7 @@ * along with this program. If not, see . * * * *******************************************************************************/ -#ifndef RSNXSITEMS_H -#define RSNXSITEMS_H +#pragma once #include #include @@ -35,6 +37,7 @@ // These items have "flag type" numbers, but this is not used. +// TODO: refactor as C++11 enum class const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_REQ_ITEM = 0x01; const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_ITEM = 0x02; const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_STATS_ITEM = 0x03; @@ -47,14 +50,19 @@ const uint8_t RS_PKT_SUBTYPE_NXS_MSG_ITEM = 0x20; const uint8_t RS_PKT_SUBTYPE_NXS_TRANSAC_ITEM = 0x40; const uint8_t RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY_ITEM = 0x80; -// possibility create second service to deal with this functionality +enum class RsNxsSubtype : uint8_t +{ + PULL_REQUEST = 0x90 /// @see RsNxsPullRequestItem +}; +#ifdef RS_DEAD_CODE +// possibility create second service to deal with this functionality const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_GRP = 0x0001; const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_MSG = 0x0002; const uint8_t RS_PKT_SUBTYPE_EXT_DELETE_GRP = 0x0004; const uint8_t RS_PKT_SUBTYPE_EXT_DELETE_MSG = 0x0008; const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_REQ = 0x0010; - +#endif // def RS_DEAD_CODE /*! * Base class for Network exchange service @@ -65,17 +73,14 @@ const uint8_t RS_PKT_SUBTYPE_EXT_SEARCH_REQ = 0x0010; */ class RsNxsItem : public RsItem { - public: - RsNxsItem(uint16_t servtype, uint8_t subtype) : RsItem(RS_PKT_VERSION_SERVICE, servtype, subtype), transactionNumber(0) - { - setPriorityLevel(QOS_PRIORITY_RS_GXS_NET); - return; - } - virtual ~RsNxsItem(){} - virtual void clear() = 0; + RsNxsItem(uint16_t servtype, uint8_t subtype): + RsItem(RS_PKT_VERSION_SERVICE, servtype, subtype), transactionNumber(0) + { setPriorityLevel(QOS_PRIORITY_RS_GXS_NET); } - uint32_t transactionNumber; // set to zero if this is not a transaction item + virtual ~RsNxsItem() = default; + + uint32_t transactionNumber; // set to zero if this is not a transaction item }; @@ -362,6 +367,22 @@ public: }; +/*! + * Used to request to a peer pull updates from us ASAP without waiting GXS sync + * timer */ +struct RsNxsPullRequestItem: RsItem +{ + explicit RsNxsPullRequestItem(RsServiceType servtype): + RsItem( RS_PKT_VERSION_SERVICE, + servtype, + static_cast(RsNxsSubtype::PULL_REQUEST), + QOS_PRIORITY_RS_GXS_NET ) {} + + /// @see RsSerializable + void serial_process( RsGenericSerializer::SerializeJob, + RsGenericSerializer::SerializeContext& ) override {} +}; + /*! * Used to respond to a RsGrpMsgsReq @@ -401,6 +422,7 @@ struct RsNxsMsg : RsNxsItem RsGxsMsgMetaData* metaData; }; +#ifdef RS_DEAD_CODE /*! * Used to request a search of user data */ @@ -422,7 +444,7 @@ public: RsTlvBinaryData serviceSearchItem; // service aware of item class uint32_t expiration; // expiration date }; - +#endif //def RS_DEAD_CODE #ifdef UNUSED_CODE @@ -511,6 +533,3 @@ public: protected: const uint16_t SERVICE_TYPE; }; - - -#endif // RSNXSITEMS_H diff --git a/libretroshare/src/serialiser/rsserial.cc b/libretroshare/src/serialiser/rsserial.cc index 52123abf1..4d3cda811 100644 --- a/libretroshare/src/serialiser/rsserial.cc +++ b/libretroshare/src/serialiser/rsserial.cc @@ -3,7 +3,9 @@ * * * libretroshare: retroshare core library * * * - * Copyright 2007-2008 by Robert Fernie * + * Copyright (C) 2007-2008 Robert Fernie * + * Copyright (C) 2021 Gioacchino Mazzurco * + * Copyright (C) 2021 Asociación Civil Altermundi * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU Lesser General Public License as * @@ -20,21 +22,20 @@ * * *******************************************************************************/ -#include "serialiser/rsbaseserial.h" - -#include "util/rsthreads.h" -#include "util/rsstring.h" -#include "util/rsprint.h" - -#include "rsitems/rsitem.h" -#include "rsitems/itempriorities.h" - -#include +#include #include #include #include #include +#include "serialiser/rsbaseserial.h" +#include "util/cxx23retrocompat.h" +#include "util/rsthreads.h" +#include "util/rsstring.h" +#include "util/rsprint.h" +#include "rsitems/rsitem.h" +#include "rsitems/itempriorities.h" + /*** * #define RSSERIAL_DEBUG 1 @@ -166,11 +167,17 @@ uint8_t RsItem::PacketSubType() const /* For Service Packets */ RsItem::RsItem(uint8_t ver, uint16_t service, uint8_t subtype) { - _priority_level = QOS_PRIORITY_UNKNOWN ; // This value triggers PQIInterface to complain about undefined priorities + // This value triggers PQIInterface to complain about undefined priorities + _priority_level = QOS_PRIORITY_UNKNOWN; type = (ver << 24) + (service << 8) + subtype; - return; } +RsItem::RsItem( uint8_t ver, RsServiceType service, uint8_t subtype, + RsItemPriority prio ): + type(static_cast( + (ver << 24) + (std::to_underlying(service) << 8) + subtype )), + _priority_level(prio) {} + uint16_t RsItem::PacketService() const { return (type >> 8) & 0xFFFF; diff --git a/libretroshare/src/util/cxx23retrocompat.h b/libretroshare/src/util/cxx23retrocompat.h new file mode 100644 index 000000000..cd96b7cbf --- /dev/null +++ b/libretroshare/src/util/cxx23retrocompat.h @@ -0,0 +1,34 @@ +/******************************************************************************* + * RetroShare C++23 backwards compatibility utilities * + * * + * libretroshare: retroshare core library * + * * + * Copyright (C) 2021 Gioacchino Mazzurco * + * Copyright (C) 2021 Asociación Civil Altermundi * + * * + * This program is free software: you can redistribute it and/or modify * + * it under the terms of the GNU Lesser General Public License as * + * published by the Free Software Foundation, either version 3 of the * + * License, or (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU Lesser General Public License for more details. * + * * + * You should have received a copy of the GNU Lesser General Public License * + * along with this program. If not, see . * + * * + *******************************************************************************/ +#pragma once + +#include + +#if ! defined(__cpp_lib_to_underlying) +namespace std +{ +template +constexpr underlying_type_t to_underlying(Enum e) noexcept +{ return static_cast>(e); } +} +#endif // ! defined(__cpp_lib_to_underlying) From a374f1dc6bc5a60407460f4d621a75c3c9f0340d Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Sat, 13 Mar 2021 21:11:18 +0100 Subject: [PATCH 2/8] Implement RsNxsSerialiser::create_item for RsNxsPullRequestItem --- libretroshare/src/rsitems/rsnxsitems.cc | 6 ++++++ libretroshare/src/rsitems/rsnxsitems.h | 15 ++++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/libretroshare/src/rsitems/rsnxsitems.cc b/libretroshare/src/rsitems/rsnxsitems.cc index e8178e045..94409bc19 100644 --- a/libretroshare/src/rsitems/rsnxsitems.cc +++ b/libretroshare/src/rsitems/rsnxsitems.cc @@ -64,6 +64,12 @@ RsItem *RsNxsSerialiser::create_item(uint16_t service_id,uint8_t item_subtype) c if(service_id != SERVICE_TYPE) return NULL ; + switch(static_cast(item_subtype)) + { + case RsNxsSubtype::PULL_REQUEST: + return new RsNxsPullRequestItem(static_cast(service_id)); + } + switch(item_subtype) { case RS_PKT_SUBTYPE_NXS_SYNC_GRP_REQ_ITEM: return new RsNxsSyncGrpReqItem(SERVICE_TYPE) ; diff --git a/libretroshare/src/rsitems/rsnxsitems.h b/libretroshare/src/rsitems/rsnxsitems.h index 8be63f74c..c7b6c58e3 100644 --- a/libretroshare/src/rsitems/rsnxsitems.h +++ b/libretroshare/src/rsitems/rsnxsitems.h @@ -35,8 +35,12 @@ #include "serialiser/rstlvkeys.h" #include "gxs/rsgxsdata.h" -// These items have "flag type" numbers, but this is not used. +enum class RsNxsSubtype : uint8_t +{ + PULL_REQUEST = 0x90 /// @see RsNxsPullRequestItem +}; +// These items have "flag type" numbers, but this is not used. // TODO: refactor as C++11 enum class const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_REQ_ITEM = 0x01; const uint8_t RS_PKT_SUBTYPE_NXS_SYNC_GRP_ITEM = 0x02; @@ -50,10 +54,6 @@ const uint8_t RS_PKT_SUBTYPE_NXS_MSG_ITEM = 0x20; const uint8_t RS_PKT_SUBTYPE_NXS_TRANSAC_ITEM = 0x40; const uint8_t RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY_ITEM = 0x80; -enum class RsNxsSubtype : uint8_t -{ - PULL_REQUEST = 0x90 /// @see RsNxsPullRequestItem -}; #ifdef RS_DEAD_CODE // possibility create second service to deal with this functionality @@ -525,8 +525,9 @@ class RsNxsSerialiser : public RsServiceSerializer { public: - explicit RsNxsSerialiser(uint16_t servtype) : RsServiceSerializer(servtype), SERVICE_TYPE(servtype) {} - virtual ~RsNxsSerialiser() {} + explicit RsNxsSerialiser(uint16_t servtype): + RsServiceSerializer(servtype), SERVICE_TYPE(servtype) {} + virtual ~RsNxsSerialiser() = default; virtual RsItem *create_item(uint16_t service_id,uint8_t item_subtype) const ; From a7f1e94ceae97cae75db2466cb2a0618f23a1121 Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Sat, 13 Mar 2021 22:39:59 +0100 Subject: [PATCH 3/8] Request pull from peers when GXS group is created --- libretroshare/src/gxs/rsgenexchange.cc | 17 +++++++++++++---- libretroshare/src/gxs/rsgxsnetservice.h | 24 ++++++++---------------- libretroshare/src/gxs/rsnxs.h | 16 ++++++++++++++++ 3 files changed, 37 insertions(+), 20 deletions(-) diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index da8973eb1..152198da2 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -41,6 +41,7 @@ #include "rsserver/p3face.h" #include "retroshare/rsevents.h" #include "util/radix64.h" +#include "util/cxx17retrocompat.h" #define PUB_GRP_MASK 0x000f #define RESTR_GRP_MASK 0x00f0 @@ -2723,7 +2724,8 @@ bool RsGenExchange::checkKeys(const RsTlvSecurityKeySet& keySet) void RsGenExchange::publishGrps() { - std::list groups_to_subscribe ; + bool atLeastOneGroupCreatedSuccessfully = false; + std::list groups_to_subscribe; { RS_STACK_MUTEX(mGenMtx) ; @@ -2954,6 +2956,8 @@ void RsGenExchange::publishGrps() // add to published to allow acknowledgement toNotify.insert(std::make_pair(token, GrpNote(true,ggps.mIsUpdate,grpId))); + + atLeastOneGroupCreatedSuccessfully = true; } } @@ -2972,9 +2976,14 @@ void RsGenExchange::publishGrps() // This is done off-mutex to avoid possible cross deadlocks with the net service. - if(mNetService!=NULL) - for(std::list::const_iterator it(groups_to_subscribe.begin());it!=groups_to_subscribe.end();++it) - mNetService->subscribeStatusChanged((*it),true) ; + if(mNetService != nullptr) + { + for(auto& grpId : std::as_const(groups_to_subscribe)) + mNetService->subscribeStatusChanged(grpId, true); + + if(atLeastOneGroupCreatedSuccessfully) + mNetService->requestPull(); + } } uint32_t RsGenExchange::generatePublicToken() diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 406531852..308f9b067 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -250,6 +250,14 @@ public: void threadTick() override; /// @see RsTickingThread + + /// @see RsNetworkExchangeService + void pullFromPeers(std::set peers = std::set()) override; + + /// @see RsNetworkExchangeService + std::error_condition requestPull( + std::set peers = std::set() ) override; + private: /*! @@ -462,22 +470,6 @@ private: void checkDistantSyncState(); - /** - * @brief Pull new stuff from peers - * @param peers peers to pull from, if empty all available peers are pulled - */ - void pullFromPeers(std::set peers = std::set()); - - /** - * @brief request online peers to pull updates from our node ASAP - * @param peers peers to which request pull from, if empty all available - * peers are requested to pull - * @return success or error details - * TODO: should this be exposed via RsNetworkExchangeService? - */ - std::error_condition requestPull( - std::set peers = std::set() ); - void syncGrpStatistics(); void addGroupItemToList(NxsTransaction*& tr, const RsGxsGroupId& grpId, uint32_t& transN, diff --git a/libretroshare/src/gxs/rsnxs.h b/libretroshare/src/gxs/rsnxs.h index 2f9bb25a6..d74254749 100644 --- a/libretroshare/src/gxs/rsnxs.h +++ b/libretroshare/src/gxs/rsnxs.h @@ -325,4 +325,20 @@ public: return RsReputationLevel::NEUTRAL; } } + + /** + * @brief Pull new stuff from peers + * @param peers peers to pull from, if empty all available peers are pulled + */ + virtual void pullFromPeers( + std::set peers = std::set() ) = 0; + + /** + * @brief request online peers to pull updates from our node ASAP + * @param peers peers to which request pull from, if empty all available + * peers are requested to pull + * @return success or error details + */ + virtual std::error_condition requestPull( + std::set peers = std::set() ) = 0; }; From b42323013eecf9ac73660ef2841f88af30d951c2 Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Sun, 14 Mar 2021 09:28:50 +0100 Subject: [PATCH 4/8] Fix includes in rsnxs.h --- libretroshare/src/gxs/rsnxs.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libretroshare/src/gxs/rsnxs.h b/libretroshare/src/gxs/rsnxs.h index d74254749..5933d4572 100644 --- a/libretroshare/src/gxs/rsnxs.h +++ b/libretroshare/src/gxs/rsnxs.h @@ -26,7 +26,7 @@ #include #include -#include +#include #include #include From 6295e91304963a45094ebe9c8443120b86b28b1e Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Sun, 14 Mar 2021 17:15:00 +0100 Subject: [PATCH 5/8] Request pull from peers when GXS message is created --- libretroshare/src/gxs/rsgenexchange.cc | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/libretroshare/src/gxs/rsgenexchange.cc b/libretroshare/src/gxs/rsgenexchange.cc index 152198da2..fb1b4c295 100644 --- a/libretroshare/src/gxs/rsgenexchange.cc +++ b/libretroshare/src/gxs/rsgenexchange.cc @@ -2286,8 +2286,9 @@ bool RsGenExchange::processGrpMask(const RsGxsGroupId& grpId, ContentValue &grpC void RsGenExchange::publishMsgs() { + bool atLeastOneMessageCreatedSuccessfully = false; - RS_STACK_MUTEX(mGenMtx) ; + RS_STACK_MUTEX(mGenMtx); rstime_t now = time(NULL); @@ -2464,6 +2465,8 @@ void RsGenExchange::publishMsgs() // add to published to allow acknowledgement mMsgNotify.insert(std::make_pair(mit->first, std::make_pair(grpId, msgId))); mDataAccess->updatePublicRequestStatus(mit->first, RsTokenService::COMPLETE); + + atLeastOneMessageCreatedSuccessfully = true; } else { @@ -2497,6 +2500,8 @@ void RsGenExchange::publishMsgs() mNotifications.push_back(ch); } + + if(atLeastOneMessageCreatedSuccessfully) mNetService->requestPull(); } RsGenExchange::ServiceCreate_Return RsGenExchange::service_CreateGroup(RsGxsGrpItem* /* grpItem */, From fc404bd5d87a46d93d5a816a42f66d8023a97f1b Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Tue, 16 Mar 2021 17:51:59 +0100 Subject: [PATCH 6/8] RsGxsNetService trigger pulling on group subscribe --- libretroshare/src/gxs/rsgxsnetservice.cc | 32 +++++++++++------------ libretroshare/src/services/p3gxsforums.cc | 15 ++++++++--- 2 files changed, 28 insertions(+), 19 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index adea14205..c8295f80b 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -1021,32 +1021,32 @@ void RsGxsNetService::locked_resetClientTS(const RsGxsGroupId& grpId) it->second.msgUpdateInfos.erase(grpId) ; } -void RsGxsNetService::subscribeStatusChanged(const RsGxsGroupId& grpId,bool subscribed) +void RsGxsNetService::subscribeStatusChanged( + const RsGxsGroupId& grpId, bool subscribed ) { - RS_STACK_MUTEX(mNxsMutex) ; - - if(!subscribed) - return ; + if(!subscribed) return; // When we subscribe, we reset the time stamps, so that the entire group list // gets requested once again, for a proper update. + RS_STACK_MUTEX(mNxsMutex); + #ifdef NXS_NET_DEBUG_0 - GXSNETDEBUG__G(grpId) << "Changing subscribe status for grp " << grpId << " to " << subscribed << ": reseting all server msg time stamps for this group, and server global TS." << std::endl; - std::map::iterator it = mServerMsgUpdateMap.find(grpId) ; + RS_DBG( "Changing subscribe status for grp", grpId, " to ", subscribed, + ": reseting all server msg time stamps for this group, and " + "server global TS." ); #endif - RsGxsServerMsgUpdate& item(mServerMsgUpdateMap[grpId]) ; + RsGxsServerMsgUpdate& item(mServerMsgUpdateMap[grpId]); + item.msgUpdateTS = static_cast(time(nullptr)); - item.msgUpdateTS = time(NULL) ; + /* We also update mGrpServerUpdateItem so as to trigger a new grp list + * exchange with friends (friends will send their known ClientTS which + * will be lower than our own grpUpdateTS, triggering our sending of the + * new subscribed grp list. */ + mGrpServerUpdate.grpUpdateTS = static_cast(time(nullptr)); - // We also update mGrpServerUpdateItem so as to trigger a new grp list exchange with friends (friends will send their known ClientTS which - // will be lower than our own grpUpdateTS, triggering our sending of the new subscribed grp list. - - mGrpServerUpdate.grpUpdateTS = time(NULL) ; - - if(subscribed) - locked_resetClientTS(grpId) ; + locked_resetClientTS(grpId); } bool RsGxsNetService::fragmentMsg(RsNxsMsg& msg, MsgFragments& msgFragments) const diff --git a/libretroshare/src/services/p3gxsforums.cc b/libretroshare/src/services/p3gxsforums.cc index 6cb3ae5c6..c930046a9 100644 --- a/libretroshare/src/services/p3gxsforums.cc +++ b/libretroshare/src/services/p3gxsforums.cc @@ -900,10 +900,19 @@ bool p3GxsForums::markRead(const RsGxsGrpMsgIdPair& msgId, bool read) bool p3GxsForums::subscribeToForum(const RsGxsGroupId& groupId, bool subscribe ) { uint32_t token; - if( !RsGenExchange::subscribeToGroup(token, groupId, subscribe) || waitToken(token) != RsTokenService::COMPLETE ) return false; + if( !RsGenExchange::subscribeToGroup(token, groupId, subscribe) || + waitToken(token) != RsTokenService::COMPLETE ) return false; - RsGxsGroupId grp; - acknowledgeGrp(token,grp); + RsGxsGroupId grp; + acknowledgeGrp(token, grp); + + /* Since subscribe has been requested, the caller is most probably + * interested in getting the group messages ASAP so pull from peers without + * waiting GXS sync timer. + * Do it here as this is meaningful or not depending on the service. + * Do it only after the token has been completed otherwise the pull have no + * effect. */ + if(subscribe) RsGenExchange::netService()->pullFromPeers(); return true; } From ebbd8cf938ef4fd913bc2bc036eae2dd5ebe4bec Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Mon, 22 Mar 2021 17:46:46 +0100 Subject: [PATCH 7/8] Forums expose API to request syncronization --- libretroshare/src/retroshare/rsgxsforums.h | 11 +++++++++++ libretroshare/src/services/p3gxsforums.cc | 6 ++++++ libretroshare/src/services/p3gxsforums.h | 2 ++ 3 files changed, 19 insertions(+) diff --git a/libretroshare/src/retroshare/rsgxsforums.h b/libretroshare/src/retroshare/rsgxsforums.h index 4967d46c5..ba13bd16e 100644 --- a/libretroshare/src/retroshare/rsgxsforums.h +++ b/libretroshare/src/retroshare/rsgxsforums.h @@ -450,6 +450,17 @@ public: const std::string& matchString, std::vector& searchResults ) = 0; + /** + * @brief Request Synchronization with available peers + * Usually syncronization already happen automatically so be carefull + * to call this method only if necessary. + * It has been thinked for use cases like mobile phone where internet + * connection is intermittent and calling this may be useful when a system + * event about connection being available or about to go offline is received + * @jsonapi{development} + * @return Success or error details + */ + virtual std::error_condition requestSynchronization() = 0; //////////////////////////////////////////////////////////////////////////// /* Following functions are deprecated and should not be considered a stable diff --git a/libretroshare/src/services/p3gxsforums.cc b/libretroshare/src/services/p3gxsforums.cc index c930046a9..81f03b6ec 100644 --- a/libretroshare/src/services/p3gxsforums.cc +++ b/libretroshare/src/services/p3gxsforums.cc @@ -1159,6 +1159,12 @@ std::error_condition p3GxsForums::setPostKeepForever( } } +std::error_condition p3GxsForums::requestSynchronization() +{ + RsGenExchange::netService()->pullFromPeers(); + return RsGenExchange::netService()->requestPull(); +} + /* so we need the same tick idea as wiki for generating dummy forums */ diff --git a/libretroshare/src/services/p3gxsforums.h b/libretroshare/src/services/p3gxsforums.h index b499e7b0d..9400de652 100644 --- a/libretroshare/src/services/p3gxsforums.h +++ b/libretroshare/src/services/p3gxsforums.h @@ -175,6 +175,8 @@ public: rs_owner_ptr& resultData, uint32_t& resultSize ) override; #endif + std::error_condition requestSynchronization() override; + /// implementation of rsGxsGorums /// bool getGroupData(const uint32_t &token, std::vector &groups) override; From cd5dad6a753406e61d869dd0385a3d6d184f01a7 Mon Sep 17 00:00:00 2001 From: Gioacchino Mazzurco Date: Fri, 12 Nov 2021 19:03:02 +0100 Subject: [PATCH 8/8] Rename GXS pullFromPeers to pullFromPeers According to what discussed with Cyril --- libretroshare/src/gxs/rsgxsnetservice.cc | 25 +++++++++++++---------- libretroshare/src/gxs/rsgxsnetservice.h | 3 ++- libretroshare/src/gxs/rsnxs.h | 6 +++--- libretroshare/src/services/p3gxsforums.cc | 9 ++++---- 4 files changed, 24 insertions(+), 19 deletions(-) diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index c8295f80b..c93d8e1e1 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -35,7 +35,7 @@ // | // +----------- sharePublishKeys() // | -// +----------- pullFromPeers() +// +----------- checkUpdatesFromPeers() // | | // | +--if AutoSync--- send global UpdateTS of each peer to itself => the peer knows the last // | | time current peer has received an updated from himself @@ -127,14 +127,14 @@ // (Set at server side to be mGrpServerUpdateItem->grpUpdateTS) // // Only updated in processCompletedIncomingTransaction() from Grp list transaction. -// Used in pullFromPeers() sending in RsNxsSyncGrp once to all peers: peer will send data if +// Used in checkUpdatesFromPeers() sending in RsNxsSyncGrp once to all peers: peer will send data if // has something new. All time comparisons are in the friends' clock time. // // mClientMsgUpdateMap: map< RsPeerId, map > // // Last msg list modification time sent by that peer Id // Updated in processCompletedIncomingTransaction() from Grp list trans. -// Used in pullFromPeers() sending in RsNxsSyncGrp once to all peers. +// Used in checkUpdatesFromPeers() sending in RsNxsSyncGrp once to all peers. // Set at server to be mServerMsgUpdateMap[grpId]->msgUpdateTS // // mGrpServerUpdateItem: TimeStamp Last group local modification timestamp over all groups @@ -150,7 +150,7 @@ // // tick() tick() // | | -// +---- pullFromPeers +-- recvNxsItemQueue() +// +---- checkUpdatesFromPeers() +-- recvNxsItemQueue() // | | // +---------------- Send global UpdateTS of each peer to itself => the peer knows +---------> +------ handleRecvSyncGroup( RsNxsSyncGrp*) // | the last msg sent (stored in mClientGrpUpdateMap[peer_id]), | | - parse all subscribed groups. For each, send a RsNxsSyncGrpItem with publish TS @@ -457,7 +457,7 @@ int RsGxsNetService::tick() if((elapsed) < now) { - pullFromPeers(); + checkUpdatesFromPeers(); syncGrpStatistics(); checkDistantSyncState(); @@ -570,7 +570,8 @@ RsGxsGroupId RsGxsNetService::hashGrpId(const RsGxsGroupId& gid,const RsPeerId& return RsGxsGroupId( RsDirUtil::sha1sum(tmpmem,SIZE).toByteArray() ); } -void RsGxsNetService::pullFromPeers(std::set peers) +std::error_condition RsGxsNetService::checkUpdatesFromPeers( + std::set peers ) { #ifdef NXS_NET_DEBUG_0 RS_DBG("this=", (void*)this, ". serviceInfo=", mServiceInfo); @@ -594,8 +595,8 @@ void RsGxsNetService::pullFromPeers(std::set peers) } } - // Still empty? Then nothing to do - if (peers.empty()) return; + // Still empty? Reports there are no available peers + if (peers.empty()) return std::errc::network_down; RS_STACK_MUTEX(mNxsMutex); @@ -624,7 +625,7 @@ void RsGxsNetService::pullFromPeers(std::set peers) generic_sendItem(grp); } - if(!mAllowMsgSync) return; + if(!mAllowMsgSync) return std::error_condition(); #ifndef GXS_DISABLE_SYNC_MSGS @@ -742,7 +743,9 @@ void RsGxsNetService::pullFromPeers(std::set peers) } } -#endif +#endif // ndef GXS_DISABLE_SYNC_MSGS + + return std::error_condition(); } void RsGxsNetService::generic_sendItem(rs_owner_ptr si) @@ -5123,7 +5126,7 @@ std::error_condition RsGxsNetService::requestPull(std::set peers) void RsGxsNetService::handlePullRequest( std::unique_ptr item ) { - pullFromPeers(std::set{item->PeerId()}); + checkUpdatesFromPeers(std::set{item->PeerId()}); } bool RsGxsNetService::getGroupServerUpdateTS(const RsGxsGroupId& gid,rstime_t& group_server_update_TS, rstime_t& msg_server_update_TS) diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 308f9b067..0baee2a2f 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -252,7 +252,8 @@ public: /// @see RsNetworkExchangeService - void pullFromPeers(std::set peers = std::set()) override; + std::error_condition checkUpdatesFromPeers( + std::set peers = std::set() ) override; /// @see RsNetworkExchangeService std::error_condition requestPull( diff --git a/libretroshare/src/gxs/rsnxs.h b/libretroshare/src/gxs/rsnxs.h index 5933d4572..2197d0e7a 100644 --- a/libretroshare/src/gxs/rsnxs.h +++ b/libretroshare/src/gxs/rsnxs.h @@ -327,10 +327,10 @@ public: } /** - * @brief Pull new stuff from peers - * @param peers peers to pull from, if empty all available peers are pulled + * @brief Check if new stuff is available from peers + * @param peers peers to check, if empty all available peers are checked */ - virtual void pullFromPeers( + virtual std::error_condition checkUpdatesFromPeers( std::set peers = std::set() ) = 0; /** diff --git a/libretroshare/src/services/p3gxsforums.cc b/libretroshare/src/services/p3gxsforums.cc index 81f03b6ec..97d880880 100644 --- a/libretroshare/src/services/p3gxsforums.cc +++ b/libretroshare/src/services/p3gxsforums.cc @@ -907,12 +907,12 @@ bool p3GxsForums::subscribeToForum(const RsGxsGroupId& groupId, bool subscribe ) acknowledgeGrp(token, grp); /* Since subscribe has been requested, the caller is most probably - * interested in getting the group messages ASAP so pull from peers without - * waiting GXS sync timer. + * interested in getting the group messages ASAP so check updates from peers + * without waiting GXS sync timer. * Do it here as this is meaningful or not depending on the service. * Do it only after the token has been completed otherwise the pull have no * effect. */ - if(subscribe) RsGenExchange::netService()->pullFromPeers(); + if(subscribe) RsGenExchange::netService()->checkUpdatesFromPeers(); return true; } @@ -1161,7 +1161,8 @@ std::error_condition p3GxsForums::setPostKeepForever( std::error_condition p3GxsForums::requestSynchronization() { - RsGenExchange::netService()->pullFromPeers(); + auto errc = RsGenExchange::netService()->checkUpdatesFromPeers(); + if(errc) return errc; return RsGenExchange::netService()->requestPull(); }