half-way through async messaging. Redesigned global router pipeline so as to merge traffic from turtle and friend/routing matrix. Only turtle traffic currently enabled. Should provide minimal functionality

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@8127 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2015-04-09 21:34:50 +00:00
parent e75b28b733
commit 2ecd582273
7 changed files with 844 additions and 515 deletions

View file

@ -44,7 +44,6 @@
// To be put in pqi/p3cfgmgr.h
//
static const uint32_t CONFIG_TYPE_GROUTER = 0x0016 ;
static const uint32_t RS_GROUTER_DATA_FLAGS_ENCRYPTED = 0x0001 ;
class p3LinkMgr ;
@ -52,9 +51,10 @@ class p3turtle ;
class RsGixs ;
class RsGRouterItem ;
class RsGRouterGenericDataItem ;
class RsGRouterTransactionChunkItem ;
class RsGRouterSignedReceiptItem ;
class RsGRouterAbstractMsgItem ;
class RsGRouterTransactionItem ;
class RsGRouterTransactionAcknItem ;
// This class is responsible for accepting data chunks and merging them into a final object. When the object is
// complete, it is de-serialised and returned as a RsGRouterGenericDataItem*.
@ -69,13 +69,28 @@ public:
void addVirtualPeer(const TurtleVirtualPeerId& vpid) ;
void removeVirtualPeer(const TurtleVirtualPeerId& vpid) ;
RsGRouterAbstractMsgItem *addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk_item) ;
std::map<TurtleVirtualPeerId, RsGRouterTransactionChunkItem*> virtual_peers ;
std::set<TurtleVirtualPeerId> virtual_peers ;
time_t first_tunnel_ok_TS ; // timestamp when 1st tunnel was received.
time_t last_tunnel_ok_TS ; // timestamp when last tunnel was received.
};
class GRouterDataInfo
{
// ! This class does not have a copy constructor that duplicates the incoming data buffer. This is on purpose!
public:
GRouterDataInfo()
{
incoming_data_buffer = NULL ;
}
void clear() { delete incoming_data_buffer ; incoming_data_buffer = NULL ;}
// These two methods handle the memory management of buffers for each virtual peers.
RsGRouterAbstractMsgItem *addDataChunk(RsGRouterTransactionChunkItem *chunk_item) ;
RsGRouterTransactionChunkItem *incoming_data_buffer ;
};
class p3GRouter: public RsGRouter, public RsTurtleClientService, public p3Service, public p3Config
{
public:
@ -196,6 +211,15 @@ protected:
virtual void removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id) ;
private:
//===================================================//
// Low level item sorting //
//===================================================//
void handleLowLevelServiceItems() ;
void handleLowLevelServiceItem(RsGRouterTransactionItem*) ;
void handleLowLevelTransactionChunkItem(RsGRouterTransactionChunkItem *chunk_item);
void handleLowLevelTransactionAckItem(RsGRouterTransactionAcknItem*) ;
class nullstream: public std::ostream {};
std::ostream& grouter_debug() const
@ -207,10 +231,16 @@ private:
void routePendingObjects() ;
void handleTunnels() ;
void autoWash() ;
void handleIncoming(const TurtleFileHash &hash, RsGRouterAbstractMsgItem *) ;
void handleIncomingReceiptItem(const TurtleFileHash &hash, RsGRouterSignedReceiptItem *receipt_item) ;
void handleIncomingDataItem(const TurtleFileHash &hash, RsGRouterGenericDataItem *data_item) ;
//===================================================//
// High level item sorting //
//===================================================//
void handleIncoming() ;
void handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_item) ;
void handleIncomingDataItem(RsGRouterGenericDataItem *data_item) ;
bool locked_getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id);
@ -219,6 +249,7 @@ private:
//
static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ;
static time_t computeNextTimeDelay(time_t duration) ;
static bool sliceDataItem(RsGRouterAbstractMsgItem *,std::list<RsGRouterTransactionChunkItem*>& chunks) ;
uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ;
@ -231,7 +262,10 @@ private:
static Sha1CheckSum makeTunnelHash(const RsGxsId& destination,const GRouterServiceId& client);
static void makeGxsIdAndClientId(const TurtleFileHash &sum,RsGxsId& gxs_id,GRouterServiceId& client_id);
bool sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterAbstractMsgItem *item);
bool locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTransactionItem& item);
void locked_collectAvailableFriends(const GRouterKeyId &gxs_id,std::list<RsPeerId>& friend_peers, bool is_origin);
void locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list<RsPeerId>& tunnel_peers);
//===================================================//
// p3Config methods //
@ -284,10 +318,16 @@ private:
//
std::map<GRouterMsgPropagationId, GRouterRoutingInfo> _pending_messages;// pending messages
std::map<TurtleFileHash,GRouterTunnelInfo> _virtual_peers ;
// Stores virtual peers that appear/disappear as the result of the turtle router client
//
std::map<TurtleFileHash,GRouterTunnelInfo> _tunnels ;
// Stores incoming data from any peers (virtual and real) into chunks that get aggregated until finished.
//
std::map<RsPeerId,GRouterDataInfo> _incoming_data_pipes ;
// Queue of incoming items. Might be receipts or data. Should always be empty (not a storage place)
std::list<RsGRouterItem*> _incoming_items ;
std::list<RsGRouterAbstractMsgItem *> _incoming_items ;
// Data handling methods
//
@ -299,6 +339,7 @@ private:
p3ServiceControl *mServiceControl ;
p3turtle *mTurtle ;
RsGixs *mGixs ;
p3LinkMgr *mLinkMgr ;
// Multi-thread protection mutex.
//