mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-12-28 17:09:34 -05:00
implemented data transmission code (not fully working yet)
git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-NewGRouterModel@7848 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
ac4f51623b
commit
ce7710d183
@ -57,7 +57,42 @@ RsItem *RsGRouterSerialiser::deserialise(void *data, uint32_t *pktsize)
|
|||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
RsGRouterTransactionChunkItem *RsGRouterSerialiser::deserialise_RsGRouterTransactionChunkItem(void *data, uint32_t tlvsize) const
|
||||||
|
{
|
||||||
|
uint32_t offset = 8; // skip the header
|
||||||
|
uint32_t rssize = getRsItemSize(data);
|
||||||
|
bool ok = true ;
|
||||||
|
|
||||||
|
RsGRouterTransactionChunkItem *item = new RsGRouterTransactionChunkItem() ;
|
||||||
|
|
||||||
|
/* add mandatory parts first */
|
||||||
|
ok &= getRawUInt64(data, tlvsize, &offset, &item->propagation_id);
|
||||||
|
ok &= getRawUInt32(data, tlvsize, &offset, &item->chunk_start);
|
||||||
|
ok &= getRawUInt32(data, tlvsize, &offset, &item->chunk_size);
|
||||||
|
ok &= getRawUInt32(data, tlvsize, &offset, &item->total_size);
|
||||||
|
|
||||||
|
if( NULL == (item->chunk_data = (uint8_t*)malloc(item->chunk_size)))
|
||||||
|
{
|
||||||
|
std::cerr << __PRETTY_FUNCTION__ << ": Cannot allocate memory for chunk " << item->chunk_size << std::endl;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
if(item->chunk_size + offset >= rssize)
|
||||||
|
{
|
||||||
|
std::cerr << __PRETTY_FUNCTION__ << ": Cannot read beyond item size. Serialisation error!" << std::endl;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(item->chunk_data,&((uint8_t*)data)[offset],item->chunk_size) ;
|
||||||
|
offset += item->chunk_size ;
|
||||||
|
|
||||||
|
if (offset != rssize || !ok)
|
||||||
|
{
|
||||||
|
std::cerr << __PRETTY_FUNCTION__ << ": error while deserialising! Item will be dropped." << std::endl;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
|
||||||
|
return item;
|
||||||
|
}
|
||||||
RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataItem(void *data, uint32_t pktsize) const
|
RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataItem(void *data, uint32_t pktsize) const
|
||||||
{
|
{
|
||||||
uint32_t offset = 8; // skip the header
|
uint32_t offset = 8; // skip the header
|
||||||
@ -76,6 +111,12 @@ RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataI
|
|||||||
return NULL ;
|
return NULL ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(item->data_size + offset >= rssize)
|
||||||
|
{
|
||||||
|
std::cerr << __PRETTY_FUNCTION__ << ": Cannot read beyond item size. Serialisation error!" << std::endl;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
|
||||||
memcpy(item->data_bytes,&((uint8_t*)data)[offset],item->data_size) ;
|
memcpy(item->data_bytes,&((uint8_t*)data)[offset],item->data_size) ;
|
||||||
offset += item->data_size ;
|
offset += item->data_size ;
|
||||||
|
|
||||||
@ -257,6 +298,42 @@ s += destination_key.serial_size() ; // destination_key
|
|||||||
|
|
||||||
return s ;
|
return s ;
|
||||||
}
|
}
|
||||||
|
uint32_t RsGRouterTransactionChunkItem::serial_size() const
|
||||||
|
{
|
||||||
|
uint32_t s = 8 ; // header
|
||||||
|
s += sizeof(GRouterMsgPropagationId) ; // routing id
|
||||||
|
s += 4 ; // chunk_start
|
||||||
|
s += 4 ; // chunk_size
|
||||||
|
s += 4 ; // total_size
|
||||||
|
s += chunk_size ; // data
|
||||||
|
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
bool RsGRouterTransactionChunkItem::serialise(void *data,uint32_t& size) const
|
||||||
|
{
|
||||||
|
uint32_t tlvsize,offset=0;
|
||||||
|
bool ok = true;
|
||||||
|
|
||||||
|
if(!serialise_header(data,size,tlvsize,offset))
|
||||||
|
return false ;
|
||||||
|
|
||||||
|
/* add mandatory parts first */
|
||||||
|
ok &= setRawUInt64(data, tlvsize, &offset, propagation_id);
|
||||||
|
ok &= setRawUInt32(data, tlvsize, &offset, chunk_start);
|
||||||
|
ok &= setRawUInt32(data, tlvsize, &offset, chunk_size);
|
||||||
|
ok &= setRawUInt32(data, tlvsize, &offset, total_size);
|
||||||
|
|
||||||
|
memcpy(&((uint8_t*)data)[offset],chunk_data,chunk_size) ;
|
||||||
|
offset += chunk_size ;
|
||||||
|
|
||||||
|
if (offset != tlvsize)
|
||||||
|
{
|
||||||
|
ok = false;
|
||||||
|
std::cerr << "RsGRouterGenericDataItem::serialisedata() size error! " << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
return ok;
|
||||||
|
}
|
||||||
bool RsGRouterGenericDataItem::serialise(void *data,uint32_t& size) const
|
bool RsGRouterGenericDataItem::serialise(void *data,uint32_t& size) const
|
||||||
{
|
{
|
||||||
uint32_t tlvsize,offset=0;
|
uint32_t tlvsize,offset=0;
|
||||||
@ -514,6 +591,15 @@ std::ostream& RsGRouterMatrixCluesItem::print(std::ostream& o, uint16_t)
|
|||||||
|
|
||||||
return o ;
|
return o ;
|
||||||
}
|
}
|
||||||
|
std::ostream& RsGRouterTransactionChunkItem::print(std::ostream& o, uint16_t)
|
||||||
|
{
|
||||||
|
o << "RsGRouterTransactionChunkItem:" << std::endl ;
|
||||||
|
o << " total_size: " << total_size << std::endl;
|
||||||
|
o << " chunk_size: " << chunk_size << std::endl;
|
||||||
|
o << " chunk_start: " << chunk_start << std::endl;
|
||||||
|
|
||||||
|
return o ;
|
||||||
|
}
|
||||||
std::ostream& RsGRouterMatrixFriendListItem::print(std::ostream& o, uint16_t)
|
std::ostream& RsGRouterMatrixFriendListItem::print(std::ostream& o, uint16_t)
|
||||||
{
|
{
|
||||||
o << "RsGRouterMatrixCluesItem:" << std::endl ;
|
o << "RsGRouterMatrixCluesItem:" << std::endl ;
|
||||||
|
@ -39,6 +39,8 @@ const uint8_t RS_PKT_SUBTYPE_GROUTER_RECEIPT = 0x04 ; // acknowledgement
|
|||||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA_deprecated = 0x05 ; // used to send data to a destination
|
const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA_deprecated = 0x05 ; // used to send data to a destination
|
||||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA = 0x06 ; // used to send data to a destination (Signed by source)
|
const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA = 0x06 ; // used to send data to a destination (Signed by source)
|
||||||
|
|
||||||
|
const uint8_t RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK = 0x10 ; // chunk of data. Used internally.
|
||||||
|
|
||||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // item to save matrix clues
|
const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // item to save matrix clues
|
||||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists
|
const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists
|
||||||
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated = 0x87 ; // deprecated. Don't use.
|
const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated = 0x87 ; // deprecated. Don't use.
|
||||||
@ -138,6 +140,26 @@ class RsGRouterReceiptItem: public RsGRouterItem
|
|||||||
RsTlvKeySignature signature ; // signs mid+destination_key+state
|
RsTlvKeySignature signature ; // signs mid+destination_key+state
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Low-level data items
|
||||||
|
|
||||||
|
class RsGRouterTransactionChunkItem: public RsGRouterItem
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
RsGRouterTransactionChunkItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; }
|
||||||
|
|
||||||
|
virtual bool serialise(void *data,uint32_t& size) const ;
|
||||||
|
virtual uint32_t serial_size() const ;
|
||||||
|
|
||||||
|
virtual void clear() {}
|
||||||
|
virtual std::ostream& print(std::ostream &out, uint16_t indent = 0) ;
|
||||||
|
|
||||||
|
GRouterMsgPropagationId propagation_id ;
|
||||||
|
uint32_t chunk_start ;
|
||||||
|
uint32_t chunk_size ;
|
||||||
|
uint32_t total_size ;
|
||||||
|
uint8_t *chunk_data ;
|
||||||
|
};
|
||||||
|
|
||||||
// Items for saving the routing matrix information.
|
// Items for saving the routing matrix information.
|
||||||
|
|
||||||
class RsGRouterMatrixCluesItem: public RsGRouterItem
|
class RsGRouterMatrixCluesItem: public RsGRouterItem
|
||||||
@ -216,6 +238,7 @@ class RsGRouterSerialiser: public RsSerialType
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
RsGRouterGenericDataItem *deserialise_RsGRouterGenericDataItem(void *data,uint32_t size) const ;
|
RsGRouterGenericDataItem *deserialise_RsGRouterGenericDataItem(void *data,uint32_t size) const ;
|
||||||
|
RsGRouterTransactionChunkItem *deserialise_RsGRouterTransactionChunkItem(void *data,uint32_t size) const ;
|
||||||
RsGRouterReceiptItem *deserialise_RsGRouterReceiptItem(void *data,uint32_t size) const ;
|
RsGRouterReceiptItem *deserialise_RsGRouterReceiptItem(void *data,uint32_t size) const ;
|
||||||
RsGRouterMatrixCluesItem *deserialise_RsGRouterMatrixCluesItem(void *data,uint32_t size) const ;
|
RsGRouterMatrixCluesItem *deserialise_RsGRouterMatrixCluesItem(void *data,uint32_t size) const ;
|
||||||
RsGRouterMatrixFriendListItem *deserialise_RsGRouterMatrixFriendListItem(void *data,uint32_t size) const ;
|
RsGRouterMatrixFriendListItem *deserialise_RsGRouterMatrixFriendListItem(void *data,uint32_t size) const ;
|
||||||
|
@ -395,22 +395,23 @@ bool p3GRouter::unregisterKey(const RsGxsId& key_id,const GRouterServiceId& sid)
|
|||||||
// Turtle management //
|
// Turtle management //
|
||||||
//===========================================================================================================================//
|
//===========================================================================================================================//
|
||||||
|
|
||||||
bool p3GRouter::handleTunnelRequest(const RsFileHash& /*hash*/,const RsPeerId& /*peer_id*/)
|
bool p3GRouter::handleTunnelRequest(const RsFileHash& hash,const RsPeerId& /*peer_id*/)
|
||||||
{
|
{
|
||||||
NOT_IMPLEMENTED;
|
|
||||||
|
|
||||||
// tunnel request is answered according to the following rules:
|
// tunnel request is answered according to the following rules:
|
||||||
// - we are the destination => always accept
|
// - we are the destination => always accept
|
||||||
// - we know the destination and have RCPT items to send back => always accept
|
// - we know the destination and have RCPT items to send back => always accept
|
||||||
// - we know the destination and have a route (according to matrix) => accept with high probability
|
// - we know the destination and have a route (according to matrix) => accept with high probability
|
||||||
// - we don't know the destination => accept with very low probability
|
// - we don't know the destination => accept with very low probability
|
||||||
|
|
||||||
|
if(_owned_key_ids.find(hash) == _owned_key_ids.end())
|
||||||
|
return false ;
|
||||||
|
|
||||||
|
std::cerr << "p3GRouter::handleTunnelRequest(). Got req for hash " << hash << ", responding OK" << std::endl;
|
||||||
|
|
||||||
return false ;
|
return false ;
|
||||||
}
|
}
|
||||||
void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem */*item*/,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction)
|
void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction)
|
||||||
{
|
{
|
||||||
NOT_IMPLEMENTED;
|
|
||||||
|
|
||||||
std::cerr << "p3GRouter::receiveTurtleData() " << std::endl;
|
std::cerr << "p3GRouter::receiveTurtleData() " << std::endl;
|
||||||
std::cerr << " Received data for hash : " << hash << std::endl;
|
std::cerr << " Received data for hash : " << hash << std::endl;
|
||||||
std::cerr << " Virtual peer id : " << virtual_peer_id << std::endl;
|
std::cerr << " Virtual peer id : " << virtual_peer_id << std::endl;
|
||||||
@ -418,16 +419,148 @@ void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem */*item*/,const RsFi
|
|||||||
|
|
||||||
// turtle data is received.
|
// turtle data is received.
|
||||||
// This function
|
// This function
|
||||||
|
// - possibly packs multi-item blocks back together
|
||||||
// - converts it into a grouter generic item (by deserialising it)
|
// - converts it into a grouter generic item (by deserialising it)
|
||||||
// -
|
|
||||||
|
RsTurtleGenericDataItem *item = dynamic_cast<RsTurtleGenericDataItem*>(gitem) ;
|
||||||
|
|
||||||
|
if(item == NULL)
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: item is not a data item. That is an error." << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
std::cerr << " data size : " << item->data_size << std::endl;
|
||||||
|
std::cerr << " data bytes : " << RsDirUtil::sha1sum((unsigned char*)item->data_bytes,item->data_size) << std::endl;
|
||||||
|
|
||||||
|
RsGRouterGenericDataItem *generic_item = NULL ;
|
||||||
|
|
||||||
|
{
|
||||||
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
|
// Items come out of the pipe in order. We need to recover all chunks before we de-serialise the content and have it handled by handleIncoming()
|
||||||
|
|
||||||
|
std::map<TurtleFileHash,GRouterTunnelInfo>::iterator it = _virtual_peers.find(hash) ;
|
||||||
|
|
||||||
|
if(it == _virtual_peers.end())
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: hash is not known. Cannot receive. Data is dropped." << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
RsItem *itm = RsGRouterSerialiser().deserialise(item->data_bytes,&item->data_size) ;
|
||||||
|
RsGRouterTransactionChunkItem *chunk_item = dynamic_cast<RsGRouterTransactionChunkItem*>(itm) ;
|
||||||
|
|
||||||
|
if(chunk_item == NULL)
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: cannot deserialise turtle item into a GRouterTransactionChunk item." << std::endl;
|
||||||
|
if(itm)
|
||||||
|
delete itm ;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
generic_item = it->second.addDataChunk(virtual_peer_id,chunk_item) ;
|
||||||
|
|
||||||
|
if(generic_item != NULL)
|
||||||
|
_incoming_items.push_back(generic_item) ;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void GRouterTunnelInfo::removeVirtualPeer(const TurtleVirtualPeerId& vpid)
|
||||||
|
{
|
||||||
|
std::map<TurtleVirtualPeerId,RsGRouterTransactionChunkItem*>::iterator it = virtual_peers.find(vpid) ;
|
||||||
|
|
||||||
|
if(it == virtual_peers.end())
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: removing a virtual peer that does not exist. This is an error!" << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(it->second != NULL)
|
||||||
|
{
|
||||||
|
std::cerr << " WARNING: removing a virtual peer that still holds data. The data will be lost." << std::endl;
|
||||||
|
delete it->second ;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual_peers.erase(it) ;
|
||||||
|
}
|
||||||
|
void GRouterTunnelInfo::addVirtualPeer(const TurtleVirtualPeerId& vpid)
|
||||||
|
{
|
||||||
|
std::map<TurtleVirtualPeerId,RsGRouterTransactionChunkItem*>::iterator it = virtual_peers.find(vpid) ;
|
||||||
|
|
||||||
|
if(it != virtual_peers.end())
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: adding a virtual peer that already exist. This is an error!" << std::endl;
|
||||||
|
delete it->second ;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual_peers[vpid] = NULL ;
|
||||||
|
|
||||||
|
time_t now = time(NULL) ;
|
||||||
|
|
||||||
|
if(first_tunnel_ok_TS == 0) first_tunnel_ok_TS = now ;
|
||||||
|
if(last_tunnel_ok_TS < now) last_tunnel_ok_TS = now ;
|
||||||
|
|
||||||
|
}
|
||||||
|
RsGRouterGenericDataItem *GRouterTunnelInfo::addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk)
|
||||||
|
{
|
||||||
|
// find the chunk
|
||||||
|
std::map<TurtleVirtualPeerId,RsGRouterTransactionChunkItem*>::iterator it = virtual_peers.find(vpid) ;
|
||||||
|
|
||||||
|
if(it == virtual_peers.end())
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: no virtual peer " << vpid << " for chunk received. Dropping." << std::endl;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(it->second == NULL)
|
||||||
|
{
|
||||||
|
if(chunk->chunk_start != 0)
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: chunk numbering is wrong. First chunk is not starting at 0. Dropping." << std::endl;
|
||||||
|
delete chunk;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
it->second = chunk ;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if(it->second->chunk_size != chunk->chunk_start || it->second->total_size != chunk->total_size)
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: chunk numbering is wrong. Dropping." << std::endl;
|
||||||
|
delete chunk ;
|
||||||
|
delete it->second ;
|
||||||
|
}
|
||||||
|
it->second->chunk_data = (uint8_t*)realloc((uint8_t*)it->second->chunk_data,it->second->chunk_size + chunk->chunk_size) ;
|
||||||
|
memcpy(&it->second->chunk_data[it->second->chunk_size],chunk->chunk_data,chunk->chunk_size) ;
|
||||||
|
it->second->chunk_size += chunk->chunk_size ;
|
||||||
|
|
||||||
|
delete chunk ;
|
||||||
|
}
|
||||||
|
|
||||||
|
// if finished, return it.
|
||||||
|
|
||||||
|
if(it->second->total_size == it->second->chunk_size)
|
||||||
|
{
|
||||||
|
RsGRouterGenericDataItem *data_item= new RsGRouterGenericDataItem ;
|
||||||
|
data_item->data_bytes = it->second->chunk_data ;
|
||||||
|
data_item->data_size = it->second->chunk_size ;
|
||||||
|
it->second->chunk_data = NULL;
|
||||||
|
delete it->second ;
|
||||||
|
it->second= NULL ;
|
||||||
|
|
||||||
|
return data_item ;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
|
||||||
void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction dir)
|
void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction dir)
|
||||||
{
|
{
|
||||||
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
// Server side tunnels. This is incoming data. Nothing to do.
|
// Server side tunnels. This is incoming data. Nothing to do.
|
||||||
|
|
||||||
std::cerr << "p3GRouter::addVirtualPeer(). Received vpid " << virtual_peer_id << " for hash " << hash << ", direction=" << dir << std::endl;
|
std::cerr << "p3GRouter::addVirtualPeer(). Received vpid " << virtual_peer_id << " for hash " << hash << ", direction=" << dir << std::endl;
|
||||||
|
std::cerr << " adding VPID." << std::endl;
|
||||||
std::cerr << " adding server VPID." << std::endl;
|
|
||||||
|
|
||||||
_virtual_peers[hash].addVirtualPeer(virtual_peer_id) ;
|
_virtual_peers[hash].addVirtualPeer(virtual_peer_id) ;
|
||||||
|
|
||||||
@ -443,9 +576,21 @@ void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPee
|
|||||||
}
|
}
|
||||||
void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id)
|
void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id)
|
||||||
{
|
{
|
||||||
NOT_IMPLEMENTED;
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
// this is mostly for unused tunnels. So no real work is needed here. Just remove the tunnels from client/server lists.
|
std::cerr << "p3GRouter::addVirtualPeer(). Received vpid " << virtual_peer_id << " for hash " << hash << std::endl;
|
||||||
|
std::cerr << " removing VPID." << std::endl;
|
||||||
|
|
||||||
|
// make sure the VPID exists.
|
||||||
|
|
||||||
|
std::map<TurtleFileHash,GRouterTunnelInfo>::iterator it = _virtual_peers.find(hash) ;
|
||||||
|
|
||||||
|
if(it == _virtual_peers.end())
|
||||||
|
{
|
||||||
|
std::cerr << " no virtual peers at all for this hash! This is a consistency error." << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
it->second.removeVirtualPeer(virtual_peer_id) ;
|
||||||
}
|
}
|
||||||
void p3GRouter::connectToTurtleRouter(p3turtle *pt)
|
void p3GRouter::connectToTurtleRouter(p3turtle *pt)
|
||||||
{
|
{
|
||||||
@ -492,6 +637,8 @@ void p3GRouter::handleTunnels()
|
|||||||
|
|
||||||
// Delay after which a message is re-sent, depending on the number of attempts already made.
|
// Delay after which a message is re-sent, depending on the number of attempts already made.
|
||||||
|
|
||||||
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
if(!_pending_messages.empty())
|
if(!_pending_messages.empty())
|
||||||
{
|
{
|
||||||
grouter_debug() << "p3GRouter::handleTunnels()" << std::endl;
|
grouter_debug() << "p3GRouter::handleTunnels()" << std::endl;
|
||||||
@ -554,6 +701,8 @@ void p3GRouter::routePendingObjects()
|
|||||||
// Go throught he list of pending messages.
|
// Go throught he list of pending messages.
|
||||||
// For those with a tunnel ready, send the message in the tunnel.
|
// For those with a tunnel ready, send the message in the tunnel.
|
||||||
|
|
||||||
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
time_t now = time(NULL) ;
|
time_t now = time(NULL) ;
|
||||||
|
|
||||||
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();++it)
|
for(std::map<GRouterMsgPropagationId, GRouterRoutingInfo>::iterator it=_pending_messages.begin();it!=_pending_messages.end();++it)
|
||||||
@ -573,7 +722,7 @@ void p3GRouter::routePendingObjects()
|
|||||||
std::cerr << " no peers available. Cannot send!!" << std::endl;
|
std::cerr << " no peers available. Cannot send!!" << std::endl;
|
||||||
continue ;
|
continue ;
|
||||||
}
|
}
|
||||||
TurtleVirtualPeerId vpid = *(vpit->second.virtual_peers.begin()) ;
|
TurtleVirtualPeerId vpid = (vpit->second.virtual_peers.begin())->first ;
|
||||||
|
|
||||||
std::cerr << " sending to " << vpid << std::endl;
|
std::cerr << " sending to " << vpid << std::endl;
|
||||||
|
|
||||||
@ -590,7 +739,75 @@ void p3GRouter::routePendingObjects()
|
|||||||
|
|
||||||
void p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGenericDataItem *item)
|
void p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGenericDataItem *item)
|
||||||
{
|
{
|
||||||
NOT_IMPLEMENTED ;
|
// split into chunks and send them all into the tunnel.
|
||||||
|
|
||||||
|
std::cerr << "p3GRouter::sendDataInTunnel()" << std::endl;
|
||||||
|
|
||||||
|
uint32_t size = item->serial_size();
|
||||||
|
uint8_t *data = (uint8_t*)malloc(size) ;
|
||||||
|
|
||||||
|
if(data == NULL)
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: cannot allocate memory. Size=" << size << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
if(!item->serialise(data,size))
|
||||||
|
{
|
||||||
|
free(data) ;
|
||||||
|
std::cerr << " ERROR: cannot serialise." << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t offset = 0 ;
|
||||||
|
static const uint32_t CHUNK_SIZE = 15000 ;
|
||||||
|
|
||||||
|
while(offset < size)
|
||||||
|
{
|
||||||
|
uint32_t chunk_size = std::min(size - offset, CHUNK_SIZE) ;
|
||||||
|
|
||||||
|
RsGRouterTransactionChunkItem *chunk_item = new RsGRouterTransactionChunkItem ;
|
||||||
|
chunk_item->propagation_id = item->routing_id ;
|
||||||
|
chunk_item->total_size = size;
|
||||||
|
chunk_item->chunk_size = chunk_size ;
|
||||||
|
chunk_item->chunk_data = (uint8_t*)malloc(chunk_size) ;
|
||||||
|
|
||||||
|
std::cerr << " preparing to send a chunk [" << offset << " -> " << offset + chunk_size << " / " << size << "]" << std::endl;
|
||||||
|
|
||||||
|
if(chunk_item->chunk_data == NULL)
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: Cannot allocate memory for size " << chunk_size << std::endl;
|
||||||
|
}
|
||||||
|
memcpy(chunk_item->chunk_data,&data[offset],chunk_size) ;
|
||||||
|
|
||||||
|
offset += chunk_size ;
|
||||||
|
|
||||||
|
RsTurtleGenericDataItem *turtle_item = new RsTurtleGenericDataItem ;
|
||||||
|
|
||||||
|
uint32_t turtle_data_size = chunk_item->serial_size() ;
|
||||||
|
uint8_t *turtle_data = (uint8_t*)malloc(turtle_data_size) ;
|
||||||
|
|
||||||
|
if(turtle_data == NULL)
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: Cannot allocate turtle data memory for size " << turtle_data_size << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
if(!chunk_item->serialise(turtle_data,turtle_data_size))
|
||||||
|
{
|
||||||
|
std::cerr << " ERROR: cannot serialise RsGRouterTransactionChunkItem." << std::endl;
|
||||||
|
free(turtle_data) ;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
delete chunk_item ;
|
||||||
|
|
||||||
|
turtle_item->data_size = turtle_data_size ;
|
||||||
|
turtle_item->data_bytes = turtle_data ;
|
||||||
|
|
||||||
|
mTurtle->sendTurtleData(vpid,turtle_item) ;
|
||||||
|
}
|
||||||
|
|
||||||
|
free(data) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void p3GRouter::handleIncoming()
|
void p3GRouter::handleIncoming()
|
||||||
@ -616,8 +833,9 @@ void p3GRouter::handleIncoming()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void p3GRouter::locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) const
|
void p3GRouter::locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id)
|
||||||
{
|
{
|
||||||
|
RS_STACK_MUTEX (grMtx) ;
|
||||||
#ifdef GROUTER_DEBUG
|
#ifdef GROUTER_DEBUG
|
||||||
grouter_debug() << " Key is owned by us. Notifying service that item was ACKed. msg_id=" << msg_id << ", service_id = " << service_id << "." << std::endl;
|
grouter_debug() << " Key is owned by us. Notifying service that item was ACKed. msg_id=" << msg_id << ", service_id = " << service_id << "." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
@ -635,7 +853,7 @@ void p3GRouter::locked_notifyClientAcknowledged(const GRouterMsgPropagationId& m
|
|||||||
|
|
||||||
void p3GRouter::addRoutingClue(const GRouterKeyId& id,const RsPeerId& peer_id)
|
void p3GRouter::addRoutingClue(const GRouterKeyId& id,const RsPeerId& peer_id)
|
||||||
{
|
{
|
||||||
RsStackMutex mtx(grMtx) ;
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
#ifdef GROUTER_DEBUG
|
#ifdef GROUTER_DEBUG
|
||||||
grouter_debug() << "Received new routing clue for key " << id << " from peer " << peer_id << std::endl;
|
grouter_debug() << "Received new routing clue for key " << id << " from peer " << peer_id << std::endl;
|
||||||
#endif
|
#endif
|
||||||
@ -644,13 +862,14 @@ void p3GRouter::addRoutingClue(const GRouterKeyId& id,const RsPeerId& peer_id)
|
|||||||
|
|
||||||
void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
|
void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item)
|
||||||
{
|
{
|
||||||
RsStackMutex mtx(grMtx) ;
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
NOT_IMPLEMENTED;
|
NOT_IMPLEMENTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool p3GRouter::registerClientService(const GRouterServiceId& id,GRouterClientService *service)
|
bool p3GRouter::registerClientService(const GRouterServiceId& id,GRouterClientService *service)
|
||||||
{
|
{
|
||||||
RsStackMutex mtx(grMtx) ;
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
_registered_services[id] = service ;
|
_registered_services[id] = service ;
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
@ -873,7 +1092,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
{
|
{
|
||||||
RsStackMutex mtx(grMtx) ;
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
_pending_messages[propagation_id] = info ;
|
_pending_messages[propagation_id] = info ;
|
||||||
}
|
}
|
||||||
return true ;
|
return true ;
|
||||||
@ -893,10 +1112,17 @@ Sha1CheckSum p3GRouter::makeTunnelHash(const RsGxsId& destination,const GRouterS
|
|||||||
|
|
||||||
return Sha1CheckSum(bytes) ;
|
return Sha1CheckSum(bytes) ;
|
||||||
}
|
}
|
||||||
|
void p3GRouter::makeGxsIdAndClientId(const Sha1CheckSum& sum,RsGxsId& gxs_id,GRouterServiceId& client_id)
|
||||||
|
{
|
||||||
|
assert( gxs_id.SIZE_IN_BYTES == 16) ;
|
||||||
|
assert(Sha1CheckSum::SIZE_IN_BYTES == 20) ;
|
||||||
|
|
||||||
|
gxs_id = RsGxsId(sum.toByteArray());// takes the first 16 bytes
|
||||||
|
client_id = sum.toByteArray()[19] + (sum.toByteArray()[18] << 8) ;
|
||||||
|
}
|
||||||
bool p3GRouter::loadList(std::list<RsItem*>& items)
|
bool p3GRouter::loadList(std::list<RsItem*>& items)
|
||||||
{
|
{
|
||||||
RsStackMutex mtx(grMtx) ;
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
#ifdef GROUTER_DEBUG
|
#ifdef GROUTER_DEBUG
|
||||||
grouter_debug() << "p3GRouter::loadList() : " << std::endl;
|
grouter_debug() << "p3GRouter::loadList() : " << std::endl;
|
||||||
@ -948,6 +1174,8 @@ bool p3GRouter::saveList(bool& cleanup,std::list<RsItem*>& items)
|
|||||||
grouter_debug() << " saving routing clues." << std::endl;
|
grouter_debug() << " saving routing clues." << std::endl;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
_routing_matrix.saveList(items) ;
|
_routing_matrix.saveList(items) ;
|
||||||
|
|
||||||
#ifdef GROUTER_DEBUG
|
#ifdef GROUTER_DEBUG
|
||||||
@ -971,6 +1199,8 @@ bool p3GRouter::saveList(bool& cleanup,std::list<RsItem*>& items)
|
|||||||
|
|
||||||
bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
|
bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
|
||||||
{
|
{
|
||||||
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
info.per_friend_probabilities.clear() ;
|
info.per_friend_probabilities.clear() ;
|
||||||
info.friend_ids.clear() ;
|
info.friend_ids.clear() ;
|
||||||
info.published_keys.clear() ;
|
info.published_keys.clear() ;
|
||||||
@ -978,8 +1208,6 @@ bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
|
|||||||
std::set<RsPeerId> ids ;
|
std::set<RsPeerId> ids ;
|
||||||
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
|
mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ;
|
||||||
|
|
||||||
RsStackMutex mtx(grMtx) ;
|
|
||||||
|
|
||||||
//info.published_keys = _owned_key_ids ;
|
//info.published_keys = _owned_key_ids ;
|
||||||
|
|
||||||
for(std::set<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
|
for(std::set<RsPeerId>::const_iterator it(ids.begin());it!=ids.end();++it)
|
||||||
@ -999,7 +1227,8 @@ bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info)
|
|||||||
}
|
}
|
||||||
bool p3GRouter::getRoutingCacheInfo(std::vector<GRouterRoutingCacheInfo>& infos)
|
bool p3GRouter::getRoutingCacheInfo(std::vector<GRouterRoutingCacheInfo>& infos)
|
||||||
{
|
{
|
||||||
RsStackMutex mtx(grMtx) ;
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
infos.clear() ;
|
infos.clear() ;
|
||||||
|
|
||||||
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::const_iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
|
for(std::map<GRouterMsgPropagationId,GRouterRoutingInfo>::const_iterator it(_pending_messages.begin());it!=_pending_messages.end();++it)
|
||||||
@ -1020,7 +1249,7 @@ bool p3GRouter::getRoutingCacheInfo(std::vector<GRouterRoutingCacheInfo>& infos)
|
|||||||
//
|
//
|
||||||
void p3GRouter::debugDump()
|
void p3GRouter::debugDump()
|
||||||
{
|
{
|
||||||
RsStackMutex mtx(grMtx) ;
|
RS_STACK_MUTEX(grMtx) ;
|
||||||
|
|
||||||
time_t now = time(NULL) ;
|
time_t now = time(NULL) ;
|
||||||
|
|
||||||
@ -1059,8 +1288,8 @@ void p3GRouter::debugDump()
|
|||||||
{
|
{
|
||||||
grouter_debug() << " hash: " << it->first << ", first received: " << now - it->second.last_tunnel_ok_TS << " (secs ago), last received: " << now - it->second.last_tunnel_ok_TS << std::endl;
|
grouter_debug() << " hash: " << it->first << ", first received: " << now - it->second.last_tunnel_ok_TS << " (secs ago), last received: " << now - it->second.last_tunnel_ok_TS << std::endl;
|
||||||
|
|
||||||
for(std::set<TurtleVirtualPeerId>::const_iterator it2 = it->second.virtual_peers.begin();it2!=it->second.virtual_peers.end();++it2)
|
for(std::map<TurtleVirtualPeerId,RsGRouterTransactionChunkItem*>::const_iterator it2 = it->second.virtual_peers.begin();it2!=it->second.virtual_peers.end();++it2)
|
||||||
grouter_debug() << " " << *it2 << std::endl;
|
grouter_debug() << " " << it2->first << " : cached data = " << (void*)it2->second << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
grouter_debug() << " Routing matrix: " << std::endl;
|
grouter_debug() << " Routing matrix: " << std::endl;
|
||||||
|
@ -52,25 +52,25 @@ class p3turtle ;
|
|||||||
class p3IdService ;
|
class p3IdService ;
|
||||||
class RsGRouterItem ;
|
class RsGRouterItem ;
|
||||||
class RsGRouterGenericDataItem ;
|
class RsGRouterGenericDataItem ;
|
||||||
|
class RsGRouterTransactionChunkItem ;
|
||||||
class RsGRouterReceiptItem ;
|
class RsGRouterReceiptItem ;
|
||||||
|
|
||||||
|
// This class is responsible for accepting data chunks and merging them into a final object. When the object is
|
||||||
|
// complete, it is de-serialised and returned as a RsGRouterGenericDataItem*.
|
||||||
|
|
||||||
class GRouterTunnelInfo
|
class GRouterTunnelInfo
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
GRouterTunnelInfo() :first_tunnel_ok_TS(0), last_tunnel_ok_TS(0) {}
|
GRouterTunnelInfo() :first_tunnel_ok_TS(0), last_tunnel_ok_TS(0) {}
|
||||||
|
|
||||||
void addVirtualPeer(const TurtleVirtualPeerId& vpid)
|
// These two methods handle the memory management of buffers for each virtual peers.
|
||||||
{
|
|
||||||
assert(virtual_peers.find(vpid) == virtual_peers.end()) ;
|
|
||||||
time_t now = time(NULL) ;
|
|
||||||
|
|
||||||
virtual_peers.insert(vpid) ;
|
void addVirtualPeer(const TurtleVirtualPeerId& vpid) ;
|
||||||
|
void removeVirtualPeer(const TurtleVirtualPeerId& vpid) ;
|
||||||
|
|
||||||
if(first_tunnel_ok_TS == 0) first_tunnel_ok_TS = now ;
|
RsGRouterGenericDataItem *addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk_item) ;
|
||||||
if(last_tunnel_ok_TS < now) last_tunnel_ok_TS = now ;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::set<TurtleVirtualPeerId> virtual_peers ;
|
std::map<TurtleVirtualPeerId, RsGRouterTransactionChunkItem*> virtual_peers ;
|
||||||
|
|
||||||
time_t first_tunnel_ok_TS ; // timestamp when 1st tunnel was received.
|
time_t first_tunnel_ok_TS ; // timestamp when 1st tunnel was received.
|
||||||
time_t last_tunnel_ok_TS ; // timestamp when last tunnel was received.
|
time_t last_tunnel_ok_TS ; // timestamp when last tunnel was received.
|
||||||
@ -217,7 +217,7 @@ private:
|
|||||||
static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ;
|
static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ;
|
||||||
static time_t computeNextTimeDelay(time_t duration) ;
|
static time_t computeNextTimeDelay(time_t duration) ;
|
||||||
|
|
||||||
void locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) const ;
|
void locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) ;
|
||||||
|
|
||||||
uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ;
|
uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ;
|
||||||
|
|
||||||
@ -226,7 +226,10 @@ private:
|
|||||||
bool verifySignedDataItem(RsGRouterGenericDataItem *item) ;
|
bool verifySignedDataItem(RsGRouterGenericDataItem *item) ;
|
||||||
bool encryptDataItem(RsGRouterGenericDataItem *item,const RsGxsId& destination_key) ;
|
bool encryptDataItem(RsGRouterGenericDataItem *item,const RsGxsId& destination_key) ;
|
||||||
bool decryptDataItem(RsGRouterGenericDataItem *item) ;
|
bool decryptDataItem(RsGRouterGenericDataItem *item) ;
|
||||||
Sha1CheckSum makeTunnelHash(const RsGxsId& destination,const GRouterServiceId& client);
|
|
||||||
|
static Sha1CheckSum makeTunnelHash(const RsGxsId& destination,const GRouterServiceId& client);
|
||||||
|
static void makeGxsIdAndClientId(const Sha1CheckSum& sum,RsGxsId& gxs_id,GRouterServiceId& client_id);
|
||||||
|
|
||||||
void sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGenericDataItem *item);
|
void sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGenericDataItem *item);
|
||||||
|
|
||||||
//===================================================//
|
//===================================================//
|
||||||
|
Loading…
Reference in New Issue
Block a user