fixed a few bugs in GXS dist sync tunneling

This commit is contained in:
csoler 2018-04-02 17:07:32 +02:00
parent aa59694d88
commit 8fe3eb711d
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
5 changed files with 53 additions and 27 deletions

View File

@ -362,7 +362,8 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs, RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs,
const RsServiceInfo serviceInfo, const RsServiceInfo serviceInfo,
RsGixsReputation* reputations, RsGcxs* circles, RsGixs *gixs, RsGixsReputation* reputations, RsGcxs* circles, RsGixs *gixs,
PgpAuxUtils *pgpUtils, bool grpAutoSync, bool msgAutoSync, bool distSync, uint32_t default_store_period, uint32_t default_sync_period) PgpAuxUtils *pgpUtils, RsGxsNetTunnelService *mGxsNT,
bool grpAutoSync, bool msgAutoSync, bool distSync, uint32_t default_store_period, uint32_t default_sync_period)
: p3ThreadedService(), p3Config(), mTransactionN(0), : p3ThreadedService(), p3Config(), mTransactionN(0),
mObserver(nxsObs), mDataStore(gds), mObserver(nxsObs), mDataStore(gds),
mServType(servType), mTransactionTimeOut(TRANSAC_TIMEOUT), mServType(servType), mTransactionTimeOut(TRANSAC_TIMEOUT),
@ -370,7 +371,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
mSyncTs(0), mLastKeyPublishTs(0), mSyncTs(0), mLastKeyPublishTs(0),
mLastCleanRejectedMessages(0), mSYNC_PERIOD(SYNC_PERIOD), mLastCleanRejectedMessages(0), mSYNC_PERIOD(SYNC_PERIOD),
mCircles(circles), mGixs(gixs), mCircles(circles), mGixs(gixs),
mReputations(reputations), mPgpUtils(pgpUtils), mReputations(reputations), mPgpUtils(pgpUtils),mGxsNetTunnel(mGxsNT),
mGrpAutoSync(grpAutoSync), mAllowMsgSync(msgAutoSync),mAllowDistSync(distSync), mGrpAutoSync(grpAutoSync), mAllowMsgSync(msgAutoSync),mAllowDistSync(distSync),
mServiceInfo(serviceInfo), mDefaultMsgStorePeriod(default_store_period), mServiceInfo(serviceInfo), mDefaultMsgStorePeriod(default_store_period),
mDefaultMsgSyncPeriod(default_sync_period) mDefaultMsgSyncPeriod(default_sync_period)

View File

@ -90,7 +90,7 @@ public:
RsNxsObserver *nxsObs, // used to be = NULL. RsNxsObserver *nxsObs, // used to be = NULL.
const RsServiceInfo serviceInfo, const RsServiceInfo serviceInfo,
RsGixsReputation* reputations = NULL, RsGcxs* circles = NULL, RsGixs *gixs=NULL, RsGixsReputation* reputations = NULL, RsGcxs* circles = NULL, RsGixs *gixs=NULL,
PgpAuxUtils *pgpUtils = NULL, PgpAuxUtils *pgpUtils = NULL, RsGxsNetTunnelService *mGxsNT = NULL,
bool grpAutoSync = true, bool msgAutoSync = true,bool distSync=false, bool grpAutoSync = true, bool msgAutoSync = true,bool distSync=false,
uint32_t default_store_period = RS_GXS_DEFAULT_MSG_STORE_PERIOD, uint32_t default_store_period = RS_GXS_DEFAULT_MSG_STORE_PERIOD,
uint32_t default_sync_period = RS_GXS_DEFAULT_MSG_REQ_PERIOD); uint32_t default_sync_period = RS_GXS_DEFAULT_MSG_REQ_PERIOD);
@ -543,6 +543,8 @@ private:
RsGixs *mGixs; RsGixs *mGixs;
RsGixsReputation* mReputations; RsGixsReputation* mReputations;
PgpAuxUtils *mPgpUtils; PgpAuxUtils *mPgpUtils;
RsGxsNetTunnelService *mGxsNetTunnel;
bool mGrpAutoSync; bool mGrpAutoSync;
bool mAllowMsgSync; bool mAllowMsgSync;
bool mAllowDistSync; bool mAllowDistSync;
@ -584,8 +586,6 @@ private:
uint32_t mDefaultMsgStorePeriod ; uint32_t mDefaultMsgStorePeriod ;
uint32_t mDefaultMsgSyncPeriod ; uint32_t mDefaultMsgSyncPeriod ;
RsGxsNetTunnelService *mGxsNetTunnel;
}; };
#endif // RSGXSNETSERVICE_H #endif // RSGXSNETSERVICE_H

View File

@ -24,6 +24,7 @@
*/ */
#include "util/rsdir.h" #include "util/rsdir.h"
#include "util/rstime.h"
#include "retroshare/rspeers.h" #include "retroshare/rspeers.h"
#include "serialiser/rstypeserializer.h" #include "serialiser/rstypeserializer.h"
#include "rsgxsnettunnel.h" #include "rsgxsnettunnel.h"
@ -209,6 +210,9 @@ bool RsGxsNetTunnelService::requestPeers(uint16_t service_id,const RsGxsGroupId&
RsGxsNetTunnelGroupInfo& ginfo( mGroups[group_id] ) ; RsGxsNetTunnelGroupInfo& ginfo( mGroups[group_id] ) ;
ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE; ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_ACTIVE;
ginfo.hash = calculateGroupHash(group_id) ;
mHandledHashes[ginfo.hash] = group_id ;
// we dont set the group policy here. It will only be set if no peers, or too few peers are available. // we dont set the group policy here. It will only be set if no peers, or too few peers are available.
#ifdef DEBUG_RSGXSNETTUNNEL #ifdef DEBUG_RSGXSNETTUNNEL
@ -227,6 +231,10 @@ bool RsGxsNetTunnelService::releasePeers(uint16_t service_id, const RsGxsGroupId
RsGxsNetTunnelGroupInfo& ginfo( mGroups[group_id] ) ; RsGxsNetTunnelGroupInfo& ginfo( mGroups[group_id] ) ;
ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE; ginfo.group_policy = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE;
ginfo.hash = calculateGroupHash(group_id) ;
mHandledHashes[ginfo.hash] = group_id ; // yes, we do not remove, because we're supposed to answer tunnel requests from other peers.
mTurtle->stopMonitoringTunnels(ginfo.hash) ; mTurtle->stopMonitoringTunnels(ginfo.hash) ;
#ifdef DEBUG_RSGXSNETTUNNEL #ifdef DEBUG_RSGXSNETTUNNEL
@ -257,21 +265,21 @@ void RsGxsNetTunnelService::dump() const
RS_STACK_MUTEX(mGxsNetTunnelMtx); RS_STACK_MUTEX(mGxsNetTunnelMtx);
static std::string group_status_str[4] = { static std::string group_status_str[4] = {
std::string("[RS_GXS_NET_TUNNEL_GRP_STATUS_UNKNOWN ]"), std::string("[UNKNOWN ]"),
std::string("[RS_GXS_NET_TUNNEL_GRP_STATUS_IDLE ]"), std::string("[IDLE ]"),
std::string("[RS_GXS_NET_TUNNEL_GRP_STATUS_TUNNELS_REQUESTED]"), std::string("[TUNNELS_REQUESTED]"),
std::string("[RS_GXS_NET_TUNNEL_GRP_STATUS_VPIDS_AVAILABLE ]") std::string("[VPIDS_AVAILABLE ]")
}; };
static std::string group_policy_str[3] = { static std::string group_policy_str[3] = {
std::string("[RS_GXS_NET_TUNNEL_POLICY_UNKNOWN]"), std::string("[UNKNOWN]"),
std::string("[RS_GXS_NET_TUNNEL_POLICY_PASSIVE]"), std::string("[PASSIVE]"),
std::string("[RS_GXS_NET_TUNNEL_POLICY_ACTIVE ]"), std::string("[ACTIVE ]"),
}; };
static std::string vpid_status_str[3] = { static std::string vpid_status_str[3] = {
std::string("[RS_GXS_NET_TUNNEL_VP_STATUS_UNKNOWN ]"), std::string("[UNKNOWN ]"),
std::string("[RS_GXS_NET_TUNNEL_VP_STATUS_TUNNEL_OK ]"), std::string("[TUNNEL_OK]"),
std::string("[RS_GXS_NET_TUNNEL_VP_STATUS_ACTIVE ]") std::string("[ACTIVE ]")
}; };
std::cerr << "GxsNetTunnelService dump: " << std::endl; std::cerr << "GxsNetTunnelService dump: " << std::endl;
@ -279,7 +287,7 @@ void RsGxsNetTunnelService::dump() const
for(auto it(mGroups.begin());it!=mGroups.end();++it) for(auto it(mGroups.begin());it!=mGroups.end();++it)
{ {
std::cerr << " " << it->first << " hash: " << it->second.hash << " policy: " << group_policy_str[it->second.group_policy] << " status: " << group_status_str[it->second.group_status] << "] Last contact: " << time(NULL) - it->second.last_contact << " secs ago" << std::endl; std::cerr << " " << it->first << " hash: " << it->second.hash << " policy: " << group_policy_str[it->second.group_policy] << " status: " << group_status_str[it->second.group_status] << " Last contact: " << time(NULL) - it->second.last_contact << " secs ago" << std::endl;
std::cerr << " virtual peers:" << std::endl; std::cerr << " virtual peers:" << std::endl;
for(auto it2(it->second.virtual_peers.begin());it2!=it->second.virtual_peers.end();++it2) for(auto it2(it->second.virtual_peers.begin());it2!=it->second.virtual_peers.end();++it2)
std::cerr << " " << *it2 << std::endl; std::cerr << " " << *it2 << std::endl;
@ -291,7 +299,7 @@ void RsGxsNetTunnelService::dump() const
std::cerr << " GXS Peer:" << it->first << " Turtle:" << it->second.turtle_virtual_peer_id std::cerr << " GXS Peer:" << it->first << " Turtle:" << it->second.turtle_virtual_peer_id
<< " status: " << vpid_status_str[it->second.vpid_status] << " s: " << " status: " << vpid_status_str[it->second.vpid_status] << " s: "
<< (int)it->second.side << " last seen " << time(NULL)-it->second.last_contact << (int)it->second.side << " last seen " << time(NULL)-it->second.last_contact
<< " ekey: " << RsUtil::BinToHex(it->second.encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE) ; << " ekey: " << RsUtil::BinToHex(it->second.encryption_master_key,RS_GXS_TUNNEL_CONST_EKEY_SIZE) << std::endl;
for(auto it2(it->second.providing_set.begin());it2!=it->second.providing_set.end();++it2) for(auto it2(it->second.providing_set.begin());it2!=it->second.providing_set.end();++it2)
std::cerr << " service " << std::hex << it2->first << std::dec << " " << it2->second.provided_groups.size() << " groups, " << it2->second.incoming_data.size() << " data" << std::endl; std::cerr << " service " << std::hex << it2->first << std::dec << " " << it2->second.provided_groups.size() << " groups, " << it2->second.incoming_data.size() << " data" << std::endl;
@ -474,7 +482,7 @@ void RsGxsNetTunnelService::addVirtualPeer(const TurtleFileHash& hash, const Tur
RsTurtleGenericDataItem *encrypted_turtle_item = NULL ; RsTurtleGenericDataItem *encrypted_turtle_item = NULL ;
if(p3turtle::encryptData(tmpmem,len,encryption_master_key,encrypted_turtle_item)) if(p3turtle::encryptData(tmpmem,len,encryption_master_key,encrypted_turtle_item))
mTurtle->sendTurtleData(vpid,encrypted_turtle_item) ; mPendingTurtleItems.push_back(std::make_pair(vpid,encrypted_turtle_item)) ; // we cannot send directly because of turtle mutex locked before calling addVirtualPeer.
else else
GXS_NET_TUNNEL_ERROR() << "cannot encrypt. Something's wrong. Data is dropped." << std::endl; GXS_NET_TUNNEL_ERROR() << "cannot encrypt. Something's wrong. Data is dropped." << std::endl;
} }
@ -536,7 +544,15 @@ void RsGxsNetTunnelService::generateEncryptionKey(const RsGxsGroupId& group_id,c
void RsGxsNetTunnelService::data_tick() void RsGxsNetTunnelService::data_tick()
{ {
GXS_NET_TUNNEL_DEBUG() << std::endl; while(!mPendingTurtleItems.empty())
{
auto& it(mPendingTurtleItems.front());
mTurtle->sendTurtleData(it.first,it.second) ;
mPendingTurtleItems.pop_front();
}
rstime::rs_usleep(1*1000*1000); // 1 sec
time_t now = time(NULL); time_t now = time(NULL);
@ -544,7 +560,7 @@ void RsGxsNetTunnelService::data_tick()
static time_t last_autowash = time(NULL); static time_t last_autowash = time(NULL);
if(last_autowash + 5 > now) if(last_autowash + 5 < now)
{ {
autowash(); autowash();
last_autowash = now; last_autowash = now;
@ -552,7 +568,7 @@ void RsGxsNetTunnelService::data_tick()
static time_t last_dump = time(NULL); static time_t last_dump = time(NULL);
if(last_dump + 10 > now) if(last_dump + 10 < now)
{ {
last_dump = now; last_dump = now;
dump(); dump();
@ -577,7 +593,10 @@ void RsGxsNetTunnelService::autowash()
} }
if(ginfo.group_policy == RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE) if(ginfo.group_policy == RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_POLICY_PASSIVE)
{
mTurtle->stopMonitoringTunnels(ginfo.hash); mTurtle->stopMonitoringTunnels(ginfo.hash);
ginfo.group_status = RsGxsNetTunnelGroupInfo::RS_GXS_NET_TUNNEL_GRP_STATUS_IDLE;
}
} }
} }

View File

@ -221,6 +221,7 @@ protected:
private: private:
void autowash() ; void autowash() ;
void handleIncoming(RsGxsNetTunnelItem *item) ; void handleIncoming(RsGxsNetTunnelItem *item) ;
void flush_pending_items();
static const uint32_t RS_GXS_TUNNEL_CONST_RANDOM_BIAS_SIZE = 20 ; static const uint32_t RS_GXS_TUNNEL_CONST_RANDOM_BIAS_SIZE = 20 ;
static const uint32_t RS_GXS_TUNNEL_CONST_EKEY_SIZE = 32 ; static const uint32_t RS_GXS_TUNNEL_CONST_EKEY_SIZE = 32 ;
@ -231,6 +232,8 @@ private:
std::map<RsFileHash, RsGxsGroupId> mHandledHashes ; // hashes asked to turtle. Used to answer tunnel requests std::map<RsFileHash, RsGxsGroupId> mHandledHashes ; // hashes asked to turtle. Used to answer tunnel requests
std::map<TurtleVirtualPeerId, RsGxsNetTunnelVirtualPeerId> mTurtle2GxsPeer ; // convertion table to find GXS peer id from turtle std::map<TurtleVirtualPeerId, RsGxsNetTunnelVirtualPeerId> mTurtle2GxsPeer ; // convertion table to find GXS peer id from turtle
std::list<std::pair<TurtleVirtualPeerId,RsTurtleGenericDataItem*> > mPendingTurtleItems ; // items that need to be sent off-turtle Mutex.
/*! /*!
* \brief Generates the hash to request tunnels for this group. This hash is only used by turtle, and is used to * \brief Generates the hash to request tunnels for this group. This hash is only used by turtle, and is used to
* hide the real group id. * hide the real group id.

View File

@ -1300,7 +1300,7 @@ int RsServer::StartupRetroShare()
RS_SERVICE_GXS_TYPE_GXSID, gxsid_ds, nxsMgr, RS_SERVICE_GXS_TYPE_GXSID, gxsid_ds, nxsMgr,
mGxsIdService, mGxsIdService->getServiceInfo(), mGxsIdService, mGxsIdService->getServiceInfo(),
mReputations, mGxsCircles,mGxsIdService, mReputations, mGxsCircles,mGxsIdService,
pgpAuxUtils, pgpAuxUtils,NULL,
false,false); // don't synchronise group automatic (need explicit group request) false,false); // don't synchronise group automatic (need explicit group request)
// don't sync messages at all. // don't sync messages at all.
@ -1319,9 +1319,7 @@ int RsServer::StartupRetroShare()
RS_SERVICE_GXS_TYPE_GXSCIRCLE, gxscircles_ds, nxsMgr, RS_SERVICE_GXS_TYPE_GXSCIRCLE, gxscircles_ds, nxsMgr,
mGxsCircles, mGxsCircles->getServiceInfo(), mGxsCircles, mGxsCircles->getServiceInfo(),
mReputations, mGxsCircles,mGxsIdService, mReputations, mGxsCircles,mGxsIdService,
pgpAuxUtils, pgpAuxUtils);
true, // synchronise group automatic
true); // sync messages automatic, since they contain subscription requests.
mGxsCircles->setNetworkExchangeService(gxscircles_ns) ; mGxsCircles->setNetworkExchangeService(gxscircles_ns) ;
@ -1379,6 +1377,8 @@ int RsServer::StartupRetroShare()
/**** Channel GXS service ****/ /**** Channel GXS service ****/
RsGxsNetTunnelService *mGxsNetTunnel = new RsGxsNetTunnelService ;
RsGeneralDataService* gxschannels_ds = new RsDataService(currGxsDir + "/", "gxschannels_db", RsGeneralDataService* gxschannels_ds = new RsDataService(currGxsDir + "/", "gxschannels_db",
RS_SERVICE_GXS_TYPE_CHANNELS, NULL, rsInitConfig->gxs_passwd); RS_SERVICE_GXS_TYPE_CHANNELS, NULL, rsInitConfig->gxs_passwd);
@ -1389,7 +1389,7 @@ int RsServer::StartupRetroShare()
RS_SERVICE_GXS_TYPE_CHANNELS, gxschannels_ds, nxsMgr, RS_SERVICE_GXS_TYPE_CHANNELS, gxschannels_ds, nxsMgr,
mGxsChannels, mGxsChannels->getServiceInfo(), mGxsChannels, mGxsChannels->getServiceInfo(),
mReputations, mGxsCircles,mGxsIdService, mReputations, mGxsCircles,mGxsIdService,
pgpAuxUtils,true,true,true); pgpAuxUtils,mGxsNetTunnel,true,true,true);
mGxsChannels->setNetworkExchangeService(gxschannels_ns) ; mGxsChannels->setNetworkExchangeService(gxschannels_ns) ;
@ -1444,7 +1444,7 @@ int RsServer::StartupRetroShare()
RsGxsNetService* gxstrans_ns = new RsGxsNetService( RsGxsNetService* gxstrans_ns = new RsGxsNetService(
RS_SERVICE_TYPE_GXS_TRANS, gxstrans_ds, nxsMgr, mGxsTrans, RS_SERVICE_TYPE_GXS_TRANS, gxstrans_ds, nxsMgr, mGxsTrans,
mGxsTrans->getServiceInfo(), mReputations, mGxsCircles, mGxsTrans->getServiceInfo(), mReputations, mGxsCircles,
mGxsIdService, pgpAuxUtils,true,true,p3GxsTrans::GXS_STORAGE_PERIOD,p3GxsTrans::GXS_SYNC_PERIOD); mGxsIdService, pgpAuxUtils,NULL,true,true,false,p3GxsTrans::GXS_STORAGE_PERIOD,p3GxsTrans::GXS_SYNC_PERIOD);
mGxsTrans->setNetworkExchangeService(gxstrans_ns); mGxsTrans->setNetworkExchangeService(gxstrans_ns);
pqih->addService(gxstrans_ns, true); pqih->addService(gxstrans_ns, true);
@ -1486,6 +1486,7 @@ int RsServer::StartupRetroShare()
// connect components to turtle router. // connect components to turtle router.
mGxsNetTunnel->connectToTurtleRouter(tr) ;
ftserver->connectToTurtleRouter(tr) ; ftserver->connectToTurtleRouter(tr) ;
ftserver->connectToFileDatabase(fdb) ; ftserver->connectToFileDatabase(fdb) ;
chatSrv->connectToGxsTunnelService(mGxsTunnels) ; chatSrv->connectToGxsTunnelService(mGxsTunnels) ;
@ -1824,6 +1825,8 @@ int RsServer::StartupRetroShare()
//rsWire = mWire; //rsWire = mWire;
/*** start up GXS core runner ***/ /*** start up GXS core runner ***/
startServiceThread(mGxsNetTunnel, "gxs net tunnel");
startServiceThread(mGxsIdService, "gxs id"); startServiceThread(mGxsIdService, "gxs id");
startServiceThread(mGxsCircles, "gxs circle"); startServiceThread(mGxsCircles, "gxs circle");
startServiceThread(mPosted, "gxs posted"); startServiceThread(mPosted, "gxs posted");