From b587301b5a63e2a1b3a1b1b965cb0f7e6bf72ca2 Mon Sep 17 00:00:00 2001 From: drbob Date: Wed, 2 Oct 2013 03:21:04 +0000 Subject: [PATCH] Added a thread per active peer - to reduce RTT and increase throughout. * Added pqithreadstreamer, tweaked pqistreamer to support derivation. * Shifted RTT from p3Service to p3FastService. * Disabled lots of debug. git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-initdev@6787 b45a01b8-16f6-495d-af2f-9b41ad6348cc --- libretroshare/src/libretroshare.pro | 2 + libretroshare/src/pqi/p3netmgr.cc | 4 - libretroshare/src/pqi/pqi_base.h | 1 + libretroshare/src/pqi/pqihandler.cc | 2 - libretroshare/src/pqi/pqiperson.cc | 247 ++-- libretroshare/src/pqi/pqiperson.h | 41 +- libretroshare/src/pqi/pqipersongrp.cc | 20 +- libretroshare/src/pqi/pqipersongrp.h | 4 + libretroshare/src/pqi/pqiqosstreamer.cc | 4 +- libretroshare/src/pqi/pqiqosstreamer.h | 6 +- libretroshare/src/pqi/pqissl.cc | 26 +- libretroshare/src/pqi/pqisslpersongrp.cc | 6 +- libretroshare/src/pqi/pqissludp.cc | 27 +- libretroshare/src/pqi/pqistreamer.cc | 63 +- libretroshare/src/pqi/pqistreamer.h | 12 +- libretroshare/src/pqi/pqithreadstreamer.cc | 174 +++ libretroshare/src/pqi/pqithreadstreamer.h | 62 + libretroshare/src/services/p3discovery2.cc | 1241 +------------------- libretroshare/src/services/p3rtt.cc | 62 +- libretroshare/src/services/p3rtt.h | 6 +- libretroshare/src/services/p3service.cc | 1 + libretroshare/src/util/rsnet_ss.cc | 61 + 22 files changed, 658 insertions(+), 1414 deletions(-) create mode 100644 libretroshare/src/pqi/pqithreadstreamer.cc create mode 100644 libretroshare/src/pqi/pqithreadstreamer.h diff --git a/libretroshare/src/libretroshare.pro b/libretroshare/src/libretroshare.pro index 682dfdcee..d4058dc10 100644 --- a/libretroshare/src/libretroshare.pro +++ b/libretroshare/src/libretroshare.pro @@ -352,6 +352,7 @@ HEADERS += pqi/authssl.h \ pqi/pqisslproxy.h \ pqi/pqistore.h \ pqi/pqistreamer.h \ + pqi/pqithreadstreamer.h \ pqi/pqiqosstreamer.h \ pqi/sslfns.h \ pqi/pqinetstatebox.h @@ -473,6 +474,7 @@ SOURCES += pqi/authgpg.cc \ pqi/pqisslproxy.cc \ pqi/pqistore.cc \ pqi/pqistreamer.cc \ + pqi/pqithreadstreamer.cc \ pqi/pqiqosstreamer.cc \ pqi/sslfns.cc \ pqi/pqinetstatebox.cc diff --git a/libretroshare/src/pqi/p3netmgr.cc b/libretroshare/src/pqi/p3netmgr.cc index 960a88ec5..c76b54bdb 100644 --- a/libretroshare/src/pqi/p3netmgr.cc +++ b/libretroshare/src/pqi/p3netmgr.cc @@ -74,10 +74,6 @@ const uint32_t MIN_TIME_BETWEEN_NET_RESET = 5; * #define NETMGR_DEBUG_STATEBOX 1 ***/ -#define NETMGR_DEBUG 1 -#define NETMGR_DEBUG_RESET 1 -#define NETMGR_DEBUG_TICK 1 -#define NETMGR_DEBUG_STATEBOX 1 pqiNetStatus::pqiNetStatus() :mLocalAddrOk(false), mExtAddrOk(false), mExtAddrStableOk(false), diff --git a/libretroshare/src/pqi/pqi_base.h b/libretroshare/src/pqi/pqi_base.h index f41cf854a..b2ffcd491 100644 --- a/libretroshare/src/pqi/pqi_base.h +++ b/libretroshare/src/pqi/pqi_base.h @@ -210,6 +210,7 @@ class PQInterface: public RateInterface * Retrieve RsItem from a facility */ virtual RsItem *GetItem() = 0; + virtual bool RecvItem(RsItem *item) { return false; } /* alternative for for GetItem(), when we want to push */ /** * also there are tick + person id functions. diff --git a/libretroshare/src/pqi/pqihandler.cc b/libretroshare/src/pqi/pqihandler.cc index d1dcf0fe8..214f33c86 100644 --- a/libretroshare/src/pqi/pqihandler.cc +++ b/libretroshare/src/pqi/pqihandler.cc @@ -295,8 +295,6 @@ int pqihandler::SendRsRawItem(RsRawItem *ns) return queueOutRsItem(ns) ; } - - // inputs. This is a very basic // system that is completely biased and slow... // someone please fix. diff --git a/libretroshare/src/pqi/pqiperson.cc b/libretroshare/src/pqi/pqiperson.cc index f4a42e75a..8c45fff1e 100644 --- a/libretroshare/src/pqi/pqiperson.cc +++ b/libretroshare/src/pqi/pqiperson.cc @@ -35,10 +35,9 @@ const int pqipersonzone = 82371; * #define PERSON_DEBUG 1 ****/ -#define PERSON_DEBUG 1 - pqiperson::pqiperson(std::string id, pqipersongrp *pg) - :PQInterface(id), active(false), activepqi(NULL), + :PQInterface(id), mNotifyMtx("pqiperson-notify"), mPersonMtx("pqiperson"), + active(false), activepqi(NULL), inConnectAttempt(false), waittimes(0), pqipg(pg) { @@ -50,6 +49,8 @@ pqiperson::pqiperson(std::string id, pqipersongrp *pg) pqiperson::~pqiperson() { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + // clean up the children. std::map::iterator it; for(it = kids.begin(); it != kids.end(); it++) @@ -64,6 +65,8 @@ pqiperson::~pqiperson() // The PQInterface interface. int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size) { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + std::string out = "pqiperson::SendItem()"; if (active) { @@ -86,14 +89,27 @@ int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size) RsItem *pqiperson::GetItem() { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + if (active) return activepqi -> GetItem(); // else not possible. return NULL; } +bool pqiperson::RecvItem(RsItem *item) +{ + std::cerr << "pqiperson::RecvItem()"; + std::cerr << std::endl; + + return pqipg->recvItem((RsRawItem *) item); +} + + int pqiperson::status() { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + if (active) return activepqi -> status(); return -1; @@ -101,6 +117,8 @@ int pqiperson::status() int pqiperson::receiveHeartbeat() { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::receiveHeartbeat() from peer : " + PeerId()); lastHeartbeatReceived = time(NULL); @@ -110,61 +128,68 @@ int pqiperson::receiveHeartbeat() // tick...... int pqiperson::tick() { - //if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset. - if (active && (lastHeartbeatReceived != 0) && - (time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 5) + int activeTick = 0; { - int ageLastIncoming = time(NULL) - activepqi->getLastIncomingTS(); - std::string out = "pqiperson::tick() WARNING No heartbeat from: " + PeerId(); - //out << " assume dead. calling pqissl::reset(), LastHeartbeat was: "; - rs_sprintf_append(out, " LastHeartbeat was: %ld secs ago", time(NULL) - lastHeartbeatReceived); - rs_sprintf_append(out, " LastIncoming was: %d secs ago", ageLastIncoming); - pqioutput(PQL_WARNING, pqipersonzone, out); - -#define NO_PACKET_TIMEOUT 60 - - if (ageLastIncoming > NO_PACKET_TIMEOUT) + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + + //if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset. + if (active && (lastHeartbeatReceived != 0) && + (time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 5) { - out = "pqiperson::tick() " + PeerId(); - out += " No Heartbeat & No Packets -> assume dead. calling pqissl::reset()"; + int ageLastIncoming = time(NULL) - activepqi->getLastIncomingTS(); + std::string out = "pqiperson::tick() WARNING No heartbeat from: " + PeerId(); + //out << " assume dead. calling pqissl::reset(), LastHeartbeat was: "; + rs_sprintf_append(out, " LastHeartbeat was: %ld secs ago", time(NULL) - lastHeartbeatReceived); + rs_sprintf_append(out, " LastIncoming was: %d secs ago", ageLastIncoming); pqioutput(PQL_WARNING, pqipersonzone, out); - - this->reset(); + + #define NO_PACKET_TIMEOUT 60 + + if (ageLastIncoming > NO_PACKET_TIMEOUT) + { + out = "pqiperson::tick() " + PeerId(); + out += " No Heartbeat & No Packets -> assume dead. calling pqissl::reset()"; + pqioutput(PQL_WARNING, pqipersonzone, out); + + this->reset(); + } + } - + + + { + std::string out = "pqiperson::tick() Id: " + PeerId() + " "; + if (active) + out += "***Active***"; + else + out += ">>InActive<<"; + + out += "\n"; + rs_sprintf_append(out, "Activepqi: %p inConnectAttempt: ", activepqi); + + if (inConnectAttempt) + out += "In Connection Attempt"; + else + out += " Not Connecting "; + out += "\n"; + + // tick the children. + std::map::iterator it; + for(it = kids.begin(); it != kids.end(); it++) + { + if (0 < (it->second) -> tick()) + { + activeTick = 1; + } + rs_sprintf_append(out, "\tTicking Child: %d\n", it->first); + } + + pqioutput(PQL_DEBUG_ALL, pqipersonzone, out); + } // end of pqioutput. } - int activeTick = 0; - - { - std::string out = "pqiperson::tick() Id: " + PeerId() + " "; - if (active) - out += "***Active***"; - else - out += ">>InActive<<"; - - out += "\n"; - rs_sprintf_append(out, "Activepqi: %p inConnectAttempt: ", activepqi); - - if (inConnectAttempt) - out += "In Connection Attempt"; - else - out += " Not Connecting "; - out += "\n"; - - // tick the children. - std::map::iterator it; - for(it = kids.begin(); it != kids.end(); it++) - { - if (0 < (it->second) -> tick()) - { - activeTick = 1; - } - rs_sprintf_append(out, "\tTicking Child: %d\n", it->first); - } - - pqioutput(PQL_DEBUG_ALL, pqipersonzone, out); - } // end of pqioutput. + // handle Notify Events that were generated. + processNotifyEvents(); return activeTick; } @@ -172,8 +197,60 @@ int pqiperson::tick() // callback function for the child - notify of a change. // This is only used for out-of-band info.... // otherwise could get dangerous loops. +// - Actually, now we have - must store and process later. int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockaddr_storage &remote_peer_address) { + if (mPersonMtx.trylock()) + { + handleNotifyEvent_locked(ni, newState, remote_peer_address); + + mPersonMtx.unlock(); + + return 1; + } + + + RsStackMutex stack(mNotifyMtx); /**** LOCK MUTEX ****/ + + mNotifyQueue.push_back(NotifyData(ni, newState, remote_peer_address)); + + return 1; +} + + +void pqiperson::processNotifyEvents() +{ + NetInterface *ni; + int state; + struct sockaddr_storage addr; + + while(1) + { + { + RsStackMutex stack(mNotifyMtx); /**** LOCK MUTEX ****/ + + if (mNotifyQueue.empty()) + { + return; + } + NotifyData &data = mNotifyQueue.front(); + ni = data.mNi; + state = data.mState; + addr = data.mAddr; + + mNotifyQueue.pop_front(); + } + + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + handleNotifyEvent_locked(ni, state, addr); + } + return; +} + + +int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const struct sockaddr_storage &remote_peer_address) +{ + { std::string out = "pqiperson::notifyEvent() Id: " + PeerId() + "\n"; rs_sprintf_append(out, "Message: %d from: %p\n", newState, ni); @@ -243,6 +320,8 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockadd activepqi = pqi; inConnectAttempt = false; + activepqi->start(); // STARTUP THREAD. + /* reset all other children? (clear up long UDP attempt) */ for(it = kids.begin(); it != kids.end(); it++) { @@ -270,6 +349,7 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockadd { pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId() + " CONNECT_FAILED->marking so!"); + activepqi->stop(); // STOP THREAD. active = false; activepqi = NULL; } @@ -302,11 +382,14 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockadd int pqiperson::reset() { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::reset() resetting all pqiconnect for Id: " + PeerId()); std::map::iterator it; for(it = kids.begin(); it != kids.end(); it++) { + (it->second) -> stop(); // STOP THREAD. (it->second) -> reset(); } @@ -317,8 +400,29 @@ int pqiperson::reset() return 1; } +int pqiperson::fullstopthreads() +{ + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + + pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::fullstopthreads() for Id: " + PeerId()); + + std::map::iterator it; + for(it = kids.begin(); it != kids.end(); it++) + { + (it->second) -> fullstop(); // WAIT FOR THREAD TO STOP. + } + + activepqi = NULL; + active = false; + lastHeartbeatReceived = 0; + + return 1; +} + int pqiperson::addChildInterface(uint32_t type, pqiconnect *pqi) { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + { std::string out; rs_sprintf(out, "pqiperson::addChildInterface() : Id %s %u", PeerId().c_str(), type); @@ -335,6 +439,8 @@ int pqiperson::addChildInterface(uint32_t type, pqiconnect *pqi) int pqiperson::listen() { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::listen() Id: " + PeerId()); if (!active) @@ -352,6 +458,8 @@ int pqiperson::listen() int pqiperson::stoplistening() { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::stoplistening() Id: " + PeerId()); std::map::iterator it; @@ -368,6 +476,8 @@ int pqiperson::connect(uint32_t type, const struct sockaddr_storage &raddr, uint32_t delay, uint32_t period, uint32_t timeout, uint32_t flags, uint32_t bandwidth, const std::string &domain_addr, uint16_t domain_port) { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + #ifdef PERSON_DEBUG #endif { @@ -413,7 +523,7 @@ int pqiperson::connect(uint32_t type, const struct sockaddr_storage &raddr, #ifdef PERSON_DEBUG std::cerr << "pqiperson::connect() WARNING, clearing rate cap" << std::endl; #endif - setRateCap(0,0); + setRateCap_locked(0,0); #ifdef PERSON_DEBUG std::cerr << "pqiperson::connect() setting connect_parameters" << std::endl; @@ -444,25 +554,10 @@ int pqiperson::connect(uint32_t type, const struct sockaddr_storage &raddr, } -pqiconnect *pqiperson::getKid(uint32_t type) -{ - std::map::iterator it; - - if (kids.empty()) { - return NULL; - } - - it = kids.find(type); - if (it == kids.end()) - { - return NULL; - } else { - return it->second; - } -} - void pqiperson::getRates(RsBwRates &rates) { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + // get the rate from the active one. if ((!active) || (activepqi == NULL)) return; @@ -471,6 +566,8 @@ void pqiperson::getRates(RsBwRates &rates) int pqiperson::getQueueSize(bool in) { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + // get the rate from the active one. if ((!active) || (activepqi == NULL)) return 0; @@ -480,6 +577,8 @@ int pqiperson::getQueueSize(bool in) float pqiperson::getRate(bool in) { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + // get the rate from the active one. if ((!active) || (activepqi == NULL)) return 0; @@ -488,6 +587,8 @@ float pqiperson::getRate(bool in) void pqiperson::setMaxRate(bool in, float val) { + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + // set to all of them. (and us) PQInterface::setMaxRate(in, val); // clean up the children. @@ -499,6 +600,12 @@ void pqiperson::setMaxRate(bool in, float val) } void pqiperson::setRateCap(float val_in, float val_out) +{ + RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ + return setRateCap_locked(val_in, val_out); +} + +void pqiperson::setRateCap_locked(float val_in, float val_out) { // set to all of them. (and us) PQInterface::setRateCap(val_in, val_out); diff --git a/libretroshare/src/pqi/pqiperson.h b/libretroshare/src/pqi/pqiperson.h index d0d0f05b2..bcb2de6a8 100644 --- a/libretroshare/src/pqi/pqiperson.h +++ b/libretroshare/src/pqi/pqiperson.h @@ -30,6 +30,7 @@ #include "pqi/pqi.h" +#include "util/rsnet.h" #include @@ -44,12 +45,13 @@ static const int CONNECT_FAILED = 5; static const int HEARTBEAT_REPEAT_TIME = 5; #include "pqi/pqiqosstreamer.h" +#include "pqi/pqithreadstreamer.h" class pqiconnect: public pqiQoSstreamer, public NetInterface { public: - pqiconnect(RsSerialiser *rss, NetBinInterface *ni_in) - :pqiQoSstreamer(rss, ni_in->PeerId(), ni_in, 0), // pqistreamer will cleanup NetInterface. + pqiconnect(PQInterface *parent, RsSerialiser *rss, NetBinInterface *ni_in) + :pqiQoSstreamer(parent, rss, ni_in->PeerId(), ni_in, 0), // pqistreamer will cleanup NetInterface. NetInterface(NULL, ni_in->PeerId()), // No need for callback ni(ni_in) { @@ -75,7 +77,6 @@ virtual bool connect_parameter(uint32_t type, std::string value) { return ni -> virtual bool connect_additional_address(uint32_t type, const struct sockaddr_storage &addr) { return ni -> connect_additional_address(type, addr);} - virtual int getConnectAddress(struct sockaddr_storage &raddr){ return ni->getConnectAddress(raddr); } // get the contact from the net side! @@ -101,6 +102,25 @@ protected: class pqipersongrp; + +class NotifyData +{ + public: + NotifyData() + :mNi(NULL), mState(0) + { + sockaddr_storage_clear(mAddr); + } + + NotifyData(NetInterface *ni, int state, const struct sockaddr_storage &addr) + :mNi(ni), mState(state), mAddr(addr) { return; } + + NetInterface *mNi; + int mState; + struct sockaddr_storage mAddr; +}; + + class pqiperson: public PQInterface { public: @@ -117,6 +137,8 @@ int connect(uint32_t type, const struct sockaddr_storage &raddr, uint32_t delay, uint32_t period, uint32_t timeout, uint32_t flags, uint32_t bandwidth, const std::string &domain_addr, uint16_t domain_port); +int fullstopthreads(); + int receiveHeartbeat(); // add in connection method. int addChildInterface(uint32_t type, pqiconnect *pqi); @@ -130,6 +152,7 @@ virtual int SendItem(RsItem *item) return SendItem(item,serialized_size) ; } virtual RsItem *GetItem(); +virtual bool RecvItem(RsItem *item); virtual int status(); virtual int tick(); @@ -144,10 +167,20 @@ virtual float getRate(bool in); virtual void setMaxRate(bool in, float val); virtual void setRateCap(float val_in, float val_out); -pqiconnect *getKid(uint32_t type); private: +void processNotifyEvents(); +int handleNotifyEvent_locked(NetInterface *ni, int event, const struct sockaddr_storage &addr); + + RsMutex mNotifyMtx; /**** LOCKS Notify Queue ****/ + + std::list mNotifyQueue; + + RsMutex mPersonMtx; /**** LOCKS below ****/ + +void setRateCap_locked(float val_in, float val_out); + std::map kids; bool active; pqiconnect *activepqi; diff --git a/libretroshare/src/pqi/pqipersongrp.cc b/libretroshare/src/pqi/pqipersongrp.cc index 42aa7b110..e3128c6ed 100644 --- a/libretroshare/src/pqi/pqipersongrp.cc +++ b/libretroshare/src/pqi/pqipersongrp.cc @@ -56,6 +56,21 @@ static std::list waitingIds; * pqilistener and when accessing pqihandlers data. */ + + // New speedy recv. +bool pqipersongrp::RecvRsRawItem(RsRawItem *item) +{ + std::cerr << "pqipersongrp::RecvRsRawItem()"; + std::cerr << std::endl; + + p3ServiceServer::recvItem(item); + + return true; +} + + + + // handle the tunnel services. int pqipersongrp::tickServiceRecv() { @@ -433,6 +448,7 @@ int pqipersongrp::removePeer(std::string id) p -> stoplistening(); pqioutput(PQL_WARNING, pqipersongrpzone, "pqipersongrp::removePeer() => reset() called before deleting person"); p -> reset(); + p -> fullstopthreads(); delete p; mods.erase(it); } @@ -679,7 +695,7 @@ pqiperson * pqipersongrpDummy::locked_createPerson(std::string id, pqilistener * RsSerialiser *rss = new RsSerialiser(); rss->addSerialType(new RsServiceSerialiser()); - pqiconnect *pqic = new pqiconnect(rss, d1); + pqiconnect *pqic = new pqiconnect(pqip, rss, d1); pqip -> addChildInterface(PQI_CONNECT_TCP, pqic); @@ -689,7 +705,7 @@ pqiperson * pqipersongrpDummy::locked_createPerson(std::string id, pqilistener * RsSerialiser *rss2 = new RsSerialiser(); rss2->addSerialType(new RsServiceSerialiser()); - pqiconnect *pqic2 = new pqiconnect(rss2, d2); + pqiconnect *pqic2 = new pqiconnect(pqip, rss2, d2); pqip -> addChildInterface(PQI_CONNECT_UDP, pqic2); diff --git a/libretroshare/src/pqi/pqipersongrp.h b/libretroshare/src/pqi/pqipersongrp.h index cc79d89cf..4e8cd66a8 100644 --- a/libretroshare/src/pqi/pqipersongrp.h +++ b/libretroshare/src/pqi/pqipersongrp.h @@ -80,6 +80,10 @@ int connectPeer(std::string id #endif ); + // New speedy recv. +virtual bool RecvRsRawItem(RsRawItem *item); + + /* Work-around to dodgy pointer stuff */ int tagHeartbeatRecvd(std::string id); diff --git a/libretroshare/src/pqi/pqiqosstreamer.cc b/libretroshare/src/pqi/pqiqosstreamer.cc index f42cb761c..5a8bd8167 100644 --- a/libretroshare/src/pqi/pqiqosstreamer.cc +++ b/libretroshare/src/pqi/pqiqosstreamer.cc @@ -25,8 +25,8 @@ #include "pqiqosstreamer.h" -pqiQoSstreamer::pqiQoSstreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin) - : pqistreamer(rss,peerid,bio_in,bio_flagsin), pqiQoS(PQI_QOS_STREAMER_MAX_LEVELS, PQI_QOS_STREAMER_ALPHA) +pqiQoSstreamer::pqiQoSstreamer(PQInterface *parent, RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin) + : pqithreadstreamer(parent,rss,peerid,bio_in,bio_flagsin), pqiQoS(PQI_QOS_STREAMER_MAX_LEVELS, PQI_QOS_STREAMER_ALPHA) { _total_item_size = 0 ; _total_item_count = 0 ; diff --git a/libretroshare/src/pqi/pqiqosstreamer.h b/libretroshare/src/pqi/pqiqosstreamer.h index 2e95d52d1..9f75ce5d8 100644 --- a/libretroshare/src/pqi/pqiqosstreamer.h +++ b/libretroshare/src/pqi/pqiqosstreamer.h @@ -26,12 +26,12 @@ #pragma once #include "pqiqos.h" -#include "pqistreamer.h" +#include "pqithreadstreamer.h" -class pqiQoSstreamer: public pqistreamer, public pqiQoS +class pqiQoSstreamer: public pqithreadstreamer, public pqiQoS { public: - pqiQoSstreamer(RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin); + pqiQoSstreamer(PQInterface *parent, RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin); static const uint32_t PQI_QOS_STREAMER_MAX_LEVELS = 10 ; static const float PQI_QOS_STREAMER_ALPHA = 2.0 ; diff --git a/libretroshare/src/pqi/pqissl.cc b/libretroshare/src/pqi/pqissl.cc index b8c916da8..32fefbffa 100644 --- a/libretroshare/src/pqi/pqissl.cc +++ b/libretroshare/src/pqi/pqissl.cc @@ -1316,6 +1316,9 @@ int pqissl::Authorise_SSL_Connection() /* This function is public, and callable from pqilistener - so must be mutex protected */ int pqissl::accept(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr) // initiate incoming connection. { + std::cerr << "pqissl::accept()"; + std::cerr << std::endl; + RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/ return accept_locked(ssl, fd, foreign_addr); @@ -1476,6 +1479,9 @@ int pqissl::accept_locked(SSL *ssl, int fd, const struct sockaddr_storage &forei active = true; waiting = WAITING_NOT; + std::cerr << "pqissl::accept_locked() connection complete - notifying parent"; + std::cerr << std::endl; + // Notify the pqiperson.... (Both Connect/Receive) if (parent()) { @@ -1770,7 +1776,7 @@ bool pqissl::moretoread(uint32_t usec) FD_ZERO(&ExceptFDs); FD_SET(sockfd, &ReadFDs); - FD_SET(sockfd, &WriteFDs); + // Dont set WriteFDs. FD_SET(sockfd, &ExceptFDs); struct timeval timeout; @@ -1797,22 +1803,6 @@ bool pqissl::moretoread(uint32_t usec) return 0; } - if (FD_ISSET(sockfd, &WriteFDs)) - { -#ifdef PQISSL_DEBUG - // write can work. - rslog(RSL_DEBUG_ALL, pqisslzone, - "pqissl::moretoread() Can Write!"); -#endif - } - else - { -#ifdef PQISSL_DEBUG - // write can work. - rslog(RSL_DEBUG_BASIC, pqisslzone, - "pqissl::moretoread() Can *NOT* Write!"); -#endif - } if (FD_ISSET(sockfd, &ReadFDs)) { @@ -1849,7 +1839,7 @@ bool pqissl::cansend(uint32_t usec) FD_ZERO(&WriteFDs); FD_ZERO(&ExceptFDs); - FD_SET(sockfd, &ReadFDs); + // Dont Set ReadFDs. FD_SET(sockfd, &WriteFDs); FD_SET(sockfd, &ExceptFDs); diff --git a/libretroshare/src/pqi/pqisslpersongrp.cc b/libretroshare/src/pqi/pqisslpersongrp.cc index 1766359d5..a0fdb6733 100644 --- a/libretroshare/src/pqi/pqisslpersongrp.cc +++ b/libretroshare/src/pqi/pqisslpersongrp.cc @@ -95,7 +95,7 @@ pqiperson * pqisslpersongrp::locked_createPerson(std::string id, pqilistener *li RsSerialiser *rss = new RsSerialiser(); rss->addSerialType(new RsServiceSerialiser()); - pqiconnect *pqisc = new pqiconnect(rss, pqis); + pqiconnect *pqisc = new pqiconnect(pqip, rss, pqis); pqip -> addChildInterface(PQI_CONNECT_HIDDEN_TCP, pqisc); } @@ -118,7 +118,7 @@ pqiperson * pqisslpersongrp::locked_createPerson(std::string id, pqilistener *li RsSerialiser *rss = new RsSerialiser(); rss->addSerialType(new RsServiceSerialiser()); - pqiconnect *pqisc = new pqiconnect(rss, pqis); + pqiconnect *pqisc = new pqiconnect(pqip, rss, pqis); pqip -> addChildInterface(PQI_CONNECT_TCP, pqisc); @@ -128,7 +128,7 @@ pqiperson * pqisslpersongrp::locked_createPerson(std::string id, pqilistener *li RsSerialiser *rss2 = new RsSerialiser(); rss2->addSerialType(new RsServiceSerialiser()); - pqiconnect *pqiusc = new pqiconnect(rss2, pqius); + pqiconnect *pqiusc = new pqiconnect(pqip, rss2, pqius); // add a ssl + proxy interface. // Add Proxy First. diff --git a/libretroshare/src/pqi/pqissludp.cc b/libretroshare/src/pqi/pqissludp.cc index 71be7dd71..fef91d8db 100644 --- a/libretroshare/src/pqi/pqissludp.cc +++ b/libretroshare/src/pqi/pqissludp.cc @@ -561,18 +561,24 @@ bool pqissludp::moretoread(uint32_t usec) { RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/ - if (usec) - { - std::cerr << "pqissludp::moretoread() usec parameter not implemented"; - std::cerr << std::endl; - } - { std::string out = "pqissludp::moretoread()"; rs_sprintf_append(out, " polling socket (%d)", sockfd); rslog(RSL_DEBUG_ALL, pqissludpzone, out); } + if (usec) + { + std::cerr << "pqissludp::moretoread() usec parameter: " << usec; + std::cerr << std::endl; + + if (0 < tou_maxread(sockfd)) + { + return true; + } + usleep(usec); + } + /* check for more to read first ... if nothing... check error */ /* <===================== UDP Difference *******************/ @@ -631,8 +637,15 @@ bool pqissludp::cansend(uint32_t usec) if (usec) { - std::cerr << "pqissludp::cansend() usec parameter not implemented"; + std::cerr << "pqissludp::cansend() usec parameter: " << usec; std::cerr << std::endl; + + if (0 < tou_maxwrite(sockfd)) + { + return true; + } + + usleep(usec); } rslog(RSL_DEBUG_ALL, pqissludpzone, diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index d9e48293e..ca4b533ab 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -45,7 +45,6 @@ const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */ #define DEBUG_PQISTREAMER 1 ***/ -#define DEBUG_PQISTREAMER 1 #ifdef DEBUG_TRANSFERS #include "util/rsprint.h" @@ -54,7 +53,7 @@ const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */ pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in, int bio_flags_in) :PQInterface(id), mStreamerMtx("pqistreamer"), - mRsSerialiser(rss), mBio(bio_in), mBio_flags(bio_flags_in), + mBio(bio_in), mBio_flags(bio_flags_in), mRsSerialiser(rss), mPkt_wpending(NULL), mTotalRead(0), mTotalSent(0), mCurrRead(0), mCurrSent(0), @@ -171,10 +170,10 @@ RsItem *pqistreamer::GetItem() // // PQInterface int pqistreamer::tick() { - RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ #ifdef DEBUG_PQISTREAMER { + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ std::string out = "pqistreamer::tick()\n" + PeerId(); rs_sprintf_append(out, ": currRead/Sent: %d/%d", mCurrRead, mCurrSent); @@ -182,22 +181,15 @@ int pqistreamer::tick() } #endif - mBio->tick(); - - /* short circuit everything is bio isn't active */ - if (!(mBio->isactive())) + if (!tick_bio()) { return 0; } + tick_recv(0); + tick_send(0); - /* must do both, as outgoing will catch some bad sockets, - * that incoming will not - */ - - handleincoming_locked(); - handleoutgoing_locked(); - + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ #ifdef DEBUG_PQISTREAMER /* give details of the packets */ { @@ -234,6 +226,49 @@ int pqistreamer::tick() return 0; } +int pqistreamer::tick_bio() +{ + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + mBio->tick(); + + /* short circuit everything is bio isn't active */ + if (!(mBio->isactive())) + { + return 0; + } + return 1; +} + + +int pqistreamer::tick_recv(uint32_t timeout) +{ + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + + if (mBio->moretoread(timeout)) + { + handleincoming_locked(); + } + return 1; +} + + +int pqistreamer::tick_send(uint32_t timeout) +{ + RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ + + /* short circuit everything is bio isn't active */ + if (!(mBio->isactive())) + { + return 0; + } + + if (mBio->cansend(timeout)) + { + handleoutgoing_locked(); + } + return 1; +} + int pqistreamer::status() { RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/ diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index db08072c7..ce6555532 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -62,6 +62,11 @@ class pqistreamer: public PQInterface virtual void getRates(RsBwRates &rates); virtual int getQueueSize(bool in); // extracting data. protected: + + int tick_bio(); + int tick_send(uint32_t timeout); + int tick_recv(uint32_t timeout); + /* Implementation */ // These methods are redefined in pqiQoSstreamer @@ -76,6 +81,10 @@ class pqistreamer: public PQInterface protected: RsMutex mStreamerMtx ; // Protects data, fns below, protected so pqiqos can use it too. + // Binary Interface for IO, initialisated at startup. + BinInterface *mBio; + unsigned int mBio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE + private: int queue_outpqi_locked(RsItem *i,uint32_t& serialized_size); int handleincomingitem_locked(RsItem *i); @@ -98,9 +107,6 @@ class pqistreamer: public PQInterface // RsSerialiser - determines which packets can be serialised. RsSerialiser *mRsSerialiser; - // Binary Interface for IO, initialisated at startup. - BinInterface *mBio; - unsigned int mBio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE void *mPkt_wpending; // storage for pending packet to write. int mPkt_rpend_size; // size of pkt_rpending. diff --git a/libretroshare/src/pqi/pqithreadstreamer.cc b/libretroshare/src/pqi/pqithreadstreamer.cc new file mode 100644 index 000000000..196059665 --- /dev/null +++ b/libretroshare/src/pqi/pqithreadstreamer.cc @@ -0,0 +1,174 @@ +/* + * pqithreadstreamer.cc + * + * 3P/PQI network interface for RetroShare. + * + * Copyright 2004-2013 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + + +#include "pqi/pqithreadstreamer.h" + +#define DEFAULT_STREAMER_TIMEOUT 10000 // 10 ms. +#define DEFAULT_STREAMER_SLEEP 1000 // 1 ms. +#define DEFAULT_STREAMER_IDLE_SLEEP 1000000 // 1 sec + +pqithreadstreamer::pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, std::string id, BinInterface *bio_in, int bio_flags_in) +:pqistreamer(rss, id, bio_in, bio_flags_in), mThreadMutex("pqithreadstreamer"), mParent(parent), mTimeout(0) +{ + mTimeout = DEFAULT_STREAMER_TIMEOUT; + mSleepPeriod = DEFAULT_STREAMER_SLEEP; + return; +} + +bool pqithreadstreamer::RecvItem(RsItem *item) +{ + return mParent->RecvItem(item); +} + +int pqithreadstreamer::tick() +{ + tick_bio(); + + return 0; +} + +void pqithreadstreamer::start() +{ + { + RsStackMutex stack(mThreadMutex); + mToRun = true; + } + RsThread::start(); +} + +void pqithreadstreamer::run() +{ + std::cerr << "pqithreadstream::run()"; + std::cerr << std::endl; + + { + RsStackMutex stack(mThreadMutex); + mRunning = true; + } + + while(1) + { + { + RsStackMutex stack(mThreadMutex); + if (!mToRun) + { + std::cerr << "pqithreadstream::run() stopping"; + std::cerr << std::endl; + + mRunning = false; + return; + } + } + data_tick(); + } +} + +void pqithreadstreamer::stop() +{ + RsStackMutex stack(mThreadMutex); + + std::cerr << "pqithreadstream::stop()"; + std::cerr << std::endl; + + mToRun = false; +} + +void pqithreadstreamer::fullstop() +{ + stop(); + + while(1) + { + RsStackMutex stack(mThreadMutex); + if (!mRunning) + { + std::cerr << "pqithreadstream::fullstop() complete"; + std::cerr << std::endl; + return; + } + usleep(1000); + } +} + +bool pqithreadstreamer::threadrunning() +{ + RsStackMutex stack(mThreadMutex); + return mRunning; +} + + +int pqithreadstreamer::data_tick() +{ + //std::cerr << "pqithreadstream::data_tick()"; + //std::cerr << std::endl; + + uint32_t recv_timeout = 0; + uint32_t sleep_period = 0; + bool isactive = false; + { + RsStackMutex stack(mStreamerMtx); + recv_timeout = mTimeout; + sleep_period = mSleepPeriod; + isactive = mBio->isactive(); + } + + if (!isactive) + { + usleep(DEFAULT_STREAMER_IDLE_SLEEP); + return 0; + } + + + //std::cerr << "pqithreadstream::data_tick() tick_recv"; + //std::cerr << std::endl; + + tick_recv(recv_timeout); + + // Push Items, Outside of Mutex. + RsItem *incoming = NULL; + while((incoming = GetItem())) + { + RecvItem(incoming); + } + + //std::cerr << "pqithreadstream::data_tick() tick_send"; + //std::cerr << std::endl; + + tick_send(0); + + if (sleep_period) + { + //std::cerr << "pqithreadstream::data_tick() usleep"; + //std::cerr << std::endl; + + usleep(sleep_period); + } + return 1; +} + + + + diff --git a/libretroshare/src/pqi/pqithreadstreamer.h b/libretroshare/src/pqi/pqithreadstreamer.h new file mode 100644 index 000000000..7c07aeb3d --- /dev/null +++ b/libretroshare/src/pqi/pqithreadstreamer.h @@ -0,0 +1,62 @@ +/* + * libretroshare/src/pqi pqithreadstreamer.h + * + * 3P/PQI network interface for RetroShare. + * + * Copyright 2004-2013 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#ifndef MRK_PQI_THREAD_STREAMER_HEADER +#define MRK_PQI_THREAD_STREAMER_HEADER + +#include "pqi/pqistreamer.h" +#include "util/rsthreads.h" + +class pqithreadstreamer: public pqistreamer, public RsThread +{ + public: + pqithreadstreamer(PQInterface *parent, RsSerialiser *rss, std::string peerid, BinInterface *bio_in, int bio_flagsin); + +virtual void run(); +virtual void start(); +virtual void stop(); +virtual void fullstop(); +virtual bool threadrunning(); + +virtual bool RecvItem(RsItem *item); +virtual int tick(); + +protected: + +int data_tick(); + + PQInterface *mParent; + uint32_t mTimeout; + uint32_t mSleepPeriod; + +private: + /* thread variables */ + RsMutex mThreadMutex; + bool mRunning; + bool mToRun; + +}; + +#endif //MRK_PQI_THREAD_STREAMER_HEADER diff --git a/libretroshare/src/services/p3discovery2.cc b/libretroshare/src/services/p3discovery2.cc index 606c6e631..0fb722fd8 100644 --- a/libretroshare/src/services/p3discovery2.cc +++ b/libretroshare/src/services/p3discovery2.cc @@ -32,8 +32,9 @@ // Interface pointer. RsDisc *rsDisc = NULL; -#define P3DISC_DEBUG 1 - +/**** + * #define P3DISC_DEBUG 1 + ****/ bool populateContactInfo(const peerState &detail, RsDiscContactItem *pkt) @@ -297,7 +298,7 @@ int p3discovery2::handleIncoming() else { #ifdef P3DISC_DEBUG - std::cerr << "p3disc::handleIncoming() Unknown Received Message!" << std::endl; + std::cerr << "p3discovery2::handleIncoming() Unknown Received Message!" << std::endl; item -> print(std::cerr); std::cerr << std::endl; #endif @@ -306,7 +307,7 @@ int p3discovery2::handleIncoming() } #ifdef P3DISC_DEBUG - std::cerr << "p3disc::handleIncoming() finished." << std::endl; + std::cerr << "p3discovery2::handleIncoming() finished." << std::endl; #endif return nhandled; @@ -985,7 +986,7 @@ void p3discovery2::sendPGPCertificate(const PGPID &aboutId, const SSLID &toId) if ((linkType & RS_NET_CONN_SPEED_TRICKLE) || (linkType & RS_NET_CONN_SPEED_LOW)) { - std::cerr << "p3disc::sendPGPCertificate() Not sending Certificates to: " << toId; + std::cerr << "p3discovery2::sendPGPCertificate() Not sending Certificates to: " << toId; std::cerr << " (low bandwidth)" << std::endl; return; } @@ -1023,7 +1024,7 @@ void p3discovery2::recvPGPCertificate(const SSLID &fromId, RsDiscPgpCertItem *it if (pstate.vs_disc != RS_VS_DISC_FULL) { #ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvPGPCertificate() Not Loading Certificates as in MINIMAL MODE"; + std::cerr << "p3discovery2::recvPGPCertificate() Not Loading Certificates as in MINIMAL MODE"; std::cerr << std::endl; #endif @@ -1278,1231 +1279,3 @@ void p3discovery2::setGPGOperation(AuthGPGOperation *operation) } - -#if 0 -/***************************************************************************************/ -/***************************************************************************************/ -/************** OLD CODE ***************************/ -/***************************************************************************************/ -/***************************************************************************************/ - - -#include "retroshare/rsiface.h" -#include "retroshare/rspeers.h" -#include "services/p3disc.h" - -#include "pqi/p3peermgr.h" -#include "pqi/p3linkmgr.h" -#include "pqi/p3netmgr.h" - -#include "pqi/authssl.h" -#include "pqi/authgpg.h" - -#include -#include -#include - -const uint32_t AUTODISC_LDI_SUBTYPE_PING = 0x01; -const uint32_t AUTODISC_LDI_SUBTYPE_RPLY = 0x02; - -#include "util/rsdebug.h" -#include "util/rsprint.h" -#include "util/rsversion.h" - -const int pqidisczone = 2482; - -//static int convertTDeltaToTRange(double tdelta); -//static int convertTRangeToTDelta(int trange); - -// Operating System specific includes. -#include "pqi/pqinetwork.h" - -/* DISC FLAGS */ - -const uint32_t P3DISC_FLAGS_USE_DISC = 0x0001; -const uint32_t P3DISC_FLAGS_USE_DHT = 0x0002; -const uint32_t P3DISC_FLAGS_EXTERNAL_ADDR = 0x0004; -const uint32_t P3DISC_FLAGS_STABLE_UDP = 0x0008; -const uint32_t P3DISC_FLAGS_PEER_ONLINE = 0x0010; -const uint32_t P3DISC_FLAGS_OWN_DETAILS = 0x0020; -const uint32_t P3DISC_FLAGS_PEER_TRUSTS_ME = 0x0040; -const uint32_t P3DISC_FLAGS_ASK_VERSION = 0x0080; - - -/***** - * #define P3DISC_DEBUG 1 - ****/ - -//#define P3DISC_DEBUG 1 - -/****************************************************************************************** - ****************************** NEW DISCOVERY ******************************************* - ****************************************************************************************** - *****************************************************************************************/ - -p3disc::p3disc(p3PeerMgr *pm, p3LinkMgr *lm, p3NetMgr *nm, pqipersongrp *pqih) - :p3Service(RS_SERVICE_TYPE_DISC), - p3Config(CONFIG_TYPE_P3DISC), - mPeerMgr(pm), mLinkMgr(lm), mNetMgr(nm), - mPqiPersonGrp(pqih), mDiscMtx("p3disc") -{ - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - addSerialType(new RsDiscSerialiser()); - - mLastSentHeartbeatTime = time(NULL); - mDiscEnabled = true; - - //add own version to versions map - versions[AuthSSL::getAuthSSL()->OwnId()] = RsUtil::retroshareVersion(); -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::p3disc() setup"; - std::cerr << std::endl; -#endif - - return; -} - -int p3disc::tick() -{ - //send a heartbeat to all connected peers - time_t hbTime; - { - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - hbTime = mLastSentHeartbeatTime; - } - - if (time(NULL) - hbTime > HEARTBEAT_REPEAT_TIME) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::tick() sending heartbeat to all peers" << std::endl; -#endif - - std::list peers; - std::list::const_iterator pit; - - mLinkMgr->getOnlineList(peers); - for (pit = peers.begin(); pit != peers.end(); ++pit) - { - sendHeartbeat(*pit); - } - - /* check our Discovery flag */ - peerState detail; - mPeerMgr->getOwnNetStatus(detail); - - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - mDiscEnabled = (!(detail.visState & RS_VIS_STATE_NODISC)); - mLastSentHeartbeatTime = time(NULL); - } - - return handleIncoming(); -} - -int p3disc::handleIncoming() -{ - RsItem *item = NULL; - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::handleIncoming()" << std::endl; -#endif - - int nhandled = 0; - // While messages read - while(NULL != (item = recvItem())) - { - RsDiscAskInfo *inf = NULL; - RsDiscReply *dri = NULL; - RsDiscVersion *dvi = NULL; - RsDiscHeartbeat *dta = NULL; - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::handleIncoming() Received Message!" << std::endl; - item -> print(std::cerr); - std::cerr << std::endl; -#endif - - // if discovery reply then respond if haven't already. - if (NULL != (dri = dynamic_cast (item))) - { - if(rsPeers->servicePermissionFlags_sslid(item->PeerId()) & RS_SERVICE_PERM_DISCOVERY) - recvDiscReply(dri); - else - delete item ; - } - else if (NULL != (dvi = dynamic_cast (item))) - { - recvPeerVersionMsg(dvi); - nhandled++; - delete item; - } - else if (NULL != (inf = dynamic_cast (item))) /* Ping */ - { - if(rsPeers->servicePermissionFlags_sslid(item->PeerId()) & RS_SERVICE_PERM_DISCOVERY) - recvAskInfo(inf); - - nhandled++; - delete item; - } - else if (NULL != (dta = dynamic_cast (item))) - { - recvHeartbeatMsg(dta); - nhandled++ ; - delete item; - } - else - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::handleIncoming() Unknown Received Message!" << std::endl; - item -> print(std::cerr); - std::cerr << std::endl; -#endif - delete item; - } - } - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::handleIncoming() finished." << std::endl; -#endif - - return nhandled; -} - - - - - /************* from pqiMonitor *******************/ -void p3disc::statusChange(const std::list &plist) -{ -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::statusChange()" << std::endl; -#endif - - std::list::const_iterator pit; - /* if any have switched to 'connected' then we notify */ - for(pit = plist.begin(); pit != plist.end(); pit++) - { - if ((pit->state & RS_PEER_S_FRIEND) && (pit->actions & RS_PEER_CONNECTED)) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::statusChange() Starting Disc with: " << pit->id << std::endl; -#endif - sendOwnVersion(pit->id); - - if(rsPeers->servicePermissionFlags_sslid(pit->id) & RS_SERVICE_PERM_DISCOVERY) - sendAllInfoToJustConnectedPeer(pit->id); - - sendJustConnectedPeerInfoToAllPeer(pit->id); - } - else if (!(pit->state & RS_PEER_S_FRIEND) && (pit->actions & RS_PEER_MOVED)) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::statusChange() Removing Friend: " << pit->id << std::endl; -#endif - removeFriend(pit->id); - } - else if ((pit->state & RS_PEER_S_FRIEND) && (pit->actions & RS_PEER_NEW)) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::statusChange() Adding Friend: " << pit->id << std::endl; -#endif - askInfoToAllPeers(pit->id); - } - } -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::statusChange() finished." << std::endl; -#endif - return; -} - -void p3disc::sendAllInfoToJustConnectedPeer(const std::string &id) -{ - /* get a peer lists */ - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendAllInfoToJustConnectedPeer() id: " << id << std::endl; -#endif - - std::list friendIds; - std::list::iterator it; - std::set gpgIds; - std::set::iterator git; - - /* We send our full friends list - if we have Discovery Enabled */ - if (mDiscEnabled) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendAllInfoToJustConnectedPeer() Discovery Enabled, sending Friend List" << std::endl; -#endif - mLinkMgr->getFriendList(friendIds); - - /* send them a list of all friend's details */ - for(it = friendIds.begin(); it != friendIds.end(); it++) - { - /* get details */ - peerState detail; - if (!mPeerMgr->getFriendNetStatus(*it, detail)) - { - /* major error! */ -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendAllInfoToJustConnectedPeer() No Info, Skipping: " << *it; - std::cerr << std::endl; -#endif - continue; - } - - if (!(detail.visState & RS_VIS_STATE_NODISC)) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendAllInfoToJustConnectedPeer() Adding GPGID: " << detail.gpg_id; - std::cerr << " (SSLID: " << *it << ")"; - std::cerr << std::endl; -#endif - gpgIds.insert(detail.gpg_id); - } - else - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendAllInfoToJustConnectedPeer() DISC OFF for GPGID: " << detail.gpg_id; - std::cerr << " (SSLID: " << *it << ")"; - std::cerr << std::endl; -#endif - } - } - } - - //add own info, this info is sent whether discovery is enabled - or not. - gpgIds.insert(rsPeers->getGPGOwnId()); - - { - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - /* refresh with new list */ - std::list &idList = mSendIdList[id]; - idList.clear(); - for(git = gpgIds.begin(); git != gpgIds.end(); git++) - { - idList.push_back(*git); - } - } - - #ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendAllInfoToJustConnectedPeer() finished." << std::endl; - #endif -} - -void p3disc::sendJustConnectedPeerInfoToAllPeer(const std::string &connectedPeerId) -{ - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendJustConnectedPeerInfoToAllPeer() connectedPeerId : " << connectedPeerId << std::endl; -#endif - - /* only ask info if discovery is on */ - { - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - if (!mDiscEnabled) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendJustConnectedPeerInfoToAllPeer() Disc Disabled => NULL OP" << std::endl; -#endif - return; - } - } - - /* get details */ - peerState detail; - if (!mPeerMgr->getFriendNetStatus(connectedPeerId, detail)) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendJustConnectedPeerInfoToAllPeer() No NetStatus => FAILED" << std::endl; -#endif - return; - } - - if (detail.visState & RS_VIS_STATE_NODISC) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendJustConnectedPeerInfoToAllPeer() Peer Disc Discable => NULL OP" << std::endl; -#endif - return; - } - - - std::string gpg_connectedPeerId = rsPeers->getGPGId(connectedPeerId); - std::list onlineIds; - - /* get a peer lists */ - rsPeers->getOnlineList(onlineIds); - - { - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - /* append gpg id's of all friend's to the sending list */ - - std::list::iterator it; - for (it = onlineIds.begin(); it != onlineIds.end(); it++) - if(rsPeers->servicePermissionFlags_sslid(*it) & RS_SERVICE_PERM_DISCOVERY) - { - std::list &idList = mSendIdList[*it]; - - if (std::find(idList.begin(), idList.end(), gpg_connectedPeerId) == idList.end()) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendJustConnectedPeerInfoToAllPeer() adding to queue for: "; - std::cerr << *it << std::endl; -#endif - idList.push_back(gpg_connectedPeerId); - } - else - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendJustConnectedPeerInfoToAllPeer() already in queue for: "; - std::cerr << *it << std::endl; -#endif - } - } - } -} - - -bool isDummyFriend(const std::string &id) -{ - bool ret = (id.substr(0,5) == "dummy"); - return ret; -} - - /* (dest (to), source (cert)) */ -RsDiscReply *p3disc::createDiscReply(const std::string &to, const std::string &about) -{ - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::createDiscReply() called. Sending details of: " << about << " to: " << to << std::endl; -#endif - - std::string aboutGpgId = rsPeers->getGPGId(about); - if (aboutGpgId.empty()) { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::createDiscReply() no info about this id" << std::endl; -#endif - return NULL; - } - - - // Construct a message - RsDiscReply *di = new RsDiscReply(); - - // Fill the message - // Set Target as input cert. - di -> PeerId(to); - di -> aboutId = aboutGpgId; - - // set the ip addresse list. - std::list sslChilds; - rsPeers->getAssociatedSSLIds(aboutGpgId, sslChilds); - bool shouldWeSendGPGKey = false;//the GPG key is send only if we've got a valid friend with DISC enabled - - { - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - if (!mDiscEnabled) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::createDiscReply() Disc Disabled, removing all friend SSL Ids"; - std::cerr << std::endl; -#endif - sslChilds.clear(); - } - } - - std::list::iterator it; - for (it = sslChilds.begin(); it != sslChilds.end(); it++) - { - /* skip dummy ones - until they are removed fully */ - if (isDummyFriend(*it)) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::createDiscReply() Skipping Dummy Child SSL Id:" << *it; - std::cerr << std::endl; -#endif - continue; - } - - peerState detail; - if (!mPeerMgr->getFriendNetStatus(*it, detail) - || detail.visState & RS_VIS_STATE_NODISC) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::createDiscReply() Skipping cos No Details or NODISC flag id: " << *it; - std::cerr << std::endl; -#endif - continue; - } - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::createDiscReply() Found Child SSL Id:" << *it; - std::cerr << std::endl; -#endif - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::createDiscReply() Adding Child SSL Id Details"; - std::cerr << std::endl; -#endif - shouldWeSendGPGKey = true; - - RsPeerNetItem rsPeerNetItem ; - rsPeerNetItem.clear(); - - rsPeerNetItem.pid = detail.id; - rsPeerNetItem.gpg_id = detail.gpg_id; - rsPeerNetItem.location = detail.location; - rsPeerNetItem.netMode = detail.netMode; - rsPeerNetItem.visState = detail.visState; - rsPeerNetItem.lastContact = detail.lastcontact; - rsPeerNetItem.currentlocaladdr = detail.localaddr; - rsPeerNetItem.currentremoteaddr = detail.serveraddr; - rsPeerNetItem.dyndns = detail.dyndns; - detail.ipAddrs.mLocal.loadTlv(rsPeerNetItem.localAddrList); - detail.ipAddrs.mExt.loadTlv(rsPeerNetItem.extAddrList); - - - di->rsPeerList.push_back(rsPeerNetItem); - } - - - //send own details - if (about == rsPeers->getGPGOwnId()) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::createDiscReply() Adding Own Id Details"; - std::cerr << std::endl; -#endif - peerState detail; - if (mPeerMgr->getOwnNetStatus(detail)) - { - shouldWeSendGPGKey = true; - RsPeerNetItem rsPeerNetItem ; - rsPeerNetItem.clear(); - rsPeerNetItem.pid = detail.id; - rsPeerNetItem.gpg_id = detail.gpg_id; - rsPeerNetItem.location = detail.location; - rsPeerNetItem.netMode = detail.netMode; - rsPeerNetItem.visState = detail.visState; - rsPeerNetItem.lastContact = time(NULL); - rsPeerNetItem.currentlocaladdr = detail.localaddr; - rsPeerNetItem.currentremoteaddr = detail.serveraddr; - rsPeerNetItem.dyndns = detail.dyndns; - detail.ipAddrs.mLocal.loadTlv(rsPeerNetItem.localAddrList); - detail.ipAddrs.mExt.loadTlv(rsPeerNetItem.extAddrList); - - di->rsPeerList.push_back(rsPeerNetItem); - } - } - - if (!shouldWeSendGPGKey) { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::createDiscReply() GPG key should not be send, no friend with disc on found about it." << std::endl; -#endif - // cleanup! - delete di; - return NULL; - } - - return di; -} - -void p3disc::sendOwnVersion(std::string to) -{ -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendOwnVersion() Sending rs version to: " << to << std::endl; -#endif - - /* only ask info if discovery is on */ - if (!mDiscEnabled) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendOwnVersion() Disc Disabled => NULL OP" << std::endl; -#endif - return; - } - - - RsDiscVersion *di = new RsDiscVersion(); - di->PeerId(to); - di->version = RsUtil::retroshareVersion(); - - /* send the message */ - sendItem(di); - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::sendOwnVersion() finished." << std::endl; -#endif -} - -void p3disc::sendHeartbeat(std::string to) -{ - { - std::string out = "p3disc::sendHeartbeat() to : " + to; -#ifdef P3DISC_DEBUG - std::cerr << out << std::endl; -#endif - rslog(RSL_WARNING, pqidisczone, out); - } - - - RsDiscHeartbeat *di = new RsDiscHeartbeat(); - di->PeerId(to); - - /* send the message */ - sendItem(di); - -#ifdef P3DISC_DEBUG - std::cerr << "Sent tick Message" << std::endl; -#endif -} - -void p3disc::askInfoToAllPeers(std::string about) -{ - -#ifdef P3DISC_DEBUG - std::cerr <<"p3disc::askInfoToAllPeers() about " << about << std::endl; -#endif - - // We Still Ask even if Disc isn't Enabled... if they want to give us the info ;) - - - peerState connectState; - if (!mPeerMgr->getFriendNetStatus(about, connectState) || (connectState.visState & RS_VIS_STATE_NODISC)) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::askInfoToAllPeers() friend disc is off, don't send the request." << std::endl; -#endif - return; - } - - std::string aboutGpgId = rsPeers->getGPGId(about); - if (aboutGpgId == "") - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::askInfoToAllPeers() no gpg id found" << std::endl; -#endif - } - - std::list onlineIds; - std::list::iterator it; - - rsPeers->getOnlineList(onlineIds); - - /* ask info to trusted friends */ - for(it = onlineIds.begin(); it != onlineIds.end(); it++) - { - RsDiscAskInfo *di = new RsDiscAskInfo(); - di->PeerId(*it); - di->gpg_id = aboutGpgId; - sendItem(di); -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::askInfoToAllPeers() question sent to : " << *it << std::endl; -#endif - } -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::askInfoToAllPeers() finished." << std::endl; -#endif -} - -void p3disc::recvPeerDetails(RsDiscReply *item, const std::string &certGpgId) -{ - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvPeerFriendMsg() From: " << item->PeerId() << " About " << item->aboutId << std::endl; -#endif - - if (certGpgId.empty()) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvPeerFriendMsg() gpg cert Id is empty - cert not transmitted" << std::endl; -#endif - } - else if (item->aboutId == "" || item->aboutId != certGpgId) - { - std::cerr << "p3disc::recvPeerFriendMsg() Error : about id is not the same as gpg id." << std::endl; - return; - } - - bool should_notify_discovery = false ; - std::string item_gpg_id = rsPeers->getGPGId(item->PeerId()) ; - - for (std::list::iterator pit = item->rsPeerList.begin(); pit != item->rsPeerList.end(); pit++) - { - if(isDummyFriend(pit->pid)) - { - continue; - } - - bool new_info = false; - addDiscoveryData(item->PeerId(), pit->pid,item_gpg_id, - item->aboutId, pit->currentlocaladdr, pit->currentremoteaddr, 0, time(NULL),new_info); - - if(new_info) - should_notify_discovery = true ; - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvPeerFriendMsg() Peer Config Item:" << std::endl; - pit->print(std::cerr, 10); - std::cerr << std::endl; -#endif - - if (pit->pid != rsPeers->getOwnId()) - { - // Apparently, the connect manager won't add a friend if the gpg id is not - // trusted. However, this should be tested here for consistency and security - // in case of modifications in mConnMgr. - // - - // Check if already friend. - if(AuthGPG::getAuthGPG()->isGPGAccepted(pit->gpg_id) || pit->gpg_id == AuthGPG::getAuthGPG()->getGPGOwnId()) - { - if (!mPeerMgr->isFriend(pit->pid)) - { - // Add with no disc by default. If friend already exists, it will do nothing - // NO DISC is important - otherwise, we'll just enter a nasty loop, - // where every addition triggers requests, then they are cleaned up, and readded... - - // This way we get their addresses, but don't advertise them until we get a - // connection. -#ifdef P3DISC_DEBUG - std::cerr << "--> Adding to friends list " << pit->pid << " - " << pit->gpg_id << std::endl; -#endif - mPeerMgr->addFriend(pit->pid, pit->gpg_id, pit->netMode, RS_VIS_STATE_NODISC,(time_t)0,RS_SERVICE_PERM_ALL); - } - } - - /* skip if not one of our peers */ - if (!mPeerMgr->isFriend(pit->pid)) - { - /* THESE ARE OUR FRIEND OF FRIENDS ... pass this information along to NetMgr & DHT... - * as we can track FOF and use them as potential Proxies / Relays - */ - - /* add into NetMgr and non-search, so we can detect connect attempts */ - mNetMgr->netAssistFriend(pit->pid,false); - - /* inform NetMgr that we know this peer */ - mNetMgr->netAssistKnownPeer(pit->pid, pit->currentremoteaddr, - NETASSIST_KNOWN_PEER_FOF | NETASSIST_KNOWN_PEER_OFFLINE); - - continue; - } - - if (item->PeerId() == pit->pid) - { -#ifdef P3DISC_DEBUG - std::cerr << "Info sent by the peer itself -> updating self info:" << std::endl; - std::cerr << " -> current local addr = " << pit->currentlocaladdr << std::endl; - std::cerr << " -> current remote addr = " << pit->currentremoteaddr << std::endl; - std::cerr << " -> visState = " << std::hex << pit->visState << std::dec; - std::cerr << " -> network mode: " << pit->netMode << std::endl; - std::cerr << " -> location: " << pit->location << std::endl; - std::cerr << std::endl; -#endif - - bool peerDataChanged = false; - - // When the peer sends his own list of IPs, the info replaces the existing info, because the - // peer is the primary source of his own IPs. - if (mPeerMgr->setNetworkMode(pit->pid, pit->netMode)) { - peerDataChanged = true; - } - if (mPeerMgr->setLocation(pit->pid, pit->location)) { - peerDataChanged = true; - } - if (mPeerMgr->setLocalAddress(pit->pid, pit->currentlocaladdr)) { - peerDataChanged = true; - } - if (mPeerMgr->setExtAddress(pit->pid, pit->currentremoteaddr)) { - peerDataChanged = true; - } - if (mPeerMgr->setVisState(pit->pid, pit->visState)) { - peerDataChanged = true; - } - if (mPeerMgr->setDynDNS(pit->pid, pit->dyndns)) { - peerDataChanged = true; - } - - if (peerDataChanged == true) - { - // inform all connected peers of change - sendJustConnectedPeerInfoToAllPeer(pit->pid); - } - } - - // always update historical address list... this should be enough to let us connect. - - pqiIpAddrSet addrsFromPeer; - addrsFromPeer.mLocal.extractFromTlv(pit->localAddrList); - addrsFromPeer.mExt.extractFromTlv(pit->extAddrList); - -#ifdef P3DISC_DEBUG - std::cerr << "Setting address list to peer " << pit->pid << ", to be:" << std::endl ; - - addrsFromPeer.printAddrs(std::cerr); - std::cerr << std::endl; -#endif - mPeerMgr->updateAddressList(pit->pid, addrsFromPeer); - - } -#ifdef P3DISC_DEBUG - else - { - std::cerr << "Skipping info about own id " << pit->pid << std::endl ; - } -#endif - - } - - rsicontrol->getNotify().notifyListChange(NOTIFY_LIST_NEIGHBOURS, NOTIFY_TYPE_MOD); - - if(should_notify_discovery) - rsicontrol->getNotify().notifyDiscInfoChanged(); - - /* cleanup (handled by caller) */ -} - -void p3disc::recvPeerVersionMsg(RsDiscVersion *item) -{ -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvPeerVersionMsg() From: " << item->PeerId(); - std::cerr << std::endl; -#endif - - // dont need protection - versions[item->PeerId()] = item->version; - - return; -} - -void p3disc::recvHeartbeatMsg(RsDiscHeartbeat *item) -{ -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvHeartbeatMsg() From: " << item->PeerId(); - std::cerr << std::endl; -#endif - - mPqiPersonGrp->tagHeartbeatRecvd(item->PeerId()); - - return; -} - -void p3disc::recvAskInfo(RsDiscAskInfo *item) -{ -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvAskInfo() From: " << item->PeerId(); - std::cerr << std::endl; -#endif - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - /* only provide info if discovery is on */ - if (!mDiscEnabled) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvAskInfo() Disc Disabled => NULL OP"; - std::cerr << std::endl; -#endif - return; - } - - std::list &idList = mSendIdList[item->PeerId()]; - - if (std::find(idList.begin(), idList.end(), item->gpg_id) == idList.end()) { - idList.push_back(item->gpg_id); - } -} - -void p3disc::recvDiscReply(RsDiscReply *dri) -{ - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::recvDiscReply() From: " << dri->PeerId() << " About: " << dri->aboutId; - std::cerr << std::endl; -#endif - - /* search pending item and remove it, when already exist */ - std::list::iterator it; - for (it = mPendingDiscReplyInList.begin(); it != mPendingDiscReplyInList.end(); it++) - { - if ((*it)->PeerId() == dri->PeerId() && (*it)->aboutId == dri->aboutId) - { - delete (*it); - mPendingDiscReplyInList.erase(it); - break; - } - } - - // add item to list for later process - - if(mDiscEnabled || dri->aboutId == rsPeers->getGPGId(dri->PeerId())) - mPendingDiscReplyInList.push_back(dri); // no delete - else - delete dri ; -} - - - - -void p3disc::removeFriend(std::string /*ssl_id*/) -{ -} - -/*************************************************************************************/ -/* AuthGPGService */ -/*************************************************************************************/ -AuthGPGOperation *p3disc::getGPGOperation() -{ - { - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - /* process disc reply in list */ - if (mPendingDiscReplyInList.empty() == false) { - RsDiscReply *item = mPendingDiscReplyInList.front(); - - return new AuthGPGOperationLoadOrSave(true, item->aboutId, item->certGPG, item); - } - } - - /* process disc reply out list */ - - std::string destId; - std::string srcId; - - { - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - while (!mSendIdList.empty()) - { - std::map >::iterator it = mSendIdList.begin(); - - if (!it->second.empty() && mLinkMgr->isOnline(it->first)) { - std::string gpgId = it->second.front(); - it->second.pop_front(); - - destId = it->first; - srcId = gpgId; - - break; - } else { - /* peer is not online anymore ... try next */ - mSendIdList.erase(it); - } - } - } - - if (!destId.empty() && !srcId.empty()) { - RsDiscReply *item = createDiscReply(destId, srcId); - if (item) { - return new AuthGPGOperationLoadOrSave(false, item->aboutId, "", item); - } - } - - return NULL; -} - -void p3disc::setGPGOperation(AuthGPGOperation *operation) -{ - AuthGPGOperationLoadOrSave *loadOrSave = dynamic_cast(operation); - if (loadOrSave) { - if (loadOrSave->m_load) { - /* search in pending in list */ - RsDiscReply *item = NULL; - - { - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - std::list::iterator it; - it = std::find(mPendingDiscReplyInList.begin(), mPendingDiscReplyInList.end(), loadOrSave->m_userdata); - if (it != mPendingDiscReplyInList.end()) { - item = *it; - mPendingDiscReplyInList.erase(it); - } - } - - if (item) { - recvPeerDetails(item, loadOrSave->m_certGpgId); - delete item; - } - } else { - RsDiscReply *item = (RsDiscReply*) loadOrSave->m_userdata; - - if (item) - { -// It is okay to send an empty certificate! - This is to reduce the network load at connection time. -// Hopefully, we'll get the stripped down certificates working soon!... even then still be okay to send null. -#if 0 - if (loadOrSave->m_certGpg.empty()) - { -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::setGPGOperation() don't send details because the gpg cert is not good" << std::endl; -#endif - delete item; - return; - } -#endif - - /* for Relay Connections (and other slow ones) we don't want to - * to waste bandwidth sending certificates. So don't add it. - */ - - uint32_t linkType = mLinkMgr->getLinkType(item->PeerId()); - if ((linkType & RS_NET_CONN_SPEED_TRICKLE) || - (linkType & RS_NET_CONN_SPEED_LOW)) - { - std::cerr << "p3disc::setGPGOperation() Send DiscReply Packet to: "; - std::cerr << item->PeerId(); - std::cerr << " without Certificate (low bandwidth)" << std::endl; - } - else - { - // Attaching Certificate. - item->certGPG = loadOrSave->m_certGpg; - } - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::setGPGOperation() About to Send Message:" << std::endl; - item->print(std::cerr, 5); -#endif - - // Send off message - sendItem(item); - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::cbkGPGOperationSave() discovery reply sent." << std::endl; -#endif - } - } - return; - } - - /* ignore other operations */ -} - -/*************************************************************************************/ -/* Storing Network Graph */ -/*************************************************************************************/ -int p3disc::addDiscoveryData(const std::string& fromId, const std::string& aboutId,const std::string& from_gpg_id,const std::string& about_gpg_id, const struct sockaddr_in& laddr, const struct sockaddr_in& raddr, uint32_t flags, time_t ts,bool& new_info) -{ - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - new_info = false ; - - gpg_neighbors[from_gpg_id].insert(about_gpg_id) ; - -#ifdef P3DISC_DEBUG - std::cerr << "Adding discovery data " << fromId << " - " << aboutId << std::endl ; -#endif - /* Store Network information */ - std::map::iterator it; - if (neighbours.end() == (it = neighbours.find(aboutId))) - { - /* doesn't exist */ - autoneighbour an; - - /* an data */ - an.id = aboutId; - an.validAddrs = false; - an.discFlags = 0; - an.ts = 0; - - neighbours[aboutId] = an; - - it = neighbours.find(aboutId); - new_info = true ; - } - - /* it always valid */ - - /* just update packet */ - - autoserver as; - - as.id = fromId; - as.localAddr = laddr; - as.remoteAddr = raddr; - as.discFlags = flags; - as.ts = ts; - - bool authDetails = (as.id == it->second.id); - - /* KEY decision about address */ - if ((authDetails) || - ((!(it->second.authoritative)) && (as.ts > it->second.ts))) - { - /* copy details to an */ - it->second.authoritative = authDetails; - it->second.ts = as.ts; - it->second.validAddrs = true; - it->second.localAddr = as.localAddr; - it->second.remoteAddr = as.remoteAddr; - it->second.discFlags = as.discFlags; - } - - if(it->second.neighbour_of.find(fromId) == it->second.neighbour_of.end()) - { - (it->second).neighbour_of[fromId] = as; - new_info =true ; - } - - /* do we update network address info??? */ - return 1; - -} - - - -/*************************************************************************************/ -/* Extracting Network Graph Details */ -/*************************************************************************************/ -bool p3disc::potentialproxies(const std::string& id, std::list &proxyIds) -{ - /* find id -> and extract the neighbour_of ids */ - - if(id == rsPeers->getOwnId()) // SSL id // This is treated appart, because otherwise we don't receive any disc info about us - return rsPeers->getFriendList(proxyIds) ; - - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - std::map::iterator it; - std::map::iterator sit; - if (neighbours.end() == (it = neighbours.find(id))) - { - return false; - } - - for(sit = it->second.neighbour_of.begin(); sit != it->second.neighbour_of.end(); sit++) - { - proxyIds.push_back(sit->first); - } - return true; -} - - -bool p3disc::potentialGPGproxies(const std::string& gpg_id, std::list &proxyGPGIds) -{ - /* find id -> and extract the neighbour_of ids */ - - if(gpg_id == rsPeers->getGPGOwnId()) // SSL id // This is treated appart, because otherwise we don't receive any disc info about us - return rsPeers->getGPGAcceptedList(proxyGPGIds) ; - - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - std::map >::iterator it = gpg_neighbors.find(gpg_id) ; - - if(it == gpg_neighbors.end()) - return false; - - for(std::set::const_iterator sit(it->second.begin()); sit != it->second.end(); ++sit) - proxyGPGIds.push_back(*sit); - - return true; -} - -void p3disc::getversions(std::map &versions) -{ - versions = this->versions; -} - -void p3disc::getWaitingDiscCount(unsigned int *sendCount, unsigned int *recvCount) -{ - if (sendCount == NULL && recvCount == NULL) { - /* Nothing to do */ - return; - } - - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - if (sendCount) { - *sendCount = 0; - - std::map >::iterator it; - for (it = mSendIdList.begin(); it != mSendIdList.end(); it++) { - *sendCount += it->second.size(); - } - } - - if (recvCount) { - *recvCount = mPendingDiscReplyInList.size(); - } -} - -#ifdef UNUSED_CODE -int p3disc::idServers() -{ - RsStackMutex stack(mDiscMtx); /********** STACK LOCKED MTX ******/ - - std::map::iterator nit; - std::map::iterator sit; - int cts = time(NULL); - - std::string out = "::::AutoDiscovery Neighbours::::\n"; - for(nit = neighbours.begin(); nit != neighbours.end(); nit++) - { - out += "Neighbour: " + (nit->second).id + "\n"; - rs_sprintf_append(out, "-> LocalAddr: %s:%u\n", rs_inet_ntoa(nit->second.localAddr.sin_addr).c_str(), ntohs(nit->second.localAddr.sin_port)); - rs_sprintf_append(out, "-> RemoteAddr: %s:%u\n", rs_inet_ntoa(nit->second.remoteAddr.sin_addr).c_str(), ntohs(nit->second.remoteAddr.sin_port)); - rs_sprintf_append(out, " Last Contact: %ld sec ago\n", cts - (nit->second.ts)); - - rs_sprintf_append(out, " -->DiscFlags: 0x%x\n", nit->second.discFlags); - - for(sit = (nit->second.neighbour_of).begin(); - sit != (nit->second.neighbour_of).end(); sit++) - { - out += "\tConnected via: " + (sit->first) + "\n"; - rs_sprintf_append(out, "\t\tLocalAddr: %s:%u\n", rs_inet_ntoa(sit->second.localAddr.sin_addr).c_str(), ntohs(sit->second.localAddr.sin_port)); - rs_sprintf_append(out, "\t\tRemoteAddr: %s:%u\n", rs_inet_ntoa(sit->second.remoteAddr.sin_addr).c_str(), ntohs(sit->second.remoteAddr.sin_port)); - - rs_sprintf_append(out, "\t\tLast Contact: %ld sec ago\n", cts - (sit->second.ts)); - rs_sprintf_append(out, "\t\tDiscFlags: 0x%x\n", sit->second.discFlags); - } - } - -#ifdef P3DISC_DEBUG - std::cerr << "p3disc::idServers()" << std::endl; - std::cerr << out; -#endif - - return 1; -} -#endif - -// tdelta -> trange. -// -inf...<0 0 (invalid) -// 0.. <9 1 -// 9...<99 2 -// 99...<999 3 -// 999...<9999 4 -// etc... - -//int convertTDeltaToTRange(double tdelta) -//{ -// if (tdelta < 0) -// return 0; -// int trange = 1 + (int) log10(tdelta + 1.0); -// return trange; -// -//} - -// trange -> tdelta -// -inf...0 -1 (invalid) -// 1 8 -// 2 98 -// 3 998 -// 4 9998 -// etc... - -//int convertTRangeToTDelta(int trange) -//{ -// if (trange <= 0) -// return -1; -// -// return (int) (pow(10.0, trange) - 1.5); // (int) xxx98.5 -> xxx98 -//} - -#endif // #if 0 diff --git a/libretroshare/src/services/p3rtt.cc b/libretroshare/src/services/p3rtt.cc index 91fb51b9e..bea7ce492 100644 --- a/libretroshare/src/services/p3rtt.cc +++ b/libretroshare/src/services/p3rtt.cc @@ -118,7 +118,7 @@ static double convert64bitsToTs(uint64_t bits) p3rtt::p3rtt(p3LinkMgr *lm) - :p3Service(RS_SERVICE_TYPE_RTT), /* p3Config(CONFIG_TYPE_RTT), */ mRttMtx("p3rtt"), mLinkMgr(lm) + :p3FastService(RS_SERVICE_TYPE_RTT), mRttMtx("p3rtt"), mLinkMgr(lm) { addSerialType(new RsRttSerialiser()); @@ -130,7 +130,6 @@ p3rtt::p3rtt(p3LinkMgr *lm) int p3rtt::tick() { - processIncoming(); sendPackets(); return 0; @@ -212,57 +211,30 @@ void p3rtt::sendPingMeasurements() } - - -int p3rtt::processIncoming() +bool p3rtt::recvItem(RsItem *item) { - /* for each packet - pass to specific handler */ - RsItem *item = NULL; - while(NULL != (item = recvItem())) + switch(item->PacketSubType()) { - switch(item->PacketSubType()) + default: + break; + case RS_PKT_SUBTYPE_RTT_PING: { - default: - break; - case RS_PKT_SUBTYPE_RTT_PING: - { - handlePing(item); - } - break; - case RS_PKT_SUBTYPE_RTT_PONG: - { - handlePong(item); - } - break; - -#if 0 - /* THESE ARE ALL FUTURISTIC DATA TYPES */ - case RS_DATA_ITEM: - { - handleData(item); - } - break; - - case RS_BANDWIDTH_PING_ITEM: - { - handleBandwidthPing(item); - } - break; - - case RS_BANDWIDTH_PONG_ITEM: - { - handleBandwidthPong(item); - } - break; -#endif + handlePing(item); } - - /* clean up */ - delete item; + break; + case RS_PKT_SUBTYPE_RTT_PONG: + { + handlePong(item); + } + break; } + + /* clean up */ + delete item; return true ; } + int p3rtt::handlePing(RsItem *item) { /* cast to right type */ diff --git a/libretroshare/src/services/p3rtt.h b/libretroshare/src/services/p3rtt.h index 68c90fdfb..e2e42fb24 100644 --- a/libretroshare/src/services/p3rtt.h +++ b/libretroshare/src/services/p3rtt.h @@ -60,7 +60,7 @@ class RttPeerInfo * Used to test Latency. */ -class p3rtt: public RsRtt, public p3Service +class p3rtt: public RsRtt, public p3FastService { public: p3rtt(p3LinkMgr *cm); @@ -74,10 +74,10 @@ virtual uint32_t getPongResults(std::string id, int n, std::listsin_family = AF_INET; @@ -285,8 +300,10 @@ bool operator<(const struct sockaddr_storage &a, const struct sockaddr_storage & bool sockaddr_storage_same(const struct sockaddr_storage &addr, const struct sockaddr_storage &addr2) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_same()"; std::cerr << std::endl; +#endif if (!sockaddr_storage_samefamily(addr, addr2)) return false; @@ -310,16 +327,20 @@ bool sockaddr_storage_same(const struct sockaddr_storage &addr, const struct soc bool sockaddr_storage_samefamily(const struct sockaddr_storage &addr, const struct sockaddr_storage &addr2) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_samefamily()"; std::cerr << std::endl; +#endif return (addr.ss_family == addr2.ss_family); } bool sockaddr_storage_sameip(const struct sockaddr_storage &addr, const struct sockaddr_storage &addr2) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_sameip()"; std::cerr << std::endl; +#endif if (!sockaddr_storage_samefamily(addr, addr2)) return false; @@ -343,8 +364,10 @@ bool sockaddr_storage_sameip(const struct sockaddr_storage &addr, const struct s bool sockaddr_storage_samenet(const struct sockaddr_storage &addr, const struct sockaddr_storage &addr2) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_samenet()"; std::cerr << std::endl; +#endif if (!sockaddr_storage_samefamily(addr, addr2)) return false; @@ -367,8 +390,10 @@ bool sockaddr_storage_samenet(const struct sockaddr_storage &addr, const struct bool sockaddr_storage_samesubnet(const struct sockaddr_storage &addr, const struct sockaddr_storage &addr2) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_samesubnet()"; std::cerr << std::endl; +#endif if (!sockaddr_storage_samefamily(addr, addr2)) return false; @@ -463,8 +488,10 @@ std::string sockaddr_storage_porttostring(const struct sockaddr_storage &addr) /********************************* Net Checks ***********************************/ bool sockaddr_storage_isnull(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_isnull()"; std::cerr << std::endl; +#endif if (addr.ss_family == 0) return true; @@ -486,8 +513,10 @@ bool sockaddr_storage_isnull(const struct sockaddr_storage &addr) bool sockaddr_storage_isValidNet(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_isValidNet()"; std::cerr << std::endl; +#endif switch(addr.ss_family) { @@ -507,8 +536,10 @@ bool sockaddr_storage_isValidNet(const struct sockaddr_storage &addr) bool sockaddr_storage_isLoopbackNet(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_isLoopbackNet()"; std::cerr << std::endl; +#endif switch(addr.ss_family) { @@ -529,8 +560,10 @@ bool sockaddr_storage_isLoopbackNet(const struct sockaddr_storage &addr) bool sockaddr_storage_isPrivateNet(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_isPrivateNet()"; std::cerr << std::endl; +#endif switch(addr.ss_family) { @@ -551,8 +584,10 @@ bool sockaddr_storage_isPrivateNet(const struct sockaddr_storage &addr) bool sockaddr_storage_isExternalNet(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_isExternalNet()"; std::cerr << std::endl; +#endif switch(addr.ss_family) { @@ -608,8 +643,10 @@ const struct sockaddr_in6 *to_const_ipv6_ptr(const struct sockaddr_storage &addr bool sockaddr_storage_ipv4_zeroip(struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_zeroip()"; std::cerr << std::endl; +#endif struct sockaddr_in *ipv4_ptr = to_ipv4_ptr(addr); memset(&(ipv4_ptr->sin_addr), 0, sizeof(ipv4_ptr->sin_addr)); @@ -619,8 +656,10 @@ bool sockaddr_storage_ipv4_zeroip(struct sockaddr_storage &addr) bool sockaddr_storage_ipv4_copyip(struct sockaddr_storage &dst, const struct sockaddr_storage &src) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_copyip()"; std::cerr << std::endl; +#endif struct sockaddr_in *dst_ptr = to_ipv4_ptr(dst); const struct sockaddr_in *src_ptr = to_const_ipv4_ptr(src); @@ -644,8 +683,10 @@ uint16_t sockaddr_storage_ipv4_port(const struct sockaddr_storage &addr) bool sockaddr_storage_ipv4_setport(struct sockaddr_storage &addr, uint16_t port) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_setport()"; std::cerr << std::endl; +#endif struct sockaddr_in *ipv4_ptr = to_ipv4_ptr(addr); ipv4_ptr->sin_port = htons(port); @@ -700,8 +741,10 @@ bool sockaddr_storage_ipv6_setport(struct sockaddr_storage &addr, uint16_t port) bool sockaddr_storage_ipv4_lessthan(const struct sockaddr_storage &addr, const struct sockaddr_storage &addr2) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_lessthan()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); const struct sockaddr_in *ptr2 = to_const_ipv4_ptr(addr2); @@ -715,8 +758,10 @@ bool sockaddr_storage_ipv4_lessthan(const struct sockaddr_storage &addr, const s bool sockaddr_storage_ipv4_same(const struct sockaddr_storage &addr, const struct sockaddr_storage &addr2) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_same()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); const struct sockaddr_in *ptr2 = to_const_ipv4_ptr(addr2); @@ -727,8 +772,10 @@ bool sockaddr_storage_ipv4_same(const struct sockaddr_storage &addr, const struc bool sockaddr_storage_ipv4_sameip(const struct sockaddr_storage &addr, const struct sockaddr_storage &addr2) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_sameip()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); const struct sockaddr_in *ptr2 = to_const_ipv4_ptr(addr2); @@ -742,8 +789,10 @@ bool sockaddr_storage_ipv4_samenet(const struct sockaddr_storage &addr, const st (void) addr; (void) addr2; +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_samenet()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); @@ -756,8 +805,10 @@ bool sockaddr_storage_ipv4_samesubnet(const struct sockaddr_storage &addr, const (void) addr; (void) addr2; +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_samesubnet() using pqinetwork::isSameSubnet()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); const struct sockaddr_in *ptr2 = to_const_ipv4_ptr(addr2); @@ -866,8 +917,10 @@ std::string sockaddr_storage_ipv6_iptostring(const struct sockaddr_storage & /* /********************************* Net Checks ***********************************/ bool sockaddr_storage_ipv4_isnull(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_isnull()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); if (ptr1->sin_family != AF_INET) @@ -883,8 +936,10 @@ bool sockaddr_storage_ipv4_isnull(const struct sockaddr_storage &addr) bool sockaddr_storage_ipv4_isValidNet(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_isValidNet()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); if (ptr1->sin_family != AF_INET) @@ -897,8 +952,10 @@ bool sockaddr_storage_ipv4_isValidNet(const struct sockaddr_storage &addr) bool sockaddr_storage_ipv4_isLoopbackNet(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_isLoopbackNet()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); @@ -911,8 +968,10 @@ bool sockaddr_storage_ipv4_isLoopbackNet(const struct sockaddr_storage &addr) bool sockaddr_storage_ipv4_isPrivateNet(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_isPrivateNet()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); @@ -925,8 +984,10 @@ bool sockaddr_storage_ipv4_isPrivateNet(const struct sockaddr_storage &addr) bool sockaddr_storage_ipv4_isExternalNet(const struct sockaddr_storage &addr) { +#ifdef SS_DEBUG std::cerr << "sockaddr_storage_ipv4_isExternalNet()"; std::cerr << std::endl; +#endif const struct sockaddr_in *ptr1 = to_const_ipv4_ptr(addr); if (ptr1->sin_family != AF_INET)