mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
New message passing system. Still misses signed receipts.
git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-NewGRouterModel@7850 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
ce7710d183
commit
eea680e78c
@ -45,12 +45,14 @@ RsItem *RsGRouterSerialiser::deserialise(void *data, uint32_t *pktsize)
|
||||
|
||||
switch(getRsItemSubType(rstype))
|
||||
{
|
||||
//case RS_PKT_SUBTYPE_GROUTER_PUBLISH_KEY: return deserialise_RsGRouterPublishKeyItem(data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_DATA: return deserialise_RsGRouterGenericDataItem(data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_RECEIPT: return deserialise_RsGRouterReceiptItem(data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES: return deserialise_RsGRouterMatrixCluesItem(data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_DATA: return deserialise_RsGRouterGenericDataItem (data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK: return deserialise_RsGRouterTransactionChunkItem(data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_TRANSACTION_ACKN: return deserialise_RsGRouterTransactionAcknItem (data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_SIGNED_RECEIPT: return deserialise_RsGRouterSignedReceiptItem (data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES: return deserialise_RsGRouterMatrixCluesItem (data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST: return deserialise_RsGRouterMatrixFriendListItem(data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO: return deserialise_RsGRouterRoutingInfoItem(data, *pktsize);
|
||||
case RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO: return deserialise_RsGRouterRoutingInfoItem (data, *pktsize);
|
||||
|
||||
default:
|
||||
std::cerr << "RsGRouterSerialiser::deserialise(): Could not de-serialise item. SubPacket id = " << std::hex << getRsItemSubType(rstype) << " id = " << rstype << std::dec << std::endl;
|
||||
return NULL;
|
||||
@ -76,7 +78,7 @@ RsGRouterTransactionChunkItem *RsGRouterSerialiser::deserialise_RsGRouterTransac
|
||||
std::cerr << __PRETTY_FUNCTION__ << ": Cannot allocate memory for chunk " << item->chunk_size << std::endl;
|
||||
return NULL ;
|
||||
}
|
||||
if(item->chunk_size + offset >= rssize)
|
||||
if(item->chunk_size + offset > rssize)
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << ": Cannot read beyond item size. Serialisation error!" << std::endl;
|
||||
return NULL ;
|
||||
@ -93,6 +95,25 @@ RsGRouterTransactionChunkItem *RsGRouterSerialiser::deserialise_RsGRouterTransac
|
||||
|
||||
return item;
|
||||
}
|
||||
RsGRouterTransactionAcknItem *RsGRouterSerialiser::deserialise_RsGRouterTransactionAcknItem(void *data, uint32_t tlvsize) const
|
||||
{
|
||||
uint32_t offset = 8; // skip the header
|
||||
uint32_t rssize = getRsItemSize(data);
|
||||
bool ok = true ;
|
||||
|
||||
RsGRouterTransactionAcknItem *item = new RsGRouterTransactionAcknItem() ;
|
||||
|
||||
/* add mandatory parts first */
|
||||
ok &= getRawUInt64(data, tlvsize, &offset, &item->propagation_id);
|
||||
|
||||
if (offset != rssize || !ok)
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << ": error while deserialising! Item will be dropped." << std::endl;
|
||||
return NULL ;
|
||||
}
|
||||
|
||||
return item;
|
||||
}
|
||||
RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataItem(void *data, uint32_t pktsize) const
|
||||
{
|
||||
uint32_t offset = 8; // skip the header
|
||||
@ -111,7 +132,7 @@ RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataI
|
||||
return NULL ;
|
||||
}
|
||||
|
||||
if(item->data_size + offset >= rssize)
|
||||
if(item->data_size + offset > rssize)
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << ": Cannot read beyond item size. Serialisation error!" << std::endl;
|
||||
return NULL ;
|
||||
@ -134,16 +155,16 @@ RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataI
|
||||
return item;
|
||||
}
|
||||
|
||||
RsGRouterReceiptItem *RsGRouterSerialiser::deserialise_RsGRouterReceiptItem(void *data, uint32_t pktsize) const
|
||||
RsGRouterSignedReceiptItem *RsGRouterSerialiser::deserialise_RsGRouterSignedReceiptItem(void *data, uint32_t pktsize) const
|
||||
{
|
||||
uint32_t offset = 8; // skip the header
|
||||
uint32_t rssize = getRsItemSize(data);
|
||||
bool ok = true ;
|
||||
|
||||
RsGRouterReceiptItem *item = new RsGRouterReceiptItem() ;
|
||||
RsGRouterSignedReceiptItem *item = new RsGRouterSignedReceiptItem() ;
|
||||
|
||||
ok &= getRawUInt64(data, pktsize, &offset, &item->mid);
|
||||
ok &= getRawUInt32(data, pktsize, &offset, &item->state);
|
||||
ok &= getRawUInt64(data, pktsize, &offset, &item->routing_id);
|
||||
ok &= getRawUInt32(data, pktsize, &offset, &item->return_flags);
|
||||
ok &= item->destination_key.deserialise(data, pktsize, offset);
|
||||
ok &= item->signature.GetTlv(data, pktsize, &offset); // signature
|
||||
|
||||
@ -288,7 +309,7 @@ uint32_t RsGRouterGenericDataItem::signed_data_size() const
|
||||
|
||||
return s ;
|
||||
}
|
||||
uint32_t RsGRouterReceiptItem::serial_size() const
|
||||
uint32_t RsGRouterSignedReceiptItem::serial_size() const
|
||||
{
|
||||
uint32_t s = 8 ; // header
|
||||
s += sizeof(GRouterMsgPropagationId) ; // routing id
|
||||
@ -309,6 +330,13 @@ uint32_t RsGRouterTransactionChunkItem::serial_size() const
|
||||
|
||||
return s;
|
||||
}
|
||||
uint32_t RsGRouterTransactionAcknItem::serial_size() const
|
||||
{
|
||||
uint32_t s = 8 ; // header
|
||||
s += sizeof(GRouterMsgPropagationId) ; // routing id
|
||||
|
||||
return s;
|
||||
}
|
||||
bool RsGRouterTransactionChunkItem::serialise(void *data,uint32_t& size) const
|
||||
{
|
||||
uint32_t tlvsize,offset=0;
|
||||
@ -363,6 +391,25 @@ bool RsGRouterGenericDataItem::serialise(void *data,uint32_t& size) const
|
||||
|
||||
return ok;
|
||||
}
|
||||
bool RsGRouterTransactionAcknItem::serialise(void *data,uint32_t& size) const
|
||||
{
|
||||
uint32_t tlvsize,offset=0;
|
||||
bool ok = true;
|
||||
|
||||
if(!serialise_header(data,size,tlvsize,offset))
|
||||
return false ;
|
||||
|
||||
/* add mandatory parts first */
|
||||
ok &= setRawUInt64(data, tlvsize, &offset, propagation_id);
|
||||
|
||||
if (offset != tlvsize)
|
||||
{
|
||||
ok = false;
|
||||
std::cerr << "RsGRouterGenericDataItem::serialisedata() size error! " << std::endl;
|
||||
}
|
||||
|
||||
return ok;
|
||||
}
|
||||
bool RsGRouterGenericDataItem::serialise_signed_data(void *data,uint32_t& size) const
|
||||
{
|
||||
bool ok = true;
|
||||
@ -386,7 +433,7 @@ bool RsGRouterGenericDataItem::serialise_signed_data(void *data,uint32_t& size)
|
||||
|
||||
return ok;
|
||||
}
|
||||
bool RsGRouterReceiptItem::serialise(void *data,uint32_t& size) const
|
||||
bool RsGRouterSignedReceiptItem::serialise(void *data,uint32_t& size) const
|
||||
{
|
||||
uint32_t tlvsize,offset=0;
|
||||
bool ok = true;
|
||||
@ -395,8 +442,8 @@ bool RsGRouterReceiptItem::serialise(void *data,uint32_t& size) const
|
||||
return false ;
|
||||
|
||||
/* add mandatory parts first */
|
||||
ok &= setRawUInt64(data, tlvsize, &offset, mid);
|
||||
ok &= setRawUInt32(data, tlvsize, &offset, state);
|
||||
ok &= setRawUInt64(data, tlvsize, &offset, routing_id);
|
||||
ok &= setRawUInt32(data, tlvsize, &offset, return_flags);
|
||||
ok &= destination_key.serialise(data,tlvsize,offset) ;
|
||||
ok &= signature.SetTlv(data,tlvsize,&offset) ;
|
||||
|
||||
@ -539,12 +586,12 @@ bool RsGRouterRoutingInfoItem::serialise(void *data,uint32_t& size) const
|
||||
// ------------------------------------- IO --------------------------------------- //
|
||||
// -----------------------------------------------------------------------------------//
|
||||
//
|
||||
std::ostream& RsGRouterReceiptItem::print(std::ostream& o, uint16_t)
|
||||
std::ostream& RsGRouterSignedReceiptItem::print(std::ostream& o, uint16_t)
|
||||
{
|
||||
o << "RsGRouterReceiptItem:" << std::endl ;
|
||||
o << " direct origin: \""<< PeerId() << "\"" << std::endl ;
|
||||
o << " Mid: " << mid << std::endl ;
|
||||
o << " State: " << state << std::endl ;
|
||||
o << " Mid: " << routing_id << std::endl ;
|
||||
o << " State: " << return_flags << std::endl ;
|
||||
o << " Dest: " << destination_key << std::endl ;
|
||||
o << " Sign: " << signature.keyId << std::endl ;
|
||||
|
||||
@ -600,6 +647,13 @@ std::ostream& RsGRouterTransactionChunkItem::print(std::ostream& o, uint16_t)
|
||||
|
||||
return o ;
|
||||
}
|
||||
std::ostream& RsGRouterTransactionAcknItem::print(std::ostream& o, uint16_t)
|
||||
{
|
||||
o << "RsGRouterTransactionAcknItem:" << std::endl ;
|
||||
o << " routing id: " << propagation_id << std::endl;
|
||||
|
||||
return o ;
|
||||
}
|
||||
std::ostream& RsGRouterMatrixFriendListItem::print(std::ostream& o, uint16_t)
|
||||
{
|
||||
o << "RsGRouterMatrixCluesItem:" << std::endl ;
|
||||
|
@ -34,17 +34,18 @@
|
||||
#include "p3grouter.h"
|
||||
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_PUBLISH_KEY = 0x01 ; // used to publish a key
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_ACK_deprecated = 0x03 ; // acknowledgement of data received
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_RECEIPT = 0x04 ; // acknowledgement of data received
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA_deprecated = 0x05 ; // used to send data to a destination
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_ACK_deprecated = 0x03 ; // dont use!
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_SIGNED_RECEIPT = 0x04 ; // long-distance acknowledgement of data received
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA_deprecated = 0x05 ; // dont use!
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA = 0x06 ; // used to send data to a destination (Signed by source)
|
||||
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK = 0x10 ; // chunk of data. Used internally.
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_TRANSACTION_ACKN = 0x11 ; // acknowledge for finished transaction. Not necessary, but increases fiability.
|
||||
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // item to save matrix clues
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // item to save matrix clues
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated = 0x87 ; // deprecated. Don't use.
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x89 ; // item to save routing info
|
||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x89 ; // item to save routing info
|
||||
|
||||
const uint8_t QOS_PRIORITY_RS_GROUTER = 3 ; // irrelevant since all items travel through tunnels
|
||||
|
||||
@ -85,10 +86,19 @@ class RsGRouterNonCopyableObject
|
||||
/* Specific packets */
|
||||
/***********************************************************************************/
|
||||
|
||||
class RsGRouterGenericDataItem: public RsGRouterItem, public RsGRouterNonCopyableObject
|
||||
class RsGRouterAbstractMsgItem: public RsGRouterItem
|
||||
{
|
||||
public:
|
||||
RsGRouterGenericDataItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_DATA) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; }
|
||||
RsGRouterAbstractMsgItem(uint8_t pkt_subtype) : RsGRouterItem(pkt_subtype) {}
|
||||
|
||||
GRouterMsgPropagationId routing_id ;
|
||||
GRouterKeyId destination_key ;
|
||||
};
|
||||
|
||||
class RsGRouterGenericDataItem: public RsGRouterAbstractMsgItem, public RsGRouterNonCopyableObject
|
||||
{
|
||||
public:
|
||||
RsGRouterGenericDataItem() : RsGRouterAbstractMsgItem(RS_PKT_SUBTYPE_GROUTER_DATA) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; }
|
||||
virtual ~RsGRouterGenericDataItem() { clear() ; }
|
||||
|
||||
virtual bool serialise(void *data,uint32_t& size) const ;
|
||||
@ -105,8 +115,6 @@ class RsGRouterGenericDataItem: public RsGRouterItem, public RsGRouterNonCopyabl
|
||||
|
||||
// packet data
|
||||
//
|
||||
GRouterMsgPropagationId routing_id ;
|
||||
GRouterKeyId destination_key ;
|
||||
uint32_t data_size ;
|
||||
uint8_t *data_bytes;
|
||||
|
||||
@ -120,10 +128,10 @@ class RsGRouterGenericDataItem: public RsGRouterItem, public RsGRouterNonCopyabl
|
||||
bool serialise_signed_data(void *data,uint32_t& size) const ;
|
||||
};
|
||||
|
||||
class RsGRouterReceiptItem: public RsGRouterItem
|
||||
class RsGRouterSignedReceiptItem: public RsGRouterAbstractMsgItem
|
||||
{
|
||||
public:
|
||||
RsGRouterReceiptItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_RECEIPT) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; }
|
||||
RsGRouterSignedReceiptItem() : RsGRouterAbstractMsgItem(RS_PKT_SUBTYPE_GROUTER_SIGNED_RECEIPT) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; }
|
||||
|
||||
virtual bool serialise(void *data,uint32_t& size) const ;
|
||||
virtual uint32_t serial_size() const ;
|
||||
@ -133,16 +141,14 @@ class RsGRouterReceiptItem: public RsGRouterItem
|
||||
|
||||
// packet data
|
||||
//
|
||||
GRouterMsgPropagationId mid ; // message id to which this ack is a response
|
||||
GRouterKeyId destination_key ; // This is the key to the peer we're reponding to.
|
||||
uint32_t state ; // packet was delivered, not delivered, bounced, etc
|
||||
uint32_t return_flags ; // packet was delivered, not delivered, bounced, etc
|
||||
|
||||
RsTlvKeySignature signature ; // signs mid+destination_key+state
|
||||
};
|
||||
|
||||
// Low-level data items
|
||||
|
||||
class RsGRouterTransactionChunkItem: public RsGRouterItem
|
||||
class RsGRouterTransactionChunkItem: public RsGRouterItem, public RsGRouterNonCopyableObject
|
||||
{
|
||||
public:
|
||||
RsGRouterTransactionChunkItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; }
|
||||
@ -159,6 +165,19 @@ class RsGRouterTransactionChunkItem: public RsGRouterItem
|
||||
uint32_t total_size ;
|
||||
uint8_t *chunk_data ;
|
||||
};
|
||||
class RsGRouterTransactionAcknItem: public RsGRouterItem
|
||||
{
|
||||
public:
|
||||
RsGRouterTransactionAcknItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_TRANSACTION_ACKN) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; }
|
||||
|
||||
virtual bool serialise(void *data,uint32_t& size) const ;
|
||||
virtual uint32_t serial_size() const ;
|
||||
|
||||
virtual void clear() {}
|
||||
virtual std::ostream& print(std::ostream &out, uint16_t indent = 0) ;
|
||||
|
||||
GRouterMsgPropagationId propagation_id ;
|
||||
};
|
||||
|
||||
// Items for saving the routing matrix information.
|
||||
|
||||
@ -223,26 +242,27 @@ class RsGRouterRoutingInfoItem: public RsGRouterItem, public GRouterRoutingInfo,
|
||||
|
||||
class RsGRouterSerialiser: public RsSerialType
|
||||
{
|
||||
public:
|
||||
RsGRouterSerialiser() : RsSerialType(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_GROUTER) {}
|
||||
public:
|
||||
RsGRouterSerialiser() : RsSerialType(RS_PKT_VERSION_SERVICE, RS_SERVICE_TYPE_GROUTER) {}
|
||||
|
||||
virtual uint32_t size (RsItem *item)
|
||||
{
|
||||
return dynamic_cast<RsGRouterItem *>(item)->serial_size() ;
|
||||
}
|
||||
virtual bool serialise(RsItem *item, void *data, uint32_t *size)
|
||||
{
|
||||
return dynamic_cast<RsGRouterItem *>(item)->serialise(data,*size) ;
|
||||
}
|
||||
virtual RsItem *deserialise (void *data, uint32_t *size) ;
|
||||
virtual uint32_t size (RsItem *item)
|
||||
{
|
||||
return dynamic_cast<RsGRouterItem *>(item)->serial_size() ;
|
||||
}
|
||||
virtual bool serialise(RsItem *item, void *data, uint32_t *size)
|
||||
{
|
||||
return dynamic_cast<RsGRouterItem *>(item)->serialise(data,*size) ;
|
||||
}
|
||||
virtual RsItem *deserialise (void *data, uint32_t *size) ;
|
||||
|
||||
private:
|
||||
RsGRouterGenericDataItem *deserialise_RsGRouterGenericDataItem(void *data,uint32_t size) const ;
|
||||
RsGRouterTransactionChunkItem *deserialise_RsGRouterTransactionChunkItem(void *data,uint32_t size) const ;
|
||||
RsGRouterReceiptItem *deserialise_RsGRouterReceiptItem(void *data,uint32_t size) const ;
|
||||
RsGRouterMatrixCluesItem *deserialise_RsGRouterMatrixCluesItem(void *data,uint32_t size) const ;
|
||||
RsGRouterMatrixFriendListItem *deserialise_RsGRouterMatrixFriendListItem(void *data,uint32_t size) const ;
|
||||
RsGRouterRoutingInfoItem *deserialise_RsGRouterRoutingInfoItem(void *data,uint32_t size) const ;
|
||||
private:
|
||||
RsGRouterGenericDataItem *deserialise_RsGRouterGenericDataItem(void *data,uint32_t size) const ;
|
||||
RsGRouterTransactionChunkItem *deserialise_RsGRouterTransactionChunkItem(void *data,uint32_t size) const ;
|
||||
RsGRouterTransactionAcknItem *deserialise_RsGRouterTransactionAcknItem(void *data,uint32_t size) const ;
|
||||
RsGRouterSignedReceiptItem *deserialise_RsGRouterSignedReceiptItem(void *data,uint32_t size) const ;
|
||||
RsGRouterMatrixCluesItem *deserialise_RsGRouterMatrixCluesItem(void *data,uint32_t size) const ;
|
||||
RsGRouterMatrixFriendListItem *deserialise_RsGRouterMatrixFriendListItem(void *data,uint32_t size) const ;
|
||||
RsGRouterRoutingInfoItem *deserialise_RsGRouterRoutingInfoItem(void *data,uint32_t size) const ;
|
||||
};
|
||||
|
||||
|
||||
|
@ -33,6 +33,7 @@
|
||||
#include "retroshare/rsgrouter.h"
|
||||
|
||||
class RsGRouterGenericDataItem ;
|
||||
class RsGRouterSignedReceiptItem ;
|
||||
|
||||
static const uint16_t GROUTER_CLIENT_ID_MESSAGES = 0x1001 ;
|
||||
|
||||
@ -55,15 +56,15 @@ static const uint32_t GROUTER_ITEM_MAX_TRAVEL_DISTANCE = 6*256 ; // 6 dis
|
||||
static const uint32_t GROUTER_ITEM_MAX_CACHE_KEEP_TIME = 86400 ; // Cached items are kept for 24 hours at most.
|
||||
static const uint32_t GROUTER_ITEM_MAX_CACHE_KEEP_TIME_DEAD= 3600 ; // DEAD Items are kept in cache for only 1 hour to favor re-exploring dead routes.
|
||||
|
||||
static const uint32_t RS_GROUTER_DATA_STATUS_UNKNOWN = 0x0000 ; // unknown. Unused.
|
||||
static const uint32_t RS_GROUTER_DATA_STATUS_PENDING = 0x0001 ; // item is pending. Should be sent asap.
|
||||
static const uint32_t RS_GROUTER_DATA_STATUS_SENT = 0x0002 ; // item is sent. Waiting for answer
|
||||
static const uint32_t RS_GROUTER_DATA_STATUS_ACKOWLG = 0x0003 ; // item is at destination. The cache only holds it to avoid duplication.
|
||||
static const uint32_t RS_GROUTER_DATA_STATUS_UNKNOWN = 0x0000 ; // unknown. Unused.
|
||||
static const uint32_t RS_GROUTER_DATA_STATUS_PENDING = 0x0001 ; // item is pending. Should be sent asap.
|
||||
static const uint32_t RS_GROUTER_DATA_STATUS_SENT = 0x0002 ; // item is sent. Waiting for answer
|
||||
static const uint32_t RS_GROUTER_DATA_STATUS_RECEIPT_OK = 0x0003 ; // item is at destination.
|
||||
|
||||
static const uint32_t RS_GROUTER_TUNNEL_STATUS_UNMANAGED = 0x0000 ;// unknown destination key
|
||||
static const uint32_t RS_GROUTER_TUNNEL_STATUS_PENDING = 0x0001 ;// unknown destination key
|
||||
static const uint32_t RS_GROUTER_TUNNEL_STATUS_READY = 0x0002 ;// unknown destination key
|
||||
static const uint32_t RS_GROUTER_TUNNEL_STATUS_CAN_SEND = 0x0003 ;// unknown destination key
|
||||
static const uint32_t RS_GROUTER_TUNNEL_STATUS_UNMANAGED = 0x0000 ; // no tunnel requested atm
|
||||
static const uint32_t RS_GROUTER_TUNNEL_STATUS_PENDING = 0x0001 ; // tunnel requested to turtle
|
||||
static const uint32_t RS_GROUTER_TUNNEL_STATUS_READY = 0x0002 ; // tunnel is ready but we're still waiting for various confirmations
|
||||
static const uint32_t RS_GROUTER_TUNNEL_STATUS_CAN_SEND = 0x0003 ; // tunnel is ready and data can be sent
|
||||
|
||||
class FriendTrialRecord
|
||||
{
|
||||
@ -79,18 +80,19 @@ class FriendTrialRecord
|
||||
|
||||
class GRouterRoutingInfo
|
||||
{
|
||||
public:
|
||||
uint32_t data_status ; // pending, waiting, etc.
|
||||
uint32_t tunnel_status ; // status of tunnel handling.
|
||||
time_t received_time_TS ; // time at which the item was originally received
|
||||
time_t last_sent_TS ; // last time the item was sent to friends
|
||||
public:
|
||||
uint32_t data_status ; // pending, waiting, etc.
|
||||
uint32_t tunnel_status ; // status of tunnel handling.
|
||||
time_t received_time_TS ; // time at which the item was originally received
|
||||
time_t last_sent_TS ; // last time the item was sent to friends
|
||||
time_t last_tunnel_request_TS ; // last time tunnels have been asked for this item.
|
||||
uint32_t sending_attempts ; // number of times tunnels have been asked for this peer without success
|
||||
|
||||
RsGxsId destination_key ; // ultimate destination for this key
|
||||
GRouterServiceId client_id ; // service ID of the client. Only valid when origin==OwnId
|
||||
TurtleFileHash tunnel_hash ; // tunnel hash to be used for this item
|
||||
RsGxsId destination_key ; // ultimate destination for this key
|
||||
GRouterServiceId client_id ; // service ID of the client. Only valid when origin==OwnId
|
||||
TurtleFileHash tunnel_hash ; // tunnel hash to be used for this item
|
||||
|
||||
RsGRouterGenericDataItem *data_item ;
|
||||
RsGRouterSignedReceiptItem *receipt_item ;
|
||||
};
|
||||
|
||||
|
@ -202,7 +202,9 @@
|
||||
#define GROUTER_DEBUG
|
||||
#define NOT_IMPLEMENTED std::cerr << __PRETTY_FUNCTION__ << ": not implemented!" << std::endl;
|
||||
|
||||
static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response.
|
||||
static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response.
|
||||
static const uint32_t MAX_DELAY_BETWEEN_TWO_SEND = 120 ; // wait for 120 seconds before re-sending.
|
||||
static const uint32_t TUNNEL_OK_WAIT_TIME = 10 ; // wait for 10 seconds after last tunnel ok, so that we have a complete set of tunnels.
|
||||
|
||||
const std::string p3GRouter::SERVICE_INFO_APP_NAME = "Global Router" ;
|
||||
|
||||
@ -225,12 +227,12 @@ p3GRouter::p3GRouter(p3ServiceControl *sc, p3IdService *is)
|
||||
int p3GRouter::tick()
|
||||
{
|
||||
time_t now = time(NULL) ;
|
||||
routePendingObjects() ;
|
||||
|
||||
if(now > _last_autowash_time + RS_GROUTER_AUTOWASH_PERIOD)
|
||||
{
|
||||
// route pending objects
|
||||
//
|
||||
routePendingObjects() ;
|
||||
|
||||
_last_autowash_time = now ;
|
||||
autoWash() ;
|
||||
@ -240,10 +242,6 @@ int p3GRouter::tick()
|
||||
//
|
||||
handleTunnels() ;
|
||||
|
||||
// Handle incoming items. Data comes out of tunnels.
|
||||
//
|
||||
handleIncoming() ;
|
||||
|
||||
// Update routing matrix
|
||||
//
|
||||
if(now > _last_matrix_update_time + RS_GROUTER_MATRIX_UPDATE_PERIOD)
|
||||
@ -341,16 +339,16 @@ void p3GRouter::autoWash()
|
||||
|
||||
bool p3GRouter::registerKey(const RsGxsId& authentication_key,const GRouterServiceId& client_id,const std::string& description)
|
||||
{
|
||||
RsStackMutex mtx(grMtx) ;
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
|
||||
if(_registered_services.find(client_id) == _registered_services.end())
|
||||
{
|
||||
if(_registered_services.find(client_id) == _registered_services.end())
|
||||
{
|
||||
std::cerr << __PRETTY_FUNCTION__ << ": unable to register key " << authentication_key << " for client id " << client_id << ": client id is not known." << std::endl;
|
||||
return false ;
|
||||
}
|
||||
return false ;
|
||||
}
|
||||
|
||||
GRouterPublishedKeyInfo info ;
|
||||
info.service_id = client_id ;
|
||||
GRouterPublishedKeyInfo info ;
|
||||
info.service_id = client_id ;
|
||||
info.authentication_key = authentication_key ;
|
||||
info.description_string = description.substr(0,20);
|
||||
|
||||
@ -358,17 +356,17 @@ bool p3GRouter::registerKey(const RsGxsId& authentication_key,const GRouterServi
|
||||
|
||||
_owned_key_ids[hash] = info ;
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << "Registered the following key: " << std::endl;
|
||||
grouter_debug() << "Registered the following key: " << std::endl;
|
||||
grouter_debug() << " Auth GXS Id : " << authentication_key << std::endl;
|
||||
grouter_debug() << " Client id : " << std::hex << client_id << std::dec << std::endl;
|
||||
grouter_debug() << " Description : " << info.description_string << std::endl;
|
||||
grouter_debug() << " Client id : " << std::hex << client_id << std::dec << std::endl;
|
||||
grouter_debug() << " Description : " << info.description_string << std::endl;
|
||||
#endif
|
||||
|
||||
return true ;
|
||||
return true ;
|
||||
}
|
||||
bool p3GRouter::unregisterKey(const RsGxsId& key_id,const GRouterServiceId& sid)
|
||||
{
|
||||
RsStackMutex mtx(grMtx) ;
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
|
||||
Sha1CheckSum hash = makeTunnelHash(key_id,sid) ;
|
||||
|
||||
@ -403,12 +401,13 @@ bool p3GRouter::handleTunnelRequest(const RsFileHash& hash,const RsPeerId& /*pee
|
||||
// - we know the destination and have a route (according to matrix) => accept with high probability
|
||||
// - we don't know the destination => accept with very low probability
|
||||
|
||||
std::cerr << "p3GRouter::handleTunnelRequest(). Got req for hash " << hash << ", responding OK" << std::endl;
|
||||
|
||||
if(_owned_key_ids.find(hash) == _owned_key_ids.end())
|
||||
return false ;
|
||||
|
||||
std::cerr << "p3GRouter::handleTunnelRequest(). Got req for hash " << hash << ", responding OK" << std::endl;
|
||||
|
||||
return false ;
|
||||
std::cerr << " responding ok." << std::endl;
|
||||
return true ;
|
||||
}
|
||||
void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction)
|
||||
{
|
||||
@ -432,13 +431,12 @@ void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileH
|
||||
std::cerr << " data size : " << item->data_size << std::endl;
|
||||
std::cerr << " data bytes : " << RsDirUtil::sha1sum((unsigned char*)item->data_bytes,item->data_size) << std::endl;
|
||||
|
||||
RsGRouterGenericDataItem *generic_item = NULL ;
|
||||
RsGRouterAbstractMsgItem *generic_item = NULL ;
|
||||
|
||||
// Items come out of the pipe in order. We need to recover all chunks before we de-serialise the content and have it handled by handleIncoming()
|
||||
|
||||
{
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
|
||||
// Items come out of the pipe in order. We need to recover all chunks before we de-serialise the content and have it handled by handleIncoming()
|
||||
|
||||
std::map<TurtleFileHash,GRouterTunnelInfo>::iterator it = _virtual_peers.find(hash) ;
|
||||
|
||||
if(it == _virtual_peers.end())
|
||||
@ -447,20 +445,65 @@ void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileH
|
||||
return ;
|
||||
}
|
||||
|
||||
RsItem *itm = RsGRouterSerialiser().deserialise(item->data_bytes,&item->data_size) ;
|
||||
RsGRouterTransactionChunkItem *chunk_item = dynamic_cast<RsGRouterTransactionChunkItem*>(itm) ;
|
||||
RsItem *itm = RsGRouterSerialiser().deserialise(item->data_bytes,&item->data_size) ;
|
||||
|
||||
if(chunk_item == NULL)
|
||||
{
|
||||
std::cerr << " ERROR: cannot deserialise turtle item into a GRouterTransactionChunk item." << std::endl;
|
||||
if(itm)
|
||||
delete itm ;
|
||||
return ;
|
||||
// At this point we can have either a transaction chunk, or a transaction ACK.
|
||||
// We handle them both here
|
||||
|
||||
RsGRouterTransactionChunkItem *chunk_item = dynamic_cast<RsGRouterTransactionChunkItem*>(itm) ;
|
||||
RsGRouterTransactionAcknItem *trans_ack_item = NULL;
|
||||
|
||||
if(chunk_item != NULL)
|
||||
{
|
||||
std::cerr << " item is a transaction item." << std::endl;
|
||||
generic_item = it->second.addDataChunk(virtual_peer_id,chunk_item) ; // addDataChunk takes ownership over chunk_item
|
||||
}
|
||||
else if(NULL != (trans_ack_item = dynamic_cast<RsGRouterTransactionAcknItem*>(itm)))
|
||||
{
|
||||
std::cerr << " item is a transaction ACK." << std::endl;
|
||||
|
||||
std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.find(trans_ack_item->propagation_id) ;
|
||||
|
||||
if(it != _pending_messages.end())
|
||||
{
|
||||
it->second.data_status = RS_GROUTER_DATA_STATUS_SENT;
|
||||
std::cerr << " setting new status as sent/awaiting receipt." << std::endl;
|
||||
}
|
||||
else
|
||||
std::cerr << " ERROR: no routing ID corresponds to this ACK item. Inconsistency!" << std::endl;
|
||||
}
|
||||
|
||||
else
|
||||
{
|
||||
std::cerr << " ERROR: cannot deserialise turtle item." << std::endl;
|
||||
if(itm)
|
||||
delete itm ;
|
||||
}
|
||||
}
|
||||
generic_item = it->second.addDataChunk(virtual_peer_id,chunk_item) ;
|
||||
// send to client off-mutex
|
||||
|
||||
if(generic_item != NULL)
|
||||
_incoming_items.push_back(generic_item) ;
|
||||
{
|
||||
std::cerr << " transaction is finished. Passing newly created item to client." << std::endl;
|
||||
std::cerr << " sending a ACK item" << std::endl;
|
||||
|
||||
RsGRouterTransactionAcknItem ackn_item ;
|
||||
ackn_item.propagation_id = generic_item->routing_id ;
|
||||
|
||||
RsTurtleGenericDataItem *turtle_data_item = new RsTurtleGenericDataItem ;
|
||||
|
||||
turtle_data_item->data_size = ackn_item.serial_size() ;
|
||||
turtle_data_item->data_bytes = (uint8_t*)malloc(turtle_data_item->data_size) ;
|
||||
|
||||
if(! ackn_item.serialise(turtle_data_item->data_bytes,turtle_data_item->data_size))
|
||||
{
|
||||
std::cerr << " ERROR: Cannot serialise ACKN item." << std::endl;
|
||||
return ;
|
||||
}
|
||||
|
||||
mTurtle->sendTurtleData(virtual_peer_id,turtle_data_item) ;
|
||||
|
||||
handleIncoming(hash,generic_item) ;
|
||||
}
|
||||
}
|
||||
|
||||
@ -497,10 +540,9 @@ void GRouterTunnelInfo::addVirtualPeer(const TurtleVirtualPeerId& vpid)
|
||||
time_t now = time(NULL) ;
|
||||
|
||||
if(first_tunnel_ok_TS == 0) first_tunnel_ok_TS = now ;
|
||||
if(last_tunnel_ok_TS < now) last_tunnel_ok_TS = now ;
|
||||
|
||||
last_tunnel_ok_TS = now ;
|
||||
}
|
||||
RsGRouterGenericDataItem *GRouterTunnelInfo::addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk)
|
||||
RsGRouterAbstractMsgItem *GRouterTunnelInfo::addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk)
|
||||
{
|
||||
// find the chunk
|
||||
std::map<TurtleVirtualPeerId,RsGRouterTransactionChunkItem*>::iterator it = virtual_peers.find(vpid) ;
|
||||
@ -540,14 +582,13 @@ RsGRouterGenericDataItem *GRouterTunnelInfo::addDataChunk(const TurtleVirtualPee
|
||||
|
||||
if(it->second->total_size == it->second->chunk_size)
|
||||
{
|
||||
RsGRouterGenericDataItem *data_item= new RsGRouterGenericDataItem ;
|
||||
data_item->data_bytes = it->second->chunk_data ;
|
||||
data_item->data_size = it->second->chunk_size ;
|
||||
RsItem *data_item = RsGRouterSerialiser().deserialise(it->second->chunk_data,&it->second->chunk_size) ;
|
||||
|
||||
it->second->chunk_data = NULL;
|
||||
delete it->second ;
|
||||
it->second= NULL ;
|
||||
|
||||
return data_item ;
|
||||
return dynamic_cast<RsGRouterAbstractMsgItem*>(data_item) ;
|
||||
}
|
||||
else
|
||||
return NULL ;
|
||||
@ -555,24 +596,44 @@ RsGRouterGenericDataItem *GRouterTunnelInfo::addDataChunk(const TurtleVirtualPee
|
||||
|
||||
void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction dir)
|
||||
{
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
|
||||
// Server side tunnels. This is incoming data. Nothing to do.
|
||||
|
||||
std::cerr << "p3GRouter::addVirtualPeer(). Received vpid " << virtual_peer_id << " for hash " << hash << ", direction=" << dir << std::endl;
|
||||
std::cerr << " direction = " << dir << std::endl;
|
||||
|
||||
// client side. We set the tunnel flags to READY.
|
||||
|
||||
if(dir == RsTurtleGenericTunnelItem::DIRECTION_SERVER)
|
||||
{
|
||||
bool found = false ;
|
||||
|
||||
// linear search. Bad, but not really a problem. New virtual peers come quite rarely.
|
||||
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
|
||||
if(it->second.tunnel_hash == hash)
|
||||
{
|
||||
std::cerr << " setting tunnel state to READY." << std::endl;
|
||||
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_READY ;
|
||||
found = true ;
|
||||
break ;
|
||||
}
|
||||
|
||||
if(!found)
|
||||
{
|
||||
std::cerr << " ERROR: cannot find corresponding pending message." << std::endl;
|
||||
return ;
|
||||
}
|
||||
}
|
||||
if(dir == RsTurtleGenericTunnelItem::DIRECTION_CLIENT)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
std::cerr << " adding VPID." << std::endl;
|
||||
|
||||
_virtual_peers[hash].addVirtualPeer(virtual_peer_id) ;
|
||||
|
||||
if(dir == RsTurtleGenericTunnelItem::DIRECTION_SERVER)
|
||||
{
|
||||
}
|
||||
|
||||
// Client side tunnels. This is outgoing data. Nothing to do.
|
||||
|
||||
if(dir == RsTurtleGenericTunnelItem::DIRECTION_CLIENT)
|
||||
{
|
||||
}
|
||||
}
|
||||
void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id)
|
||||
{
|
||||
@ -591,10 +652,17 @@ void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtual
|
||||
return ;
|
||||
}
|
||||
it->second.removeVirtualPeer(virtual_peer_id) ;
|
||||
|
||||
if(it->second.virtual_peers.empty())
|
||||
{
|
||||
std::cerr << " last virtual peer removed. Also deleting hash entry." << std::endl;
|
||||
_virtual_peers.erase(it) ;
|
||||
}
|
||||
}
|
||||
void p3GRouter::connectToTurtleRouter(p3turtle *pt)
|
||||
{
|
||||
mTurtle = pt ;
|
||||
pt->registerTunnelService(this) ;
|
||||
}
|
||||
|
||||
//===========================================================================================================================//
|
||||
@ -627,7 +695,6 @@ void p3GRouter::handleTunnels()
|
||||
|
||||
// possible pending message status:
|
||||
// - RS_GROUTER_PENDING_MSG_STATUS_TUNNEL_READY : tunnel is ready. Waiting a few seconds to be used (this is to allow multiple tunnels to come).
|
||||
// - RS_GROUTER_PENDING_MSG_STATUS_TUNNEL_SEND_OK : tunnel is ready to be used
|
||||
// - RS_GROUTER_PENDING_MSG_STATUS_TUNNEL_PENDING : tunnel was asked.
|
||||
// - RS_GROUTER_PENDING_MSG_STATUS_TUNNEL_UNMANAGED: not tunnel managed at the moment.
|
||||
|
||||
@ -671,15 +738,18 @@ if(!_pending_messages.empty())
|
||||
|
||||
it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ;
|
||||
|
||||
grouter_debug() << " stopping tunnels for this message." ;
|
||||
grouter_debug() << " stopping tunnels for this message." << std::endl; ;
|
||||
}
|
||||
else if(it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY)
|
||||
grouter_debug() << " tunnel is available. " << std::endl;
|
||||
else
|
||||
grouter_debug() << " doing nothing." << std::endl;
|
||||
|
||||
grouter_debug() << std::endl;
|
||||
}
|
||||
}
|
||||
grouter_debug() << " sorting..." << std::endl;
|
||||
if(!priority_list.empty())
|
||||
grouter_debug() << " sorting..." << std::endl;
|
||||
|
||||
std::sort(priority_list.begin(),priority_list.end()) ;
|
||||
|
||||
@ -687,7 +757,7 @@ grouter_debug() << " sorting..." << std::endl;
|
||||
|
||||
for(uint32_t i=0;i<priority_list.size();++i)
|
||||
{
|
||||
grouter_debug() << " askign tunnel management for msg=" << priority_list[i].first << " hash=" << priority_list[i].second->tunnel_hash << std::endl;
|
||||
grouter_debug() << " askign tunnel management for msg=" << priority_list[i].first << " hash=" << priority_list[i].second->tunnel_hash << std::endl;
|
||||
|
||||
mTurtle->monitorTunnels(priority_list[i].second->tunnel_hash,this) ;
|
||||
|
||||
@ -705,32 +775,47 @@ void p3GRouter::routePendingObjects()
|
||||
|
||||
time_t now = time(NULL) ;
|
||||
|
||||
std::cerr << "p3GRouter::routePendingObjects()" << std::endl;
|
||||
|
||||
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();++it)
|
||||
if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING && it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY)
|
||||
if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING && it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY
|
||||
&& now > it->second.last_sent_TS + MAX_DELAY_BETWEEN_TWO_SEND)
|
||||
{
|
||||
std::cerr << " routing id: " << std::hex << it->first << std::dec ;
|
||||
|
||||
const TurtleFileHash& hash(it->second.tunnel_hash) ;
|
||||
std::map<TurtleFileHash,GRouterTunnelInfo>::const_iterator vpit ;
|
||||
|
||||
if( (vpit = _virtual_peers.find(hash)) != _virtual_peers.end())
|
||||
if( (vpit = _virtual_peers.find(hash)) == _virtual_peers.end())
|
||||
{
|
||||
// for now, just take one. But in the future, we will need some policy to temporarily store objects at proxy peers, etc.
|
||||
|
||||
std::cerr << " " << vpit->second.virtual_peers.size() << " virtual peers available. " << std::endl;
|
||||
|
||||
if(vpit->second.virtual_peers.empty())
|
||||
{
|
||||
std::cerr << " no peers available. Cannot send!!" << std::endl;
|
||||
continue ;
|
||||
}
|
||||
TurtleVirtualPeerId vpid = (vpit->second.virtual_peers.begin())->first ;
|
||||
|
||||
std::cerr << " sending to " << vpid << std::endl;
|
||||
|
||||
sendDataInTunnel(vpid,it->second.data_item) ;
|
||||
|
||||
it->second.data_status == RS_GROUTER_DATA_STATUS_SENT ;
|
||||
it->second.last_sent_TS = now ;
|
||||
std::cerr << ". No virtual peers. Skipping now." << std::endl;
|
||||
continue ;
|
||||
}
|
||||
|
||||
if(vpit->second.last_tunnel_ok_TS + TUNNEL_OK_WAIT_TIME > now)
|
||||
{
|
||||
std::cerr << ". Still waiting delay (stabilisation)." << std::endl;
|
||||
continue ;
|
||||
}
|
||||
|
||||
// for now, just take one. But in the future, we will need some policy to temporarily store objects at proxy peers, etc.
|
||||
|
||||
std::cerr << " " << vpit->second.virtual_peers.size() << " virtual peers available. " << std::endl;
|
||||
|
||||
if(vpit->second.virtual_peers.empty())
|
||||
{
|
||||
std::cerr << " no peers available. Cannot send!!" << std::endl;
|
||||
continue ;
|
||||
}
|
||||
TurtleVirtualPeerId vpid = (vpit->second.virtual_peers.begin())->first ;
|
||||
|
||||
std::cerr << " sending to " << vpid << std::endl;
|
||||
|
||||
sendDataInTunnel(vpid,it->second.data_item) ;
|
||||
|
||||
std::cerr << " setting last sent time to now" << std::endl;
|
||||
|
||||
it->second.last_sent_TS = now ;
|
||||
}
|
||||
|
||||
// Also route back some ACKs if necessary.
|
||||
@ -742,6 +827,8 @@ void p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGeneri
|
||||
// split into chunks and send them all into the tunnel.
|
||||
|
||||
std::cerr << "p3GRouter::sendDataInTunnel()" << std::endl;
|
||||
std::cerr << "item dump before send:" << std::endl;
|
||||
item->print(std::cerr, 2) ;
|
||||
|
||||
uint32_t size = item->serial_size();
|
||||
uint8_t *data = (uint8_t*)malloc(size) ;
|
||||
@ -769,6 +856,7 @@ void p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGeneri
|
||||
RsGRouterTransactionChunkItem *chunk_item = new RsGRouterTransactionChunkItem ;
|
||||
chunk_item->propagation_id = item->routing_id ;
|
||||
chunk_item->total_size = size;
|
||||
chunk_item->chunk_start= offset;
|
||||
chunk_item->chunk_size = chunk_size ;
|
||||
chunk_item->chunk_data = (uint8_t*)malloc(chunk_size) ;
|
||||
|
||||
@ -777,6 +865,7 @@ void p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGeneri
|
||||
if(chunk_item->chunk_data == NULL)
|
||||
{
|
||||
std::cerr << " ERROR: Cannot allocate memory for size " << chunk_size << std::endl;
|
||||
return ;
|
||||
}
|
||||
memcpy(chunk_item->chunk_data,&data[offset],chunk_size) ;
|
||||
|
||||
@ -804,52 +893,144 @@ void p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGeneri
|
||||
turtle_item->data_size = turtle_data_size ;
|
||||
turtle_item->data_bytes = turtle_data ;
|
||||
|
||||
std::cerr << " sending to vpid " << vpid << std::endl;
|
||||
mTurtle->sendTurtleData(vpid,turtle_item) ;
|
||||
}
|
||||
|
||||
free(data) ;
|
||||
}
|
||||
|
||||
void p3GRouter::handleIncoming()
|
||||
void p3GRouter::handleIncoming(const TurtleFileHash& hash,RsGRouterAbstractMsgItem *item)
|
||||
{
|
||||
// Store and sort incoming packets, that come from the tunnels.
|
||||
RsGRouterGenericDataItem *generic_data_item ;
|
||||
RsGRouterSignedReceiptItem *receipt_item ;
|
||||
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << "GRouter::handleIncoming()" << std::endl;
|
||||
#endif
|
||||
|
||||
while(!_incoming_items.empty())
|
||||
{
|
||||
RsGRouterItem *item = _incoming_items.front() ;
|
||||
_incoming_items.pop_front() ;
|
||||
|
||||
// we might receive these kinds of items: data, recept, ACK.
|
||||
|
||||
NOT_IMPLEMENTED;
|
||||
if(NULL != (generic_data_item = dynamic_cast<RsGRouterGenericDataItem*>(item)))
|
||||
handleIncomingDataItem(hash,generic_data_item) ;
|
||||
else if(NULL != (receipt_item = dynamic_cast<RsGRouterSignedReceiptItem*>(item)))
|
||||
handleIncomingReceiptItem(hash,receipt_item) ;
|
||||
else
|
||||
std::cerr << "Item has unknown type (not data nor signed receipt). Dropping!" << std::endl;
|
||||
|
||||
delete item ;
|
||||
}
|
||||
|
||||
void p3GRouter::handleIncomingReceiptItem(const TurtleFileHash& hash,RsGRouterSignedReceiptItem *receipt_item)
|
||||
{
|
||||
std::cerr << "Handling incoming signed receipt item. Passing to client." << std::endl;
|
||||
std::cerr << "Item content:" << std::endl;
|
||||
receipt_item->print(std::cerr,2) ;
|
||||
|
||||
GRouterClientService *client = NULL ;
|
||||
GRouterServiceId service_id = 0;
|
||||
|
||||
if(!getClientAndServiceId(hash,receipt_item->destination_key,client,service_id))
|
||||
{
|
||||
std::cerr << " ERROR: cannot find client service for this hash/key combination." << std::endl;
|
||||
return ;
|
||||
}
|
||||
|
||||
// Because we don't do proxy transmission yet, the client needs to be notified. Otherwise, we will need to
|
||||
// first check if we're a proxy or not. We also remove the message from the global router sending list.
|
||||
// in the proxy case, we should only store the receipt.
|
||||
|
||||
std::cerr << " notifying client that the msg was received." << std::endl;
|
||||
|
||||
client->acknowledgeDataReceived(receipt_item->routing_id) ;
|
||||
|
||||
std::cerr << " removing messsage from cache." << std::endl;
|
||||
{
|
||||
RS_STACK_MUTEX (grMtx) ;
|
||||
|
||||
std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.find(receipt_item->routing_id) ;
|
||||
if(it != _pending_messages.end())
|
||||
{
|
||||
delete it->second.data_item ;
|
||||
//delete it->second.receipt_item ;
|
||||
|
||||
_pending_messages.erase(it) ;
|
||||
|
||||
//it->second.data_status = RS_GROUTER_DATA_STATUS_RECEIPT_OK;
|
||||
//it->second.receipt_item = signed_receipt_item ;
|
||||
}
|
||||
else
|
||||
std::cerr << " ERROR: no routing ID corresponds to this message. Inconsistency!" << std::endl;
|
||||
}
|
||||
}
|
||||
|
||||
void p3GRouter::locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id)
|
||||
void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGenericDataItem *generic_item)
|
||||
{
|
||||
RS_STACK_MUTEX (grMtx) ;
|
||||
#ifdef GROUTER_DEBUG
|
||||
grouter_debug() << " Key is owned by us. Notifying service that item was ACKed. msg_id=" << msg_id << ", service_id = " << service_id << "." << std::endl;
|
||||
#endif
|
||||
// notify the client
|
||||
//
|
||||
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(service_id) ;
|
||||
std::cerr << "Handling incoming data item. Passing to client." << std::endl;
|
||||
std::cerr << "Item content:" << std::endl;
|
||||
generic_item->print(std::cerr,2) ;
|
||||
|
||||
if(its == _registered_services.end())
|
||||
{
|
||||
std::cerr << "(EE) message " << msg_id << " is attached to service " << service_id << ", which is unknown!! That is a bug." << std::endl;
|
||||
return ;
|
||||
}
|
||||
its->second->acknowledgeDataReceived(msg_id) ;
|
||||
}
|
||||
GRouterClientService *client = NULL ;
|
||||
GRouterServiceId service_id = 0;
|
||||
|
||||
if(!getClientAndServiceId(hash,generic_item->destination_key,client,service_id))
|
||||
{
|
||||
std::cerr << " ERROR: cannot find client service for this hash/key combination." << std::endl;
|
||||
return ;
|
||||
}
|
||||
|
||||
// We don't do proxy yet, so the item is necessarily for us.
|
||||
// The item's signature must be checked, and the item needs to be decrypted.
|
||||
|
||||
if(verifySignedDataItem(generic_item)) // we should get proper flags out of this
|
||||
std::cerr << " verifying item signature: CHECKED!" ;
|
||||
else
|
||||
std::cerr << " verifying item signature: FAILED!" ;
|
||||
|
||||
if(!decryptDataItem(generic_item))
|
||||
{
|
||||
std::cerr << " decrypting item : FAILED! Item will be dropped." << std::endl;
|
||||
return ;
|
||||
}
|
||||
else
|
||||
std::cerr << " decrypting item : OK!" << std::endl;
|
||||
|
||||
// make a copy of the data, since the item will be deleted.
|
||||
|
||||
uint8_t *data_copy = (uint8_t*)malloc(generic_item->data_size) ;
|
||||
memcpy(data_copy,generic_item->data_bytes,generic_item->data_size) ;
|
||||
|
||||
client->receiveGRouterData(generic_item->destination_key,generic_item->signature.keyId,service_id,data_copy,generic_item->data_size);
|
||||
}
|
||||
|
||||
bool p3GRouter::getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id)
|
||||
{
|
||||
client = NULL ;
|
||||
service_id = 0;
|
||||
|
||||
RS_STACK_MUTEX (grMtx) ;
|
||||
std::map<TurtleFileHash,GRouterPublishedKeyInfo>::iterator it = _owned_key_ids.find(hash) ;
|
||||
|
||||
if(it == _owned_key_ids.end())
|
||||
{
|
||||
std::cerr << " ERROR: no registered GXS key for this hash! This is a consistency error." << std::endl;
|
||||
return false;
|
||||
}
|
||||
// check that we have the correct key.
|
||||
if(it->second.authentication_key != destination_key)
|
||||
{
|
||||
std::cerr << " ERROR: hash is associated to key " << it->second.authentication_key << " whether item is for key " << destination_key << ": consistency error." << std::endl;
|
||||
return false;
|
||||
}
|
||||
// now find the client given its id.
|
||||
|
||||
std::map<GRouterServiceId,GRouterClientService*>::const_iterator its = _registered_services.find(it->second.service_id) ;
|
||||
|
||||
if(its == _registered_services.end())
|
||||
{
|
||||
std::cerr << " ERROR: client id " << it->second.service_id << " not registered. Consistency error." << std::endl;
|
||||
return false;
|
||||
}
|
||||
|
||||
client = its->second ;
|
||||
service_id = it->second.service_id ;
|
||||
|
||||
return true ;
|
||||
}
|
||||
|
||||
void p3GRouter::addRoutingClue(const GRouterKeyId& id,const RsPeerId& peer_id)
|
||||
{
|
||||
@ -860,13 +1041,6 @@ void p3GRouter::addRoutingClue(const GRouterKeyId& id,const RsPeerId& peer_id)
|
||||
_routing_matrix.addRoutingClue(id,peer_id,RS_GROUTER_BASE_WEIGHT_GXS_PACKET) ;
|
||||
}
|
||||
|
||||
void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
|
||||
{
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
|
||||
NOT_IMPLEMENTED;
|
||||
}
|
||||
|
||||
bool p3GRouter::registerClientService(const GRouterServiceId& id,GRouterClientService *service)
|
||||
{
|
||||
RS_STACK_MUTEX(grMtx) ;
|
||||
@ -920,10 +1094,43 @@ return true ;
|
||||
}
|
||||
bool p3GRouter::decryptDataItem(RsGRouterGenericDataItem *item)
|
||||
{
|
||||
NOT_IMPLEMENTED ;
|
||||
return false ;
|
||||
}
|
||||
assert(item->flags & RS_GROUTER_DATA_FLAGS_ENCRYPTED) ;
|
||||
|
||||
#ifdef GROUTER_DEBUG
|
||||
std::cerr << " decrypting data for key " << item->destination_key << std::endl;
|
||||
std::cerr << " encrypted size = " << item->data_size << std::endl;
|
||||
#endif
|
||||
RsTlvSecurityKey encryption_key ;
|
||||
|
||||
// get the key, and let the cache find it.
|
||||
for(int i=0;i<4;++i)
|
||||
if(mIdService->getPrivateKey(item->destination_key,encryption_key))
|
||||
break ;
|
||||
else
|
||||
usleep(500*1000) ; // sleep half a sec.
|
||||
|
||||
if(encryption_key.keyId.isNull())
|
||||
{
|
||||
std::cerr << " (EE) Cannot get encryption key for id " << item->destination_key << std::endl;
|
||||
return false ;
|
||||
}
|
||||
|
||||
uint8_t *decrypted_data =NULL;
|
||||
int decrypted_size =0;
|
||||
|
||||
if(!GxsSecurity::decrypt(decrypted_data,decrypted_size,item->data_bytes,item->data_size,encryption_key))
|
||||
{
|
||||
std::cerr << " (EE) Decryption failed." << std::endl;
|
||||
return false ;
|
||||
}
|
||||
|
||||
delete[] item->data_bytes ;
|
||||
item->data_bytes = decrypted_data ;
|
||||
item->data_size = decrypted_size ;
|
||||
item->flags &= ~RS_GROUTER_DATA_FLAGS_ENCRYPTED ;
|
||||
|
||||
return true ;
|
||||
}
|
||||
|
||||
bool p3GRouter::signDataItem(RsGRouterGenericDataItem *item,const RsGxsId& signing_id)
|
||||
{
|
||||
@ -997,8 +1204,8 @@ bool p3GRouter::verifySignedDataItem(RsGRouterGenericDataItem *item)
|
||||
if(signature_key.keyData.bin_data == NULL)
|
||||
throw std::runtime_error("No key for checking signature from " + item->signature.keyId.toStdString());
|
||||
|
||||
std::cerr << "Validating signature for data hash: " << RsDirUtil::sha1sum(data,data_size) << " and key_id = " << item->signature.keyId << std::endl;
|
||||
std::cerr << "First bytes of encrypted data: " << RsUtil::BinToHex((const char *)data,std::min(data_size,50u)) << "..."<< std::endl;
|
||||
std::cerr << " Validating signature for data hash: " << RsDirUtil::sha1sum(data,data_size) << " and key_id = " << item->signature.keyId << std::endl;
|
||||
std::cerr << " First bytes of encrypted data: " << RsUtil::BinToHex((const char *)data,std::min(data_size,50u)) << "..."<< std::endl;
|
||||
|
||||
if(!GxsSecurity::validateSignature((char*)data,data_size,signature_key,item->signature))
|
||||
throw std::runtime_error("Signature was verified and it doesn't check! This is a security issue!") ;
|
||||
@ -1280,6 +1487,7 @@ void p3GRouter::debugDump()
|
||||
grouter_debug() << " Last sent : " << now - it->second.last_sent_TS << " secs ago.";
|
||||
grouter_debug() << " Data Status: " << statusString[it->second.data_status] << std::endl;
|
||||
grouter_debug() << " Tunl Status: " << statusString[it->second.tunnel_status] << std::endl;
|
||||
grouter_debug() << " Receipt ok : " << (it->second.receipt_item != NULL) << std::endl;
|
||||
}
|
||||
|
||||
grouter_debug() << " Tunnels: " << std::endl;
|
||||
@ -1294,8 +1502,8 @@ void p3GRouter::debugDump()
|
||||
|
||||
grouter_debug() << " Routing matrix: " << std::endl;
|
||||
|
||||
if(_debug_enabled)
|
||||
_routing_matrix.debugDump() ;
|
||||
// if(_debug_enabled)
|
||||
// _routing_matrix.debugDump() ;
|
||||
}
|
||||
|
||||
|
||||
|
@ -53,7 +53,8 @@ class p3IdService ;
|
||||
class RsGRouterItem ;
|
||||
class RsGRouterGenericDataItem ;
|
||||
class RsGRouterTransactionChunkItem ;
|
||||
class RsGRouterReceiptItem ;
|
||||
class RsGRouterSignedReceiptItem ;
|
||||
class RsGRouterAbstractMsgItem ;
|
||||
|
||||
// This class is responsible for accepting data chunks and merging them into a final object. When the object is
|
||||
// complete, it is de-serialised and returned as a RsGRouterGenericDataItem*.
|
||||
@ -68,7 +69,7 @@ public:
|
||||
void addVirtualPeer(const TurtleVirtualPeerId& vpid) ;
|
||||
void removeVirtualPeer(const TurtleVirtualPeerId& vpid) ;
|
||||
|
||||
RsGRouterGenericDataItem *addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk_item) ;
|
||||
RsGRouterAbstractMsgItem *addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk_item) ;
|
||||
|
||||
std::map<TurtleVirtualPeerId, RsGRouterTransactionChunkItem*> virtual_peers ;
|
||||
|
||||
@ -207,18 +208,20 @@ private:
|
||||
|
||||
void autoWash() ;
|
||||
void routePendingObjects() ;
|
||||
void handleIncoming() ;
|
||||
void handleTunnels() ;
|
||||
|
||||
void debugDump() ;
|
||||
void handleIncoming(const TurtleFileHash &hash, RsGRouterAbstractMsgItem *) ;
|
||||
void handleIncomingReceiptItem(const TurtleFileHash &hash, RsGRouterSignedReceiptItem *receipt_item) ;
|
||||
void handleIncomingDataItem(const TurtleFileHash &hash, RsGRouterGenericDataItem *data_item) ;
|
||||
|
||||
bool getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id);
|
||||
|
||||
|
||||
// utility functions
|
||||
//
|
||||
static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ;
|
||||
static time_t computeNextTimeDelay(time_t duration) ;
|
||||
|
||||
void locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) ;
|
||||
|
||||
uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ;
|
||||
|
||||
// signs an item with the given key.
|
||||
@ -249,7 +252,7 @@ private:
|
||||
|
||||
// Prints the internal state of the router, for debug purpose.
|
||||
//
|
||||
void debug_dump() ;
|
||||
void debugDump() ;
|
||||
|
||||
//===================================================//
|
||||
// Internal queues/variables //
|
||||
@ -290,8 +293,8 @@ private:
|
||||
|
||||
// Data handling methods
|
||||
//
|
||||
void handleRecvDataItem(RsGRouterGenericDataItem *item);
|
||||
void handleRecvReceiptItem(RsGRouterReceiptItem *item);
|
||||
//void handleRecvDataItem(RsGRouterGenericDataItem *item);
|
||||
//void handleRecvReceiptItem(RsGRouterReceiptItem *item);
|
||||
|
||||
// Pointers to other RS objects
|
||||
//
|
||||
|
@ -2198,14 +2198,14 @@ void p3MsgService::receiveGRouterData(const RsGxsId& destination_key, const RsGx
|
||||
RsMsgItem *msg_item = dynamic_cast<RsMsgItem*>(item) ;
|
||||
|
||||
if(msg_item != NULL)
|
||||
{
|
||||
std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl;
|
||||
{
|
||||
std::cerr << " Encrypted item correctly deserialised. Passing on to incoming list." << std::endl;
|
||||
|
||||
msg_item->PeerId(RsPeerId(signing_key)) ; // hack to pass on GXS id.
|
||||
handleIncomingItem(msg_item) ;
|
||||
}
|
||||
else
|
||||
std::cerr << " Item could not be deserialised. Format error??" << std::endl;
|
||||
}
|
||||
else
|
||||
std::cerr << " Item could not be deserialised. Format error??" << std::endl;
|
||||
}
|
||||
|
||||
void p3MsgService::sendDistantMsgItem(RsMsgItem *msgitem)
|
||||
|
Loading…
Reference in New Issue
Block a user