diff --git a/libretroshare/src/grouter/grouteritems.cc b/libretroshare/src/grouter/grouteritems.cc index 2ec16aba9..8a408c92f 100644 --- a/libretroshare/src/grouter/grouteritems.cc +++ b/libretroshare/src/grouter/grouteritems.cc @@ -211,6 +211,9 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI ok &= getRawUInt32(data, pktsize, &offset, &item->client_id); ok &= item->tunnel_hash.deserialise(data, pktsize, offset) ; + ok &= getRawUInt32(data, pktsize, &offset, &item->routing_flags) ; + + ok &= item->incoming_routes.GetTlv(data,pktsize,&offset) ; item->data_item = deserialise_RsGRouterGenericDataItem(&((uint8_t*)data)[offset],pktsize - offset) ; if(item->data_item != NULL) @@ -252,7 +255,7 @@ RsGRouterMatrixFriendListItem *RsGRouterSerialiser::deserialise_RsGRouterMatrixF uint32_t nb_friends = 0 ; ok &= getRawUInt32(data, pktsize, &offset, &nb_friends); // file hash - item->reverse_friend_indices.resize(nb_friends) ; + item->reverse_friend_indices.resize(nb_friends) ; for(uint32_t i=0;ok && ireverse_friend_indices[i].deserialise(data, pktsize, offset) ; @@ -573,6 +576,9 @@ uint32_t RsGRouterRoutingInfoItem::serial_size() const s += sizeof(GRouterServiceId) ; // service_id s += tunnel_hash.serial_size() ; + s += 4 ; // routing_flags + s += incoming_routes.TlvSize() ; // incoming_routes + s += data_item->serial_size(); // data_item if(receipt_item != NULL) @@ -668,6 +674,9 @@ bool RsGRouterRoutingInfoItem::serialise(void *data,uint32_t& size) const ok &= setRawUInt32(data, tlvsize, &offset, client_id) ; ok &= tunnel_hash.serialise(data, tlvsize, offset) ; + ok &= setRawUInt32(data, tlvsize, &offset, routing_flags) ; + + ok &= incoming_routes.SetTlv(data,tlvsize,&offset) ; uint32_t ns = size - offset ; ok &= data_item->serialise( &((uint8_t*)data)[offset], ns) ; diff --git a/libretroshare/src/grouter/grouteritems.h b/libretroshare/src/grouter/grouteritems.h index 667865c64..be3df1f15 100644 --- a/libretroshare/src/grouter/grouteritems.h +++ b/libretroshare/src/grouter/grouteritems.h @@ -48,9 +48,9 @@ const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // it const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated = 0x87 ; // deprecated. Don't use. const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated2 = 0x88 ; // item to save routing info -const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x89 ; // deprecated. Don't use. +const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x90 ; // deprecated. Don't use. -const uint8_t QOS_PRIORITY_RS_GROUTER = 3 ; // relevant for items that travel through friends +const uint8_t QOS_PRIORITY_RS_GROUTER = 4 ; // relevant for items that travel through friends /***********************************************************************************/ diff --git a/libretroshare/src/grouter/groutertypes.h b/libretroshare/src/grouter/groutertypes.h index eaac82254..b6b070540 100644 --- a/libretroshare/src/grouter/groutertypes.h +++ b/libretroshare/src/grouter/groutertypes.h @@ -61,6 +61,7 @@ static const uint32_t RS_GROUTER_DATA_STATUS_PENDING = 0x0001 ; // item is static const uint32_t RS_GROUTER_DATA_STATUS_SENT = 0x0002 ; // item is sent to tunnel or friend. No need to keep sending. static const uint32_t RS_GROUTER_DATA_STATUS_RECEIPT_OK = 0x0003 ; // item is at destination. static const uint32_t RS_GROUTER_DATA_STATUS_ONGOING = 0x0004 ; // transaction is ongoing. +static const uint32_t RS_GROUTER_DATA_STATUS_DONE = 0x0005 ; // receipt item has been forward to all routes. We can remove the item. static const uint32_t RS_GROUTER_SENDING_STATUS_TUNNEL = 0x0001 ; // item was sent in a tunnel static const uint32_t RS_GROUTER_SENDING_STATUS_FRIEND = 0x0002 ; // item was sent to a friend @@ -104,15 +105,15 @@ public: GRouterServiceId client_id ; // service ID of the client. Only valid when origin==OwnId TurtleFileHash tunnel_hash ; // tunnel hash to be used for this item + uint32_t routing_flags ; RsGRouterGenericDataItem *data_item ; RsGRouterSignedReceiptItem *receipt_item ; - std::set incoming_routes ; + RsTlvPeerIdSet incoming_routes ; // non serialised data - uint32_t routing_flags ; time_t data_transaction_TS ; static const uint32_t ROUTING_FLAGS_ALLOW_TUNNELS = 0x0001; diff --git a/libretroshare/src/grouter/p3grouter.cc b/libretroshare/src/grouter/p3grouter.cc index 96046a871..d0fbb634a 100644 --- a/libretroshare/src/grouter/p3grouter.cc +++ b/libretroshare/src/grouter/p3grouter.cc @@ -254,7 +254,7 @@ #include "grouterclientservice.h" /**********************/ -//#define GROUTER_DEBUG +#define GROUTER_DEBUG /**********************/ static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response. @@ -477,12 +477,14 @@ void p3GRouter::handleLowLevelTransactionChunkItem(RsGRouterTransactionChunkItem std::cerr << " item is a transaction item." << std::endl; #endif + RsPeerId pid = chunk_item->PeerId() ; + RsGRouterAbstractMsgItem *generic_item = NULL; { RS_STACK_MUTEX(grMtx) ; - generic_item = _incoming_data_pipes[chunk_item->PeerId()].addDataChunk(chunk_item) ; // addDataChunk takes ownership over chunk_item - generic_item->PeerId(chunk_item->PeerId()) ; + generic_item = _incoming_data_pipes[pid].addDataChunk(dynamic_cast(chunk_item->duplicate())) ;// addDataChunk takes ownership over chunk_item + generic_item->PeerId(pid) ; } // send to client off-mutex @@ -497,7 +499,7 @@ void p3GRouter::handleLowLevelTransactionChunkItem(RsGRouterTransactionChunkItem RsGRouterTransactionAcknItem ackn_item ; ackn_item.propagation_id = generic_item->routing_id ; - locked_sendTransactionData(chunk_item->PeerId(),ackn_item) ; + locked_sendTransactionData(pid,ackn_item) ; { RS_STACK_MUTEX(grMtx) ; @@ -526,8 +528,10 @@ void p3GRouter::handleLowLevelTransactionAckItem(RsGRouterTransactionAcknItem *t std::cerr << " setting new status as sent/awaiting receipt." << std::endl; #endif } +#ifdef GROUTER_DEBUG else - std::cerr << " ERROR: no routing ID corresponds to this ACK item. Inconsistency!" << std::endl; + std::cerr << " Note: no routing ID corresponds to this ACK item. This probably corresponds to a signed receipt" << std::endl; +#endif } void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction) @@ -723,13 +727,13 @@ void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtual #endif _tunnels.erase(it) ; } -// #ifdef GROUTER_DEBUG -// std::cerr << " setting tunnel status in pending message." << std::endl; -// #endif -// -// for(std::map::iterator it2(_pending_messages.begin());it2!=_pending_messages.end();++it2) -// if(it2->second.tunnel_hash == hash && it->second.virtual_peers.empty()) -// it2->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ; +#ifdef GROUTER_DEBUG + std::cerr << " setting tunnel status in pending message." << std::endl; +#endif + + for(std::map::iterator it2(_pending_messages.begin());it2!=_pending_messages.end();++it2) + if(it2->second.tunnel_hash == hash && it->second.virtual_peers.empty()) + it2->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ; } void p3GRouter::connectToTurtleRouter(p3turtle *pt) @@ -862,12 +866,26 @@ if(!_pending_messages.empty()) else std::cerr << " doing nothing." << std::endl; #endif + + // also check that all tunnels are actually active, to remove any old dead tunnels + + if(it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY) + { + std::map::iterator it2 = _tunnels.find(it->second.tunnel_hash) ; + + if(it2 == _tunnels.end() || it2->second.virtual_peers.empty()) ; + { + std::cerr << " re-setting tunnel status to PENDING, as no tunnels are actually present." << std::endl; + it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ; + } + } } #ifdef GROUTER_DEBUG if(!priority_list.empty()) grouter_debug() << " sorting..." << std::endl; #endif + std::sort(priority_list.begin(),priority_list.end(),item_comparator_001()) ; // take tunnels from item priority list, and enable tunnel handling, while respecting max number of active tunnels limit @@ -919,13 +937,14 @@ void p3GRouter::routePendingObjects() std::list peers ; + // For now, disable tunnels. We'll first check that the good old tunnel system works as before. if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS) - locked_collectAvailableTunnels(it->second.tunnel_hash,peers); + locked_collectAvailableTunnels(it->second.tunnel_hash,peers); // For now, disable friends. We'll first check that the good old tunnel system works as before. - // - // if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS) - // locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN); + + // if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS) + // locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN); if(peers.empty()) { @@ -942,9 +961,9 @@ void p3GRouter::routePendingObjects() if(!peers.empty()) std::cerr << " sending to peers:" << std::endl; #endif - for(std::list::const_iterator itpid(peers.begin());itpid!=peers.end();++itpid) - for(std::list::const_iterator it2(chunks.begin());it2!=chunks.end();++it2) - locked_sendTransactionData(*itpid,*(*it2) ) ; + for(std::list::const_iterator itpid(peers.begin());itpid!=peers.end();++itpid) + for(std::list::const_iterator it2(chunks.begin());it2!=chunks.end();++it2) + locked_sendTransactionData(*itpid,*(*it2) ) ; // delete temporary items @@ -956,12 +975,48 @@ void p3GRouter::routePendingObjects() it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ; it->second.data_transaction_TS = now ; ; } - else if(it->second.data_status == RS_GROUTER_DATA_STATUS_ONGOING && now > MAX_TRANSACTION_ACK_WAITING_TIME + it->second.data_transaction_TS) - { - std::cerr << " waited too long for this transation. Switching back to PENDING." << std::endl; + else if(it->second.data_status == RS_GROUTER_DATA_STATUS_ONGOING && now > MAX_TRANSACTION_ACK_WAITING_TIME + it->second.data_transaction_TS) + { + std::cerr << " waited too long for this transation. Switching back to PENDING." << std::endl; - it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ; - } + it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ; + } + else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK) + { + // send the receipt through all incoming routes, as soon as it gets delivered. + + std::cerr << " receipt should be sent back. Trying all incoming routes..." << std::endl; + + std::list chunks ; + sliceDataItem(it->second.receipt_item,chunks) ; + + for(std::list::iterator it2=it->second.incoming_routes.ids.begin();it2!=it->second.incoming_routes.ids.end();) + if(mServiceControl->isPeerConnected(getServiceInfo().mServiceType,*it2)) + { + std::cerr << " sending receipt back to " << *it2 << " which is online." << std::endl; + + for(std::list::const_iterator it3(chunks.begin());it3!=chunks.end();++it3) + locked_sendTransactionData(*it2,*(*it3) ) ; + + // then remove from the set. + std::list::iterator it2tmp = it2 ; + ++it2tmp ; + it->second.incoming_routes.ids.erase(it2) ; + it2 = it2tmp ; + } + else + ++it2 ; + + for(std::list::const_iterator cit=chunks.begin();cit!=chunks.end();++cit) + delete *cit; + + // Because signed receipts are small items, we take the bet that if the item could be sent, then it was received. + // otherwise, we should mark that incomng route as being handled, wait for the ACK and deal with it by updating + // it->second.data_status at that time. + + if(it->second.incoming_routes.ids.empty()) + it->second.data_status = RS_GROUTER_DATA_STATUS_DONE ; + } } } @@ -1087,6 +1142,7 @@ bool p3GRouter::locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTr std::cerr << " sending to pid " << pid << std::endl; #endif RsGRouterTransactionItem *item_copy = trans_item.duplicate() ; + item_copy->PeerId(pid) ; sendItem(item_copy) ; @@ -1137,15 +1193,32 @@ void p3GRouter::autoWash() } else ++it ; + + // also check all existing tunnels + + for(std::map::iterator it = _tunnels.begin();it!=_tunnels.end();++it) + { + std::list vpids_to_remove ; + for(std::set::iterator it2 = it->second.virtual_peers.begin();it2!=it->second.virtual_peers.end();++it2) + if(!mTurtle->isTurtlePeer(*it2)) + { + vpids_to_remove.push_back(*it2) ; + std::cerr << " " << *it2 << " is not an active tunnel for hash " << it->first << ". Removing virtual peer id." << std::endl; + } + + for(std::list::const_iterator it2=vpids_to_remove.begin();it2!=vpids_to_remove.end();++it2) + it->second.removeVirtualPeer(*it2) ; + } + } // look into pending items. - -#ifdef GROUTER_DEBUG - grouter_debug() << " Pending messages to route : " << _pending_messages.size() << std::endl; -#endif - +#warning move the notification for received messages in the handlign function of signed receipts. Keep the notification for unsent here. for(std::map >::const_iterator it(notified_msgs.begin());it!=notified_msgs.end();++it) + { + std::cerr << " notifying client for message id " << std::hex << it->first << " state = " << it->second.second << std::endl; it->second.first->notifyDataStatus(it->first, it->second.second) ; + } +#warning should we also clean incoming data pipes? if(items_deleted) _changed = true ; @@ -1159,7 +1232,7 @@ bool p3GRouter::sliceDataItem(RsGRouterAbstractMsgItem *item,std::listprint(std::cerr, 2) ; #endif @@ -1362,7 +1435,10 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item) } std::cerr << " storing incoming route: from " << data_item->PeerId() << std::endl; - _pending_messages[data_item->routing_id].incoming_routes.insert(data_item->PeerId()) ; + +#warning we should make sure there's no duplicates. Possibly turn RsTlvIdSet.ids into a std::set! + if(!mTurtle->isTurtlePeer(data_item->PeerId())) + _pending_messages[data_item->routing_id].incoming_routes.ids.push_back(data_item->PeerId()) ; return ; } @@ -1439,11 +1515,11 @@ void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item) if(ok) { #ifdef GROUTER_DEBUG - std::cerr << " sent signed receipt in tunnel " << data_item->PeerId() << std::endl; + std::cerr << " sent signed receipt to " << data_item->PeerId() << std::endl; #endif } else - std::cerr << " sending signed receipt in tunnel " << data_item->PeerId() << ": FAILED." << std::endl; + std::cerr << " sending signed receipt to " << data_item->PeerId() << ": FAILED." << std::endl; } bool p3GRouter::locked_getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id) @@ -1737,7 +1813,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie info.last_friend_sent_TS = 0 ; info.last_tunnel_request_TS = 0 ; info.sending_attempts = 0 ; - info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS + info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN | GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS | GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ; info.received_time_TS = now ; @@ -1958,6 +2034,7 @@ void p3GRouter::debugDump() grouter_debug() << " Received : " << now - it->second.received_time_TS << " secs ago."; grouter_debug() << " Last tunnel sent: " << now - it->second.last_tunnel_sent_TS << " secs ago."; grouter_debug() << " Last friend sent: " << now - it->second.last_friend_sent_TS << " secs ago."; + grouter_debug() << " Transaction TS : " << now - it->second.data_transaction_TS << " secs ago."; grouter_debug() << " Data Status : " << statusString[it->second.data_status] << std::endl; grouter_debug() << " Tunl Status : " << statusString[it->second.tunnel_status] << std::endl; grouter_debug() << " Receipt ok : " << (it->second.receipt_item != NULL) << std::endl; @@ -1983,8 +2060,8 @@ void p3GRouter::debugDump() grouter_debug() << " Routing matrix: " << std::endl; -// if(_debug_enabled) -// _routing_matrix.debugDump() ; + if(_debug_enabled) + _routing_matrix.debugDump() ; }