diff --git a/libretroshare/src/grouter/grouteritems.cc b/libretroshare/src/grouter/grouteritems.cc index 77b27629e..2ec16aba9 100644 --- a/libretroshare/src/grouter/grouteritems.cc +++ b/libretroshare/src/grouter/grouteritems.cc @@ -128,7 +128,8 @@ RsGRouterGenericDataItem *RsGRouterSerialiser::deserialise_RsGRouterGenericDataI ok &= getRawUInt64(data, pktsize, &offset, &item->routing_id); ok &= item->destination_key.deserialise(data, pktsize, offset) ; - ok &= getRawUInt32(data, pktsize, &offset, &item->data_size); + ok &= getRawUInt32(data, pktsize, &offset, &item->service_id); + ok &= getRawUInt32(data, pktsize, &offset, &item->data_size); if( NULL == (item->data_bytes = (uint8_t*)malloc(item->data_size))) { @@ -173,6 +174,7 @@ RsGRouterSignedReceiptItem *RsGRouterSerialiser::deserialise_RsGRouterSignedRece ok &= getRawUInt64(data, pktsize, &offset, &item->routing_id); ok &= getRawUInt32(data, pktsize, &offset, &item->flags); ok &= item->destination_key.deserialise(data, pktsize, offset); + ok &= getRawUInt32(data, pktsize, &offset, &item->service_id); ok &= item->data_hash.deserialise(data, pktsize, offset); ok &= item->signature.GetTlv(data, pktsize, &offset); // signature @@ -201,7 +203,8 @@ RsGRouterRoutingInfoItem *RsGRouterSerialiser::deserialise_RsGRouterRoutingInfoI ok &= getRawUInt32(data, pktsize, &offset, &item->data_status); ok &= getRawUInt32(data, pktsize, &offset, &item->tunnel_status); ok &= getRawTimeT(data, pktsize, &offset, item->received_time_TS); - ok &= getRawTimeT(data, pktsize, &offset, item->last_sent_TS); + ok &= getRawTimeT(data, pktsize, &offset, item->last_tunnel_sent_TS); + ok &= getRawTimeT(data, pktsize, &offset, item->last_friend_sent_TS); ok &= getRawTimeT(data, pktsize, &offset, item->last_tunnel_request_TS); ok &= getRawUInt32(data, pktsize, &offset, &item->sending_attempts); @@ -330,6 +333,7 @@ uint32_t RsGRouterGenericDataItem::serial_size() const s += sizeof(GRouterMsgPropagationId) ; // routing id s += destination_key.serial_size() ; // destination_key s += 4 ; // data_size + s += 4 ; // service id s += data_size ; // data s += signature.TlvSize() ; // signature s += 4 ; // randomized distance @@ -343,6 +347,7 @@ uint32_t RsGRouterGenericDataItem::signed_data_size() const s += sizeof(GRouterMsgPropagationId) ; // routing id s += destination_key.serial_size() ; // destination_key s += 4 ; // data_size + s += 4 ; // service id s += data_size ; // data return s ; @@ -354,6 +359,7 @@ uint32_t RsGRouterSignedReceiptItem::serial_size() const s += destination_key.serial_size() ; // destination_key s += data_hash.serial_size() ; s += 4 ; // state + s += 4 ; // service_id s += signature.TlvSize() ; // signature return s ; @@ -364,6 +370,7 @@ uint32_t RsGRouterSignedReceiptItem::signed_data_size() const s += sizeof(GRouterMsgPropagationId) ; // routing id s += destination_key.serial_size() ; // destination_key s += data_hash.serial_size() ; + s += 4 ; // service_id s += 4 ; // state return s ; @@ -422,6 +429,7 @@ bool RsGRouterGenericDataItem::serialise(void *data,uint32_t& size) const /* add mandatory parts first */ ok &= setRawUInt64(data, tlvsize, &offset, routing_id); ok &= destination_key.serialise(data, tlvsize, offset) ; + ok &= setRawUInt32(data, tlvsize, &offset, service_id); ok &= setRawUInt32(data, tlvsize, &offset, data_size); memcpy(&((uint8_t*)data)[offset],data_bytes,data_size) ; @@ -469,6 +477,7 @@ bool RsGRouterGenericDataItem::serialise_signed_data(void *data,uint32_t& size) /* add mandatory parts first */ ok &= setRawUInt64(data, tlvsize, &offset, routing_id); ok &= destination_key.serialise(data, tlvsize, offset) ; + ok &= setRawUInt32(data, tlvsize, &offset, service_id); ok &= setRawUInt32(data, tlvsize, &offset, data_size); memcpy(&((uint8_t*)data)[offset],data_bytes,data_size) ; @@ -494,6 +503,7 @@ bool RsGRouterSignedReceiptItem::serialise(void *data,uint32_t& size) const ok &= setRawUInt64(data, tlvsize, &offset, routing_id); ok &= setRawUInt32(data, tlvsize, &offset, flags); ok &= destination_key.serialise(data,tlvsize,offset) ; + ok &= setRawUInt32(data, tlvsize, &offset, service_id); ok &= data_hash.serialise(data,tlvsize,offset) ; ok &= signature.SetTlv(data,tlvsize,&offset) ; @@ -516,6 +526,7 @@ bool RsGRouterSignedReceiptItem::serialise_signed_data(void *data,uint32_t& size ok &= setRawUInt64(data, tlvsize, &offset, routing_id); ok &= setRawUInt32(data, tlvsize, &offset, flags); ok &= destination_key.serialise(data,tlvsize,offset) ; + ok &= setRawUInt32(data, tlvsize, &offset, service_id); ok &= data_hash.serialise(data,tlvsize,offset) ; if (offset != tlvsize) @@ -553,7 +564,8 @@ uint32_t RsGRouterRoutingInfoItem::serial_size() const s += 4 ; // data status_flags s += 4 ; // tunnel status_flags s += 8 ; // received_time - s += 8 ; // last_sent + s += 8 ; // last_tunnel_sent_TS + s += 8 ; // last_friend_sent_TS s += 8 ; // last_TR_TS s += 4 ; // sending attempts @@ -649,7 +661,8 @@ bool RsGRouterRoutingInfoItem::serialise(void *data,uint32_t& size) const ok &= setRawUInt32(data, tlvsize, &offset, data_status) ; ok &= setRawUInt32(data, tlvsize, &offset, tunnel_status) ; ok &= setRawTimeT(data, tlvsize, &offset, received_time_TS) ; - ok &= setRawTimeT(data, tlvsize, &offset, last_sent_TS) ; + ok &= setRawTimeT(data, tlvsize, &offset, last_tunnel_sent_TS) ; + ok &= setRawTimeT(data, tlvsize, &offset, last_friend_sent_TS) ; ok &= setRawTimeT(data, tlvsize, &offset, last_tunnel_request_TS) ; ok &= setRawUInt32(data, tlvsize, &offset, sending_attempts) ; @@ -712,7 +725,8 @@ std::ostream& RsGRouterRoutingInfoItem::print(std::ostream& o, uint16_t) o << " data status: "<< std::hex<< data_status << std::dec << std::endl ; o << " tunnel status: "<< tunnel_status << std::endl ; o << " recv time: "<< received_time_TS << std::endl ; - o << " Last sent: "<< last_sent_TS << std::endl ; + o << " Last tunnel sent: "<< last_tunnel_sent_TS << std::endl ; + o << " Last friend sent: "<< last_friend_sent_TS << std::endl ; o << " Sending attempts:"<< sending_attempts << std::endl ; o << " destination key: "<< data_item->destination_key << std::endl ; o << " Client id: "<< client_id << std::endl ; diff --git a/libretroshare/src/grouter/grouteritems.h b/libretroshare/src/grouter/grouteritems.h index 66bf02696..667865c64 100644 --- a/libretroshare/src/grouter/grouteritems.h +++ b/libretroshare/src/grouter/grouteritems.h @@ -31,23 +31,26 @@ #include "retroshare/rstypes.h" #include "retroshare/rsgrouter.h" -#include "p3grouter.h" +#include "groutermatrix.h" -const uint8_t RS_PKT_SUBTYPE_GROUTER_PUBLISH_KEY = 0x01 ; // used to publish a key -const uint8_t RS_PKT_SUBTYPE_GROUTER_ACK_deprecated = 0x03 ; // dont use! -const uint8_t RS_PKT_SUBTYPE_GROUTER_SIGNED_RECEIPT = 0x04 ; // long-distance acknowledgement of data received -const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA_deprecated = 0x05 ; // dont use! -const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA = 0x06 ; // used to send data to a destination (Signed by source) +const uint8_t RS_PKT_SUBTYPE_GROUTER_PUBLISH_KEY = 0x01 ; // used to publish a key +const uint8_t RS_PKT_SUBTYPE_GROUTER_ACK_deprecated = 0x03 ; // don't use! +const uint8_t RS_PKT_SUBTYPE_GROUTER_SIGNED_RECEIPT_deprecated = 0x04 ; // don't use! +const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA_deprecated = 0x05 ; // don't use! +const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA_deprecated2 = 0x06 ; // don't use! +const uint8_t RS_PKT_SUBTYPE_GROUTER_DATA = 0x07 ; // used to send data to a destination (Signed by source) +const uint8_t RS_PKT_SUBTYPE_GROUTER_SIGNED_RECEIPT = 0x08 ; // long-distance acknowledgement of data received -const uint8_t RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK = 0x10 ; // chunk of data. Used internally. -const uint8_t RS_PKT_SUBTYPE_GROUTER_TRANSACTION_ACKN = 0x11 ; // acknowledge for finished transaction. Not necessary, but increases fiability. +const uint8_t RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK = 0x10 ; // chunk of data. Used internally. +const uint8_t RS_PKT_SUBTYPE_GROUTER_TRANSACTION_ACKN = 0x11 ; // acknowledge for finished transaction. Not necessary, but increases fiability. -const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // item to save matrix clues -const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists -const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated = 0x87 ; // deprecated. Don't use. -const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x89 ; // item to save routing info +const uint8_t RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES = 0x80 ; // item to save matrix clues +const uint8_t RS_PKT_SUBTYPE_GROUTER_FRIENDS_LIST = 0x82 ; // item to save friend lists +const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated = 0x87 ; // deprecated. Don't use. +const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO_deprecated2 = 0x88 ; // item to save routing info +const uint8_t RS_PKT_SUBTYPE_GROUTER_ROUTING_INFO = 0x89 ; // deprecated. Don't use. -const uint8_t QOS_PRIORITY_RS_GROUTER = 3 ; // irrelevant since all items travel through tunnels +const uint8_t QOS_PRIORITY_RS_GROUTER = 3 ; // relevant for items that travel through friends /***********************************************************************************/ @@ -59,6 +62,8 @@ class RsGRouterItem: public RsItem public: RsGRouterItem(uint8_t grouter_subtype) : RsItem(RS_PKT_VERSION_SERVICE,RS_SERVICE_TYPE_GROUTER,grouter_subtype) {} + virtual ~RsGRouterItem() {} + virtual bool serialise(void *data,uint32_t& size) const = 0 ; virtual uint32_t serial_size() const = 0 ; @@ -94,12 +99,14 @@ class RsGRouterAbstractMsgItem: public RsGRouterItem { public: RsGRouterAbstractMsgItem(uint8_t pkt_subtype) : RsGRouterItem(pkt_subtype) {} + virtual ~RsGRouterAbstractMsgItem() {} virtual uint32_t signed_data_size() const = 0 ; virtual bool serialise_signed_data(void *data,uint32_t& size) const = 0 ; GRouterMsgPropagationId routing_id ; GRouterKeyId destination_key ; + GRouterServiceId service_id ; RsTlvKeySignature signature ; // signs mid+destination_key+state uint32_t flags ; // packet was delivered, not delivered, bounced, etc }; @@ -138,6 +145,7 @@ class RsGRouterSignedReceiptItem: public RsGRouterAbstractMsgItem { public: RsGRouterSignedReceiptItem() : RsGRouterAbstractMsgItem(RS_PKT_SUBTYPE_GROUTER_SIGNED_RECEIPT) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; } + virtual ~RsGRouterSignedReceiptItem() {} virtual bool serialise(void *data,uint32_t& size) const ; virtual uint32_t serial_size() const ; @@ -158,10 +166,26 @@ class RsGRouterSignedReceiptItem: public RsGRouterAbstractMsgItem // Low-level data items -class RsGRouterTransactionChunkItem: public RsGRouterItem, public RsGRouterNonCopyableObject +class RsGRouterTransactionItem: public RsGRouterItem { public: - RsGRouterTransactionChunkItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; } + RsGRouterTransactionItem(uint8_t pkt_subtype) : RsGRouterItem(pkt_subtype) {} + + virtual ~RsGRouterTransactionItem() {} + + virtual bool serialise(void *data,uint32_t& size) const =0; + virtual uint32_t serial_size() const =0; + virtual void clear() =0; + + virtual RsGRouterTransactionItem *duplicate() const = 0 ; +}; + +class RsGRouterTransactionChunkItem: public RsGRouterTransactionItem, public RsGRouterNonCopyableObject +{ + public: + RsGRouterTransactionChunkItem() : RsGRouterTransactionItem(RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; } + + virtual ~RsGRouterTransactionChunkItem() { free(chunk_data) ; } virtual bool serialise(void *data,uint32_t& size) const ; virtual uint32_t serial_size() const ; @@ -169,16 +193,26 @@ class RsGRouterTransactionChunkItem: public RsGRouterItem, public RsGRouterNonCo virtual void clear() {} virtual std::ostream& print(std::ostream &out, uint16_t indent = 0) ; + virtual RsGRouterTransactionItem *duplicate() const + { + RsGRouterTransactionChunkItem *item = new RsGRouterTransactionChunkItem ; + *item = *this ; // copy all fields + item->chunk_data = (uint8_t*)malloc(chunk_size) ; // deep copy memory chunk + memcpy(item->chunk_data,chunk_data,chunk_size) ; + return item ; + } + GRouterMsgPropagationId propagation_id ; uint32_t chunk_start ; uint32_t chunk_size ; uint32_t total_size ; uint8_t *chunk_data ; }; -class RsGRouterTransactionAcknItem: public RsGRouterItem +class RsGRouterTransactionAcknItem: public RsGRouterTransactionItem { public: - RsGRouterTransactionAcknItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_TRANSACTION_ACKN) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; } + RsGRouterTransactionAcknItem() : RsGRouterTransactionItem(RS_PKT_SUBTYPE_GROUTER_TRANSACTION_ACKN) { setPriorityLevel(QOS_PRIORITY_RS_GROUTER) ; } + virtual ~RsGRouterTransactionAcknItem() {} virtual bool serialise(void *data,uint32_t& size) const ; virtual uint32_t serial_size() const ; @@ -186,6 +220,8 @@ class RsGRouterTransactionAcknItem: public RsGRouterItem virtual void clear() {} virtual std::ostream& print(std::ostream &out, uint16_t indent = 0) ; + virtual RsGRouterTransactionItem *duplicate() const { return new RsGRouterTransactionAcknItem(*this) ; } + GRouterMsgPropagationId propagation_id ; }; @@ -194,7 +230,7 @@ class RsGRouterTransactionAcknItem: public RsGRouterItem class RsGRouterMatrixCluesItem: public RsGRouterItem { public: - RsGRouterMatrixCluesItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES) + RsGRouterMatrixCluesItem() : RsGRouterItem(RS_PKT_SUBTYPE_GROUTER_MATRIX_CLUES) { setPriorityLevel(0) ; } // this item is never sent through the network virtual bool serialise(void *data,uint32_t& size) const ; diff --git a/libretroshare/src/grouter/groutertypes.h b/libretroshare/src/grouter/groutertypes.h index f73f63042..eaac82254 100644 --- a/libretroshare/src/grouter/groutertypes.h +++ b/libretroshare/src/grouter/groutertypes.h @@ -58,8 +58,12 @@ static const uint32_t GROUTER_ITEM_MAX_CACHE_KEEP_TIME_DEAD= 3600 ; // DEAD static const uint32_t RS_GROUTER_DATA_STATUS_UNKNOWN = 0x0000 ; // unknown. Unused. static const uint32_t RS_GROUTER_DATA_STATUS_PENDING = 0x0001 ; // item is pending. Should be sent asap. -static const uint32_t RS_GROUTER_DATA_STATUS_SENT = 0x0002 ; // item is sent. Waiting for answer +static const uint32_t RS_GROUTER_DATA_STATUS_SENT = 0x0002 ; // item is sent to tunnel or friend. No need to keep sending. static const uint32_t RS_GROUTER_DATA_STATUS_RECEIPT_OK = 0x0003 ; // item is at destination. +static const uint32_t RS_GROUTER_DATA_STATUS_ONGOING = 0x0004 ; // transaction is ongoing. + +static const uint32_t RS_GROUTER_SENDING_STATUS_TUNNEL = 0x0001 ; // item was sent in a tunnel +static const uint32_t RS_GROUTER_SENDING_STATUS_FRIEND = 0x0002 ; // item was sent to a friend static const uint32_t RS_GROUTER_TUNNEL_STATUS_UNMANAGED = 0x0000 ; // no tunnel requested atm static const uint32_t RS_GROUTER_TUNNEL_STATUS_PENDING = 0x0001 ; // tunnel requested to turtle @@ -89,17 +93,30 @@ public: receipt_item = NULL ; } - uint32_t data_status ; // pending, waiting, etc. + uint32_t data_status ; // pending, waiting, etc. uint32_t tunnel_status ; // status of tunnel handling. + time_t received_time_TS ; // time at which the item was originally received - time_t last_sent_TS ; // last time the item was sent to friends - time_t last_tunnel_request_TS ; // last time tunnels have been asked for this item. + time_t last_tunnel_sent_TS ; // last time the item was sent to friends + time_t last_friend_sent_TS ; // last time the item was sent to friends + time_t last_tunnel_request_TS ; // last time tunnels have been asked for this item. uint32_t sending_attempts ; // number of times tunnels have been asked for this peer without success - GRouterServiceId client_id ; // service ID of the client. Only valid when origin==OwnId - TurtleFileHash tunnel_hash ; // tunnel hash to be used for this item + GRouterServiceId client_id ; // service ID of the client. Only valid when origin==OwnId + TurtleFileHash tunnel_hash ; // tunnel hash to be used for this item RsGRouterGenericDataItem *data_item ; RsGRouterSignedReceiptItem *receipt_item ; + + std::set incoming_routes ; + + // non serialised data + + uint32_t routing_flags ; + time_t data_transaction_TS ; + + static const uint32_t ROUTING_FLAGS_ALLOW_TUNNELS = 0x0001; + static const uint32_t ROUTING_FLAGS_ALLOW_FRIENDS = 0x0002; + static const uint32_t ROUTING_FLAGS_IS_ORIGIN = 0x0003; }; diff --git a/libretroshare/src/grouter/p3grouter.cc b/libretroshare/src/grouter/p3grouter.cc index 45eb7f2db..96046a871 100644 --- a/libretroshare/src/grouter/p3grouter.cc +++ b/libretroshare/src/grouter/p3grouter.cc @@ -177,6 +177,64 @@ // // => we need abstract packets and service ids. // +// Data pipeline +// ============= +// +// sendData() +// | +// +--> encrypt/sign ---> store in _pending_messages +// +// receiveTurtleData() +// | +// +-------------------------------------------------+ +// tick() | +// | | +// +--> HandleLowLevelServiceItems() | +// | | | +// | +--> handleLowLevelServiceItem(item) <------------+ +// | | +// | +--> handleIncomingTransactionAckItem() +// | | +// | +--> handleIncomingTransactionChunkItem() +// | | +// | +---> addDataChunk() +// | | +// | +---> push item to _incoming_items list +// | +// +--> handleIncoming() +// | | +// | +---> handleIncomingReceiptItem(GRouterSignedReceiptItem*) +// | | | +// | +---> handleIncomingDataItem(GRouterDataItem*) | +// | | | +// | +----------------------------+ +// | | +// | | +// | [for US?] --------------------+-----> verifySignedData() +// | | | +// | +-----> notifyClient() +// | | | +// | | +-----> send Receipt item ---+ +// | | | +// | +----> Store In _pending_messages | +// | | +// +--> routePendingObjects() | +// | | | +// | +--> locked_collectAvailablePeers()/locked_collectAvailableTunnels() | +// | | | +// | +--> sliceDataItem() | +// | | | +// | +--> locked_sendTransactionData() <-----------------------------------------------+ +// | | +// | +--> mTurtle->sendTurtleData(virtual_pid,turtle_item) / sendItem() +// | +// +--> handleTunnels() +// | | +// | +---> mTurtle->stopMonitoringTunnels(hash) ; +// | +---> mTurtle->monitoringTunnels(hash) ; +// | +// +--> autoWash() +// //////////////////////////////////////////////////////////////////////////////////////////////////////////// #include @@ -199,12 +257,13 @@ //#define GROUTER_DEBUG /**********************/ -static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response. -static const uint32_t MAX_TUNNEL_UNMANAGED_TIME = 600 ; // min time before retry tunnels for that msg. -static const uint32_t MAX_DELAY_BETWEEN_TWO_SEND = 120 ; // wait for 120 seconds before re-sending. -static const uint32_t TUNNEL_OK_WAIT_TIME = 10 ; // wait for 10 seconds after last tunnel ok, so that we have a complete set of tunnels. -static const uint32_t MAX_GROUTER_DATA_SIZE = 2*1024*1024 ; // 2MB size limit. This is of course arbitrary. -static const uint32_t MAX_RECEIPT_WAIT_TIME = 20 ; // wait for at most 20 secs for a receipt. If not, cancel. +static const uint32_t MAX_TUNNEL_WAIT_TIME = 60 ; // wait for 60 seconds at most for a tunnel response. +static const uint32_t MAX_TUNNEL_UNMANAGED_TIME = 600 ; // min time before retry tunnels for that msg. +static const uint32_t MAX_DELAY_BETWEEN_TWO_SEND = 120 ; // wait for 120 seconds before re-sending. +static const uint32_t TUNNEL_OK_WAIT_TIME = 3 ; // wait for 3 seconds after last tunnel ok, so that we have a complete set of tunnels. +static const uint32_t MAX_GROUTER_DATA_SIZE = 2*1024*1024 ; // 2MB size limit. This is of course arbitrary. +static const uint32_t MAX_RECEIPT_WAIT_TIME = 20 ; // wait for at most 20 secs for a receipt. If not, cancel. +static const uint32_t MAX_TRANSACTION_ACK_WAITING_TIME = 60 ; // wait for at most 60 secs for a ACK. If not restart the transaction. const std::string p3GRouter::SERVICE_INFO_APP_NAME = "Global Router" ; @@ -226,50 +285,66 @@ p3GRouter::p3GRouter(p3ServiceControl *sc, RsGixs *is) int p3GRouter::tick() { - time_t now = time(NULL) ; - routePendingObjects() ; + time_t now = time(NULL) ; + + // Sort incoming service data + // + handleLowLevelServiceItems() ; + + // Handle high level global router data + // + handleIncoming() ; + + // Take each item in the list of pending messages and receipts. If the destination peer is available + // or if the tunnel is available, the item will be sent there. + // + routePendingObjects() ; + + // clean things up. Remove unused requests, old stuff etc. + + autoWash() ; // Go through the list of active tunnel requests and pending objects to ask for new tunnels // or close existing tunnel requests. // handleTunnels() ; - // Update routing matrix - // - if(now > _last_matrix_update_time + RS_GROUTER_MATRIX_UPDATE_PERIOD) - { - RsStackMutex mtx(grMtx) ; + // Update routing matrix + // + if(now > _last_matrix_update_time + RS_GROUTER_MATRIX_UPDATE_PERIOD) + { + RsStackMutex mtx(grMtx) ; - _last_matrix_update_time = now ; - _routing_matrix.updateRoutingProbabilities() ; // This should be locked. - } + _last_matrix_update_time = now ; + _routing_matrix.updateRoutingProbabilities() ; // This should be locked. + } #ifdef GROUTER_DEBUG - // Debug dump everything - // - if(now > _last_debug_output_time + RS_GROUTER_DEBUG_OUTPUT_PERIOD) - { - _last_debug_output_time = now ; - if(_debug_enabled) - debugDump() ; - } + // Debug dump everything + // + if(now > _last_debug_output_time + RS_GROUTER_DEBUG_OUTPUT_PERIOD) + { + _last_debug_output_time = now ; + if(_debug_enabled) + debugDump() ; + } #endif - // If content has changed, save config, at most every RS_GROUTER_MIN_CONFIG_SAVE_PERIOD seconds appart - // Otherwise, always save at least every RS_GROUTER_MAX_CONFIG_SAVE_PERIOD seconds - // - if(_changed && now > _last_config_changed + RS_GROUTER_MIN_CONFIG_SAVE_PERIOD) - { + // If content has changed, save config, at most every RS_GROUTER_MIN_CONFIG_SAVE_PERIOD seconds appart + // Otherwise, always save at least every RS_GROUTER_MAX_CONFIG_SAVE_PERIOD seconds + // + if(_changed && now > _last_config_changed + RS_GROUTER_MIN_CONFIG_SAVE_PERIOD) + { #ifdef GROUTER_DEBUG - grouter_debug() << "p3GRouter::tick(): triggering config save." << std::endl; + grouter_debug() << "p3GRouter::tick(): triggering config save." << std::endl; #endif - _changed = false ; - _last_config_changed = now ; - IndicateConfigChanged() ; - } + _changed = false ; + _last_config_changed = now ; + IndicateConfigChanged() ; + } - return 0 ; + return 0 ; } time_t p3GRouter::computeNextTimeDelay(time_t stored_time) @@ -347,6 +422,34 @@ bool p3GRouter::unregisterKey(const RsGxsId& key_id,const GRouterServiceId& sid) return true ; } +//===========================================================================================================================// +// Service data handling // +//===========================================================================================================================// + +void p3GRouter::handleLowLevelServiceItems() +{ + // While messages read + // + RsItem *item = NULL; + + while(NULL != (item = recvItem())) + handleLowLevelServiceItem(dynamic_cast(item)) ; +} + +void p3GRouter::handleLowLevelServiceItem(RsGRouterTransactionItem *item) +{ + switch(item->PacketSubType()) + { + case RS_PKT_SUBTYPE_GROUTER_TRANSACTION_ACKN: handleLowLevelTransactionAckItem(dynamic_cast(item)) ; + break ; + case RS_PKT_SUBTYPE_GROUTER_TRANSACTION_CHUNK: handleLowLevelTransactionChunkItem(dynamic_cast(item)) ; + break ; + default: + std::cerr << "p3GRouter::handleIncoming: Unknown packet subtype " << item->PacketSubType() << std::endl ; + } + delete item; +} + //===========================================================================================================================// // Turtle management // //===========================================================================================================================// @@ -367,6 +470,66 @@ bool p3GRouter::handleTunnelRequest(const RsFileHash& hash,const RsPeerId& /*pee #endif return true ; } + +void p3GRouter::handleLowLevelTransactionChunkItem(RsGRouterTransactionChunkItem *chunk_item) +{ +#ifdef GROUTER_DEBUG + std::cerr << " item is a transaction item." << std::endl; +#endif + + RsGRouterAbstractMsgItem *generic_item = NULL; + { + RS_STACK_MUTEX(grMtx) ; + + generic_item = _incoming_data_pipes[chunk_item->PeerId()].addDataChunk(chunk_item) ; // addDataChunk takes ownership over chunk_item + generic_item->PeerId(chunk_item->PeerId()) ; + } + + // send to client off-mutex + + if(generic_item == NULL) + return ; + +#ifdef GROUTER_DEBUG + std::cerr << " transaction is finished. Passing newly created item to client." << std::endl; + std::cerr << " sending a ACK item" << std::endl; +#endif + + RsGRouterTransactionAcknItem ackn_item ; + ackn_item.propagation_id = generic_item->routing_id ; + locked_sendTransactionData(chunk_item->PeerId(),ackn_item) ; + + { + RS_STACK_MUTEX(grMtx) ; + _incoming_items.push_back(generic_item) ; + } +} + +void p3GRouter::handleLowLevelTransactionAckItem(RsGRouterTransactionAcknItem *trans_ack_item) +{ +#ifdef GROUTER_DEBUG + std::cerr << " item is a transaction ACK." << std::endl; +#endif + RS_STACK_MUTEX(grMtx) ; + + std::map::iterator it=_pending_messages.find(trans_ack_item->propagation_id) ; + + if(it != _pending_messages.end()) + { + it->second.data_status = RS_GROUTER_DATA_STATUS_SENT; + + if(mTurtle->isTurtlePeer(trans_ack_item->PeerId())) + it->second.last_tunnel_sent_TS = time(NULL) ; + else + it->second.last_friend_sent_TS = time(NULL) ; +#ifdef GROUTER_DEBUG + std::cerr << " setting new status as sent/awaiting receipt." << std::endl; +#endif + } + else + std::cerr << " ERROR: no routing ID corresponds to this ACK item. Inconsistency!" << std::endl; +} + void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileHash& hash,const RsPeerId& virtual_peer_id,RsTurtleGenericTunnelItem::Direction direction) { #ifdef GROUTER_DEBUG @@ -393,97 +556,33 @@ void p3GRouter::receiveTurtleData(RsTurtleGenericTunnelItem *gitem,const RsFileH std::cerr << " data bytes : " << RsDirUtil::sha1sum((unsigned char*)item->data_bytes,item->data_size) << std::endl; #endif - RsGRouterAbstractMsgItem *generic_item = NULL ; - // Items come out of the pipe in order. We need to recover all chunks before we de-serialise the content and have it handled by handleIncoming() + RsItem *itm = RsGRouterSerialiser().deserialise(item->data_bytes,&item->data_size) ; + + itm->PeerId(virtual_peer_id) ; + + // At this point we can have either a transaction chunk, or a transaction ACK. + // We handle them both here + + RsGRouterTransactionChunkItem *chunk_item = dynamic_cast(itm) ; + RsGRouterTransactionAcknItem *trans_ack_item = NULL; + + if(chunk_item != NULL) + handleLowLevelTransactionChunkItem(chunk_item) ; + else if(NULL != (trans_ack_item = dynamic_cast(itm))) + handleLowLevelTransactionAckItem(trans_ack_item) ; + else { - RS_STACK_MUTEX(grMtx) ; - std::map::iterator it = _virtual_peers.find(hash) ; - - if(it == _virtual_peers.end()) - { - std::cerr << " ERROR: hash is not known. Cannot receive. Data is dropped." << std::endl; - return ; - } - - RsItem *itm = RsGRouterSerialiser().deserialise(item->data_bytes,&item->data_size) ; - - // At this point we can have either a transaction chunk, or a transaction ACK. - // We handle them both here - - RsGRouterTransactionChunkItem *chunk_item = dynamic_cast(itm) ; - RsGRouterTransactionAcknItem *trans_ack_item = NULL; - - if(chunk_item != NULL) - { -#ifdef GROUTER_DEBUG - std::cerr << " item is a transaction item." << std::endl; -#endif - generic_item = it->second.addDataChunk(virtual_peer_id,chunk_item) ; // addDataChunk takes ownership over chunk_item - } - else if(NULL != (trans_ack_item = dynamic_cast(itm))) - { -#ifdef GROUTER_DEBUG - std::cerr << " item is a transaction ACK." << std::endl; -#endif - - std::map::iterator it=_pending_messages.find(trans_ack_item->propagation_id) ; - - if(it != _pending_messages.end()) - { - it->second.data_status = RS_GROUTER_DATA_STATUS_SENT; -#ifdef GROUTER_DEBUG - std::cerr << " setting new status as sent/awaiting receipt." << std::endl; -#endif - } - else - std::cerr << " ERROR: no routing ID corresponds to this ACK item. Inconsistency!" << std::endl; - } - - else - { - std::cerr << " ERROR: cannot deserialise turtle item." << std::endl; - if(itm) - delete itm ; - } - } - // send to client off-mutex - - if(generic_item != NULL) - { -#ifdef GROUTER_DEBUG - std::cerr << " transaction is finished. Passing newly created item to client." << std::endl; - std::cerr << " sending a ACK item" << std::endl; -#endif - - RsGRouterTransactionAcknItem ackn_item ; - ackn_item.propagation_id = generic_item->routing_id ; - - RsTurtleGenericDataItem *turtle_data_item = new RsTurtleGenericDataItem ; - - turtle_data_item->data_size = ackn_item.serial_size() ; - turtle_data_item->data_bytes = (uint8_t*)malloc(turtle_data_item->data_size) ; - - if(! ackn_item.serialise(turtle_data_item->data_bytes,turtle_data_item->data_size)) - { - std::cerr << " ERROR: Cannot serialise ACKN item." << std::endl; - delete turtle_data_item; - return ; - } - - mTurtle->sendTurtleData(virtual_peer_id,turtle_data_item) ; - - // This is useful to send a receipt in the same tunnel while it's online. - generic_item->PeerId(virtual_peer_id) ; - - handleIncoming(hash,generic_item) ; + std::cerr << " ERROR: cannot deserialise turtle item." << std::endl; + if(itm) + delete itm ; } } void GRouterTunnelInfo::removeVirtualPeer(const TurtleVirtualPeerId& vpid) { - std::map::iterator it = virtual_peers.find(vpid) ; + std::set::iterator it = virtual_peers.find(vpid) ; if(it == virtual_peers.end()) { @@ -491,43 +590,23 @@ void GRouterTunnelInfo::removeVirtualPeer(const TurtleVirtualPeerId& vpid) return ; } - if(it->second != NULL) - { - std::cerr << " WARNING: removing a virtual peer that still holds data. The data will be lost." << std::endl; - delete it->second ; - } - virtual_peers.erase(it) ; } void GRouterTunnelInfo::addVirtualPeer(const TurtleVirtualPeerId& vpid) { - std::map::iterator it = virtual_peers.find(vpid) ; - - if(it != virtual_peers.end()) - { + if(virtual_peers.find(vpid) != virtual_peers.end()) std::cerr << " ERROR: adding a virtual peer that already exist. This is an error!" << std::endl; - delete it->second ; - } - virtual_peers[vpid] = NULL ; + virtual_peers.insert(vpid) ; time_t now = time(NULL) ; if(first_tunnel_ok_TS == 0) first_tunnel_ok_TS = now ; last_tunnel_ok_TS = now ; } -RsGRouterAbstractMsgItem *GRouterTunnelInfo::addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk) +RsGRouterAbstractMsgItem *GRouterDataInfo::addDataChunk(RsGRouterTransactionChunkItem *chunk) { - // find the chunk - std::map::iterator it = virtual_peers.find(vpid) ; - - if(it == virtual_peers.end()) - { - std::cerr << " ERROR: no virtual peer " << vpid << " for chunk received. Dropping." << std::endl; - return NULL; - } - - if(it->second == NULL) + if(incoming_data_buffer == NULL) { if(chunk->chunk_start != 0) { @@ -535,32 +614,33 @@ RsGRouterAbstractMsgItem *GRouterTunnelInfo::addDataChunk(const TurtleVirtualPee delete chunk; return NULL; } - it->second = chunk ; + incoming_data_buffer = chunk ; } else { - if(it->second->chunk_size != chunk->chunk_start || it->second->total_size != chunk->total_size) + if(incoming_data_buffer->chunk_size != chunk->chunk_start || incoming_data_buffer->total_size != chunk->total_size) { std::cerr << " ERROR: chunk numbering is wrong. Dropping." << std::endl; delete chunk ; - delete it->second ; + delete incoming_data_buffer ; + return NULL; } - it->second->chunk_data = (uint8_t*)realloc((uint8_t*)it->second->chunk_data,it->second->chunk_size + chunk->chunk_size) ; - memcpy(&it->second->chunk_data[it->second->chunk_size],chunk->chunk_data,chunk->chunk_size) ; - it->second->chunk_size += chunk->chunk_size ; + incoming_data_buffer->chunk_data = (uint8_t*)realloc((uint8_t*)incoming_data_buffer->chunk_data,incoming_data_buffer->chunk_size + chunk->chunk_size) ; + memcpy(&incoming_data_buffer->chunk_data[incoming_data_buffer->chunk_size],chunk->chunk_data,chunk->chunk_size) ; + incoming_data_buffer->chunk_size += chunk->chunk_size ; delete chunk ; } // if finished, return it. - if(it->second->total_size == it->second->chunk_size) + if(incoming_data_buffer->total_size == incoming_data_buffer->chunk_size) { - RsItem *data_item = RsGRouterSerialiser().deserialise(it->second->chunk_data,&it->second->chunk_size) ; + RsItem *data_item = RsGRouterSerialiser().deserialise(incoming_data_buffer->chunk_data,&incoming_data_buffer->chunk_size) ; - it->second->chunk_data = NULL; - delete it->second ; - it->second= NULL ; + incoming_data_buffer->chunk_data = NULL; + delete incoming_data_buffer; + incoming_data_buffer = NULL ; return dynamic_cast(data_item) ; } @@ -612,23 +692,24 @@ void p3GRouter::addVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPee std::cerr << " adding VPID." << std::endl; #endif - _virtual_peers[hash].addVirtualPeer(virtual_peer_id) ; + _tunnels[hash].addVirtualPeer(virtual_peer_id) ; } + void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id) { - RS_STACK_MUTEX(grMtx) ; + RS_STACK_MUTEX(grMtx) ; #ifdef GROUTER_DEBUG - std::cerr << "p3GRouter::addVirtualPeer(). Received vpid " << virtual_peer_id << " for hash " << hash << std::endl; + std::cerr << "p3GRouter::removeVirtualPeer(). Removing vpid " << virtual_peer_id << " for hash " << hash << std::endl; std::cerr << " removing VPID." << std::endl; #endif // make sure the VPID exists. - std::map::iterator it = _virtual_peers.find(hash) ; + std::map::iterator it = _tunnels.find(hash) ; - if(it == _virtual_peers.end()) + if(it == _tunnels.end()) { std::cerr << " no virtual peers at all for this hash: " << hash << "! This is a consistency error." << std::endl; return ; @@ -640,15 +721,15 @@ void p3GRouter::removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtual #ifdef GROUTER_DEBUG std::cerr << " last virtual peer removed. Also deleting hash entry." << std::endl; #endif - _virtual_peers.erase(it) ; + _tunnels.erase(it) ; } -#ifdef GROUTER_DEBUG - std::cerr << " setting tunnel status in pending message." << std::endl; -#endif - - for(std::map::iterator it2(_pending_messages.begin());it2!=_pending_messages.end();++it2) - if(it2->second.tunnel_hash == hash && it->second.virtual_peers.empty()) - it2->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ; +// #ifdef GROUTER_DEBUG +// std::cerr << " setting tunnel status in pending message." << std::endl; +// #endif +// +// for(std::map::iterator it2(_pending_messages.begin());it2!=_pending_messages.end();++it2) +// if(it2->second.tunnel_hash == hash && it->second.virtual_peers.empty()) +// it2->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ; } void p3GRouter::connectToTurtleRouter(p3turtle *pt) @@ -667,16 +748,23 @@ void p3GRouter::connectToTurtleRouter(p3turtle *pt) // Each GXS id + service might have a collection of virtual peers // -> each hash has possibly multiple virtual peers associated to it. - - -template -static bool operator<(const std::pair& p1,const std::pair& p2) +class item_comparator_001 { - return p1.first < p2.first ; -} +public: + template + bool operator()(const std::pair& p1,const std::pair& p2) const + { + return p1.first < p2.first ; + } +}; void p3GRouter::handleTunnels() { + // This function is responsible for askign for tunnels, and removing requests from the turtle router. + // To remove the unnecessary TR activity generated by multiple peers trying to send the same message, + // only peers which haven't passed on any data to direct friends, or for which the best friends are not online + // will be allowed to monitor tunnels. + // Go through the list of pending messages // - if tunnels are pending for too long => remove from turtle // - if item is waiting for too long => tunnels are waitin @@ -714,11 +802,11 @@ if(!_pending_messages.empty()) { #ifdef GROUTER_DEBUG grouter_debug() << " " << std::hex << it->first << std::dec - << " data_status=" << it->second.data_status << ", tunnel_status=" << it->second.tunnel_status - << " last tried: "<< now - it->second.last_tunnel_request_TS << " (secs ago)" << ", last sent: " << now - it->second.last_sent_TS << " (secs ago) " ; + << " data_status=" << it->second.data_status << ", tunnel_status=" << it->second.tunnel_status + << " last tried: "<< now - it->second.last_tunnel_request_TS << " (secs ago)" << ", last sent: " << now - it->second.last_tunnel_sent_TS << " (secs ago) " ; #endif - if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING) + if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING && (it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS)) { if(it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_UNMANAGED && it->second.last_tunnel_request_TS + MAX_TUNNEL_UNMANAGED_TIME < now) { @@ -751,28 +839,28 @@ if(!_pending_messages.empty()) grouter_debug() << std::endl; #endif } - else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK ) - { + else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK ) + { #ifdef GROUTER_DEBUG std::cerr << " closing pending tunnels." << std::endl; #endif mTurtle->stopMonitoringTunnels(it->second.tunnel_hash) ; - it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ; - } - else if(it->second.data_status == RS_GROUTER_DATA_STATUS_SENT && it->second.last_sent_TS + MAX_RECEIPT_WAIT_TIME < now) - { + it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ; + } + else if(it->second.data_status == RS_GROUTER_DATA_STATUS_SENT && it->second.last_tunnel_sent_TS + MAX_RECEIPT_WAIT_TIME < now) + { #ifdef GROUTER_DEBUG std::cerr << " closing pending tunnels." << std::endl; #endif mTurtle->stopMonitoringTunnels(it->second.tunnel_hash) ; - it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ; - it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ; - } + it->second.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ; + it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ; + } #ifdef GROUTER_DEBUG - else - std::cerr << " doing nothing." << std::endl; + else + std::cerr << " doing nothing." << std::endl; #endif } #ifdef GROUTER_DEBUG @@ -780,7 +868,7 @@ if(!_pending_messages.empty()) grouter_debug() << " sorting..." << std::endl; #endif - std::sort(priority_list.begin(),priority_list.end()) ; + std::sort(priority_list.begin(),priority_list.end(),item_comparator_001()) ; // take tunnels from item priority list, and enable tunnel handling, while respecting max number of active tunnels limit @@ -795,112 +883,261 @@ if(!_pending_messages.empty()) priority_list[i].second->tunnel_status = RS_GROUTER_TUNNEL_STATUS_PENDING ; priority_list[i].second->last_tunnel_request_TS = now ; } + + } void p3GRouter::routePendingObjects() { - // Go throught he list of pending messages. - // For those with a tunnel ready, send the message in the tunnel. + // Go throught he list of pending messages. For those with a peer ready, send the message to that peer. + // The peer might be: + // - a virtual peer id that actually is a tunnel + // - a real friend node + // Tunnels and friends will used whenever available. Of course this might cause a message to arrive multiple times, but we + // don't really care since the GR takes care of duplicates already. + // + // Which tunnels are available is handled by handleTunnels() + // time_t now = time(NULL) ; - std::map > notified_msgs ; - { RS_STACK_MUTEX(grMtx) ; #ifdef GROUTER_DEBUG if(!_pending_messages.empty()) std::cerr << "p3GRouter::routePendingObjects()" << std::endl; #endif - for(std::map::iterator it=_pending_messages.begin();it!=_pending_messages.end();) - if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING && it->second.tunnel_status == RS_GROUTER_TUNNEL_STATUS_READY && now > it->second.last_sent_TS + MAX_DELAY_BETWEEN_TWO_SEND) + for(std::map::iterator it=_pending_messages.begin();it!=_pending_messages.end();++it) + { +#ifdef GROUTER_DEBUG + std::cerr << " message " << std::hex << it->first << std::dec << std::endl; +#endif + if(it->second.data_status == RS_GROUTER_DATA_STATUS_PENDING) { -#ifdef GROUTER_DEBUG - std::cerr << " routing id: " << std::hex << it->first << std::dec ; -#endif - const TurtleFileHash& hash(it->second.tunnel_hash) ; - std::map::const_iterator vpit ; + // Look for tunnels and friends where to send the data. Send to both. - if( (vpit = _virtual_peers.find(hash)) == _virtual_peers.end()) + std::list peers ; + + if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS) + locked_collectAvailableTunnels(it->second.tunnel_hash,peers); + + // For now, disable friends. We'll first check that the good old tunnel system works as before. + // + // if(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS) + // locked_collectAvailableFriends(it->second.data_item->destination_key,peers, it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN); + + if(peers.empty()) { -#ifdef GROUTER_DEBUG - std::cerr << ". No virtual peers. Skipping now." << std::endl; -#endif - ++it ; + std::cerr << " no tunnel nor friends available" << std::endl; continue ; } - if(vpit->second.last_tunnel_ok_TS + TUNNEL_OK_WAIT_TIME > now) - { -#ifdef GROUTER_DEBUG - std::cerr << ". Still waiting delay (stabilisation)." << std::endl; -#endif - ++it ; - continue ; - } + // slice the data appropriately and send. - // for now, just take one. But in the future, we will need some policy to temporarily store objects at proxy peers, etc. + std::list chunks ; + sliceDataItem(it->second.data_item,chunks) ; #ifdef GROUTER_DEBUG - std::cerr << " " << vpit->second.virtual_peers.size() << " virtual peers available. " << std::endl; + if(!peers.empty()) + std::cerr << " sending to peers:" << std::endl; #endif + for(std::list::const_iterator itpid(peers.begin());itpid!=peers.end();++itpid) + for(std::list::const_iterator it2(chunks.begin());it2!=chunks.end();++it2) + locked_sendTransactionData(*itpid,*(*it2) ) ; - if(vpit->second.virtual_peers.empty()) - { -#ifdef GROUTER_DEBUG - std::cerr << " no peers available. Cannot send!!" << std::endl; -#endif - ++it ; - continue ; - } - TurtleVirtualPeerId vpid = (vpit->second.virtual_peers.begin())->first ; + // delete temporary items -#ifdef GROUTER_DEBUG - std::cerr << " sending to " << vpid << std::endl; -#endif + for(std::list::const_iterator cit=chunks.begin();cit!=chunks.end();++cit) + delete *cit; - sendDataInTunnel(vpid,it->second.data_item) ; + // change item state in waiting list -#ifdef GROUTER_DEBUG - std::cerr << " setting last sent time to now" << std::endl; -#endif - - it->second.last_sent_TS = now ; - - ++it ; + it->second.data_status = RS_GROUTER_DATA_STATUS_ONGOING ; + it->second.data_transaction_TS = now ; ; } - else if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK || it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now) // is the item too old for cache - { + else if(it->second.data_status == RS_GROUTER_DATA_STATUS_ONGOING && now > MAX_TRANSACTION_ACK_WAITING_TIME + it->second.data_transaction_TS) + { + std::cerr << " waited too long for this transation. Switching back to PENDING." << std::endl; + + it->second.data_status = RS_GROUTER_DATA_STATUS_PENDING ; + } + } +} + +void p3GRouter::locked_collectAvailableFriends(const GRouterKeyId& gxs_id,std::list& friend_peers,bool is_origin) +{ + // The strategy is the following: + // if origin + // send to multiple neighbors : best and random + // else + // send to a single "best" neighbor (determined by threshold over routing probability), + + std::set ids ; + mServiceControl->getPeersConnected(getServiceInfo().mServiceType,ids) ; + + std::vector probas; + std::vector tmp_peers; + + for(std::set::const_iterator it(ids.begin());it!=ids.end();++it) + tmp_peers.push_back(*it) ; + + _routing_matrix.computeRoutingProbabilities(gxs_id, tmp_peers, probas) ; + #ifdef GROUTER_DEBUG - if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK) - grouter_debug() << " Removing received cached item " << std::hex << it->first << std::dec << std::endl; - else - grouter_debug() << " Removing too-old cached item " << std::hex << it->first << std::dec << std::endl; + std::cerr << "locked_getAvailableFriends()" << std::endl; + std::cerr << " getting connected friends, computing routing probabilities" << std::endl; + for(uint32_t i=0;isecond.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK)?GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED:GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED ; + std::vector > mypairs ; - if(!locked_getClientAndServiceId(it->second.tunnel_hash,it->second.data_item->destination_key,client,service_id)) - std::cerr << " ERROR: cannot find client for cancelled message " << it->first << std::endl; - else - notified_msgs[it->first] = std::make_pair(client,status) ; + for(uint32_t i=0;isecond.data_item ; - if(it->second.receipt_item != NULL) - delete it->second.receipt_item ; + // now sort them up + std::sort(mypairs.begin(),mypairs.end(),item_comparator_001()) ; - std::map::iterator tmp(it) ; - ++tmp ; - _pending_messages.erase(it) ; - it = tmp ; - } - else - ++it ; + // take the max_count peers that are still above min_probability + + uint32_t n=0 ; + + for(std::vector >::const_reverse_iterator it = mypairs.rbegin();it!=mypairs.rend() && n= probability_threshold ) + friend_peers.push_back( (*it).second ), ++n ; +} + +void p3GRouter::locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list& tunnel_peers) +{ + time_t now = time(NULL) ; + + // Now go through available virtual peers. Select the ones that are interesting, and set them as potential destinations. + + std::map::const_iterator vpit=_tunnels.find(hash) ; + + if(vpit == _tunnels.end()) + return ; + + if(vpit->second.virtual_peers.empty()) + { +#ifdef GROUTER_DEBUG + std::cerr << " no peers available. Cannot send!!" << std::endl; +#endif + return ; + } + if(vpit->second.last_tunnel_ok_TS + TUNNEL_OK_WAIT_TIME > now) + { +#ifdef GROUTER_DEBUG + std::cerr << ". Still waiting delay (stabilisation)." << std::endl; +#endif + return ; } + // for now, just take one. But in the future, we will need some policy to temporarily store objects at proxy peers, etc. + +#ifdef GROUTER_DEBUG + std::cerr << " " << vpit->second.virtual_peers.size() << " virtual peers available. " << std::endl; +#endif + TurtleVirtualPeerId vpid = *(vpit->second.virtual_peers.begin()) ; + + tunnel_peers.push_back(vpid) ; +} + +bool p3GRouter::locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTransactionItem& trans_item) +{ + if(mTurtle->isTurtlePeer(pid)) + { +#ifdef GROUTER_DEBUG + std::cerr << " sending to tunnel vpid " << pid << std::endl; +#endif + uint32_t turtle_data_size = trans_item.serial_size() ; + uint8_t *turtle_data = (uint8_t*)malloc(turtle_data_size) ; + + if(turtle_data == NULL) + { + std::cerr << " ERROR: Cannot allocate turtle data memory for size " << turtle_data_size << std::endl; + return false ; + } + if(!trans_item.serialise(turtle_data,turtle_data_size)) + { + std::cerr << " ERROR: cannot serialise RsGRouterTransactionChunkItem." << std::endl; + + free(turtle_data) ; + return false ; + } + + RsTurtleGenericDataItem *turtle_item = new RsTurtleGenericDataItem ; + + turtle_item->data_size = turtle_data_size ; + turtle_item->data_bytes = turtle_data ; + +#ifdef GROUTER_DEBUG + std::cerr << " sending to vpid " << pid << std::endl; +#endif + mTurtle->sendTurtleData(pid,turtle_item) ; + + return true ; + } + else + { +#ifdef GROUTER_DEBUG + std::cerr << " sending to pid " << pid << std::endl; +#endif + RsGRouterTransactionItem *item_copy = trans_item.duplicate() ; + + sendItem(item_copy) ; + + return true ; + } +} + +void p3GRouter::autoWash() +{ + bool items_deleted = false ; + std::map > notified_msgs ; + { + RS_STACK_MUTEX(grMtx) ; + + + time_t now = time(NULL) ; + + for(std::map::iterator it=_pending_messages.begin();it!=_pending_messages.end();) + if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK + || ((it->second.received_time_TS + GROUTER_ITEM_MAX_CACHE_KEEP_TIME < now) && !(it->second.routing_flags & GRouterRoutingInfo::ROUTING_FLAGS_IS_ORIGIN))) // is the item too old for cache + { +#ifdef GROUTER_DEBUG + if(it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK) + grouter_debug() << " Removing received cached item " << std::hex << it->first << std::dec << std::endl; + else + grouter_debug() << " Removing too-old cached item " << std::hex << it->first << std::dec << std::endl; +#endif + GRouterClientService *client = NULL ; + GRouterServiceId service_id = 0; + + uint32_t status = (it->second.data_status == RS_GROUTER_DATA_STATUS_RECEIPT_OK)?GROUTER_CLIENT_SERVICE_DATA_STATUS_RECEIVED:GROUTER_CLIENT_SERVICE_DATA_STATUS_FAILED ; + + if(!locked_getClientAndServiceId(it->second.tunnel_hash,it->second.data_item->destination_key,client,service_id)) + std::cerr << " ERROR: cannot find client for cancelled message " << it->first << std::endl; + else + notified_msgs[it->first] = std::make_pair(client,status) ; + + delete it->second.data_item ; + if(it->second.receipt_item != NULL) + delete it->second.receipt_item ; + + std::map::iterator tmp(it) ; + ++tmp ; + _pending_messages.erase(it) ; + it = tmp ; + + items_deleted = true ; + } + else + ++it ; + } // look into pending items. #ifdef GROUTER_DEBUG @@ -909,111 +1146,106 @@ void p3GRouter::routePendingObjects() for(std::map >::const_iterator it(notified_msgs.begin());it!=notified_msgs.end();++it) it->second.first->notifyDataStatus(it->first, it->second.second) ; + + if(items_deleted) + _changed = true ; } -bool p3GRouter::sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterAbstractMsgItem *item) +bool p3GRouter::sliceDataItem(RsGRouterAbstractMsgItem *item,std::list& chunks) { - // split into chunks and send them all into the tunnel. - -#ifdef GROUTER_DEBUG - std::cerr << "p3GRouter::sendDataInTunnel()" << std::endl; - std::cerr << "item dump before send:" << std::endl; - item->print(std::cerr, 2) ; -#endif - - uint32_t size = item->serial_size(); - - RsTemporaryMemory data(size) ; // data will be freed on return, whatever the route taken. - - if(data == NULL) + try { - std::cerr << " ERROR: cannot allocate memory. Size=" << size << std::endl; - return false; - } + // Split the item into chunks. This function ensures that chunks in the list are valid. Memory ownership is left to the + // calling client. In case of error, all allocated memory is deleted. - if(!item->serialise(data,size)) +#ifdef GROUTER_DEBUG + std::cerr << "p3GRouter::sendDataInTunnel()" << std::endl; + std::cerr << "item dump before send:" << std::endl; + item->print(std::cerr, 2) ; +#endif + + uint32_t size = item->serial_size(); + + RsTemporaryMemory data(size) ; // data will be freed on return, whatever the route taken. + + if(data == NULL) + { + std::cerr << " ERROR: cannot allocate memory. Size=" << size << std::endl; + throw ; + } + + if(!item->serialise(data,size)) + { + std::cerr << " ERROR: cannot serialise." << std::endl; + throw ; + } + + uint32_t offset = 0 ; + static const uint32_t CHUNK_SIZE = 15000 ; + + while(offset < size) + { + uint32_t chunk_size = std::min(size - offset, CHUNK_SIZE) ; + + RsGRouterTransactionChunkItem *chunk_item = new RsGRouterTransactionChunkItem ; + + chunk_item->propagation_id = item->routing_id ; + chunk_item->total_size = size; + chunk_item->chunk_start= offset; + chunk_item->chunk_size = chunk_size ; + chunk_item->chunk_data = (uint8_t*)malloc(chunk_size) ; +#ifdef GROUTER_DEBUG + std::cerr << " preparing to send a chunk [" << offset << " -> " << offset + chunk_size << " / " << size << "]" << std::endl; +#endif + + if(chunk_item->chunk_data == NULL) + { + std::cerr << " ERROR: Cannot allocate memory for size " << chunk_size << std::endl; + delete chunk_item; + throw ; + } + memcpy(chunk_item->chunk_data,&data[offset],chunk_size) ; + + offset += chunk_size ; + + chunks.push_back(chunk_item) ; + } + + return true ; + } + catch(...) { - std::cerr << " ERROR: cannot serialise." << std::endl; - return false; + for(std::list::const_iterator it(chunks.begin());it!=chunks.end();++it) + delete *it ; + + chunks.clear() ; + + return false ; } - - uint32_t offset = 0 ; - static const uint32_t CHUNK_SIZE = 15000 ; - - while(offset < size) - { - uint32_t chunk_size = std::min(size - offset, CHUNK_SIZE) ; - - RsGRouterTransactionChunkItem *chunk_item = new RsGRouterTransactionChunkItem ; - chunk_item->propagation_id = item->routing_id ; - chunk_item->total_size = size; - chunk_item->chunk_start= offset; - chunk_item->chunk_size = chunk_size ; - chunk_item->chunk_data = (uint8_t*)malloc(chunk_size) ; - -#ifdef GROUTER_DEBUG - std::cerr << " preparing to send a chunk [" << offset << " -> " << offset + chunk_size << " / " << size << "]" << std::endl; -#endif - - if(chunk_item->chunk_data == NULL) - { - std::cerr << " ERROR: Cannot allocate memory for size " << chunk_size << std::endl; - delete chunk_item; - return false; - } - memcpy(chunk_item->chunk_data,&data[offset],chunk_size) ; - - offset += chunk_size ; - - RsTurtleGenericDataItem *turtle_item = new RsTurtleGenericDataItem ; - - uint32_t turtle_data_size = chunk_item->serial_size() ; - uint8_t *turtle_data = (uint8_t*)malloc(turtle_data_size) ; - - if(turtle_data == NULL) - { - std::cerr << " ERROR: Cannot allocate turtle data memory for size " << turtle_data_size << std::endl; - delete chunk_item; - return false; - } - if(!chunk_item->serialise(turtle_data,turtle_data_size)) - { - std::cerr << " ERROR: cannot serialise RsGRouterTransactionChunkItem." << std::endl; - delete chunk_item; - free(turtle_data) ; - return false; - } - - delete chunk_item ; - - turtle_item->data_size = turtle_data_size ; - turtle_item->data_bytes = turtle_data ; - -#ifdef GROUTER_DEBUG - std::cerr << " sending to vpid " << vpid << std::endl; -#endif - mTurtle->sendTurtleData(vpid,turtle_item) ; - } - - return true ; } -void p3GRouter::handleIncoming(const TurtleFileHash& hash,RsGRouterAbstractMsgItem *item) +void p3GRouter::handleIncoming() { - RsGRouterGenericDataItem *generic_data_item ; - RsGRouterSignedReceiptItem *receipt_item ; + while(!_incoming_items.empty()) + { + RsGRouterAbstractMsgItem *item = _incoming_items.front() ; + _incoming_items.pop_front() ; - if(NULL != (generic_data_item = dynamic_cast(item))) - handleIncomingDataItem(hash,generic_data_item) ; - else if(NULL != (receipt_item = dynamic_cast(item))) - handleIncomingReceiptItem(hash,receipt_item) ; - else - std::cerr << "Item has unknown type (not data nor signed receipt). Dropping!" << std::endl; + RsGRouterGenericDataItem *generic_data_item ; + RsGRouterSignedReceiptItem *receipt_item ; - delete item ; + if(NULL != (generic_data_item = dynamic_cast(item))) + handleIncomingDataItem(generic_data_item) ; + else if(NULL != (receipt_item = dynamic_cast(item))) + handleIncomingReceiptItem(receipt_item) ; + else + std::cerr << "Item has unknown type (not data nor signed receipt). Dropping!" << std::endl; + + delete item ; + } } -void p3GRouter::handleIncomingReceiptItem(const TurtleFileHash& hash,RsGRouterSignedReceiptItem *receipt_item) +void p3GRouter::handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_item) { bool changed = false ; #ifdef GROUTER_DEBUG @@ -1070,31 +1302,75 @@ void p3GRouter::handleIncomingReceiptItem(const TurtleFileHash& hash,RsGRouterSi IndicateConfigChanged() ; } -void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGenericDataItem *generic_item) +void p3GRouter::handleIncomingDataItem(RsGRouterGenericDataItem *data_item) { #ifdef GROUTER_DEBUG - std::cerr << "Handling incoming data item. Passing to client." << std::endl; + std::cerr << "Handling incoming data item. " << std::endl; std::cerr << "Item content:" << std::endl; - generic_item->print(std::cerr,2) ; + data_item->print(std::cerr,2) ; #endif GRouterClientService *client = NULL ; - GRouterServiceId service_id = 0; + GRouterServiceId service_id = data_item->service_id ; + bool item_is_for_us = false ; + // Find client and service ID from destination key. { RS_STACK_MUTEX(grMtx) ; - if(!locked_getClientAndServiceId(hash,generic_item->destination_key,client,service_id)) + std::map::const_iterator its = _registered_services.find(service_id) ; + + if(its == _registered_services.end()) { - std::cerr << " ERROR: cannot find client service for this hash/key combination." << std::endl; + std::cerr << " ERROR: client id " << service_id << " not registered. Consistency error." << std::endl; return ; } + client = its->second ; + + // also check wether this item is for us or not + + item_is_for_us = _owned_key_ids.find( makeTunnelHash(data_item->destination_key,service_id) ) != _owned_key_ids.end() ; } - // We don't do proxy yet, so the item is necessarily for us. + if(!item_is_for_us) + { + RS_STACK_MUTEX(grMtx) ; + + std::cerr << " item is not for us. Storing/forwarding." << std::endl; + + // item is not for us. We store it in pending messages and will deal with it later + + std::map::iterator it = _pending_messages.find(data_item->routing_id) ; + + if(it == _pending_messages.end()) + { + std::cerr << " item is new. Storing it and forwarding it." << std::endl; + + GRouterRoutingInfo& info(_pending_messages[data_item->routing_id]) ; + + info.data_item = data_item ; + info.receipt_item = NULL ; + info.data_status = RS_GROUTER_DATA_STATUS_PENDING ; + info.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ; + info.last_tunnel_sent_TS = 0 ; + info.last_friend_sent_TS = 0 ; + info.last_tunnel_request_TS = 0 ; + info.sending_attempts = 0 ; + info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS | GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS ; + info.received_time_TS = time(NULL) ; + info.tunnel_hash = makeTunnelHash(data_item->destination_key,data_item->service_id) ; + } + + std::cerr << " storing incoming route: from " << data_item->PeerId() << std::endl; + _pending_messages[data_item->routing_id].incoming_routes.insert(data_item->PeerId()) ; + + return ; + } + + // Item is for us. // The item's signature must be checked, and the item needs to be decrypted. - if(verifySignedDataItem(generic_item)) // we should get proper flags out of this + if(verifySignedDataItem(data_item)) // we should get proper flags out of this { #ifdef GROUTER_DEBUG std::cerr << " verifying item signature: CHECKED!" ; @@ -1107,9 +1383,9 @@ void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGener // compute the hash before decryption. - Sha1CheckSum data_hash = RsDirUtil::sha1sum(generic_item->data_bytes,generic_item->data_size) ; + Sha1CheckSum data_hash = RsDirUtil::sha1sum(data_item->data_bytes,data_item->data_size) ; - if(!decryptDataItem(generic_item)) + if(!decryptDataItem(data_item)) { std::cerr << " decrypting item : FAILED! Item will be dropped." << std::endl; return ; @@ -1121,26 +1397,25 @@ void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGener // make a copy of the data, since the item will be deleted. - uint8_t *data_copy = (uint8_t*)malloc(generic_item->data_size) ; - memcpy(data_copy,generic_item->data_bytes,generic_item->data_size) ; + uint8_t *data_copy = (uint8_t*)malloc(data_item->data_size) ; + memcpy(data_copy,data_item->data_bytes,data_item->data_size) ; - client->receiveGRouterData(generic_item->destination_key,generic_item->signature.keyId,service_id,data_copy,generic_item->data_size); + client->receiveGRouterData(data_item->destination_key,data_item->signature.keyId,service_id,data_copy,data_item->data_size); // No we need to send a signed receipt to the sender. - RsGRouterSignedReceiptItem *receipt_item = new RsGRouterSignedReceiptItem ; - receipt_item->data_hash = data_hash ; - receipt_item->routing_id = generic_item->routing_id ; - receipt_item->destination_key = generic_item->signature.keyId ; - receipt_item->flags = 0 ; + RsGRouterSignedReceiptItem receipt_item ; + receipt_item.data_hash = data_hash ; + receipt_item.routing_id = data_item->routing_id ; + receipt_item.destination_key = data_item->signature.keyId ; + receipt_item.flags = 0 ; #ifdef GROUTER_DEBUG std::cerr << " preparing signed receipt." << std::endl; #endif - if(!signDataItem(receipt_item,generic_item->destination_key)) + if(!signDataItem(&receipt_item,data_item->destination_key)) { - delete receipt_item; std::cerr << " signing: FAILED. Receipt dropped. ERROR." << std::endl; return ; } @@ -1150,18 +1425,25 @@ void p3GRouter::handleIncomingDataItem(const TurtleFileHash& hash,RsGRouterGener // Normally (proxy mode) we should store the signed receipt so that it can be sent back, and handle it // in the routePendingObjects() method. + std::list chunks ; + sliceDataItem(&receipt_item,chunks) ; - if(!sendDataInTunnel(generic_item->PeerId(),receipt_item)) + bool ok = true ; + + for(std::list::const_iterator it(chunks.begin());it!=chunks.end();++it) { - std::cerr << " sending signed receipt in tunnel " << generic_item->PeerId() << ": FAILED." << std::endl; - delete receipt_item ; - return ; + ok = ok && locked_sendTransactionData(data_item->PeerId(),**it) ; + delete *it ; } + if(ok) + { #ifdef GROUTER_DEBUG - std::cerr << " sent signed receipt in tunnel " << generic_item->PeerId() << std::endl; + std::cerr << " sent signed receipt in tunnel " << data_item->PeerId() << std::endl; #endif - delete receipt_item; + } + else + std::cerr << " sending signed receipt in tunnel " << data_item->PeerId() << ": FAILED." << std::endl; } bool p3GRouter::locked_getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id) @@ -1218,31 +1500,6 @@ bool p3GRouter::encryptDataItem(RsGRouterGenericDataItem *item,const RsGxsId& de std::cerr << " Decrypted size = " << item->data_size << std::endl; #endif -#ifdef SUSPENDED - RsTlvSecurityKey encryption_key ; - - // get the key, and let the cache find it. - for(int i=0;i<4;++i) - if(mIdService->getKey(destination_key,encryption_key)) - break ; - else - usleep(500*1000) ; // sleep half a sec. - - if(encryption_key.keyId.isNull()) - { - std::cerr << " (EE) Cannot get encryption key for id " << destination_key << std::endl; - return false ; - } - - uint8_t *encrypted_data =NULL; - int encrypted_size =0; - - if(!GxsSecurity::encrypt(encrypted_data,encrypted_size,item->data_bytes,item->data_size,encryption_key)) - { - std::cerr << " (EE) Encryption failed." << std::endl; - return false ; - } -#endif uint8_t *encrypted_data =NULL; uint32_t encrypted_size =0; uint32_t error_status ; @@ -1263,7 +1520,7 @@ bool p3GRouter::encryptDataItem(RsGRouterGenericDataItem *item,const RsGxsId& de #ifdef GROUTER_DEBUG std::cerr << " Encrypted size = " << encrypted_size << std::endl; - std::cerr << " First bytes of encrypted data: " << RsUtil::BinToHex((const char *)encrypted_data,std::min(encrypted_size,30)) << "..."<< std::endl; + std::cerr << " First bytes of encrypted data: " << RsUtil::BinToHex((const char *)encrypted_data,std::min(encrypted_size,30u)) << "..."<< std::endl; std::cerr << " Encrypted data hash = " << RsDirUtil::sha1sum((const uint8_t *)encrypted_data,encrypted_size) << std::endl; #endif return true ; @@ -1281,38 +1538,13 @@ bool p3GRouter::decryptDataItem(RsGRouterGenericDataItem *item) uint32_t decrypted_size =0; uint32_t error_status ; - if(mGixs->decryptData(item->data_bytes,item->data_size,decrypted_data,decrypted_size,item->destination_key,error_status)) + if(!mGixs->decryptData(item->data_bytes,item->data_size,decrypted_data,decrypted_size,item->destination_key,error_status)) { if(error_status == RsGixs::RS_GIXS_ERROR_KEY_NOT_AVAILABLE) std::cerr << "(EE) Cannot decrypt incoming message. Key " << item->destination_key << " unknown." << std::endl; else std::cerr << "(EE) Cannot decrypt incoming message. Unknown error. " << std::endl; } -#ifdef SUSPENDED - RsTlvSecurityKey encryption_key ; - - // get the key, and let the cache find it. - for(int i=0;i<4;++i) - if(mIdService->getPrivateKey(item->destination_key,encryption_key)) - break ; - else - usleep(500*1000) ; // sleep half a sec. - - if(encryption_key.keyId.isNull()) - { - std::cerr << " (EE) Cannot get encryption key for id " << item->destination_key << std::endl; - return false ; - } - - uint8_t *decrypted_data =NULL; - int decrypted_size =0; - - if(!GxsSecurity::decrypt(decrypted_data,decrypted_size,item->data_bytes,item->data_size,encryption_key)) - { - std::cerr << " (EE) Decryption failed." << std::endl; - return false ; - } -#endif free(item->data_bytes) ; item->data_bytes = decrypted_data ; @@ -1343,18 +1575,7 @@ bool p3GRouter::signDataItem(RsGRouterAbstractMsgItem *item,const RsGxsId& signi if(!item->serialise_signed_data(data,data_size)) throw std::runtime_error("Cannot serialise signed data.") ; -#ifdef SUSPENDED - if(!mIdService->getPrivateKey(signing_id,signature_key)) - throw std::runtime_error("Cannot get signature key for id " + signing_id.toStdString()) ; -#ifdef GROUTER_DEBUG - std::cerr << " Signing..." << std::endl; - std::cerr << "First bytes of signed data: " << RsUtil::BinToHex((const char *)data,std::min(data_size,30u)) << "..."<< std::endl; -#endif - - if(!GxsSecurity::getSignature((char *)data,data_size,signature_key,item->signature)) - throw std::runtime_error("Cannot sign for id " + signing_id.toStdString() + ". Signature call failed.") ; -#endif uint32_t error_status ; if(!mGixs->signData(data,data_size,signing_id,item->signature,error_status)) @@ -1391,27 +1612,7 @@ bool p3GRouter::verifySignedDataItem(RsGRouterAbstractMsgItem *item) if(!item->serialise_signed_data(data,data_size)) throw std::runtime_error("Cannot serialise signed data.") ; -#ifdef SUSPENDED - for(int i=0;i<6;++i) - if(!mIdService->getKey(item->signature.keyId,signature_key) || signature_key.keyData.bin_data == NULL) - { - std::cerr << " Cannot get key. Waiting for caching. try " << i << "/6" << std::endl; - usleep(500 * 1000) ; // sleep for 500 msec. - } - else - break ; - if(signature_key.keyData.bin_data == NULL) - throw std::runtime_error("No key for checking signature from " + item->signature.keyId.toStdString()); - -#ifdef GROUTER_DEBUG - std::cerr << " Validating signature for data hash: " << RsDirUtil::sha1sum(data,data_size) << " and key_id = " << item->signature.keyId << std::endl; - std::cerr << " First bytes of signed data: " << RsUtil::BinToHex((const char *)data,std::min(data_size,30u)) << "..."<< std::endl; -#endif - - if(!GxsSecurity::validateSignature((char*)data,data_size,signature_key,item->signature)) - throw std::runtime_error("Signature was verified and it doesn't check! This is a security issue!") ; -#endif uint32_t error_status ; if(!mGixs->validateData(data,data_size,item->signature,true,error_status)) @@ -1467,6 +1668,9 @@ bool p3GRouter::cancel(GRouterMsgPropagationId mid) bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& client_id,const uint8_t *data, uint32_t data_size,const RsGxsId& signing_id, GRouterMsgPropagationId &propagation_id) { +// std::cerr << "GRouter currently disabled." << std::endl; +// return false; + if(data_size > MAX_GROUTER_DATA_SIZE) { std::cerr << "GRouter max size limit exceeded (size=" << data_size << ", max=" << MAX_GROUTER_DATA_SIZE << "). Please send a smaller object!" << std::endl; @@ -1489,8 +1693,9 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie data_item->data_size = data_size ; data_item->routing_id = propagation_id ; data_item->randomized_distance = 0 ; + data_item->service_id = client_id ; data_item->destination_key = destination ; - data_item->flags = 0 ; + data_item->flags = 0 ; // this is unused for now. // First, encrypt. @@ -1528,15 +1733,18 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie info.receipt_item = NULL ; info.data_status = RS_GROUTER_DATA_STATUS_PENDING ; info.tunnel_status = RS_GROUTER_TUNNEL_STATUS_UNMANAGED ; - info.last_sent_TS = 0 ; + info.last_tunnel_sent_TS = 0 ; + info.last_friend_sent_TS = 0 ; info.last_tunnel_request_TS = 0 ; info.sending_attempts = 0 ; + info.routing_flags = GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS + | GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_TUNNELS + | GRouterRoutingInfo::ROUTING_FLAGS_ALLOW_FRIENDS ; info.received_time_TS = now ; info.tunnel_hash = makeTunnelHash(destination,client_id) ; - info.client_id = client_id ; #ifdef GROUTER_DEBUG - grouter_debug() << "p3GRouter::sendGRouterData(): pushing the followign item in the msg pending list:" << std::endl; + grouter_debug() << "p3GRouter::sendGRouterData(): pushing the following item in the msg pending list:" << std::endl; grouter_debug() << " routing id = " << propagation_id << std::endl; grouter_debug() << " data_item.size = " << info.data_item->data_size << std::endl; grouter_debug() << " data_item.byte = " << RsDirUtil::sha1sum(info.data_item->data_bytes,info.data_item->data_size) << std::endl; @@ -1547,7 +1755,7 @@ bool p3GRouter::sendData(const RsGxsId& destination,const GRouterServiceId& clie grouter_debug() << " sending attempt= " << info.sending_attempts << std::endl; grouter_debug() << " distance = " << info.data_item->randomized_distance << std::endl; grouter_debug() << " recv time = " << info.received_time_TS << std::endl; - grouter_debug() << " client id = " << std::hex << info.client_id << std::dec << std::endl; + grouter_debug() << " client id = " << std::hex << data_item->service_id << std::dec << std::endl; grouter_debug() << " tunnel hash = " << info.tunnel_hash << std::endl; #endif @@ -1704,7 +1912,7 @@ bool p3GRouter::getRoutingCacheInfo(std::vector& infos) cinfo.destination = it->second.data_item->destination_key ; cinfo.routing_time = it->second.received_time_TS ; cinfo.last_tunnel_attempt_time = it->second.last_tunnel_request_TS ; - cinfo.last_sent_time = it->second.last_sent_TS ; + cinfo.last_sent_time = std::max(it->second.last_tunnel_sent_TS,it->second.last_friend_sent_TS) ; cinfo.receipt_available = (it->second.receipt_item != NULL); cinfo.data_status = it->second.data_status ; cinfo.tunnel_status = it->second.tunnel_status ; @@ -1745,26 +1953,35 @@ void p3GRouter::debugDump() for(std::map::iterator it(_pending_messages.begin());it!=_pending_messages.end();++it) { - grouter_debug() << " Msg id : " << std::hex << it->first << std::dec ; - grouter_debug() << " Destination: " << it->second.data_item->destination_key ; - grouter_debug() << " Received : " << now - it->second.received_time_TS << " secs ago."; - grouter_debug() << " Last sent : " << now - it->second.last_sent_TS << " secs ago."; - grouter_debug() << " Data Status: " << statusString[it->second.data_status] << std::endl; - grouter_debug() << " Tunl Status: " << statusString[it->second.tunnel_status] << std::endl; - grouter_debug() << " Receipt ok : " << (it->second.receipt_item != NULL) << std::endl; + grouter_debug() << " Msg id : " << std::hex << it->first << std::dec ; + grouter_debug() << " Destination : " << it->second.data_item->destination_key ; + grouter_debug() << " Received : " << now - it->second.received_time_TS << " secs ago."; + grouter_debug() << " Last tunnel sent: " << now - it->second.last_tunnel_sent_TS << " secs ago."; + grouter_debug() << " Last friend sent: " << now - it->second.last_friend_sent_TS << " secs ago."; + grouter_debug() << " Data Status : " << statusString[it->second.data_status] << std::endl; + grouter_debug() << " Tunl Status : " << statusString[it->second.tunnel_status] << std::endl; + grouter_debug() << " Receipt ok : " << (it->second.receipt_item != NULL) << std::endl; } grouter_debug() << " Tunnels: " << std::endl; - for(std::map::const_iterator it(_virtual_peers.begin());it!=_virtual_peers.end();++it) + for(std::map::const_iterator it(_tunnels.begin());it!=_tunnels.end();++it) { grouter_debug() << " hash: " << it->first << ", first received: " << now - it->second.last_tunnel_ok_TS << " (secs ago), last received: " << now - it->second.last_tunnel_ok_TS << std::endl; - for(std::map::const_iterator it2 = it->second.virtual_peers.begin();it2!=it->second.virtual_peers.end();++it2) - grouter_debug() << " " << it2->first << " : cached data = " << (void*)it2->second << std::endl; + for(std::set::const_iterator it2 = it->second.virtual_peers.begin();it2!=it->second.virtual_peers.end();++it2) + grouter_debug() << " " << (*it2) << std::endl; } - grouter_debug() << " Routing matrix: " << std::endl; + grouter_debug() << " Incoming data pipes: " << std::endl; + + for(std::map::const_iterator it(_incoming_data_pipes.begin());it!=_incoming_data_pipes.end();++it) + if(it->second.incoming_data_buffer != NULL) + grouter_debug() << " " << it->first << ": offset=" << it->second.incoming_data_buffer->chunk_start << " size=" << it->second.incoming_data_buffer->chunk_size << " over " << it->second.incoming_data_buffer->total_size << std::endl; + else + grouter_debug() << " " << it->first << " empty." << std::endl; + + grouter_debug() << " Routing matrix: " << std::endl; // if(_debug_enabled) // _routing_matrix.debugDump() ; diff --git a/libretroshare/src/grouter/p3grouter.h b/libretroshare/src/grouter/p3grouter.h index ca61ec63d..0b5f0568a 100644 --- a/libretroshare/src/grouter/p3grouter.h +++ b/libretroshare/src/grouter/p3grouter.h @@ -44,7 +44,6 @@ // To be put in pqi/p3cfgmgr.h // static const uint32_t CONFIG_TYPE_GROUTER = 0x0016 ; - static const uint32_t RS_GROUTER_DATA_FLAGS_ENCRYPTED = 0x0001 ; class p3LinkMgr ; @@ -52,9 +51,10 @@ class p3turtle ; class RsGixs ; class RsGRouterItem ; class RsGRouterGenericDataItem ; -class RsGRouterTransactionChunkItem ; class RsGRouterSignedReceiptItem ; class RsGRouterAbstractMsgItem ; +class RsGRouterTransactionItem ; +class RsGRouterTransactionAcknItem ; // This class is responsible for accepting data chunks and merging them into a final object. When the object is // complete, it is de-serialised and returned as a RsGRouterGenericDataItem*. @@ -69,13 +69,28 @@ public: void addVirtualPeer(const TurtleVirtualPeerId& vpid) ; void removeVirtualPeer(const TurtleVirtualPeerId& vpid) ; - RsGRouterAbstractMsgItem *addDataChunk(const TurtleVirtualPeerId& vpid,RsGRouterTransactionChunkItem *chunk_item) ; - - std::map virtual_peers ; + std::set virtual_peers ; time_t first_tunnel_ok_TS ; // timestamp when 1st tunnel was received. time_t last_tunnel_ok_TS ; // timestamp when last tunnel was received. }; +class GRouterDataInfo +{ + // ! This class does not have a copy constructor that duplicates the incoming data buffer. This is on purpose! +public: + GRouterDataInfo() + { + incoming_data_buffer = NULL ; + } + + void clear() { delete incoming_data_buffer ; incoming_data_buffer = NULL ;} + + // These two methods handle the memory management of buffers for each virtual peers. + + RsGRouterAbstractMsgItem *addDataChunk(RsGRouterTransactionChunkItem *chunk_item) ; + RsGRouterTransactionChunkItem *incoming_data_buffer ; +}; + class p3GRouter: public RsGRouter, public RsTurtleClientService, public p3Service, public p3Config { public: @@ -196,6 +211,15 @@ protected: virtual void removeVirtualPeer(const TurtleFileHash& hash,const TurtleVirtualPeerId& virtual_peer_id) ; private: + //===================================================// + // Low level item sorting // + //===================================================// + + void handleLowLevelServiceItems() ; + void handleLowLevelServiceItem(RsGRouterTransactionItem*) ; + void handleLowLevelTransactionChunkItem(RsGRouterTransactionChunkItem *chunk_item); + void handleLowLevelTransactionAckItem(RsGRouterTransactionAcknItem*) ; + class nullstream: public std::ostream {}; std::ostream& grouter_debug() const @@ -207,10 +231,16 @@ private: void routePendingObjects() ; void handleTunnels() ; + void autoWash() ; - void handleIncoming(const TurtleFileHash &hash, RsGRouterAbstractMsgItem *) ; - void handleIncomingReceiptItem(const TurtleFileHash &hash, RsGRouterSignedReceiptItem *receipt_item) ; - void handleIncomingDataItem(const TurtleFileHash &hash, RsGRouterGenericDataItem *data_item) ; + //===================================================// + // High level item sorting // + //===================================================// + + void handleIncoming() ; + + void handleIncomingReceiptItem(RsGRouterSignedReceiptItem *receipt_item) ; + void handleIncomingDataItem(RsGRouterGenericDataItem *data_item) ; bool locked_getClientAndServiceId(const TurtleFileHash& hash, const RsGxsId& destination_key, GRouterClientService *& client, GRouterServiceId& service_id); @@ -219,6 +249,7 @@ private: // static float computeMatrixContribution(float base,uint32_t time_shift,float probability) ; static time_t computeNextTimeDelay(time_t duration) ; + static bool sliceDataItem(RsGRouterAbstractMsgItem *,std::list& chunks) ; uint32_t computeRandomDistanceIncrement(const RsPeerId& pid,const GRouterKeyId& destination_id) ; @@ -231,7 +262,10 @@ private: static Sha1CheckSum makeTunnelHash(const RsGxsId& destination,const GRouterServiceId& client); static void makeGxsIdAndClientId(const TurtleFileHash &sum,RsGxsId& gxs_id,GRouterServiceId& client_id); - bool sendDataInTunnel(const TurtleVirtualPeerId& vpid,RsGRouterAbstractMsgItem *item); + bool locked_sendTransactionData(const RsPeerId& pid,const RsGRouterTransactionItem& item); + + void locked_collectAvailableFriends(const GRouterKeyId &gxs_id,std::list& friend_peers, bool is_origin); + void locked_collectAvailableTunnels(const TurtleFileHash& hash,std::list& tunnel_peers); //===================================================// // p3Config methods // @@ -284,10 +318,16 @@ private: // std::map _pending_messages;// pending messages - std::map _virtual_peers ; + // Stores virtual peers that appear/disappear as the result of the turtle router client + // + std::map _tunnels ; + + // Stores incoming data from any peers (virtual and real) into chunks that get aggregated until finished. + // + std::map _incoming_data_pipes ; // Queue of incoming items. Might be receipts or data. Should always be empty (not a storage place) - std::list _incoming_items ; + std::list _incoming_items ; // Data handling methods // @@ -299,6 +339,7 @@ private: p3ServiceControl *mServiceControl ; p3turtle *mTurtle ; RsGixs *mGixs ; + p3LinkMgr *mLinkMgr ; // Multi-thread protection mutex. // diff --git a/retroshare-gui/src/gui/msgs/MessageComposer.cpp b/retroshare-gui/src/gui/msgs/MessageComposer.cpp index 33497c865..7193d25a5 100644 --- a/retroshare-gui/src/gui/msgs/MessageComposer.cpp +++ b/retroshare-gui/src/gui/msgs/MessageComposer.cpp @@ -1768,7 +1768,12 @@ void MessageComposer::addRecipient(enumType type, const RsPeerId& pid) } void MessageComposer::addRecipient(enumType type, const RsGxsId& gxs_id) { - _distant_peers.insert(gxs_id) ; + static bool already = false ; + if(!already) + { + QMessageBox::warning(NULL,"Distant messaging not stable","Distant messaging is currently unstable. Do not expect too much from it.") ; + already = true ; + } int rowCount = ui.recipientWidget->rowCount(); int row; diff --git a/retroshare-gui/src/gui/msgs/MessageComposer.h b/retroshare-gui/src/gui/msgs/MessageComposer.h index 81fb4ba31..4f42eebdd 100644 --- a/retroshare-gui/src/gui/msgs/MessageComposer.h +++ b/retroshare-gui/src/gui/msgs/MessageComposer.h @@ -239,7 +239,6 @@ private: Ui::MessageComposer ui; std::list _recList ; - std::set _distant_peers ; // we keep a list of them, in order to know which peer is a GXS id. }; #endif