From ce7710d18303844df3202a75925b6ce5c3eaad72 Mon Sep 17 00:00:00 2001 From: csoler Date: Thu, 15 Jan 2015 20:33:24 +0000 Subject: [PATCH] implemented data transmission code (not fully working yet) git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-NewGRouterModel@7848 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/grouter/grouteritems.cc | 88 +++++- libretroshare/src/grouter/grouteritems.h | 33 ++- libretroshare/src/grouter/p3grouter.cc | 325 ++++++++++++++++++---- libretroshare/src/grouter/p3grouter.h | 33 ++- 4 files changed, 410 insertions(+), 69 deletions(-) diff --git a/libretroshare/src/grouter/grouteritems.cc b/libretroshare/src/grouter/grouteritems.cc index 1a80b066e..3ee524d29 100644 --- a/libretroshare/src/grouter/grouteritems.cc +++ b/libretroshare/src/grouter/grouteritems.cc @@ -57,7 +57,42 @@ RsItem *RsGRouterSerialiser::deserialise(void *data, uint32_t *pktsize) } return NULL; } +RsGRouterTransactionChunkItem *RsGRouterSerialiser::deserialise_RsGRouterTransactionChunkItem(void *data, uint32_t tlvsize) const +{ + uint32_t offset = 8; // skip the header + uint32_t rssize = getRsItemSize(data); + bool ok = true ; + RsGRouterTransactionChunkItem *item = new RsGRouterTransactionChunkItem() ; + + /* add mandatory parts first */ + ok &= getRawUInt64(data, tlvsize, &offset, &item->propagation_id); + ok &= getRawUInt32(data, tlvsize, &offset, &item->chunk_start); + ok &= getRawUInt32(data, tlvsize, &offset, &item->chunk_size); + ok &= getRawUInt32(data, tlvsize, &offset, &item->total_size); + + if( NULL == (item->chunk_data = (uint8_t*)malloc(item->chunk_size))) + { + std::cerr << __PRETTY_FUNCTION__ << ": Cannot allocate memory for chunk " << item->chunk_size << std::endl; + return NULL ; + } + if(item->chunk_size + offset >= rssize) + { + std::cerr << __PRETTY_FUNCTION__ << ": Cannot read beyond item size. Serialisation error!" << std::endl; + return NULL ; + } + + memcpy(item->chunk_data,&((uint8_t*)data)[offset],item->chunk_size) ; + offset += item->chunk_size ; + + if (offset != rssize || !ok) + { + std::cerr << __PRETTY_FUNCTION__ << ": error while deserialising! Item will be dropped." << std::endl; + return NULL ; + } + + return item; +} RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataItem(void *data, uint32_t pktsize) const { uint32_t offset = 8; // skip the header @@ -76,7 +111,13 @@ RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataI return NULL ; } - memcpy(item->data_bytes,&((uint8_t*)data)[offset],item->data_size) ; + if(item->data_size + offset >= rssize) + { + std::cerr << __PRETTY_FUNCTION__ << ": Cannot read beyond item size. Serialisation error!" << std::endl; + return NULL ; + } + + memcpy(item->data_bytes,&((uint8_t*)data)[offset],item->data_size) ; offset += item->data_size ; ok &= item->signature.GetTlv(data, pktsize, &offset) ; @@ -257,6 +298,42 @@ s += destination_key.serial_size() ; // destination_key return s ; } +uint32_t RsGRouterTransactionChunkItem::serial_size() const +{ + uint32_t s = 8 ; // header + s += sizeof(GRouterMsgPropagationId) ; // routing id + s += 4 ; // chunk_start + s += 4 ; // chunk_size + s += 4 ; // total_size + s += chunk_size ; // data + + return s; +} +bool RsGRouterTransactionChunkItem::serialise(void *data,uint32_t& size) const +{ + uint32_t tlvsize,offset=0; + bool ok = true; + + if(!serialise_header(data,size,tlvsize,offset)) + return false ; + + /* add mandatory parts first */ + ok &= setRawUInt64(data, tlvsize, &offset, propagation_id); + ok &= setRawUInt32(data, tlvsize, &offset, chunk_start); + ok &= setRawUInt32(data, tlvsize, &offset, chunk_size); + ok &= setRawUInt32(data, tlvsize, &offset, total_size); + + memcpy(&((uint8_t*)data)[offset],chunk_data,chunk_size) ; + offset += chunk_size ; + + if (offset != tlvsize) + { + ok = false; + std::cerr << "RsGRouterGenericDataItem::serialisedata() size error! " << std::endl; + } + + return ok; +} bool RsGRouterGenericDataItem::serialise(void *data,uint32_t& size) const { uint32_t tlvsize,offset=0; @@ -514,6 +591,15 @@ std::ostream& RsGRouterMatrixCluesItem::print(std::ostream& o, uint16_t) return o ; } +std::ostream& RsGRouterTransactionChunkItem::print(std::ostream& o, uint16_t) +{ + o << "RsGRouterTransactionChunkItem:" << std::endl ; + o << " total_size: " << total_size << std::endl; + o << " chunk_size: " << chunk_size << std::endl; + o << " chunk_start: " << chunk_start << std::endl; + + return o ; +} std::ostream& RsGRouterMatrixFriendListItem::print(std::ostream& o, uint16_t) { o << "RsGRouterMatrixCluesItem:" << std::endl ; diff --git a/libretroshare/src/grouter/grouteritems.h b/libretroshare/src/grouter/grouteritems.h index 8ea832be9..8ba75f761 100644 --- a/libretroshare/src/grouter/grouteritems.h +++ b/libretroshare/src/grouter/grouteritems.h @@ -33,11 +33,13 @@ #include "retroshare/rsgrouter.h" #include "p3grouter.h" -const uint8_t RS_PKT_SUBTYPE_GROUTER_PUBLISH_KEY = 0x01 ; // used to publish a key -const uint8_t RS_PKT_SUBTYPE_GROUTER_ACK_deprecated = 0x03 ; // acknowledgement of data received -const uint8_t RS_PKT_SUBTYPE_GROUTER_RECEIPT = 0x04 ; // acknowledgement of data received -const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA_deprecated = 0x05 ; // used to send data to a destination -const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA = 0x06 ; // used to send data to a destination (Signed by source) +const uint8_t RS_PKT_SUBTYPE_GROUTER_PUBLISH_KEY = 0x01 ; // used to publish a key +const uint8_t RS_PKT_SUBTYPE_GROUTER_ACK_deprecated = 0x03 ; // acknowledgement of data received +const uint8_t RS_PKT_SUBTYPE_GROUTER_RECEIPT = 0x04 ; // acknowledgement of data received +const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA_deprecated = 0x05 ; // used to send data to a destination +const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA = 0x06 ; // used to send data to a destination (Signed by source) + +const uint8_t RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK = 0x10 ; // chunk of data. Used internally. const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // item to save matrix clues const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists @@ -138,6 +140,26 @@ class RsGRouterReceiptItem: public RsGRouterItem RsTlvKeySignature signature ; // signs mid+destination_key+state }; +// Low-level data items + +class RsGRouterTransactionChunkItem: public RsGRouterItem +{ + public: + RsGRouterTransactionChunkItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; } + + virtual bool serialise(void *data,uint32_t& size) const ; + virtual uint32_t serial_size() const ; + + virtual void clear() {} + virtual std::ostream& print(std::ostream &out, uint16_t indent = 0) ; + + GRouterMsgPropagationId propagation_id ; + uint32_t chunk_start ; + uint32_t chunk_size ; + uint32_t total_size ; + uint8_t *chunk_data ; +}; + // Items for saving the routing matrix information. class RsGRouterMatrixCluesItem: public RsGRouterItem @@ -216,6 +238,7 @@ class RsGRouterSerialiser: public RsSerialType private: RsGRouterGenericDataItem *deserialise_RsGRouterGenericDataItem(void *data,uint32_t size) const ; + RsGRouterTransactionChunkItem *deserialise_RsGRouterTransactionChunkItem(void *data,uint32_t size) const ; RsGRouterReceiptItem *deserialise_RsGRouterReceiptItem(void *data,uint32_t size) const ; RsGRouterMatrixCluesItem *deserialise_RsGRouterMatrixCluesItem(void *data,uint32_t size) const ; RsGRouterMatrixFriendListItem *deserialise_RsGRouterMatrixFriendListItem(void *data,uint32_t size) const ; diff --git a/libretroshare/src/grouter/p3grouter.cc b/libretroshare/src/grouter/p3grouter.cc index dd2b8eb46..23d63c2d5 100644 --- a/libretroshare/src/grouter/p3grouter.cc +++ b/libretroshare/src/grouter/p3grouter.cc @@ -395,22 +395,23 @@ bool p3GRouter::unregisterKey(const RsGxsId& key_id,const GRouterServiceId& sid) // Turtle management // //===========================================================================================================================// -bool p3GRouter::handleTunnelRequest(const RsFileHash& /*hash*/,const RsPeerId& /*peer_id*/) +bool p3GRouter::handleTunnelRequest(const RsFileHash& hash,const RsPeerId& /*peer_id*/) { - NOT_IMPLEMENTED; - // tunnel request is answered according to the following rules: // - we are the destination => always accept // - we know the destination and have RCPT items to send back => always accept // - we know the destination and have a route (according to matrix) => accept with high probability // - we don't know the destination => accept with very low probability + if(_owned_key_ids.find(hash) == _owned_key_ids.end()) + return false ; + + std::cerr << "p3GRouter::handleTunnelRequest(). Got req for hash " << hash << ", responding OK" << std::endl; + return false ; } -void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem */*item*/,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction) +void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction) { - NOT_IMPLEMENTED; - std::cerr << "p3GRouter::receiveTurtleData() " << std::endl; std::cerr << " Received data for hash : " << hash << std::endl; std::cerr << " Virtual peer id : " << virtual_peer_id << std::endl; @@ -418,16 +419,148 @@ void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem */*item*/,const RsFi // turtle data is received. // This function + // - possibly packs multi-item blocks back together // - converts it into a grouter generic item (by deserialising it) - // - + + RsTurtleGenericDataItem *item = dynamic_cast(gitem) ; + + if(item == NULL) + { + std::cerr << " ERROR: item is not a data item. That is an error." << std::endl; + return ; + } + std::cerr << " data size : " << item->data_size << std::endl; + std::cerr << " data bytes : " << RsDirUtil::sha1sum((unsigned char*)item->data_bytes,item->data_size) << std::endl; + + RsGRouterGenericDataItem *generic_item = NULL ; + + { + RS_STACK_MUTEX(grMtx) ; + + // Items come out of the pipe in order. We need to recover all chunks before we de-serialise the content and have it handled by handleIncoming() + + std::map::iterator it = _virtual_peers.find(hash) ; + + if(it == _virtual_peers.end()) + { + std::cerr << " ERROR: hash is not known. Cannot receive. Data is dropped." << std::endl; + return ; + } + + RsItem *itm = RsGRouterSerialiser().deserialise(item->data_bytes,&item->data_size) ; + RsGRouterTransactionChunkItem *chunk_item = dynamic_cast(itm) ; + + if(chunk_item == NULL) + { + std::cerr << " ERROR: cannot deserialise turtle item into a GRouterTransactionChunk item." << std::endl; + if(itm) + delete itm ; + return ; + } + generic_item = it->second.addDataChunk(virtual_peer_id,chunk_item) ; + + if(generic_item != NULL) + _incoming_items.push_back(generic_item) ; + } } + +void GRouterTunnelInfo::removeVirtualPeer(const TurtleVirtualPeerId& vpid) +{ + std::map::iterator it = virtual_peers.find(vpid) ; + + if(it == virtual_peers.end()) + { + std::cerr << " ERROR: removing a virtual peer that does not exist. This is an error!" << std::endl; + return ; + } + + if(it->second != NULL) + { + std::cerr << " WARNING: removing a virtual peer that still holds data. The data will be lost." << std::endl; + delete it->second ; + } + + virtual_peers.erase(it) ; +} +void GRouterTunnelInfo::addVirtualPeer(const TurtleVirtualPeerId& vpid) +{ + std::map::iterator it = virtual_peers.find(vpid) ; + + if(it != virtual_peers.end()) + { + std::cerr << " ERROR: adding a virtual peer that already exist. This is an error!" << std::endl; + delete it->second ; + } + + virtual_peers[vpid] = NULL ; + + time_t now = time(NULL) ; + + if(first_tunnel_ok_TS == 0) first_tunnel_ok_TS = now ; + if(last_tunnel_ok_TS < now) last_tunnel_ok_TS = now ; + +} +RsGRouterGenericDataItem *GRouterTunnelInfo::addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk) +{ + // find the chunk + std::map::iterator it = virtual_peers.find(vpid) ; + + if(it == virtual_peers.end()) + { + std::cerr << " ERROR: no virtual peer " << vpid << " for chunk received. Dropping." << std::endl; + return NULL; + } + + if(it->second == NULL) + { + if(chunk->chunk_start != 0) + { + std::cerr << " ERROR: chunk numbering is wrong. First chunk is not starting at 0. Dropping." << std::endl; + delete chunk; + return NULL; + } + it->second = chunk ; + } + else + { + if(it->second->chunk_size != chunk->chunk_start || it->second->total_size != chunk->total_size) + { + std::cerr << " ERROR: chunk numbering is wrong. Dropping." << std::endl; + delete chunk ; + delete it->second ; + } + it->second->chunk_data = (uint8_t*)realloc((uint8_t*)it->second->chunk_data,it->second->chunk_size + chunk->chunk_size) ; + memcpy(&it->second->chunk_data[it->second->chunk_size],chunk->chunk_data,chunk->chunk_size) ; + it->second->chunk_size += chunk->chunk_size ; + + delete chunk ; + } + + // if finished, return it. + + if(it->second->total_size == it->second->chunk_size) + { + RsGRouterGenericDataItem *data_item= new RsGRouterGenericDataItem ; + data_item->data_bytes = it->second->chunk_data ; + data_item->data_size = it->second->chunk_size ; + it->second->chunk_data = NULL; + delete it->second ; + it->second= NULL ; + + return data_item ; + } + else + return NULL ; +} + void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction dir) { + RS_STACK_MUTEX(grMtx) ; + // Server side tunnels. This is incoming data. Nothing to do. std::cerr << "p3GRouter::addVirtualPeer(). Received vpid " << virtual_peer_id << " for hash " << hash << ", direction=" << dir << std::endl; - - std::cerr << " adding server VPID." << std::endl; + std::cerr << " adding VPID." << std::endl; _virtual_peers[hash].addVirtualPeer(virtual_peer_id) ; @@ -443,9 +576,21 @@ void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPee } void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id) { - NOT_IMPLEMENTED; + RS_STACK_MUTEX(grMtx) ; - // this is mostly for unused tunnels. So no real work is needed here. Just remove the tunnels from client/server lists. + std::cerr << "p3GRouter::addVirtualPeer(). Received vpid " << virtual_peer_id << " for hash " << hash << std::endl; + std::cerr << " removing VPID." << std::endl; + + // make sure the VPID exists. + + std::map::iterator it = _virtual_peers.find(hash) ; + + if(it == _virtual_peers.end()) + { + std::cerr << " no virtual peers at all for this hash! This is a consistency error." << std::endl; + return ; + } + it->second.removeVirtualPeer(virtual_peer_id) ; } void p3GRouter::connectToTurtleRouter(p3turtle *pt) { @@ -492,6 +637,8 @@ void p3GRouter::handleTunnels() // Delay after which a message is re-sent, depending on the number of attempts already made. + RS_STACK_MUTEX(grMtx) ; + if(!_pending_messages.empty()) { grouter_debug() << "p3GRouter::handleTunnels()" << std::endl; @@ -554,35 +701,37 @@ void p3GRouter::routePendingObjects() // Go throught he list of pending messages. // For those with a tunnel ready, send the message in the tunnel. + RS_STACK_MUTEX(grMtx) ; + time_t now = time(NULL) ; for(std::map::iterator it=_pending_messages.begin();it!=_pending_messages.end();++it) if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING && it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY) - { - const TurtleFileHash& hash(it->second.tunnel_hash) ; - std::map::const_iterator vpit ; + { + const TurtleFileHash& hash(it->second.tunnel_hash) ; + std::map::const_iterator vpit ; - if( (vpit = _virtual_peers.find(hash)) != _virtual_peers.end()) + if( (vpit = _virtual_peers.find(hash)) != _virtual_peers.end()) + { + // for now, just take one. But in the future, we will need some policy to temporarily store objects at proxy peers, etc. + + std::cerr << " " << vpit->second.virtual_peers.size() << " virtual peers available. " << std::endl; + + if(vpit->second.virtual_peers.empty()) { - // for now, just take one. But in the future, we will need some policy to temporarily store objects at proxy peers, etc. - - std::cerr << " " << vpit->second.virtual_peers.size() << " virtual peers available. " << std::endl; - - if(vpit->second.virtual_peers.empty()) - { - std::cerr << " no peers available. Cannot send!!" << std::endl; - continue ; - } - TurtleVirtualPeerId vpid = *(vpit->second.virtual_peers.begin()) ; - - std::cerr << " sending to " << vpid << std::endl; - - sendDataInTunnel(vpid,it->second.data_item) ; - - it->second.data_status == RS_GROUTER_DATA_STATUS_SENT ; - it->second.last_sent_TS = now ; + std::cerr << " no peers available. Cannot send!!" << std::endl; + continue ; } + TurtleVirtualPeerId vpid = (vpit->second.virtual_peers.begin())->first ; + + std::cerr << " sending to " << vpid << std::endl; + + sendDataInTunnel(vpid,it->second.data_item) ; + + it->second.data_status == RS_GROUTER_DATA_STATUS_SENT ; + it->second.last_sent_TS = now ; } + } // Also route back some ACKs if necessary. // [..] @@ -590,7 +739,75 @@ void p3GRouter::routePendingObjects() void p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGenericDataItem *item) { - NOT_IMPLEMENTED ; + // split into chunks and send them all into the tunnel. + + std::cerr << "p3GRouter::sendDataInTunnel()" << std::endl; + + uint32_t size = item->serial_size(); + uint8_t *data = (uint8_t*)malloc(size) ; + + if(data == NULL) + { + std::cerr << " ERROR: cannot allocate memory. Size=" << size << std::endl; + return ; + } + + if(!item->serialise(data,size)) + { + free(data) ; + std::cerr << " ERROR: cannot serialise." << std::endl; + return ; + } + + uint32_t offset = 0 ; + static const uint32_t CHUNK_SIZE = 15000 ; + + while(offset < size) + { + uint32_t chunk_size = std::min(size - offset, CHUNK_SIZE) ; + + RsGRouterTransactionChunkItem *chunk_item = new RsGRouterTransactionChunkItem ; + chunk_item->propagation_id = item->routing_id ; + chunk_item->total_size = size; + chunk_item->chunk_size = chunk_size ; + chunk_item->chunk_data = (uint8_t*)malloc(chunk_size) ; + + std::cerr << " preparing to send a chunk [" << offset << " -> " << offset + chunk_size << " / " << size << "]" << std::endl; + + if(chunk_item->chunk_data == NULL) + { + std::cerr << " ERROR: Cannot allocate memory for size " << chunk_size << std::endl; + } + memcpy(chunk_item->chunk_data,&data[offset],chunk_size) ; + + offset += chunk_size ; + + RsTurtleGenericDataItem *turtle_item = new RsTurtleGenericDataItem ; + + uint32_t turtle_data_size = chunk_item->serial_size() ; + uint8_t *turtle_data = (uint8_t*)malloc(turtle_data_size) ; + + if(turtle_data == NULL) + { + std::cerr << " ERROR: Cannot allocate turtle data memory for size " << turtle_data_size << std::endl; + return ; + } + if(!chunk_item->serialise(turtle_data,turtle_data_size)) + { + std::cerr << " ERROR: cannot serialise RsGRouterTransactionChunkItem." << std::endl; + free(turtle_data) ; + return ; + } + + delete chunk_item ; + + turtle_item->data_size = turtle_data_size ; + turtle_item->data_bytes = turtle_data ; + + mTurtle->sendTurtleData(vpid,turtle_item) ; + } + + free(data) ; } void p3GRouter::handleIncoming() @@ -616,8 +833,9 @@ void p3GRouter::handleIncoming() } } -void p3GRouter::locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) const +void p3GRouter::locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) { + RS_STACK_MUTEX (grMtx) ; #ifdef GROUTER_DEBUG grouter_debug() << " Key is owned by us. Notifying service that item was ACKed. msg_id=" << msg_id << ", service_id = " << service_id << "." << std::endl; #endif @@ -635,7 +853,7 @@ void p3GRouter::locked_notifyClientAcknowledged(const GRouterMsgPropagationId& m void p3GRouter::addRoutingClue(const GRouterKeyId& id,const RsPeerId& peer_id) { - RsStackMutex mtx(grMtx) ; + RS_STACK_MUTEX(grMtx) ; #ifdef GROUTER_DEBUG grouter_debug() << "Received new routing clue for key " << id << " from peer " << peer_id << std::endl; #endif @@ -644,13 +862,14 @@ void p3GRouter::addRoutingClue(const GRouterKeyId& id,const RsPeerId& peer_id) void p3GRouter::handleRecvDataItem(RsGRouterGenericDataItem *item) { - RsStackMutex mtx(grMtx) ; + RS_STACK_MUTEX(grMtx) ; + NOT_IMPLEMENTED; } bool p3GRouter::registerClientService(const GRouterServiceId& id,GRouterClientService *service) { - RsStackMutex mtx(grMtx) ; + RS_STACK_MUTEX(grMtx) ; _registered_services[id] = service ; return true ; } @@ -873,7 +1092,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie #endif { - RsStackMutex mtx(grMtx) ; + RS_STACK_MUTEX(grMtx) ; _pending_messages[propagation_id] = info ; } return true ; @@ -893,10 +1112,17 @@ Sha1CheckSum p3GRouter::makeTunnelHash(const RsGxsId& destination,const GRouterS return Sha1CheckSum(bytes) ; } +void p3GRouter::makeGxsIdAndClientId(const Sha1CheckSum& sum,RsGxsId& gxs_id,GRouterServiceId& client_id) +{ + assert( gxs_id.SIZE_IN_BYTES == 16) ; + assert(Sha1CheckSum::SIZE_IN_BYTES == 20) ; + gxs_id = RsGxsId(sum.toByteArray());// takes the first 16 bytes + client_id = sum.toByteArray()[19] + (sum.toByteArray()[18] << 8) ; +} bool p3GRouter::loadList(std::list& items) { - RsStackMutex mtx(grMtx) ; + RS_STACK_MUTEX(grMtx) ; #ifdef GROUTER_DEBUG grouter_debug() << "p3GRouter::loadList() : " << std::endl; @@ -948,7 +1174,9 @@ bool p3GRouter::saveList(bool& cleanup,std::list& items) grouter_debug() << " saving routing clues." << std::endl; #endif - _routing_matrix.saveList(items) ; + RS_STACK_MUTEX(grMtx) ; + + _routing_matrix.saveList(items) ; #ifdef GROUTER_DEBUG grouter_debug() << " saving pending items." << std::endl; @@ -971,15 +1199,15 @@ bool p3GRouter::saveList(bool& cleanup,std::list& items) bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info) { - info.per_friend_probabilities.clear() ; + RS_STACK_MUTEX(grMtx) ; + + info.per_friend_probabilities.clear() ; info.friend_ids.clear() ; info.published_keys.clear() ; std::set ids ; mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ; - RsStackMutex mtx(grMtx) ; - //info.published_keys = _owned_key_ids ; for(std::set::const_iterator it(ids.begin());it!=ids.end();++it) @@ -999,8 +1227,9 @@ bool p3GRouter::getRoutingMatrixInfo(RsGRouter::GRouterRoutingMatrixInfo& info) } bool p3GRouter::getRoutingCacheInfo(std::vector& infos) { - RsStackMutex mtx(grMtx) ; - infos.clear() ; + RS_STACK_MUTEX(grMtx) ; + + infos.clear() ; for(std::map::const_iterator it(_pending_messages.begin());it!=_pending_messages.end();++it) { @@ -1020,7 +1249,7 @@ bool p3GRouter::getRoutingCacheInfo(std::vector& infos) // void p3GRouter::debugDump() { - RsStackMutex mtx(grMtx) ; + RS_STACK_MUTEX(grMtx) ; time_t now = time(NULL) ; @@ -1059,8 +1288,8 @@ void p3GRouter::debugDump() { grouter_debug() << " hash: " << it->first << ", first received: " << now - it->second.last_tunnel_ok_TS << " (secs ago), last received: " << now - it->second.last_tunnel_ok_TS << std::endl; - for(std::set::const_iterator it2 = it->second.virtual_peers.begin();it2!=it->second.virtual_peers.end();++it2) - grouter_debug() << " " << *it2 << std::endl; + for(std::map::const_iterator it2 = it->second.virtual_peers.begin();it2!=it->second.virtual_peers.end();++it2) + grouter_debug() << " " << it2->first << " : cached data = " << (void*)it2->second << std::endl; } grouter_debug() << " Routing matrix: " << std::endl; diff --git a/libretroshare/src/grouter/p3grouter.h b/libretroshare/src/grouter/p3grouter.h index c7e93e3fa..f46035064 100644 --- a/libretroshare/src/grouter/p3grouter.h +++ b/libretroshare/src/grouter/p3grouter.h @@ -52,28 +52,28 @@ class p3turtle ; class p3IdService ; class RsGRouterItem ; class RsGRouterGenericDataItem ; +class RsGRouterTransactionChunkItem ; class RsGRouterReceiptItem ; +// This class is responsible for accepting data chunks and merging them into a final object. When the object is +// complete, it is de-serialised and returned as a RsGRouterGenericDataItem*. + class GRouterTunnelInfo { - public: - GRouterTunnelInfo() :first_tunnel_ok_TS(0), last_tunnel_ok_TS(0) {} +public: + GRouterTunnelInfo() :first_tunnel_ok_TS(0), last_tunnel_ok_TS(0) {} - void addVirtualPeer(const TurtleVirtualPeerId& vpid) - { - assert(virtual_peers.find(vpid) == virtual_peers.end()) ; - time_t now = time(NULL) ; + // These two methods handle the memory management of buffers for each virtual peers. - virtual_peers.insert(vpid) ; + void addVirtualPeer(const TurtleVirtualPeerId& vpid) ; + void removeVirtualPeer(const TurtleVirtualPeerId& vpid) ; - if(first_tunnel_ok_TS == 0) first_tunnel_ok_TS = now ; - if(last_tunnel_ok_TS < now) last_tunnel_ok_TS = now ; - } + RsGRouterGenericDataItem *addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk_item) ; - std::set virtual_peers ; + std::map virtual_peers ; - time_t first_tunnel_ok_TS ; // timestamp when 1st tunnel was received. - time_t last_tunnel_ok_TS ; // timestamp when last tunnel was received. + time_t first_tunnel_ok_TS ; // timestamp when 1st tunnel was received. + time_t last_tunnel_ok_TS ; // timestamp when last tunnel was received. }; class p3GRouter: public RsGRouter, public RsTurtleClientService, public p3Service, public p3Config { @@ -217,7 +217,7 @@ private: static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ; static time_t computeNextTimeDelay(time_t duration) ; - void locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) const ; + void locked_notifyClientAcknowledged(const GRouterMsgPropagationId& msg_id,const GRouterServiceId& service_id) ; uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ; @@ -226,7 +226,10 @@ private: bool verifySignedDataItem(RsGRouterGenericDataItem *item) ; bool encryptDataItem(RsGRouterGenericDataItem *item,const RsGxsId& destination_key) ; bool decryptDataItem(RsGRouterGenericDataItem *item) ; - Sha1CheckSum makeTunnelHash(const RsGxsId& destination,const GRouterServiceId& client); + + static Sha1CheckSum makeTunnelHash(const RsGxsId& destination,const GRouterServiceId& client); + static void makeGxsIdAndClientId(const Sha1CheckSum& sum,RsGxsId& gxs_id,GRouterServiceId& client_id); + void sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterGenericDataItem *item); //===================================================//