diff --git a/libretroshare/src/gxs/rsgxsnetservice.cc b/libretroshare/src/gxs/rsgxsnetservice.cc index 2a17b54d9..e86d325de 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.cc +++ b/libretroshare/src/gxs/rsgxsnetservice.cc @@ -362,7 +362,8 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds, RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs, const RsServiceInfo serviceInfo, RsGixsReputation* reputations, RsGcxs* circles, RsGixs *gixs, - PgpAuxUtils *pgpUtils, bool grpAutoSync, bool msgAutoSync, bool distSync, uint32_t default_store_period, uint32_t default_sync_period) + PgpAuxUtils *pgpUtils, RsGxsNetTunnelService *mGxsNT, + bool grpAutoSync, bool msgAutoSync, bool distSync, uint32_t default_store_period, uint32_t default_sync_period) : p3ThreadedService(), p3Config(), mTransactionN(0), mObserver(nxsObs), mDataStore(gds), mServType(servType), mTransactionTimeOut(TRANSAC_TIMEOUT), @@ -370,7 +371,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds, mSyncTs(0), mLastKeyPublishTs(0), mLastCleanRejectedMessages(0), mSYNC_PERIOD(SYNC_PERIOD), mCircles(circles), mGixs(gixs), - mReputations(reputations), mPgpUtils(pgpUtils), + mReputations(reputations), mPgpUtils(pgpUtils),mGxsNetTunnel(mGxsNT), mGrpAutoSync(grpAutoSync), mAllowMsgSync(msgAutoSync),mAllowDistSync(distSync), mServiceInfo(serviceInfo), mDefaultMsgStorePeriod(default_store_period), mDefaultMsgSyncPeriod(default_sync_period) diff --git a/libretroshare/src/gxs/rsgxsnetservice.h b/libretroshare/src/gxs/rsgxsnetservice.h index 8142f2e74..e798f8717 100644 --- a/libretroshare/src/gxs/rsgxsnetservice.h +++ b/libretroshare/src/gxs/rsgxsnetservice.h @@ -90,7 +90,7 @@ public: RsNxsObserver *nxsObs, // used to be = NULL. const RsServiceInfo serviceInfo, RsGixsReputation* reputations = NULL, RsGcxs* circles = NULL, RsGixs *gixs=NULL, - PgpAuxUtils *pgpUtils = NULL, + PgpAuxUtils *pgpUtils = NULL, RsGxsNetTunnelService *mGxsNT = NULL, bool grpAutoSync = true, bool msgAutoSync = true,bool distSync=false, uint32_t default_store_period = RS_GXS_DEFAULT_MSG_STORE_PERIOD, uint32_t default_sync_period = RS_GXS_DEFAULT_MSG_REQ_PERIOD); @@ -543,6 +543,8 @@ private: RsGixs *mGixs; RsGixsReputation* mReputations; PgpAuxUtils *mPgpUtils; + RsGxsNetTunnelService *mGxsNetTunnel; + bool mGrpAutoSync; bool mAllowMsgSync; bool mAllowDistSync; @@ -584,8 +586,6 @@ private: uint32_t mDefaultMsgStorePeriod ; uint32_t mDefaultMsgSyncPeriod ; - - RsGxsNetTunnelService *mGxsNetTunnel; }; #endif // RSGXSNETSERVICE_H diff --git a/libretroshare/src/gxs/rsgxsnettunnel.cc b/libretroshare/src/gxs/rsgxsnettunnel.cc index c502f2c6d..3d0f25a15 100644 --- a/libretroshare/src/gxs/rsgxsnettunnel.cc +++ b/libretroshare/src/gxs/rsgxsnettunnel.cc @@ -24,6 +24,7 @@ */ #include "util/rsdir.h" +#include "util/rstime.h" #include "retroshare/rspeers.h" #include "serialiser/rstypeserializer.h" #include "rsgxsnettunnel.h" @@ -209,6 +210,9 @@ bool RsGxsNetTunnelService::requestPeers(uint16_t service_id,const RsGxsGroupId& RsGxsNetTunnelGroupInfo& ginfo( mGroups[group_id] ) ; ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE; + ginfo.hash = calculateGroupHash(group_id) ; + + mHandledHashes[ginfo.hash] = group_id ; // we dont set the group policy here. It will only be set if no peers, or too few peers are available. #ifdef DEBUG_RSGXSNETTUNNEL @@ -227,6 +231,10 @@ bool RsGxsNetTunnelService::releasePeers(uint16_t service_id, const RsGxsGroupId RsGxsNetTunnelGroupInfo& ginfo( mGroups[group_id] ) ; ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE; + ginfo.hash = calculateGroupHash(group_id) ; + + mHandledHashes[ginfo.hash] = group_id ; // yes, we do not remove, because we're supposed to answer tunnel requests from other peers. + mTurtle->stopMonitoringTunnels(ginfo.hash) ; #ifdef DEBUG_RSGXSNETTUNNEL @@ -257,21 +265,21 @@ void RsGxsNetTunnelService::dump() const RS_STACK_MUTEX(mGxsNetTunnelMtx); static std::string group_status_str[4] = { - std::string("[RS_GXS_NET_TUNNEL_GRP_STATUS_UNKNOWN ]"), - std::string("[RS_GXS_NET_TUNNEL_GRP_STATUS_IDLE ]"), - std::string("[RS_GXS_NET_TUNNEL_GRP_STATUS_TUNNELS_REQUESTED]"), - std::string("[RS_GXS_NET_TUNNEL_GRP_STATUS_VPIDS_AVAILABLE ]") + std::string("[UNKNOWN ]"), + std::string("[IDLE ]"), + std::string("[TUNNELS_REQUESTED]"), + std::string("[VPIDS_AVAILABLE ]") }; static std::string group_policy_str[3] = { - std::string("[RS_GXS_NET_TUNNEL_POLICY_UNKNOWN]"), - std::string("[RS_GXS_NET_TUNNEL_POLICY_PASSIVE]"), - std::string("[RS_GXS_NET_TUNNEL_POLICY_ACTIVE ]"), + std::string("[UNKNOWN]"), + std::string("[PASSIVE]"), + std::string("[ACTIVE ]"), }; static std::string vpid_status_str[3] = { - std::string("[RS_GXS_NET_TUNNEL_VP_STATUS_UNKNOWN ]"), - std::string("[RS_GXS_NET_TUNNEL_VP_STATUS_TUNNEL_OK ]"), - std::string("[RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE ]") + std::string("[UNKNOWN ]"), + std::string("[TUNNEL_OK]"), + std::string("[ACTIVE ]") }; std::cerr << "GxsNetTunnelService dump: " << std::endl; @@ -279,7 +287,7 @@ void RsGxsNetTunnelService::dump() const for(auto it(mGroups.begin());it!=mGroups.end();++it) { - std::cerr << " " << it->first << " hash: " << it->second.hash << " policy: " << group_policy_str[it->second.group_policy] << " status: " << group_status_str[it->second.group_status] << "] Last contact: " << time(NULL) - it->second.last_contact << " secs ago" << std::endl; + std::cerr << " " << it->first << " hash: " << it->second.hash << " policy: " << group_policy_str[it->second.group_policy] << " status: " << group_status_str[it->second.group_status] << " Last contact: " << time(NULL) - it->second.last_contact << " secs ago" << std::endl; std::cerr << " virtual peers:" << std::endl; for(auto it2(it->second.virtual_peers.begin());it2!=it->second.virtual_peers.end();++it2) std::cerr << " " << *it2 << std::endl; @@ -291,7 +299,7 @@ void RsGxsNetTunnelService::dump() const std::cerr << " GXS Peer:" << it->first << " Turtle:" << it->second.turtle_virtual_peer_id << " status: " << vpid_status_str[it->second.vpid_status] << " s: " << (int)it->second.side << " last seen " << time(NULL)-it->second.last_contact - << " ekey: " << RsUtil::BinToHex(it->second.encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE) ; + << " ekey: " << RsUtil::BinToHex(it->second.encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE) << std::endl; for(auto it2(it->second.providing_set.begin());it2!=it->second.providing_set.end();++it2) std::cerr << " service " << std::hex << it2->first << std::dec << " " << it2->second.provided_groups.size() << " groups, " << it2->second.incoming_data.size() << " data" << std::endl; @@ -474,7 +482,7 @@ void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const Tur RsTurtleGenericDataItem *encrypted_turtle_item = NULL ; if(p3turtle::encryptData(tmpmem,len,encryption_master_key,encrypted_turtle_item)) - mTurtle->sendTurtleData(vpid,encrypted_turtle_item) ; + mPendingTurtleItems.push_back(std::make_pair(vpid,encrypted_turtle_item)) ; // we cannot send directly because of turtle mutex locked before calling addVirtualPeer. else GXS_NET_TUNNEL_ERROR() << "cannot encrypt. Something's wrong. Data is dropped." << std::endl; } @@ -536,7 +544,15 @@ void RsGxsNetTunnelService::generateEncryptionKey(const RsGxsGroupId& group_id,c void RsGxsNetTunnelService::data_tick() { - GXS_NET_TUNNEL_DEBUG() << std::endl; + while(!mPendingTurtleItems.empty()) + { + auto& it(mPendingTurtleItems.front()); + + mTurtle->sendTurtleData(it.first,it.second) ; + mPendingTurtleItems.pop_front(); + } + + rstime::rs_usleep(1*1000*1000); // 1 sec time_t now = time(NULL); @@ -544,7 +560,7 @@ void RsGxsNetTunnelService::data_tick() static time_t last_autowash = time(NULL); - if(last_autowash + 5 > now) + if(last_autowash + 5 < now) { autowash(); last_autowash = now; @@ -552,7 +568,7 @@ void RsGxsNetTunnelService::data_tick() static time_t last_dump = time(NULL); - if(last_dump + 10 > now) + if(last_dump + 10 < now) { last_dump = now; dump(); @@ -577,7 +593,10 @@ void RsGxsNetTunnelService::autowash() } if(ginfo.group_policy == RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE) + { mTurtle->stopMonitoringTunnels(ginfo.hash); + ginfo.group_status = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_STATUS_IDLE; + } } } diff --git a/libretroshare/src/gxs/rsgxsnettunnel.h b/libretroshare/src/gxs/rsgxsnettunnel.h index 451f5fb9c..3d6910d14 100644 --- a/libretroshare/src/gxs/rsgxsnettunnel.h +++ b/libretroshare/src/gxs/rsgxsnettunnel.h @@ -221,6 +221,7 @@ protected: private: void autowash() ; void handleIncoming(RsGxsNetTunnelItem *item) ; + void flush_pending_items(); static const uint32_t RS_GXS_TUNNEL_CONST_RANDOM_BIAS_SIZE = 20 ; static const uint32_t RS_GXS_TUNNEL_CONST_EKEY_SIZE = 32 ; @@ -231,6 +232,8 @@ private: std::map mHandledHashes ; // hashes asked to turtle. Used to answer tunnel requests std::map mTurtle2GxsPeer ; // convertion table to find GXS peer id from turtle + std::list > mPendingTurtleItems ; // items that need to be sent off-turtle Mutex. + /*! * \brief Generates the hash to request tunnels for this group. This hash is only used by turtle, and is used to * hide the real group id. diff --git a/libretroshare/src/rsserver/rsinit.cc b/libretroshare/src/rsserver/rsinit.cc index 17314626e..c51abf01e 100644 --- a/libretroshare/src/rsserver/rsinit.cc +++ b/libretroshare/src/rsserver/rsinit.cc @@ -1300,7 +1300,7 @@ int RsServer::StartupRetroShare() RS_SERVICE_GXS_TYPE_GXSID, gxsid_ds, nxsMgr, mGxsIdService, mGxsIdService->getServiceInfo(), mReputations, mGxsCircles,mGxsIdService, - pgpAuxUtils, + pgpAuxUtils,NULL, false,false); // don't synchronise group automatic (need explicit group request) // don't sync messages at all. @@ -1319,9 +1319,7 @@ int RsServer::StartupRetroShare() RS_SERVICE_GXS_TYPE_GXSCIRCLE, gxscircles_ds, nxsMgr, mGxsCircles, mGxsCircles->getServiceInfo(), mReputations, mGxsCircles,mGxsIdService, - pgpAuxUtils, - true, // synchronise group automatic - true); // sync messages automatic, since they contain subscription requests. + pgpAuxUtils); mGxsCircles->setNetworkExchangeService(gxscircles_ns) ; @@ -1379,6 +1377,8 @@ int RsServer::StartupRetroShare() /**** Channel GXS service ****/ + RsGxsNetTunnelService *mGxsNetTunnel = new RsGxsNetTunnelService ; + RsGeneralDataService* gxschannels_ds = new RsDataService(currGxsDir + "/", "gxschannels_db", RS_SERVICE_GXS_TYPE_CHANNELS, NULL, rsInitConfig->gxs_passwd); @@ -1389,7 +1389,7 @@ int RsServer::StartupRetroShare() RS_SERVICE_GXS_TYPE_CHANNELS, gxschannels_ds, nxsMgr, mGxsChannels, mGxsChannels->getServiceInfo(), mReputations, mGxsCircles,mGxsIdService, - pgpAuxUtils,true,true,true); + pgpAuxUtils,mGxsNetTunnel,true,true,true); mGxsChannels->setNetworkExchangeService(gxschannels_ns) ; @@ -1444,7 +1444,7 @@ int RsServer::StartupRetroShare() RsGxsNetService* gxstrans_ns = new RsGxsNetService( RS_SERVICE_TYPE_GXS_TRANS, gxstrans_ds, nxsMgr, mGxsTrans, mGxsTrans->getServiceInfo(), mReputations, mGxsCircles, - mGxsIdService, pgpAuxUtils,true,true,p3GxsTrans::GXS_STORAGE_PERIOD,p3GxsTrans::GXS_SYNC_PERIOD); + mGxsIdService, pgpAuxUtils,NULL,true,true,false,p3GxsTrans::GXS_STORAGE_PERIOD,p3GxsTrans::GXS_SYNC_PERIOD); mGxsTrans->setNetworkExchangeService(gxstrans_ns); pqih->addService(gxstrans_ns, true); @@ -1486,6 +1486,7 @@ int RsServer::StartupRetroShare() // connect components to turtle router. + mGxsNetTunnel->connectToTurtleRouter(tr) ; ftserver->connectToTurtleRouter(tr) ; ftserver->connectToFileDatabase(fdb) ; chatSrv->connectToGxsTunnelService(mGxsTunnels) ; @@ -1824,6 +1825,8 @@ int RsServer::StartupRetroShare() //rsWire = mWire; /*** start up GXS core runner ***/ + startServiceThread(mGxsNetTunnel, "gxs net tunnel"); + startServiceThread(mGxsIdService, "gxs id"); startServiceThread(mGxsCircles, "gxs circle"); startServiceThread(mPosted, "gxs posted");