reverted to single GxsTunnelService shared for all services

This commit is contained in:
csoler 2018-05-01 20:10:56 +02:00
parent ba0819f8d0
commit 4d6fed643a
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
7 changed files with 157 additions and 94 deletions

View File

@ -361,7 +361,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs, RsNxsNetMgr *netMgr, RsNxsObserver *nxsObs,
const RsServiceInfo serviceInfo, const RsServiceInfo serviceInfo,
RsGixsReputation* reputations, RsGcxs* circles, RsGixs *gixs, RsGixsReputation* reputations, RsGcxs* circles, RsGixs *gixs,
PgpAuxUtils *pgpUtils, PgpAuxUtils *pgpUtils, RsGxsNetTunnelService *mGxsNT,
bool grpAutoSync, bool msgAutoSync, bool distSync, uint32_t default_store_period, uint32_t default_sync_period) bool grpAutoSync, bool msgAutoSync, bool distSync, uint32_t default_store_period, uint32_t default_sync_period)
: p3ThreadedService(), p3Config(), mTransactionN(0), : p3ThreadedService(), p3Config(), mTransactionN(0),
mObserver(nxsObs), mDataStore(gds), mObserver(nxsObs), mDataStore(gds),
@ -370,7 +370,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
mSyncTs(0), mLastKeyPublishTs(0), mSyncTs(0), mLastKeyPublishTs(0),
mLastCleanRejectedMessages(0), mSYNC_PERIOD(SYNC_PERIOD), mLastCleanRejectedMessages(0), mSYNC_PERIOD(SYNC_PERIOD),
mCircles(circles), mGixs(gixs), mCircles(circles), mGixs(gixs),
mReputations(reputations), mPgpUtils(pgpUtils), mReputations(reputations), mPgpUtils(pgpUtils), mGxsNetTunnel(mGxsNT),
mGrpAutoSync(grpAutoSync), mAllowMsgSync(msgAutoSync),mAllowDistSync(distSync), mGrpAutoSync(grpAutoSync), mAllowMsgSync(msgAutoSync),mAllowDistSync(distSync),
mServiceInfo(serviceInfo), mDefaultMsgStorePeriod(default_store_period), mServiceInfo(serviceInfo), mDefaultMsgStorePeriod(default_store_period),
mDefaultMsgSyncPeriod(default_sync_period) mDefaultMsgSyncPeriod(default_sync_period)
@ -569,12 +569,12 @@ void RsGxsNetService::syncWithPeers()
std::set<RsPeerId> peers; std::set<RsPeerId> peers;
mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers); mNetMgr->getOnlineList(mServiceInfo.mServiceType, peers);
if(mAllowDistSync) if(mAllowDistSync && mGxsNetTunnel != NULL)
{ {
// Grab all online virtual peers of distant tunnels for the current service. // Grab all online virtual peers of distant tunnels for the current service.
std::list<RsGxsNetTunnelVirtualPeerId> vpids ; std::list<RsGxsNetTunnelVirtualPeerId> vpids ;
getVirtualPeers(vpids); mGxsNetTunnel->getVirtualPeers(vpids);
for(auto it(vpids.begin());it!=vpids.end();++it) for(auto it(vpids.begin());it!=vpids.end();++it)
peers.insert(RsPeerId(*it)) ; peers.insert(RsPeerId(*it)) ;
@ -742,7 +742,7 @@ void RsGxsNetService::generic_sendItem(RsNxsItem *si)
RsGxsGroupId tmp_grpId; RsGxsGroupId tmp_grpId;
if(mAllowDistSync && isDistantPeer( static_cast<RsGxsNetTunnelVirtualPeerId>(si->PeerId()),tmp_grpId)) if(mAllowDistSync && mGxsNetTunnel != NULL && mGxsNetTunnel->isDistantPeer( static_cast<RsGxsNetTunnelVirtualPeerId>(si->PeerId()),tmp_grpId))
{ {
RsNxsSerialiser ser(mServType); RsNxsSerialiser ser(mServType);
@ -758,7 +758,7 @@ void RsGxsNetService::generic_sendItem(RsNxsItem *si)
#endif #endif
ser.serialise(si,mem,&size) ; ser.serialise(si,mem,&size) ;
sendTunnelData(mem,size,static_cast<RsGxsNetTunnelVirtualPeerId>(si->PeerId())); mGxsNetTunnel->sendTunnelData(mServType,mem,size,static_cast<RsGxsNetTunnelVirtualPeerId>(si->PeerId()));
} }
else else
sendItem(si) ; sendItem(si) ;
@ -766,7 +766,7 @@ void RsGxsNetService::generic_sendItem(RsNxsItem *si)
void RsGxsNetService::checkDistantSyncState() void RsGxsNetService::checkDistantSyncState()
{ {
if(!mAllowDistSync) if(!mAllowDistSync || mGxsNetTunnel==NULL)
return ; return ;
RsGxsGrpMetaTemporaryMap grpMeta; RsGxsGrpMetaTemporaryMap grpMeta;
@ -806,14 +806,14 @@ void RsGxsNetService::checkDistantSyncState()
if(at_least_one_friend_is_supplier) if(at_least_one_friend_is_supplier)
{ {
releaseDistantPeers(grpId); mGxsNetTunnel->releaseDistantPeers(mServType,grpId);
#ifdef NXS_NET_DEBUG_8 #ifdef NXS_NET_DEBUG_8
GXSNETDEBUG___<< " Group " << grpId << ": suppliers among friends. Releasing peers." << std::endl; GXSNETDEBUG___<< " Group " << grpId << ": suppliers among friends. Releasing peers." << std::endl;
#endif #endif
} }
else else
{ {
requestDistantPeers(grpId); mGxsNetTunnel->requestDistantPeers(mServType,grpId);
#ifdef NXS_NET_DEBUG_8 #ifdef NXS_NET_DEBUG_8
GXSNETDEBUG___<< " Group " << grpId << ": no suppliers among friends. Requesting peers." << std::endl; GXSNETDEBUG___<< " Group " << grpId << ": no suppliers among friends. Requesting peers." << std::endl;
#endif #endif
@ -1522,8 +1522,8 @@ class StoreHere
{ {
public: public:
StoreHere(RsGxsNetService::ClientGrpMap& cgm, RsGxsNetService::ClientMsgMap& cmm, RsGxsNetService::ServerMsgMap& smm,RsGxsNetService::GrpConfigMap& gcm, RsGxsServerGrpUpdate& sgm,Bias20Bytes& mrb) StoreHere(RsGxsNetService::ClientGrpMap& cgm, RsGxsNetService::ClientMsgMap& cmm, RsGxsNetService::ServerMsgMap& smm,RsGxsNetService::GrpConfigMap& gcm, RsGxsServerGrpUpdate& sgm)
: mClientGrpMap(cgm), mClientMsgMap(cmm), mServerMsgMap(smm), mGrpConfigMap(gcm), mServerGrpUpdate(sgm), mRandomBias(mrb) : mClientGrpMap(cgm), mClientMsgMap(cmm), mServerMsgMap(smm), mGrpConfigMap(gcm), mServerGrpUpdate(sgm)
{} {}
template <typename ID_type,typename UpdateMap,class ItemClass> void check_store(ID_type id,UpdateMap& map,ItemClass& item) template <typename ID_type,typename UpdateMap,class ItemClass> void check_store(ID_type id,UpdateMap& map,ItemClass& item)
@ -1541,7 +1541,6 @@ public:
RsGxsServerGrpUpdateItem *gsui; RsGxsServerGrpUpdateItem *gsui;
RsGxsServerMsgUpdateItem *msui; RsGxsServerMsgUpdateItem *msui;
RsGxsGrpConfigItem *mgci; RsGxsGrpConfigItem *mgci;
RsGxsTunnelRandomBiasItem *rbsi;
if((mui = dynamic_cast<RsGxsMsgUpdateItem*>(item)) != NULL) if((mui = dynamic_cast<RsGxsMsgUpdateItem*>(item)) != NULL)
check_store(mui->peerID,mClientMsgMap,*mui); check_store(mui->peerID,mClientMsgMap,*mui);
@ -1553,8 +1552,6 @@ public:
check_store(msui->grpId,mServerMsgMap, *msui); check_store(msui->grpId,mServerMsgMap, *msui);
else if((gsui = dynamic_cast<RsGxsServerGrpUpdateItem*>(item)) != NULL) else if((gsui = dynamic_cast<RsGxsServerGrpUpdateItem*>(item)) != NULL)
mServerGrpUpdate = *gsui; mServerGrpUpdate = *gsui;
else if((rbsi = dynamic_cast<RsGxsTunnelRandomBiasItem*>(item))!=NULL)
mRandomBias = rbsi->mRandomBias;
else else
std::cerr << "Type not expected!" << std::endl; std::cerr << "Type not expected!" << std::endl;
@ -1569,7 +1566,6 @@ private:
RsGxsNetService::GrpConfigMap& mGrpConfigMap; RsGxsNetService::GrpConfigMap& mGrpConfigMap;
RsGxsServerGrpUpdate& mServerGrpUpdate; RsGxsServerGrpUpdate& mServerGrpUpdate;
Bias20Bytes& mRandomBias ;
}; };
bool RsGxsNetService::loadList(std::list<RsItem *> &load) bool RsGxsNetService::loadList(std::list<RsItem *> &load)
@ -1578,12 +1574,11 @@ bool RsGxsNetService::loadList(std::list<RsItem *> &load)
// The delete is done in StoreHere, if necessary // The delete is done in StoreHere, if necessary
std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mServerGrpConfigMap, mGrpServerUpdate,mRandomBias)); std::for_each(load.begin(), load.end(), StoreHere(mClientGrpUpdateMap, mClientMsgUpdateMap, mServerMsgUpdateMap, mServerGrpConfigMap, mGrpServerUpdate));
time_t now = time(NULL);
// We reset group statistics here. This is the best place since we know at this point which are all unsubscribed groups. // We reset group statistics here. This is the best place since we know at this point which are all unsubscribed groups.
time_t now = time(NULL);
for(GrpConfigMap::iterator it(mServerGrpConfigMap.begin());it!=mServerGrpConfigMap.end();++it) for(GrpConfigMap::iterator it(mServerGrpConfigMap.begin());it!=mServerGrpConfigMap.end();++it)
{ {
// At each reload, we reset the count of visible messages. It will be rapidely restored to its real value from friends. // At each reload, we reset the count of visible messages. It will be rapidely restored to its real value from friends.
@ -1637,6 +1632,7 @@ struct get_second : public std::unary_function<typename UpdateMap::value_type, R
typename UpdateMap::key_type ItemClass::*ID_member ; typename UpdateMap::key_type ItemClass::*ID_member ;
}; };
bool RsGxsNetService::saveList(bool& cleanup, std::list<RsItem*>& save) bool RsGxsNetService::saveList(bool& cleanup, std::list<RsItem*>& save)
{ {
RS_STACK_MUTEX(mNxsMutex) ; RS_STACK_MUTEX(mNxsMutex) ;
@ -1655,11 +1651,6 @@ bool RsGxsNetService::saveList(bool& cleanup, std::list<RsItem*>& save)
save.push_back(it); save.push_back(it);
RsGxsTunnelRandomBiasItem *it2 = new RsGxsTunnelRandomBiasItem(mServType) ;
it2->mRandomBias = mRandomBias;
save.push_back(it2) ;
cleanup = true; cleanup = true;
return true; return true;
} }
@ -1686,7 +1677,7 @@ RsItem *RsGxsNetService::generic_recvItem()
uint32_t size = 0 ; uint32_t size = 0 ;
RsGxsNetTunnelVirtualPeerId virtual_peer_id ; RsGxsNetTunnelVirtualPeerId virtual_peer_id ;
while(mAllowDistSync && receiveTunnelData(data,size,virtual_peer_id)) while(mAllowDistSync && mGxsNetTunnel!=NULL && mGxsNetTunnel->receiveTunnelData(mServType,data,size,virtual_peer_id))
{ {
RsNxsItem *item = dynamic_cast<RsNxsItem*>(RsNxsSerialiser(mServType).deserialise(data,&size)) ; RsNxsItem *item = dynamic_cast<RsNxsItem*>(RsNxsSerialiser(mServType).deserialise(data,&size)) ;
item->PeerId(virtual_peer_id) ; item->PeerId(virtual_peer_id) ;
@ -2005,11 +1996,6 @@ void RsGxsNetService::data_tick()
runVetting(); runVetting();
processExplicitGroupRequests(); processExplicitGroupRequests();
// also tick distant traffic
if(mAllowDistSync)
RsGxsNetTunnelService::data_tick();
} }
void RsGxsNetService::debugDump() void RsGxsNetService::debugDump()
@ -4117,8 +4103,9 @@ bool RsGxsNetService::canSendGrpId(const RsPeerId& sslId, const RsGxsGrpMetaData
// check if that peer is a virtual peer id, in which case we only send/recv data to/from it items for the group it's requested for // check if that peer is a virtual peer id, in which case we only send/recv data to/from it items for the group it's requested for
RsGxsGroupId peer_grp ; RsGxsGroupId peer_grp ;
if(isDistantPeer(RsGxsNetTunnelVirtualPeerId(sslId),peer_grp) && peer_grp != grpMeta.mGroupId) if(mAllowDistSync && mGxsNetTunnel != NULL && mGxsNetTunnel->isDistantPeer(RsGxsNetTunnelVirtualPeerId(sslId),peer_grp) && peer_grp != grpMeta.mGroupId)
{ {
#warning (cyril) make sure that this is not a problem for cross-service sending of items
#ifdef NXS_NET_DEBUG_4 #ifdef NXS_NET_DEBUG_4
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << " Distant peer designed for group " << peer_grp << ": cannot request sync for different group." << std::endl; GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << " Distant peer designed for group " << peer_grp << ": cannot request sync for different group." << std::endl;
#endif #endif
@ -4181,7 +4168,7 @@ bool RsGxsNetService::checkCanRecvMsgFromPeer(const RsPeerId& sslId, const RsGxs
// check if that peer is a virtual peer id, in which case we only send/recv data to/from it items for the group it's requested for // check if that peer is a virtual peer id, in which case we only send/recv data to/from it items for the group it's requested for
RsGxsGroupId peer_grp ; RsGxsGroupId peer_grp ;
if(isDistantPeer(RsGxsNetTunnelVirtualPeerId(sslId),peer_grp) && peer_grp != grpMeta.mGroupId) if(mGxsNetTunnel->isDistantPeer(RsGxsNetTunnelVirtualPeerId(sslId),peer_grp) && peer_grp != grpMeta.mGroupId)
{ {
#ifdef NXS_NET_DEBUG_4 #ifdef NXS_NET_DEBUG_4
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << " Distant peer designed for group " << peer_grp << ": cannot request sync for different group." << std::endl; GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << " Distant peer designed for group " << peer_grp << ": cannot request sync for different group." << std::endl;
@ -4535,7 +4522,7 @@ bool RsGxsNetService::canSendMsgIds(std::vector<RsGxsMsgMetaData*>& msgMetas, co
// check if that peer is a virtual peer id, in which case we only send/recv data to/from it items for the group it's requested for // check if that peer is a virtual peer id, in which case we only send/recv data to/from it items for the group it's requested for
RsGxsGroupId peer_grp ; RsGxsGroupId peer_grp ;
if(isDistantPeer(RsGxsNetTunnelVirtualPeerId(sslId),peer_grp) && peer_grp != grpMeta.mGroupId) if(mGxsNetTunnel->isDistantPeer(RsGxsNetTunnelVirtualPeerId(sslId),peer_grp) && peer_grp != grpMeta.mGroupId)
{ {
#ifdef NXS_NET_DEBUG_4 #ifdef NXS_NET_DEBUG_4
GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << " Distant peer designed for group " << peer_grp << ": cannot request sync for different group." << std::endl; GXSNETDEBUG_PG(sslId,grpMeta.mGroupId) << " Distant peer designed for group " << peer_grp << ": cannot request sync for different group." << std::endl;

View File

@ -72,7 +72,7 @@ class RsGroupNetworkStatsRecord
* Incoming transaction are in 3 different states * Incoming transaction are in 3 different states
* 1. START 2. RECEIVING 3. END * 1. START 2. RECEIVING 3. END
*/ */
class RsGxsNetService : public RsNetworkExchangeService, public RsGxsNetTunnelService, public p3ThreadedService, public p3Config class RsGxsNetService : public RsNetworkExchangeService, public p3ThreadedService, public p3Config
{ {
public: public:
@ -90,7 +90,7 @@ public:
RsNxsObserver *nxsObs, // used to be = NULL. RsNxsObserver *nxsObs, // used to be = NULL.
const RsServiceInfo serviceInfo, const RsServiceInfo serviceInfo,
RsGixsReputation* reputations = NULL, RsGcxs* circles = NULL, RsGixs *gixs=NULL, RsGixsReputation* reputations = NULL, RsGcxs* circles = NULL, RsGixs *gixs=NULL,
PgpAuxUtils *pgpUtils = NULL, PgpAuxUtils *pgpUtils = NULL, RsGxsNetTunnelService *mGxsNT = NULL,
bool grpAutoSync = true, bool msgAutoSync = true,bool distSync=false, bool grpAutoSync = true, bool msgAutoSync = true,bool distSync=false,
uint32_t default_store_period = RS_GXS_DEFAULT_MSG_STORE_PERIOD, uint32_t default_store_period = RS_GXS_DEFAULT_MSG_STORE_PERIOD,
uint32_t default_sync_period = RS_GXS_DEFAULT_MSG_REQ_PERIOD); uint32_t default_sync_period = RS_GXS_DEFAULT_MSG_REQ_PERIOD);
@ -548,6 +548,7 @@ private:
RsGixs *mGixs; RsGixs *mGixs;
RsGixsReputation* mReputations; RsGixsReputation* mReputations;
PgpAuxUtils *mPgpUtils; PgpAuxUtils *mPgpUtils;
RsGxsNetTunnelService *mGxsNetTunnel;
bool mGrpAutoSync; bool mGrpAutoSync;
bool mAllowMsgSync; bool mAllowMsgSync;

View File

@ -32,7 +32,7 @@
#define DEBUG_RSGXSNETTUNNEL 1 #define DEBUG_RSGXSNETTUNNEL 1
#define GXS_NET_TUNNEL_NOT_IMPLEMENTED() { std::cerr << __PRETTY_FUNCTION__ << ": not yet implemented." << std::endl; } #define GXS_NET_TUNNEL_NOT_IMPLEMENTED() { std::cerr << __PRETTY_FUNCTION__ << ": not yet implemented." << std::endl; }
#define GXS_NET_TUNNEL_DEBUG() std::cerr << time(NULL) << " : GXS_NET_TUNNEL(" << std::hex << serviceType() << std::dec << ") : " << __FUNCTION__ << " : " #define GXS_NET_TUNNEL_DEBUG() std::cerr << time(NULL) << " : GXS_NET_TUNNEL: " << __FUNCTION__ << " : "
#define GXS_NET_TUNNEL_ERROR() std::cerr << "(EE) GXS_NET_TUNNEL ERROR : " #define GXS_NET_TUNNEL_ERROR() std::cerr << "(EE) GXS_NET_TUNNEL ERROR : "
@ -54,6 +54,7 @@ const uint16_t RS_SERVICE_TYPE_GXS_NET_TUNNEL = 0x2233 ;
const uint8_t RS_PKT_SUBTYPE_GXS_NET_TUNNEL_VIRTUAL_PEER = 0x01 ; const uint8_t RS_PKT_SUBTYPE_GXS_NET_TUNNEL_VIRTUAL_PEER = 0x01 ;
const uint8_t RS_PKT_SUBTYPE_GXS_NET_TUNNEL_KEEP_ALIVE = 0x02 ; const uint8_t RS_PKT_SUBTYPE_GXS_NET_TUNNEL_KEEP_ALIVE = 0x02 ;
const uint8_t RS_PKT_SUBTYPE_GXS_NET_TUNNEL_RANDOM_BIAS = 0x03 ;
class RsGxsNetTunnelItem: public RsItem class RsGxsNetTunnelItem: public RsItem
{ {
@ -91,6 +92,21 @@ public:
virtual void serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx) {} virtual void serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx) {}
}; };
class RsGxsNetTunnelRandomBiasItem: public RsGxsNetTunnelItem
{
public:
explicit RsGxsNetTunnelRandomBiasItem() : RsGxsNetTunnelItem(RS_PKT_SUBTYPE_GXS_NET_TUNNEL_RANDOM_BIAS) { clear();}
virtual ~RsGxsNetTunnelRandomBiasItem() {}
virtual void clear() { mRandomBias.clear() ; }
virtual void serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx)
{
RsTypeSerializer::serial_process(j,ctx,mRandomBias,"random bias") ;
}
Bias20Bytes mRandomBias; // Cannot be a simple char[] because of serialization.
};
class RsGxsNetTunnelSerializer: public RsServiceSerializer class RsGxsNetTunnelSerializer: public RsServiceSerializer
{ {
public: public:
@ -108,6 +124,7 @@ public:
{ {
case RS_PKT_SUBTYPE_GXS_NET_TUNNEL_VIRTUAL_PEER: return new RsGxsNetTunnelVirtualPeerItem ; case RS_PKT_SUBTYPE_GXS_NET_TUNNEL_VIRTUAL_PEER: return new RsGxsNetTunnelVirtualPeerItem ;
case RS_PKT_SUBTYPE_GXS_NET_TUNNEL_KEEP_ALIVE : return new RsGxsNetTunnelKeepAliveItem ; case RS_PKT_SUBTYPE_GXS_NET_TUNNEL_KEEP_ALIVE : return new RsGxsNetTunnelKeepAliveItem ;
case RS_PKT_SUBTYPE_GXS_NET_TUNNEL_RANDOM_BIAS : return new RsGxsNetTunnelRandomBiasItem ;
default: default:
GXS_NET_TUNNEL_ERROR() << "type ID " << std::hex << item_subtype << std::dec << " is not handled!" << std::endl; GXS_NET_TUNNEL_ERROR() << "type ID " << std::hex << item_subtype << std::dec << " is not handled!" << std::endl;
return NULL ; return NULL ;
@ -139,7 +156,8 @@ RsGxsNetTunnelService::~RsGxsNetTunnelService()
mVirtualPeers.clear(); mVirtualPeers.clear();
for(auto it(mIncomingData.begin());it!=mIncomingData.end();++it) for(auto it(mIncomingData.begin());it!=mIncomingData.end();++it)
delete (*it).second; for(auto it2(it->second.begin());it2!=it->second.end();++it2)
delete it2->second;
mIncomingData.clear(); mIncomingData.clear();
} }
@ -159,11 +177,11 @@ bool RsGxsNetTunnelService::isDistantPeer(const RsGxsNetTunnelVirtualPeerId& vir
return false ; return false ;
} }
bool RsGxsNetTunnelService::receiveTunnelData(unsigned char *& data,uint32_t& data_len,RsGxsNetTunnelVirtualPeerId& virtual_peer) bool RsGxsNetTunnelService::receiveTunnelData(uint16_t service_id, unsigned char *& data, uint32_t& data_len, RsGxsNetTunnelVirtualPeerId& virtual_peer)
{ {
RS_STACK_MUTEX(mGxsNetTunnelMtx); RS_STACK_MUTEX(mGxsNetTunnelMtx);
std::list<std::pair<RsGxsNetTunnelVirtualPeerId,RsTlvBinaryData*> >& lst(mIncomingData); std::list<std::pair<RsGxsNetTunnelVirtualPeerId,RsTlvBinaryData*> >& lst(mIncomingData[service_id]);
if(lst.empty()) if(lst.empty())
{ {
@ -185,7 +203,7 @@ bool RsGxsNetTunnelService::receiveTunnelData(unsigned char *& data,uint32_t& da
return true; return true;
} }
bool RsGxsNetTunnelService::sendTunnelData(unsigned char *& data,uint32_t data_len,const RsGxsNetTunnelVirtualPeerId& virtual_peer) bool RsGxsNetTunnelService::sendTunnelData(uint16_t service_id,unsigned char *& data,uint32_t data_len,const RsGxsNetTunnelVirtualPeerId& virtual_peer)
{ {
RS_STACK_MUTEX(mGxsNetTunnelMtx); RS_STACK_MUTEX(mGxsNetTunnelMtx);
// The item is serialized and encrypted using chacha20+SHA256, using the generic turtle encryption, and then sent to the turtle router. // The item is serialized and encrypted using chacha20+SHA256, using the generic turtle encryption, and then sent to the turtle router.
@ -245,7 +263,7 @@ bool RsGxsNetTunnelService::getVirtualPeers(std::list<RsGxsNetTunnelVirtualPeerI
return true ; return true ;
} }
bool RsGxsNetTunnelService::requestDistantPeers(const RsGxsGroupId& group_id) bool RsGxsNetTunnelService::requestDistantPeers(uint16_t service_id, const RsGxsGroupId& group_id)
{ {
RS_STACK_MUTEX(mGxsNetTunnelMtx); RS_STACK_MUTEX(mGxsNetTunnelMtx);
@ -268,7 +286,7 @@ bool RsGxsNetTunnelService::requestDistantPeers(const RsGxsGroupId& group_id)
return true; return true;
} }
bool RsGxsNetTunnelService::releaseDistantPeers(const RsGxsGroupId& group_id) bool RsGxsNetTunnelService::releaseDistantPeers(uint16_t service_id,const RsGxsGroupId& group_id)
{ {
RS_STACK_MUTEX(mGxsNetTunnelMtx); RS_STACK_MUTEX(mGxsNetTunnelMtx);
@ -289,24 +307,37 @@ bool RsGxsNetTunnelService::releaseDistantPeers(const RsGxsGroupId& group_id)
return true; return true;
} }
RsGxsNetTunnelVirtualPeerId RsGxsNetTunnelService::locked_makeVirtualPeerId(const RsGxsGroupId& group_id) const const Bias20Bytes& RsGxsNetTunnelService::locked_randomBias()
{
if(mRandomBias.isNull())
{
#ifdef DEBUG_RSGXSNETTUNNEL
#warning /!\ this is for testing only! Remove this when done! Can not be done at initialization when rsPeer is not started.
RsPeerId ssl_id = rsPeers->getOwnId() ;
mRandomBias = Bias20Bytes(RsDirUtil::sha1sum(ssl_id.toByteArray(),ssl_id.SIZE_IN_BYTES)) ;
#else
mRandomBias = Bias20Bytes::random();
#endif
IndicateConfigChanged();
}
return mRandomBias ;
}
RsGxsNetTunnelVirtualPeerId RsGxsNetTunnelService::locked_makeVirtualPeerId(const RsGxsGroupId& group_id)
{ {
assert(RsPeerId::SIZE_IN_BYTES <= Sha1CheckSum::SIZE_IN_BYTES) ;// so that we can build the virtual PeerId from a SHA1 sum. assert(RsPeerId::SIZE_IN_BYTES <= Sha1CheckSum::SIZE_IN_BYTES) ;// so that we can build the virtual PeerId from a SHA1 sum.
// We compute sha1( SSL_id | mRandomBias ) and trunk it to 16 bytes in order to compute a RsPeerId // We compute sha1( SSL_id | mRandomBias ) and trunk it to 16 bytes in order to compute a RsPeerId
#ifdef DEBUG_RSGXSNETTUNNEL Bias20Bytes rb(locked_randomBias());
// /!\ this is for testing only! Remove this when done! Can not be done at initialization when rsPeer is not started.
RsPeerId ssl_id = rsPeers->getOwnId() ;
mRandomBias = Bias20Bytes(RsDirUtil::sha1sum(ssl_id.toByteArray(),ssl_id.SIZE_IN_BYTES)) ;
#endif
unsigned char mem[group_id.SIZE_IN_BYTES + mRandomBias.SIZE_IN_BYTES]; unsigned char mem[group_id.SIZE_IN_BYTES + rb.SIZE_IN_BYTES];
memcpy(mem ,group_id.toByteArray(),group_id.SIZE_IN_BYTES) ; memcpy(mem ,group_id.toByteArray(),group_id.SIZE_IN_BYTES) ;
memcpy(mem+group_id.SIZE_IN_BYTES,mRandomBias.toByteArray(),mRandomBias.SIZE_IN_BYTES) ; memcpy(mem+group_id.SIZE_IN_BYTES,rb.toByteArray(),rb.SIZE_IN_BYTES) ;
return RsGxsNetTunnelVirtualPeerId(RsDirUtil::sha1sum(mem,group_id.SIZE_IN_BYTES+mRandomBias.SIZE_IN_BYTES).toByteArray()); return RsGxsNetTunnelVirtualPeerId(RsDirUtil::sha1sum(mem,group_id.SIZE_IN_BYTES+rb.SIZE_IN_BYTES).toByteArray());
} }
void RsGxsNetTunnelService::dump() const void RsGxsNetTunnelService::dump() const
@ -332,7 +363,7 @@ void RsGxsNetTunnelService::dump() const
std::string("[ACTIVE ]") std::string("[ACTIVE ]")
}; };
std::cerr << "GxsNetTunnelService dump (this=" << (void*)this << ". serv=" << std::hex << serviceType() << std::dec <<") : " << std::endl; std::cerr << "GxsNetTunnelService dump (this=" << (void*)this << ": " << std::endl;
std::cerr << " Managed GXS groups: " << std::endl; std::cerr << " Managed GXS groups: " << std::endl;
for(auto it(mGroups.begin());it!=mGroups.end();++it) for(auto it(mGroups.begin());it!=mGroups.end();++it)
@ -364,7 +395,12 @@ void RsGxsNetTunnelService::dump() const
std::cerr << " Incoming data: " << std::endl; std::cerr << " Incoming data: " << std::endl;
for(auto it(mIncomingData.begin());it!=mIncomingData.end();++it) for(auto it(mIncomingData.begin());it!=mIncomingData.end();++it)
std::cerr << " peer id " << it->first << " " << (void*)it->second << std::endl; {
std::cerr << " service " << std::hex << it->first << std::dec << std::endl;
for(auto it2(it->second.begin());it2!=it->second.end();++it2)
std::cerr << " peer id " << it2->first << " " << (void*)it2->second << std::endl;
}
} }
//===========================================================================================================================================// //===========================================================================================================================================//
@ -639,6 +675,8 @@ void RsGxsNetTunnelService::data_tick()
mLastDump = now; mLastDump = now;
dump(); dump();
} }
rstime::rs_usleep(1*1000*1000) ; // 1 sec
} }
void RsGxsNetTunnelService::sendKeepAlivePackets() void RsGxsNetTunnelService::sendKeepAlivePackets()
@ -759,4 +797,51 @@ void RsGxsNetTunnelService::autowash()
} }
} }
bool RsGxsNetTunnelService::saveList(bool& cleanup, std::list<RsItem*>& save)
{
RsGxsNetTunnelRandomBiasItem *it2 = new RsGxsNetTunnelRandomBiasItem() ;
{
RS_STACK_MUTEX(mGxsNetTunnelMtx);
it2->mRandomBias = mRandomBias;
}
save.push_back(it2) ;
cleanup = true ;
return true;
}
bool RsGxsNetTunnelService::loadList(std::list<RsItem *> &load)
{
RsGxsNetTunnelRandomBiasItem *rbsi ;
for(auto it(load.begin());it!=load.end();++it)
{
if((rbsi = dynamic_cast<RsGxsNetTunnelRandomBiasItem*>(*it))!=NULL)
{
RS_STACK_MUTEX(mGxsNetTunnelMtx);
mRandomBias = rbsi->mRandomBias;
}
else
GXS_NET_TUNNEL_ERROR() << " unknown item in config file: type=" << std::hex << (*it)->PacketId() << std::dec << std::endl;
delete *it;
}
return true;
}
RsSerialiser *RsGxsNetTunnelService::setupSerialiser()
{
RS_STACK_MUTEX(mGxsNetTunnelMtx);
static RsSerialiser *ser = NULL ; // this is not so nice, but this method is only called from p3Config, so there's no really need of a data race
if(!ser)
{
ser = new RsSerialiser ;
ser->addSerialType(new RsGxsNetTunnelSerializer) ;
}
return ser ;
}

View File

@ -148,29 +148,23 @@ struct RsGxsNetTunnelGroupInfo
std::set<TurtleVirtualPeerId> virtual_peers ; // list of which virtual peers provide this group. Can me more than 1. std::set<TurtleVirtualPeerId> virtual_peers ; // list of which virtual peers provide this group. Can me more than 1.
}; };
class RsGxsNetTunnelService: public RsTurtleClientService class RsGxsNetTunnelService: public RsTurtleClientService, public RsTickingThread, public p3Config
{ {
public: public:
RsGxsNetTunnelService() ; RsGxsNetTunnelService() ;
virtual ~RsGxsNetTunnelService() ; virtual ~RsGxsNetTunnelService() ;
/*!
* \brief serviceType
* \return returns the service that is currently using this as a subclass.
*/
virtual uint16_t serviceType() const = 0 ;
/*! /*!
* \brief Manage tunnels for this group * \brief Manage tunnels for this group
* @param group_id group for which tunnels should be released * @param group_id group for which tunnels should be released
*/ */
bool requestDistantPeers(const RsGxsGroupId&group_id) ; bool requestDistantPeers(uint16_t service_id,const RsGxsGroupId&group_id) ;
/*! /*!
* \brief Stop managing tunnels for this group * \brief Stop managing tunnels for this group
* @param group_id group for which tunnels should be released * @param group_id group for which tunnels should be released
*/ */
bool releaseDistantPeers(const RsGxsGroupId&group_id) ; bool releaseDistantPeers(uint16_t service_id, const RsGxsGroupId&group_id) ;
/*! /*!
* \brief Get the list of active virtual peers for a given group. This implies that a tunnel is up and * \brief Get the list of active virtual peers for a given group. This implies that a tunnel is up and
@ -186,7 +180,7 @@ public:
* \return * \return
* true if succeeded. * true if succeeded.
*/ */
bool sendTunnelData(unsigned char *& data, uint32_t data_len, const RsGxsNetTunnelVirtualPeerId& virtual_peer) ; bool sendTunnelData(uint16_t service_id,unsigned char *& data, uint32_t data_len, const RsGxsNetTunnelVirtualPeerId& virtual_peer) ;
/*! /*!
* \brief receiveData * \brief receiveData
@ -197,7 +191,7 @@ public:
* \return * \return
* true if something is returned. If not, data is set to NULL, data_len to 0. * true if something is returned. If not, data is set to NULL, data_len to 0.
*/ */
bool receiveTunnelData(unsigned char *& data, uint32_t& data_len, RsGxsNetTunnelVirtualPeerId& virtual_peer) ; bool receiveTunnelData(uint16_t service_id, unsigned char *& data, uint32_t& data_len, RsGxsNetTunnelVirtualPeerId& virtual_peer) ;
/*! /*!
* \brief isDistantPeer * \brief isDistantPeer
@ -224,6 +218,12 @@ public:
void data_tick() ; void data_tick() ;
// Overloads p3Config
RsSerialiser *setupSerialiser();
bool saveList(bool& cleanup, std::list<RsItem*>& save);
bool loadList(std::list<RsItem *> &load);
protected: protected:
// interaction with turtle router // interaction with turtle router
@ -231,6 +231,7 @@ protected:
virtual void receiveTurtleData(RsTurtleGenericTunnelItem *item,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction) ; virtual void receiveTurtleData(RsTurtleGenericTunnelItem *item,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction) ;
void addVirtualPeer(const TurtleFileHash&, const TurtleVirtualPeerId&,RsTurtleGenericTunnelItem::Direction dir) ; void addVirtualPeer(const TurtleFileHash&, const TurtleVirtualPeerId&,RsTurtleGenericTunnelItem::Direction dir) ;
void removeVirtualPeer(const TurtleFileHash&, const TurtleVirtualPeerId&) ; void removeVirtualPeer(const TurtleFileHash&, const TurtleVirtualPeerId&) ;
const Bias20Bytes& locked_randomBias() ;
p3turtle *mTurtle ; p3turtle *mTurtle ;
@ -252,7 +253,7 @@ private:
std::list<std::pair<TurtleVirtualPeerId,RsTurtleGenericDataItem*> > mPendingTurtleItems ; // items that need to be sent off-turtle Mutex. std::list<std::pair<TurtleVirtualPeerId,RsTurtleGenericDataItem*> > mPendingTurtleItems ; // items that need to be sent off-turtle Mutex.
std::list<std::pair<RsGxsNetTunnelVirtualPeerId,RsTlvBinaryData *> > mIncomingData; // list of incoming data items std::map<uint16_t, std::list<std::pair<RsGxsNetTunnelVirtualPeerId,RsTlvBinaryData *> > > mIncomingData; // list of incoming data items
/*! /*!
* \brief Generates the hash to request tunnels for this group. This hash is only used by turtle, and is used to * \brief Generates the hash to request tunnels for this group. This hash is only used by turtle, and is used to
@ -266,7 +267,7 @@ private:
* tunnel ID and turtle virtual peer id. This allows RsGxsNetService to keep sync-ing the data consistently. * tunnel ID and turtle virtual peer id. This allows RsGxsNetService to keep sync-ing the data consistently.
*/ */
RsGxsNetTunnelVirtualPeerId locked_makeVirtualPeerId(const RsGxsGroupId& group_id) const ; RsGxsNetTunnelVirtualPeerId locked_makeVirtualPeerId(const RsGxsGroupId& group_id) ;
static void generateEncryptionKey(const RsGxsGroupId& group_id,const TurtleVirtualPeerId& vpid,unsigned char key[RS_GXS_TUNNEL_CONST_EKEY_SIZE]) ; static void generateEncryptionKey(const RsGxsGroupId& group_id,const TurtleVirtualPeerId& vpid,unsigned char key[RS_GXS_TUNNEL_CONST_EKEY_SIZE]) ;

View File

@ -44,7 +44,6 @@ RsItem* RsGxsUpdateSerialiser::create_item(uint16_t service,uint8_t item_subtype
case RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE: return new RsGxsServerGrpUpdateItem(SERVICE_TYPE); case RS_PKT_SUBTYPE_GXS_SERVER_GRP_UPDATE: return new RsGxsServerGrpUpdateItem(SERVICE_TYPE);
case RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE: return new RsGxsServerMsgUpdateItem(SERVICE_TYPE); case RS_PKT_SUBTYPE_GXS_SERVER_MSG_UPDATE: return new RsGxsServerMsgUpdateItem(SERVICE_TYPE);
case RS_PKT_SUBTYPE_GXS_GRP_CONFIG: return new RsGxsGrpConfigItem(SERVICE_TYPE); case RS_PKT_SUBTYPE_GXS_GRP_CONFIG: return new RsGxsGrpConfigItem(SERVICE_TYPE);
case RS_PKT_SUBTYPE_GXS_RANDOM_BIAS: return new RsGxsTunnelRandomBiasItem(SERVICE_TYPE);
default: default:
return NULL ; return NULL ;
} }
@ -77,11 +76,6 @@ void RsGxsServerGrpUpdateItem::clear()
grpUpdateTS = 0; grpUpdateTS = 0;
} }
void RsGxsTunnelRandomBiasItem::clear()
{
mRandomBias.clear() ;
}
/**********************************************************************************************/ /**********************************************************************************************/
/* SERIALISER */ /* SERIALISER */
/**********************************************************************************************/ /**********************************************************************************************/
@ -140,8 +134,4 @@ void RsGxsGrpConfigItem::serial_process(RsGenericSerializer::SerializeJob j,RsGe
RsTypeSerializer::serial_process<uint32_t>(j,ctx,msg_send_delay,"msg_send_delay") ; RsTypeSerializer::serial_process<uint32_t>(j,ctx,msg_send_delay,"msg_send_delay") ;
RsTypeSerializer::serial_process<uint32_t>(j,ctx,msg_req_delay,"msg_req_delay") ; RsTypeSerializer::serial_process<uint32_t>(j,ctx,msg_req_delay,"msg_req_delay") ;
} }
void RsGxsTunnelRandomBiasItem::serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx)
{
RsTypeSerializer::serial_process(j,ctx,mRandomBias,"random bias") ;
}

View File

@ -188,18 +188,6 @@ public:
RsGxsGroupId grpId; RsGxsGroupId grpId;
}; };
class RsGxsTunnelRandomBiasItem: public RsGxsNetServiceItem
{
public:
explicit RsGxsTunnelRandomBiasItem(uint16_t servType) : RsGxsNetServiceItem(servType, RS_PKT_SUBTYPE_GXS_RANDOM_BIAS) { clear();}
virtual ~RsGxsTunnelRandomBiasItem() {}
virtual void clear();
virtual void serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx);
Bias20Bytes mRandomBias; // Cannot be a simple char[] because of serialization.
};
class RsGxsUpdateSerialiser : public RsServiceSerializer class RsGxsUpdateSerialiser : public RsServiceSerializer
{ {
public: public:

View File

@ -1358,6 +1358,14 @@ int RsServer::StartupRetroShare()
mWiki->setNetworkExchangeService(wiki_ns) ; mWiki->setNetworkExchangeService(wiki_ns) ;
#endif #endif
/**** GXS Dist sync service ****/
#ifdef RS_USE_GXS_DISTANT_SYNC
RsGxsNetTunnelService *mGxsNetTunnel = new RsGxsNetTunnelService ;
#else
RsGxsNetTunnelService *mGxsNetTunnel = NULL ;
#endif
/**** Forum GXS service ****/ /**** Forum GXS service ****/
RsGeneralDataService* gxsforums_ds = new RsDataService(currGxsDir + "/", "gxsforums_db", RsGeneralDataService* gxsforums_ds = new RsDataService(currGxsDir + "/", "gxsforums_db",
@ -1371,7 +1379,7 @@ int RsServer::StartupRetroShare()
RS_SERVICE_GXS_TYPE_FORUMS, gxsforums_ds, nxsMgr, RS_SERVICE_GXS_TYPE_FORUMS, gxsforums_ds, nxsMgr,
mGxsForums, mGxsForums->getServiceInfo(), mGxsForums, mGxsForums->getServiceInfo(),
mReputations, mGxsCircles,mGxsIdService, mReputations, mGxsCircles,mGxsIdService,
pgpAuxUtils); pgpAuxUtils);//,mGxsNetTunnel,true,true,true);
mGxsForums->setNetworkExchangeService(gxsforums_ns) ; mGxsForums->setNetworkExchangeService(gxsforums_ns) ;
@ -1387,7 +1395,7 @@ int RsServer::StartupRetroShare()
RS_SERVICE_GXS_TYPE_CHANNELS, gxschannels_ds, nxsMgr, RS_SERVICE_GXS_TYPE_CHANNELS, gxschannels_ds, nxsMgr,
mGxsChannels, mGxsChannels->getServiceInfo(), mGxsChannels, mGxsChannels->getServiceInfo(),
mReputations, mGxsCircles,mGxsIdService, mReputations, mGxsCircles,mGxsIdService,
pgpAuxUtils,true,true,true); pgpAuxUtils,mGxsNetTunnel,true,true,true);
mGxsChannels->setNetworkExchangeService(gxschannels_ns) ; mGxsChannels->setNetworkExchangeService(gxschannels_ns) ;
@ -1442,7 +1450,7 @@ int RsServer::StartupRetroShare()
RsGxsNetService* gxstrans_ns = new RsGxsNetService( RsGxsNetService* gxstrans_ns = new RsGxsNetService(
RS_SERVICE_TYPE_GXS_TRANS, gxstrans_ds, nxsMgr, mGxsTrans, RS_SERVICE_TYPE_GXS_TRANS, gxstrans_ds, nxsMgr, mGxsTrans,
mGxsTrans->getServiceInfo(), mReputations, mGxsCircles, mGxsTrans->getServiceInfo(), mReputations, mGxsCircles,
mGxsIdService, pgpAuxUtils,true,true,false,p3GxsTrans::GXS_STORAGE_PERIOD,p3GxsTrans::GXS_SYNC_PERIOD); mGxsIdService, pgpAuxUtils,NULL,true,true,false,p3GxsTrans::GXS_STORAGE_PERIOD,p3GxsTrans::GXS_SYNC_PERIOD);
mGxsTrans->setNetworkExchangeService(gxstrans_ns); mGxsTrans->setNetworkExchangeService(gxstrans_ns);
pqih->addService(gxstrans_ns, true); pqih->addService(gxstrans_ns, true);
@ -1479,12 +1487,13 @@ int RsServer::StartupRetroShare()
mGxsTunnels->connectToTurtleRouter(tr) ; mGxsTunnels->connectToTurtleRouter(tr) ;
rsGxsTunnel = mGxsTunnels; rsGxsTunnel = mGxsTunnels;
mGxsNetTunnel->connectToTurtleRouter(tr) ;
rsDisc = mDisc; rsDisc = mDisc;
rsMsgs = new p3Msgs(msgSrv, chatSrv); rsMsgs = new p3Msgs(msgSrv, chatSrv);
// connect components to turtle router. // connect components to turtle router.
gxschannels_ns->connectToTurtleRouter(tr) ;
ftserver->connectToTurtleRouter(tr) ; ftserver->connectToTurtleRouter(tr) ;
ftserver->connectToFileDatabase(fdb) ; ftserver->connectToFileDatabase(fdb) ;
chatSrv->connectToGxsTunnelService(mGxsTunnels) ; chatSrv->connectToGxsTunnelService(mGxsTunnels) ;
@ -1615,6 +1624,7 @@ int RsServer::StartupRetroShare()
//mConfigMgr->addConfiguration("ftserver.cfg", ftserver); //mConfigMgr->addConfiguration("ftserver.cfg", ftserver);
// //
mConfigMgr->addConfiguration("gpg_prefs.cfg", AuthGPG::getAuthGPG()); mConfigMgr->addConfiguration("gpg_prefs.cfg", AuthGPG::getAuthGPG());
mConfigMgr->addConfiguration("gxsnettunnel.cfg", mGxsNetTunnel);
mConfigMgr->loadConfiguration(); mConfigMgr->loadConfiguration();
mConfigMgr->addConfiguration("peers.cfg", mPeerMgr); mConfigMgr->addConfiguration("peers.cfg", mPeerMgr);
@ -1824,6 +1834,7 @@ int RsServer::StartupRetroShare()
/*** start up GXS core runner ***/ /*** start up GXS core runner ***/
startServiceThread(mGxsNetTunnel, "gxs net tunnel");
startServiceThread(mGxsIdService, "gxs id"); startServiceThread(mGxsIdService, "gxs id");
startServiceThread(mGxsCircles, "gxs circle"); startServiceThread(mGxsCircles, "gxs circle");
startServiceThread(mPosted, "gxs posted"); startServiceThread(mPosted, "gxs posted");