/* * libretroshare/src/pqi pqiperson.cc * * 3P/PQI network interface for RetroShare. * * Copyright 2004-2006 by Robert Fernie. * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public * License Version 2 as published by the Free Software Foundation. * * This library is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Library General Public License for more details. * * You should have received a copy of the GNU Library General Public * License along with this library; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 * USA. * * Please report all bugs and problems to "retroshare@lunamutt.com". * */ #include "pqi/pqi.h" #include "pqi/pqiperson.h" #include "pqi/pqipersongrp.h" #include "pqi/pqissl.h" const int pqipersonzone = 82371; #include "util/rsdebug.h" #include "util/rsstring.h" #include "retroshare/rspeers.h" /**** * #define PERSON_DEBUG 1 ****/ pqiperson::pqiperson(const RsPeerId& id, pqipersongrp *pg) :PQInterface(id), mNotifyMtx("pqiperson-notify"), mPersonMtx("pqiperson"), active(false), activepqi(NULL), inConnectAttempt(false), waittimes(0), pqipg(pg) { /* must check id! */ return; } pqiperson::~pqiperson() { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ // clean up the children. std::map::iterator it; for(it = kids.begin(); it != kids.end(); ++it) { pqiconnect *pc = (it->second); delete pc; } kids.clear(); } // The PQInterface interface. int pqiperson::SendItem(RsItem *i,uint32_t& serialized_size) { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ if (active) { // every outgoing item goes through this function, so try to not waste cpu cycles // check if debug output is wanted, to avoid unecessary work // getZoneLevel() locks a global mutex and does a lookup in a map or returns a default value // (not sure if this is a performance problem) if (PQL_DEBUG_BASIC <= getZoneLevel(pqipersonzone)) { std::string out = "pqiperson::SendItem() Active: Sending On\n"; i->print_string(out, 5); // this can be very expensive #ifdef PERSON_DEBUG std::cerr << out << std::endl; #endif pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out); } return activepqi -> SendItem(i,serialized_size); } else { if (PQL_DEBUG_BASIC <= getZoneLevel(pqipersonzone)) { std::string out = "pqiperson::SendItem()"; out += " Not Active: Used to put in ToGo Store\n"; out += " Now deleting..."; pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out); } delete i; } return 0; // queued. } RsItem *pqiperson::GetItem() { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ if (active) return activepqi -> GetItem(); // else not possible. return NULL; } bool pqiperson::RecvItem(RsItem *item) { #ifdef PERSON_DEBUG std::cerr << "pqiperson::RecvItem()"; std::cerr << std::endl; #endif return pqipg->recvItem((RsRawItem *) item); } int pqiperson::status() { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ if (active) return activepqi -> status(); return -1; } int pqiperson::receiveHeartbeat() { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::receiveHeartbeat() from peer : " + PeerId().toStdString()); lastHeartbeatReceived = time(NULL); return true ; } // tick...... int pqiperson::tick() { int activeTick = 0; { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ //if lastHeartbeatReceived is 0, it might be not activated so don't do a net reset. if (active && (lastHeartbeatReceived != 0) && (time(NULL) - lastHeartbeatReceived) > HEARTBEAT_REPEAT_TIME * 5) { int ageLastIncoming = time(NULL) - activepqi->getLastIncomingTS(); std::string out = "pqiperson::tick() WARNING No heartbeat from: " + PeerId().toStdString(); //out << " assume dead. calling pqissl::reset(), LastHeartbeat was: "; rs_sprintf_append(out, " LastHeartbeat was: %ld secs ago", time(NULL) - lastHeartbeatReceived); rs_sprintf_append(out, " LastIncoming was: %d secs ago", ageLastIncoming); pqioutput(PQL_WARNING, pqipersonzone, out); #define NO_PACKET_TIMEOUT 60 if (ageLastIncoming > NO_PACKET_TIMEOUT) { out = "pqiperson::tick() " + PeerId().toStdString(); out += " No Heartbeat & No Packets -> assume dead. calling pqissl::reset()"; pqioutput(PQL_WARNING, pqipersonzone, out); this->reset_locked(); } } { std::string out = "pqiperson::tick() Id: " + PeerId().toStdString() + " "; if (active) out += "***Active***"; else out += ">>InActive<<"; out += "\n"; rs_sprintf_append(out, "Activepqi: %p inConnectAttempt: ", activepqi); if (inConnectAttempt) out += "In Connection Attempt"; else out += " Not Connecting "; out += "\n"; // tick the children. std::map::iterator it; for(it = kids.begin(); it != kids.end(); ++it) { if (0 < (it->second) -> tick()) { activeTick = 1; } rs_sprintf_append(out, "\tTicking Child: %d\n", it->first); } pqioutput(PQL_DEBUG_ALL, pqipersonzone, out); } // end of pqioutput. } // handle Notify Events that were generated. processNotifyEvents(); return activeTick; } // callback function for the child - notify of a change. // This is only used for out-of-band info.... // otherwise could get dangerous loops. // - Actually, now we have - must store and process later. int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockaddr_storage &remote_peer_address) { #ifdef PERSON_DEBUG std::cerr << "pqiperson::notifyEvent() adding event to Queue. newState=" << newState << " from IP = " << sockaddr_storage_tostring(remote_peer_address) << std::endl; #endif if (mPersonMtx.trylock()) { handleNotifyEvent_locked(ni, newState, remote_peer_address); mPersonMtx.unlock(); return 1; } RsStackMutex stack(mNotifyMtx); /**** LOCK MUTEX ****/ mNotifyQueue.push_back(NotifyData(ni, newState, remote_peer_address)); return 1; } void pqiperson::processNotifyEvents() { NetInterface *ni; int state; struct sockaddr_storage addr; while(1) { { RsStackMutex stack(mNotifyMtx); /**** LOCK MUTEX ****/ if (mNotifyQueue.empty()) { return; } NotifyData &data = mNotifyQueue.front(); ni = data.mNi; state = data.mState; addr = data.mAddr; mNotifyQueue.pop_front(); } RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ handleNotifyEvent_locked(ni, state, addr); } return; } int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState, const struct sockaddr_storage &remote_peer_address) { { std::string out = "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + "\n"; rs_sprintf_append(out, "Message: %d from: %p\n", newState, ni); rs_sprintf_append(out, "Active pqi : %p", activepqi); pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out); } /* find the pqi, */ pqiconnect *pqi = NULL; uint32_t type = 0; std::map::iterator it; /* start again */ int i = 0; for(it = kids.begin(); it != kids.end(); ++it) { std::string out; rs_sprintf(out, "pqiperson::connectattempt() Kid# %d of %u\n", i, kids.size()); rs_sprintf_append(out, " type: %u in_ni: %p", it->first, ni); pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out); i++; if ((it->second)->thisNetInterface(ni)) { pqi = (it->second); type = (it->first); } } if (!pqi) { pqioutput(PQL_WARNING, pqipersonzone, "Unknown notfyEvent Source!"); return -1; } switch(newState) { case CONNECT_RECEIVED: case CONNECT_SUCCESS: /* notify */ if (pqipg) { pqissl *ssl = dynamic_cast(ni); if(ssl != NULL) pqipg->notifyConnect(PeerId(), type, true, ssl->actAsServer(), remote_peer_address); else pqipg->notifyConnect(PeerId(), type, true, false, remote_peer_address); } if ((active) && (activepqi != pqi)) // already connected - trouble { pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_SUCCESS+active-> activing new connection, shutting others"); // This is the RESET that's killing the connections..... //activepqi -> reset(); // this causes a recursive call back into this fn. // which cleans up state. // we only do this if its not going to mess with new conn. } /* now install a new one. */ { pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_SUCCESS->marking so! (resetting others)"); // mark as active. active = true; lastHeartbeatReceived = 0; activepqi = pqi; inConnectAttempt = false; activepqi->start(); // STARTUP THREAD. /* reset all other children? (clear up long UDP attempt) */ for(it = kids.begin(); it != kids.end(); ++it) { if (!(it->second)->thisNetInterface(ni)) { std::string out; rs_sprintf(out, "Resetting pqi ref : %p", &(it->second)); pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out); it->second->reset(); } else { //std::cerr << "Active pqi : not resetting." << std::endl; } } return 1; } break; case CONNECT_UNREACHABLE: case CONNECT_FIREWALLED: case CONNECT_FAILED: if (active) { if (activepqi == pqi) { pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_FAILED->marking so!"); activepqi->shutdown(); // STOP THREAD. active = false; activepqi = NULL; } else { pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_FAILED-> from an unactive connection, don't flag the peer as not connected, just try next attempt !"); } } else { pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::notifyEvent() Id: " + PeerId().toStdString() + " CONNECT_FAILED+NOT active -> try connect again"); } /* notify up */ if (pqipg) { pqipg->notifyConnect(PeerId(), type, false, false, remote_peer_address); } return 1; break; default: break; } return -1; } /***************** Not PQInterface Fns ***********************/ int pqiperson::reset() { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ return reset_locked(); } int pqiperson::reset_locked() { pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::reset() resetting all pqiconnect for Id: " + PeerId().toStdString()); std::map::iterator it; for(it = kids.begin(); it != kids.end(); ++it) { (it->second) -> shutdown(); // STOP THREAD. (it->second) -> reset(); } activepqi = NULL; active = false; lastHeartbeatReceived = 0; return 1; } int pqiperson::fullstopthreads() { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::fullstopthreads() for Id: " + PeerId().toStdString()); std::map::iterator it; for(it = kids.begin(); it != kids.end(); ++it) { (it->second) -> fullstop(); // WAIT FOR THREAD TO STOP. } activepqi = NULL; active = false; lastHeartbeatReceived = 0; return 1; } int pqiperson::addChildInterface(uint32_t type, pqiconnect *pqi) { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ { std::string out; rs_sprintf(out, "pqiperson::addChildInterface() : Id %s %u", PeerId().toStdString().c_str(), type); pqioutput(PQL_DEBUG_BASIC, pqipersonzone, out); } kids[type] = pqi; return 1; } /***************** PRIVATE FUNCTIONS ***********************/ // functions to iterate over the connects and change state. int pqiperson::listen() { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::listen() Id: " + PeerId().toStdString()); if (!active) { std::map::iterator it; for(it = kids.begin(); it != kids.end(); ++it) { // set them all listening. (it->second) -> listen(); } } return 1; } int pqiperson::stoplistening() { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::stoplistening() Id: " + PeerId().toStdString()); std::map::iterator it; for(it = kids.begin(); it != kids.end(); ++it) { // set them all listening. (it->second) -> stoplistening(); } return 1; } int pqiperson::connect(uint32_t type, const struct sockaddr_storage &raddr, const struct sockaddr_storage &proxyaddr, const struct sockaddr_storage &srcaddr, uint32_t delay, uint32_t period, uint32_t timeout, uint32_t flags, uint32_t bandwidth, const std::string &domain_addr, uint16_t domain_port) { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ #ifdef PERSON_DEBUG #endif { std::string out = "pqiperson::connect() Id: " + PeerId().toStdString(); rs_sprintf_append(out, " type: %u", type); out += " addr: "; out += sockaddr_storage_tostring(raddr); out += " proxyaddr: "; out += sockaddr_storage_tostring(proxyaddr); out += " srcaddr: "; out += sockaddr_storage_tostring(srcaddr); rs_sprintf_append(out, " delay: %u", delay); rs_sprintf_append(out, " period: %u", period); rs_sprintf_append(out, " timeout: %u", timeout); rs_sprintf_append(out, " flags: %u", flags); rs_sprintf_append(out, " bandwidth: %u", bandwidth); //std::cerr << out.str(); pqioutput(PQL_WARNING, pqipersonzone, out); } std::map::iterator it; it = kids.find(type); if (it == kids.end()) { #ifdef PERSON_DEBUG //pqioutput(PQL_DEBUG_BASIC, pqipersonzone, "pqiperson::connect() missing pqiconnect"); #endif /* notify of fail! */ pqipg->notifyConnect(PeerId(), type, false, false, raddr); return 0; } #ifdef PERSON_DEBUG std::cerr << "pqiperson::connect() WARNING, resetting for new connection attempt" << std::endl; #endif /* set the parameters */ pqioutput(PQL_WARNING, pqipersonzone, "pqiperson::connect reset() before connection attempt"); (it->second)->reset(); #ifdef PERSON_DEBUG std::cerr << "pqiperson::connect() WARNING, clearing rate cap" << std::endl; #endif setRateCap_locked(0,0); #ifdef PERSON_DEBUG std::cerr << "pqiperson::connect() setting connect_parameters" << std::endl; #endif // These two are universal. (it->second)->connect_parameter(NET_PARAM_CONNECT_DELAY, delay); (it->second)->connect_parameter(NET_PARAM_CONNECT_TIMEOUT, timeout); // these 5 are only used by UDP connections. (it->second)->connect_parameter(NET_PARAM_CONNECT_PERIOD, period); (it->second)->connect_parameter(NET_PARAM_CONNECT_FLAGS, flags); (it->second)->connect_parameter(NET_PARAM_CONNECT_BANDWIDTH, bandwidth); (it->second)->connect_additional_address(NET_PARAM_CONNECT_PROXY, proxyaddr); (it->second)->connect_additional_address(NET_PARAM_CONNECT_SOURCE, srcaddr); // These are used by Proxy/Hidden (it->second)->connect_parameter(NET_PARAM_CONNECT_DOMAIN_ADDRESS, domain_addr); (it->second)->connect_parameter(NET_PARAM_CONNECT_REMOTE_PORT, domain_port); (it->second)->connect(raddr); // flag if we started a new connectionAttempt. inConnectAttempt = true; return 1; } void pqiperson::getRates(RsBwRates &rates) { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ // get the rate from the active one. if ((!active) || (activepqi == NULL)) return; activepqi -> getRates(rates); } int pqiperson::gatherOutQueueStatistics(std::vector& per_service,std::vector& per_priority) { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ // get the rate from the active one. if ((!active) || (activepqi == NULL)) return 0; return activepqi -> gatherOutQueueStatistics(per_service,per_priority); } int pqiperson::getQueueSize(bool in) { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ // get the rate from the active one. if ((!active) || (activepqi == NULL)) return 0; return activepqi -> getQueueSize(in); } bool pqiperson::getCryptoParams(RsPeerCryptoParams& params) { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ if(active && activepqi != NULL) return activepqi->getCryptoParams(params) ; else { params.connexion_state = 0 ; params.cipher_name.clear() ; params.cipher_bits_1 = 0 ; params.cipher_bits_2 = 0 ; params.cipher_version.clear() ; return false ; } } bool pqiconnect::getCryptoParams(RsPeerCryptoParams& params) { pqissl *ssl = dynamic_cast(ni) ; if(ssl != NULL) { ssl->getCryptoParams(params) ; return true ; } else { params.connexion_state = 0 ; params.cipher_name.clear() ; params.cipher_bits_1 = 0 ; params.cipher_bits_2 = 0 ; params.cipher_version.clear() ; return false ; } } float pqiperson::getRate(bool in) { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ // get the rate from the active one. if ((!active) || (activepqi == NULL)) return 0; return activepqi -> getRate(in); } void pqiperson::setMaxRate(bool in, float val) { RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ // set to all of them. (and us) PQInterface::setMaxRate(in, val); // clean up the children. std::map::iterator it; for(it = kids.begin(); it != kids.end(); ++it) { (it->second) -> setMaxRate(in, val); } } void pqiperson::setRateCap(float val_in, float val_out) { // This methods might be called all the way down from pqiperson::tick() down // to pqissludp while completing a UDP connexion, causing a deadlock. // // We need to make sure the mutex is not already locked by current thread. If so, we call the // locked version directly if not, we lock, call, and unlock, possibly waiting if the // lock is already acquired by another thread. // // The lock cannot be locked by the same thread between the first test and // the "else" statement, so there is no possibility for this code to fail. // // We could actually put that code in RsMutex::lock() if(pthread_equal(mPersonMtx.owner(),pthread_self())) // 1 - unlocked, or already locked by same thread return setRateCap_locked(val_in, val_out); // -> do nothing else { // 2 - lock was free or locked by different thread => wait. RsStackMutex stack(mPersonMtx); /**** LOCK MUTEX ****/ setRateCap_locked(val_in, val_out); } } void pqiperson::setRateCap_locked(float val_in, float val_out) { // set to all of them. (and us) PQInterface::setRateCap(val_in, val_out); // clean up the children. std::map::iterator it; for(it = kids.begin(); it != kids.end(); ++it) { (it->second) -> setRateCap(val_in, val_out); } }