Work-in-progress to reduce latency time:

* service->SendItem() now goes direct to pqistreamer buffer.
 * split p3FastService out of p3Service.
	p3FastService removes the recv buffer for faster processing.
	p3Service maintains its original interface, so derivate classes can remain unchanged.
 * Added uint32_t usec (wait period) to BinInterface.moretoread() & cansend() for future threading.
 * Added Mutex protection to pqistreamer, pqissl and derivatives of both.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.6-initdev@6783 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2013-10-01 10:11:34 +00:00
parent 8e7fe9f79b
commit a7dd9ad9e3
24 changed files with 713 additions and 574 deletions

View File

@ -65,5 +65,20 @@ virtual RsRawItem *GetRsRawItem() = 0;
}; };
/* interface to allow outgoing messages to be sent directly
* through to the pqiperson, rather than being queued
*/
class pqiPublisher
{
public:
virtual ~pqiPublisher() { return; }
virtual bool sendItem(RsRawItem *item) = 0;
};
#endif // PQI_TOP_HEADER #endif // PQI_TOP_HEADER

View File

@ -219,10 +219,11 @@ class PQInterface: public RateInterface
virtual std::string PeerId() { return peerId; } virtual std::string PeerId() { return peerId; }
// the callback from NetInterface Connection Events. // the callback from NetInterface Connection Events.
virtual int notifyEvent(NetInterface *ni, int event) virtual int notifyEvent(NetInterface *ni, int event, const struct sockaddr_storage &remote_peer_address)
{ {
(void) ni; /* remove unused parameter warnings */ (void) ni; /* remove unused parameter warnings */
(void) event; /* remove unused parameter warnings */ (void) event; /* remove unused parameter warnings */
(void) remote_peer_address;
return 0; return 0;
} }
@ -278,11 +279,13 @@ virtual int readdata(void *data, int len) = 0;
/** /**
* Is more particular the case of the sending data through a socket (internet) * Is more particular the case of the sending data through a socket (internet)
* moretoread and candsend, take a microsec timeout argument.
*
*/ */
virtual int netstatus() = 0; virtual int netstatus() = 0;
virtual int isactive() = 0; virtual int isactive() = 0;
virtual bool moretoread() = 0; virtual bool moretoread(uint32_t usec) = 0;
virtual bool cansend() = 0; virtual bool cansend(uint32_t usec) = 0;
/** /**
* method for streamer to shutdown bininterface * method for streamer to shutdown bininterface

View File

@ -239,7 +239,7 @@ int pqiarchive::writePkt(RsItem *pqi)
} }
if (!(bio->cansend())) if (!(bio->cansend(0)))
{ {
std::string out = "pqiarchive::writePkt() BIO cannot write!\nDiscarding:\n"; std::string out = "pqiarchive::writePkt() BIO cannot write!\nDiscarding:\n";
pqi -> print_string(out); pqi -> print_string(out);
@ -321,7 +321,7 @@ int pqiarchive::readPkt(RsItem **item_out, long *ts_out)
{ {
pqioutput(PQL_DEBUG_ALL, pqiarchivezone, "pqiarchive::readPkt()"); pqioutput(PQL_DEBUG_ALL, pqiarchivezone, "pqiarchive::readPkt()");
if ((!(bio->isactive())) || (!(bio->moretoread()))) if ((!(bio->isactive())) || (!(bio->moretoread(0))))
{ {
return 0; return 0;
} }

View File

@ -302,7 +302,7 @@ uint64_t BinEncryptedFileInterface::bytecount()
return cpyCount; return cpyCount;
} }
bool BinEncryptedFileInterface::moretoread() bool BinEncryptedFileInterface::moretoread(uint32_t /* usec */)
{ {
if(haveData) if(haveData)
return (cpyCount < sizeData); return (cpyCount < sizeData);
@ -545,7 +545,8 @@ int NetBinDummy::connect(const struct sockaddr_storage &raddr)
std::cerr << std::endl; std::cerr << std::endl;
if (parent()) if (parent())
{ {
parent()->notifyEvent(this, CONNECT_FAILED); struct sockaddr_storage addr = raddr;
parent()->notifyEvent(this, CONNECT_FAILED, raddr);
} }
} }
else if (!dummyConnected) else if (!dummyConnected)
@ -595,7 +596,10 @@ int NetBinDummy::disconnect()
if (parent()) if (parent())
{ {
parent()->notifyEvent(this, CONNECT_FAILED); struct sockaddr_storage addr;
sockaddr_storage_clear(addr);
parent()->notifyEvent(this, CONNECT_FAILED, addr);
} }
return 1; return 1;
@ -627,7 +631,12 @@ int NetBinDummy::tick()
dummyConnected = true; dummyConnected = true;
toConnect = false; toConnect = false;
if (parent()) if (parent())
parent()->notifyEvent(this, CONNECT_SUCCESS); {
struct sockaddr_storage addr;
sockaddr_storage_clear(addr);
parent()->notifyEvent(this, CONNECT_SUCCESS, addr);
}
} }
else else
{ {
@ -681,7 +690,7 @@ int NetBinDummy::isactive()
return dummyConnected; return dummyConnected;
} }
bool NetBinDummy::moretoread() bool NetBinDummy::moretoread(uint32_t /* usec */)
{ {
std::cerr << "NetBinDummy::moretoread() "; std::cerr << "NetBinDummy::moretoread() ";
printNetBinID(std::cerr, PeerId(), type); printNetBinID(std::cerr, PeerId(), type);
@ -690,7 +699,7 @@ bool NetBinDummy::moretoread()
return false; return false;
} }
bool NetBinDummy::cansend() bool NetBinDummy::cansend(uint32_t /* usec */)
{ {
std::cerr << "NetBinDummy::cansend() "; std::cerr << "NetBinDummy::cansend() ";
printNetBinID(std::cerr, PeerId(), type); printNetBinID(std::cerr, PeerId(), type);

View File

@ -52,7 +52,7 @@ virtual int senddata(void *data, int len);
virtual int readdata(void *data, int len); virtual int readdata(void *data, int len);
virtual int netstatus() { return 1;} virtual int netstatus() { return 1;}
virtual int isactive() { return (buf != NULL);} virtual int isactive() { return (buf != NULL);}
virtual bool moretoread() virtual bool moretoread(uint32_t /* usec */ )
{ {
if ((buf) && (bin_flags | BIN_FLAGS_READABLE)) if ((buf) && (bin_flags | BIN_FLAGS_READABLE))
{ {
@ -65,7 +65,10 @@ virtual bool moretoread()
} }
virtual int close(); virtual int close();
virtual bool cansend() { return (bin_flags | BIN_FLAGS_WRITEABLE); } virtual bool cansend(uint32_t /* usec */)
{
return (bin_flags | BIN_FLAGS_WRITEABLE);
}
virtual bool bandwidthLimited() { return false; } virtual bool bandwidthLimited() { return false; }
//! if HASHing is switched on //! if HASHing is switched on
@ -118,7 +121,7 @@ public:
int close(); int close();
uint64_t bytecount(); uint64_t bytecount();
bool moretoread(); bool moretoread(uint32_t usec);
private: private:
@ -156,7 +159,7 @@ virtual int senddata(void *data, int len);
virtual int readdata(void *data, int len); virtual int readdata(void *data, int len);
virtual int netstatus() { return 1; } virtual int netstatus() { return 1; }
virtual int isactive() { return 1; } virtual int isactive() { return 1; }
virtual bool moretoread() virtual bool moretoread(uint32_t /* usec */)
{ {
if ((buf) && (bin_flags | BIN_FLAGS_READABLE )) if ((buf) && (bin_flags | BIN_FLAGS_READABLE ))
{ {
@ -169,7 +172,10 @@ virtual bool moretoread()
} }
virtual int close(); virtual int close();
virtual bool cansend() { return (bin_flags | BIN_FLAGS_WRITEABLE); } virtual bool cansend(uint32_t /* usec */)
{
return (bin_flags | BIN_FLAGS_WRITEABLE);
}
virtual bool bandwidthLimited() { return false; } virtual bool bandwidthLimited() { return false; }
virtual std::string gethash(); virtual std::string gethash();
@ -217,8 +223,8 @@ virtual int senddata(void *data, int len);
virtual int readdata(void *data, int len); virtual int readdata(void *data, int len);
virtual int netstatus(); virtual int netstatus();
virtual int isactive(); virtual int isactive();
virtual bool moretoread(); virtual bool moretoread(uint32_t usec);
virtual bool cansend(); virtual bool cansend(uint32_t usec);
virtual int close(); virtual int close();
virtual std::string gethash(); virtual std::string gethash();

View File

@ -46,10 +46,17 @@ class SearchModule
// Presents a P3 Face to the world! // Presents a P3 Face to the world!
// and funnels data through to a PQInterface. // and funnels data through to a PQInterface.
// //
class pqihandler: public P3Interface class pqihandler: public P3Interface, public pqiPublisher
{ {
public: public:
pqihandler(SecurityPolicy *Global); pqihandler(SecurityPolicy *Global);
/**** Overloaded from pqiPublisher ****/
virtual bool sendItem(RsRawItem *item)
{
return SendRsRawItem(item);
}
bool AddSearchModule(SearchModule *mod); bool AddSearchModule(SearchModule *mod);
bool RemoveSearchModule(SearchModule *mod); bool RemoveSearchModule(SearchModule *mod);

View File

@ -172,7 +172,7 @@ int pqiperson::tick()
// callback function for the child - notify of a change. // callback function for the child - notify of a change.
// This is only used for out-of-band info.... // This is only used for out-of-band info....
// otherwise could get dangerous loops. // otherwise could get dangerous loops.
int pqiperson::notifyEvent(NetInterface *ni, int newState) int pqiperson::notifyEvent(NetInterface *ni, int newState, const struct sockaddr_storage &remote_peer_address)
{ {
{ {
std::string out = "pqiperson::notifyEvent() Id: " + PeerId() + "\n"; std::string out = "pqiperson::notifyEvent() Id: " + PeerId() + "\n";
@ -218,8 +218,6 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState)
/* notify */ /* notify */
if (pqipg) { if (pqipg) {
struct sockaddr_storage remote_peer_address;
pqi->getConnectAddress(remote_peer_address);
pqipg->notifyConnect(PeerId(), type, true, remote_peer_address); pqipg->notifyConnect(PeerId(), type, true, remote_peer_address);
} }
@ -288,9 +286,7 @@ int pqiperson::notifyEvent(NetInterface *ni, int newState)
/* notify up */ /* notify up */
if (pqipg) if (pqipg)
{ {
struct sockaddr_storage raddr; pqipg->notifyConnect(PeerId(), type, false, remote_peer_address);
sockaddr_storage_clear(raddr);
pqipg->notifyConnect(PeerId(), type, false, raddr);
} }
return 1; return 1;

View File

@ -135,7 +135,7 @@ virtual int status();
virtual int tick(); virtual int tick();
// overloaded callback function for the child - notify of a change. // overloaded callback function for the child - notify of a change.
int notifyEvent(NetInterface *ni, int event); virtual int notifyEvent(NetInterface *ni, int event, const struct sockaddr_storage &addr);
// PQInterface for rate control overloaded.... // PQInterface for rate control overloaded....
virtual int getQueueSize(bool in); virtual int getQueueSize(bool in);

View File

@ -62,7 +62,7 @@ int pqipersongrp::tickServiceRecv()
RsRawItem *pqi = NULL; RsRawItem *pqi = NULL;
int i = 0; int i = 0;
pqioutput(PQL_DEBUG_ALL, pqipersongrpzone, "pqipersongrp::tickTunnelServer()"); pqioutput(PQL_DEBUG_ALL, pqipersongrpzone, "pqipersongrp::tickServiceRecv()");
//p3ServiceServer::tick(); //p3ServiceServer::tick();
@ -70,8 +70,8 @@ int pqipersongrp::tickServiceRecv()
{ {
++i; ++i;
pqioutput(PQL_DEBUG_BASIC, pqipersongrpzone, pqioutput(PQL_DEBUG_BASIC, pqipersongrpzone,
"pqipersongrp::tickTunnelServer() Incoming TunnelItem"); "pqipersongrp::tickServiceRecv() Incoming TunnelItem");
incoming(pqi); recvItem(pqi);
} }
if (0 < i) if (0 < i)
@ -82,6 +82,11 @@ int pqipersongrp::tickServiceRecv()
} }
// handle the tunnel services. // handle the tunnel services.
// Improvements:
// This function is no longer necessary, and data is pushed directly to pqihandler.
#if 0
int pqipersongrp::tickServiceSend() int pqipersongrp::tickServiceSend()
{ {
RsRawItem *pqi = NULL; RsRawItem *pqi = NULL;
@ -106,10 +111,12 @@ int pqipersongrp::tickServiceSend()
return 0; return 0;
} }
#endif
// init // init
pqipersongrp::pqipersongrp(SecurityPolicy *glob, unsigned long flags) pqipersongrp::pqipersongrp(SecurityPolicy *glob, unsigned long flags)
:pqihandler(glob), pqil(NULL), initFlags(flags) :pqihandler(glob), p3ServiceServer(this), pqil(NULL), initFlags(flags)
{ {
} }
@ -130,6 +137,7 @@ int pqipersongrp::tick()
int i = 0; int i = 0;
#if 0
if (tickServiceSend()) if (tickServiceSend())
{ {
i = 1; i = 1;
@ -137,14 +145,18 @@ int pqipersongrp::tick()
std::cerr << "pqipersongrp::tick() moreToTick from tickServiceSend()" << std::endl; std::cerr << "pqipersongrp::tick() moreToTick from tickServiceSend()" << std::endl;
#endif #endif
} }
#endif
if (pqihandler::tick()) /* does actual Send/Recv */
#if 0
if (pqihandler::tick()) /* does Send/Recv */
{ {
i = 1; i = 1;
#ifdef PGRP_DEBUG #ifdef PGRP_DEBUG
std::cerr << "pqipersongrp::tick() moreToTick from pqihandler::tick()" << std::endl; std::cerr << "pqipersongrp::tick() moreToTick from pqihandler::tick()" << std::endl;
#endif #endif
} }
#endif
if (tickServiceRecv()) if (tickServiceRecv())
@ -155,6 +167,19 @@ int pqipersongrp::tick()
#endif #endif
} }
p3ServiceServer::tick();
#if 1
if (pqihandler::tick()) /* does Send/Recv */
{
i = 1;
#ifdef PGRP_DEBUG
std::cerr << "pqipersongrp::tick() moreToTick from pqihandler::tick()" << std::endl;
#endif
}
#endif
return i; return i;
} }

View File

@ -114,9 +114,9 @@ virtual int checkOutgoingRsItem(RsItem *item, int global)
private: private:
// The tunnelserver operation. // The serviceserver operation.
int tickServiceRecv(); int tickServiceRecv();
int tickServiceSend(); //int tickServiceSend();
pqilistener *pqil; pqilistener *pqil;
unsigned long initFlags; unsigned long initFlags;

View File

@ -37,7 +37,10 @@ int pqiQoSstreamer::getQueueSize(bool in)
if(in) if(in)
return pqistreamer::getQueueSize(in) ; return pqistreamer::getQueueSize(in) ;
else else
{
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
return qos_queue_size() ; return qos_queue_size() ;
}
} }
void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority) void pqiQoSstreamer::locked_storeInOutputQueue(void *ptr,int priority)

View File

@ -37,7 +37,7 @@ class pqiQoSstreamer: public pqistreamer, public pqiQoS
static const float PQI_QOS_STREAMER_ALPHA = 2.0 ; static const float PQI_QOS_STREAMER_ALPHA = 2.0 ;
virtual void locked_storeInOutputQueue(void *ptr,int priority) ; virtual void locked_storeInOutputQueue(void *ptr,int priority) ;
virtual int out_queue_size() const { return _total_item_count ; } virtual int locked_out_queue_size() const { return _total_item_count ; }
virtual void locked_clear_out_queue() ; virtual void locked_clear_out_queue() ;
virtual int locked_compute_out_pkt_size() const { return _total_item_size ; } virtual int locked_compute_out_pkt_size() const { return _total_item_size ; }
virtual void *locked_pop_out_data() ; virtual void *locked_pop_out_data() ;

View File

@ -3,11 +3,11 @@
* *
* 3P/PQI network interface for RetroShare. * 3P/PQI network interface for RetroShare.
* *
* Copyright 2004-2008 by Robert Fernie. * Copyright 2004-2013 by Robert Fernie.
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public * modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation. * License Version 2.1 as published by the Free Software Foundation.
* *
* This library is distributed in the hope that it will be useful, * This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of * but WITHOUT ANY WARRANTY; without even the implied warranty of
@ -33,7 +33,18 @@ const int pqiservicezone = 60478;
* #define SERVICE_DEBUG 1 * #define SERVICE_DEBUG 1
****/ ****/
p3ServiceServer::p3ServiceServer() : srvMtx("p3ServiceServer") void pqiService::setServiceServer(p3ServiceServer *server)
{
mServiceServer = server;
}
bool pqiService::send(RsRawItem *item)
{
return mServiceServer->sendItem(item);
}
p3ServiceServer::p3ServiceServer(pqiPublisher *pub) : mPublisher(pub), srvMtx("p3ServiceServer")
{ {
RsStackMutex stack(srvMtx); /********* LOCKED *********/ RsStackMutex stack(srvMtx); /********* LOCKED *********/
@ -42,7 +53,6 @@ p3ServiceServer::p3ServiceServer() : srvMtx("p3ServiceServer")
"p3ServiceServer::p3ServiceServer()"); "p3ServiceServer::p3ServiceServer()");
#endif #endif
rrit = services.begin();
return; return;
} }
@ -55,6 +65,7 @@ int p3ServiceServer::addService(pqiService *ts)
"p3ServiceServer::addService()"); "p3ServiceServer::addService()");
#endif #endif
std::map<uint32_t, pqiService *>::iterator it; std::map<uint32_t, pqiService *>::iterator it;
it = services.find(ts -> getType()); it = services.find(ts -> getType());
if (it != services.end()) if (it != services.end())
@ -63,12 +74,13 @@ int p3ServiceServer::addService(pqiService *ts)
return -1; return -1;
} }
ts->setServiceServer(this);
services[ts -> getType()] = ts; services[ts -> getType()] = ts;
rrit = services.begin();
return 1; return 1;
} }
int p3ServiceServer::incoming(RsRawItem *item) bool p3ServiceServer::recvItem(RsRawItem *item)
{ {
RsStackMutex stack(srvMtx); /********* LOCKED *********/ RsStackMutex stack(srvMtx); /********* LOCKED *********/
@ -92,12 +104,8 @@ int p3ServiceServer::incoming(RsRawItem *item)
pqioutput(PQL_DEBUG_BASIC, pqiservicezone, pqioutput(PQL_DEBUG_BASIC, pqiservicezone,
"p3ServiceServer::incoming() Service: No Service - deleting"); "p3ServiceServer::incoming() Service: No Service - deleting");
#endif #endif
// delete it.
delete item; delete item;
return false;
// it exists already!
return -1;
} }
{ {
@ -107,75 +115,28 @@ int p3ServiceServer::incoming(RsRawItem *item)
pqioutput(PQL_DEBUG_BASIC, pqiservicezone, out); pqioutput(PQL_DEBUG_BASIC, pqiservicezone, out);
#endif #endif
return (it->second) -> receive(item); return (it->second) -> recv(item);
} }
delete item; delete item;
return -1; return false;
} }
RsRawItem *p3ServiceServer::outgoing() bool p3ServiceServer::sendItem(RsRawItem *item)
{ {
RsStackMutex stack(srvMtx); /********* LOCKED *********/
#ifdef SERVICE_DEBUG #ifdef SERVICE_DEBUG
pqioutput(PQL_DEBUG_ALL, pqiservicezone, std::cerr << "p3ServiceServer::sendItem()";
"p3ServiceServer::outgoing()"); std::cerr << std::endl;
item -> print_string(out);
std::cerr << std::endl;
#endif #endif
if (rrit != services.end()) /* any filtering ??? */
{
rrit++;
}
else
{
rrit = services.begin();
}
std::map<uint32_t, pqiService *>::iterator sit = rrit; mPublisher->sendItem(item);
// run to the end. return true;
RsRawItem *item;
// run through to the end,
for(;rrit != services.end();rrit++)
{
if (NULL != (item = (rrit -> second) -> send()))
{
#ifdef SERVICE_DEBUG
std::string out;
rs_sprintf(out, "p3ServiceServer::outgoing() Got Item From: %p\n", rrit -> second);
item -> print_string(out);
std::cerr << out << std::endl;
pqioutput(PQL_DEBUG_BASIC, pqiservicezone, out);
#endif
return item;
}
}
// from the beginning to where we started.
for(rrit = services.begin();rrit != sit; rrit++)
{
if (NULL != (item = (rrit -> second) -> send()))
{
#ifdef SERVICE_DEBUG
std::string out;
rs_sprintf(out, "p3ServiceServer::outgoing() Got Item From: %p\n", rrit -> second);
item -> print_string(out);
pqioutput(PQL_DEBUG_BASIC, pqiservicezone, out);
std::cerr << out << std::endl;
#endif
return item;
}
}
return NULL;
} }

View File

@ -27,6 +27,7 @@
#ifndef PQI_SERVICE_HEADER #ifndef PQI_SERVICE_HEADER
#define PQI_SERVICE_HEADER #define PQI_SERVICE_HEADER
#include "pqi/pqi.h"
#include "pqi/pqi_base.h" #include "pqi/pqi_base.h"
#include "util/rsthreads.h" #include "util/rsthreads.h"
@ -51,20 +52,23 @@
// DataType is defined in the serialiser directory. // DataType is defined in the serialiser directory.
class RsRawItem; class RsRawItem;
class p3ServiceServer;
class pqiService class pqiService
{ {
protected: protected:
pqiService(uint32_t t) // our type of packets. pqiService(uint32_t t) // our type of packets.
:type(t) { return; } :type(t), mServiceServer(NULL) { return; }
virtual ~pqiService() { return; } virtual ~pqiService() { return; }
public: public:
void setServiceServer(p3ServiceServer *server);
// //
virtual int receive(RsRawItem *) = 0; virtual bool recv(RsRawItem *) = 0;
virtual RsRawItem * send() = 0; virtual bool send(RsRawItem *item);
uint32_t getType() { return type; } uint32_t getType() { return type; }
@ -72,28 +76,35 @@ virtual int tick() { return 0; }
private: private:
uint32_t type; uint32_t type;
p3ServiceServer *mServiceServer; // const, no need for mutex.
}; };
#include <map> #include <map>
/* We are pushing the packets back through p3ServiceServer2,
* so that we can filter services at this level later...
* if we decide not to do this, pqiService2 can call through
* to the base level pqiPublisher instead.
*/
class p3ServiceServer class p3ServiceServer
{ {
public: public:
p3ServiceServer(); p3ServiceServer(pqiPublisher *pub);
int addService(pqiService *); int addService(pqiService *);
int incoming(RsRawItem *); bool recvItem(RsRawItem *);
RsRawItem *outgoing(); bool sendItem(RsRawItem *);
int tick(); int tick();
private: private:
pqiPublisher *mPublisher; // constant no need for mutex.
RsMutex srvMtx; RsMutex srvMtx;
std::map<uint32_t, pqiService *> services; std::map<uint32_t, pqiService *> services;
std::map<uint32_t, pqiService *>::iterator rrit;
}; };

View File

@ -93,6 +93,7 @@ static const int PQISSL_SSL_CONNECT_TIMEOUT = 30;
pqissl::pqissl(pqissllistener *l, PQInterface *parent, p3LinkMgr *lm) pqissl::pqissl(pqissllistener *l, PQInterface *parent, p3LinkMgr *lm)
:NetBinInterface(parent, parent->PeerId()), :NetBinInterface(parent, parent->PeerId()),
mSslMtx("pqissl"),
waiting(WAITING_NOT), active(false), certvalid(false), waiting(WAITING_NOT), active(false), certvalid(false),
sslmode(PQISSL_ACTIVE), ssl_connection(NULL), sockfd(-1), sslmode(PQISSL_ACTIVE), ssl_connection(NULL), sockfd(-1),
pqil(l), // no init for remote_addr. pqil(l), // no init for remote_addr.
@ -100,9 +101,11 @@ pqissl::pqissl(pqissllistener *l, PQInterface *parent, p3LinkMgr *lm)
attempt_ts(0), attempt_ts(0),
sameLAN(false), n_read_zero(0), mReadZeroTS(0), sameLAN(false), n_read_zero(0), mReadZeroTS(0),
mConnectDelay(0), mConnectTS(0), mConnectDelay(0), mConnectTS(0),
mConnectTimeout(0), mTimeoutTS(0), mLinkMgr(lm) mConnectTimeout(0), mTimeoutTS(0), mLinkMgr(lm)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
/* set address to zero */ /* set address to zero */
sockaddr_storage_clear(remote_addr); sockaddr_storage_clear(remote_addr);
@ -144,6 +147,8 @@ pqissl::pqissl(pqissllistener *l, PQInterface *parent, p3LinkMgr *lm)
int pqissl::connect(const struct sockaddr_storage &raddr) int pqissl::connect(const struct sockaddr_storage &raddr)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
// reset failures // reset failures
remote_addr = raddr; remote_addr = raddr;
@ -175,10 +180,13 @@ int pqissl::disconnect()
return reset(); return reset();
} }
int pqissl::getConnectAddress(struct sockaddr_storage &raddr) { int pqissl::getConnectAddress(struct sockaddr_storage &raddr)
raddr = remote_addr; {
// TODO. RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
return (!sockaddr_storage_isnull(remote_addr));
raddr = remote_addr;
// TODO.
return (!sockaddr_storage_isnull(remote_addr));
} }
/* BinInterface version of reset() for pqistreamer */ /* BinInterface version of reset() for pqistreamer */
@ -191,8 +199,17 @@ int pqissl::close()
// put back on the listening queue. // put back on the listening queue.
int pqissl::reset() int pqissl::reset()
{ {
std::string outLog; RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
return reset_locked();
}
int pqissl::reset_locked()
{
std::string outLog;
bool neededReset = false;
/* a reset shouldn't cause us to stop listening /* a reset shouldn't cause us to stop listening
* only reasons for stoplistening() are; * only reasons for stoplistening() are;
* *
@ -211,7 +228,6 @@ int pqissl::reset()
outLog += "\n"; outLog += "\n";
#endif #endif
bool neededReset = false;
if (ssl_connection != NULL) if (ssl_connection != NULL)
{ {
@ -259,7 +275,9 @@ int pqissl::reset()
// clean up the streamer // clean up the streamer
if (parent()) if (parent())
{ {
parent() -> notifyEvent(this, NET_CONNECT_FAILED); struct sockaddr_storage addr;
sockaddr_storage_clear(addr);
parent() -> notifyEvent(this, NET_CONNECT_FAILED, addr);
} }
} }
return 1; return 1;
@ -267,6 +285,8 @@ int pqissl::reset()
bool pqissl::connect_parameter(uint32_t type, const std::string &value) bool pqissl::connect_parameter(uint32_t type, const std::string &value)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
(void) value; (void) value;
return false; return false;
} }
@ -274,6 +294,8 @@ bool pqissl::connect_parameter(uint32_t type, const std::string &value)
bool pqissl::connect_parameter(uint32_t type, uint32_t value) bool pqissl::connect_parameter(uint32_t type, uint32_t value)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
#ifdef PQISSL_LOG_DEBUG #ifdef PQISSL_LOG_DEBUG
{ {
std::string out = "pqissl::connect_parameter() Peer: " + PeerId(); std::string out = "pqissl::connect_parameter() Peer: " + PeerId();
@ -319,6 +341,8 @@ bool pqissl::connect_parameter(uint32_t type, uint32_t value)
void pqissl::getCryptoParams(RsPeerCryptoParams& params) void pqissl::getCryptoParams(RsPeerCryptoParams& params)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
if(active) if(active)
{ {
params.connexion_state = 1 ; params.connexion_state = 1 ;
@ -350,6 +374,7 @@ void pqissl::getCryptoParams(RsPeerCryptoParams& params)
int pqissl::status() int pqissl::status()
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
#ifdef PQISSL_LOG_DEBUG #ifdef PQISSL_LOG_DEBUG
std::string out = "pqissl::status()"; std::string out = "pqissl::status()";
@ -388,6 +413,8 @@ int pqissl::status()
// tick...... // tick......
int pqissl::tick() int pqissl::tick()
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
//pqistreamer::tick(); //pqistreamer::tick();
// continue existing connection attempt. // continue existing connection attempt.
@ -480,7 +507,7 @@ int pqissl::ConnectAttempt()
rslog(RSL_ALERT, pqisslzone, rslog(RSL_ALERT, pqisslzone,
"pqissl::ConnectAttempt() STATE = Unknown - calling reset()"); "pqissl::ConnectAttempt() STATE = Unknown - calling reset()");
reset(); reset_locked();
break; break;
} }
rslog(RSL_ALERT, pqisslzone, "pqissl::ConnectAttempt() Unknown"); rslog(RSL_ALERT, pqisslzone, "pqissl::ConnectAttempt() Unknown");
@ -510,8 +537,11 @@ int pqissl::Failed_Connection()
if (parent()) if (parent())
{ {
parent() -> notifyEvent(this, NET_CONNECT_UNREACHABLE); struct sockaddr_storage addr;
sockaddr_storage_clear(addr);
parent() -> notifyEvent(this, NET_CONNECT_UNREACHABLE, addr);
} }
waiting = WAITING_NOT; waiting = WAITING_NOT;
return 1; return 1;
@ -810,7 +840,7 @@ int pqissl::Basic_Connection_Complete()
/* as sockfd is valid, this should close it all up */ /* as sockfd is valid, this should close it all up */
rslog(RSL_ALERT, pqisslzone, "pqissl::Basic_Connection_Complete() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::Basic_Connection_Complete() -> calling reset()");
reset(); reset_locked();
return -1; return -1;
} }
@ -827,7 +857,7 @@ int pqissl::Basic_Connection_Complete()
rslog(RSL_ALERT, pqisslzone, rslog(RSL_ALERT, pqisslzone,
"pqissl::Basic_Connection_Complete() problem with the socket descriptor. Aborting"); "pqissl::Basic_Connection_Complete() problem with the socket descriptor. Aborting");
rslog(RSL_ALERT, pqisslzone, "pqissl::Basic_Connection_Complete() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::Basic_Connection_Complete() -> calling reset()");
reset(); reset_locked();
return -1; return -1;
} }
@ -1128,7 +1158,7 @@ int pqissl::SSL_Connection_Complete()
Extract_Failed_SSL_Certificate(); Extract_Failed_SSL_Certificate();
rslog(RSL_ALERT, pqisslzone, "pqissl::SSL_Connection_Complete() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::SSL_Connection_Complete() -> calling reset()");
reset(); reset_locked();
waiting = WAITING_FAIL_INTERFACE; waiting = WAITING_FAIL_INTERFACE;
return -1; return -1;
@ -1207,7 +1237,7 @@ int pqissl::Authorise_SSL_Connection()
"pqissl::Authorise_SSL_Connection() Connection Timed Out!"); "pqissl::Authorise_SSL_Connection() Connection Timed Out!");
/* as sockfd is valid, this should close it all up */ /* as sockfd is valid, this should close it all up */
rslog(RSL_ALERT, pqisslzone, "pqissl::Authorise_Connection_Complete() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::Authorise_Connection_Complete() -> calling reset()");
reset(); reset_locked();
} }
int err; int err;
@ -1233,7 +1263,7 @@ int pqissl::Authorise_SSL_Connection()
rslog(RSL_ALERT, pqisslzone, "pqissl::Authorise_Connection_Complete() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::Authorise_Connection_Complete() -> calling reset()");
// Failed completely // Failed completely
reset(); reset_locked();
return -1; return -1;
} }
@ -1245,7 +1275,7 @@ int pqissl::Authorise_SSL_Connection()
rslog(RSL_ALERT, pqisslzone, "pqissl::Authorise_Connection_Complete() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::Authorise_Connection_Complete() -> calling reset()");
// Failed completely // Failed completely
reset(); reset_locked();
return -1; return -1;
} }
@ -1269,7 +1299,7 @@ int pqissl::Authorise_SSL_Connection()
// then okay... // then okay...
rslog(RSL_WARNING, pqisslzone, "pqissl::Authorise_SSL_Connection() Accepting Conn. Peer: " + PeerId()); rslog(RSL_WARNING, pqisslzone, "pqissl::Authorise_SSL_Connection() Accepting Conn. Peer: " + PeerId());
accept(ssl_connection, sockfd, remote_addr); accept_locked(ssl_connection, sockfd, remote_addr);
return 1; return 1;
} }
@ -1278,11 +1308,20 @@ int pqissl::Authorise_SSL_Connection()
// else shutdown ssl connection. // else shutdown ssl connection.
rslog(RSL_ALERT, pqisslzone, "pqissl::Authorise_Connection_Complete() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::Authorise_Connection_Complete() -> calling reset()");
reset(); reset_locked();
return 0; return 0;
} }
/* This function is public, and callable from pqilistener - so must be mutex protected */
int pqissl::accept(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr) // initiate incoming connection. int pqissl::accept(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr) // initiate incoming connection.
{
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
return accept_locked(ssl, fd, foreign_addr);
}
int pqissl::accept_locked(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr) // initiate incoming connection.
{ {
if (waiting != WAITING_NOT) if (waiting != WAITING_NOT)
{ {
@ -1340,7 +1379,7 @@ int pqissl::accept(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr
"pqissl::accept() STATE = Unknown - ignore?"); "pqissl::accept() STATE = Unknown - ignore?");
rslog(RSL_ALERT, pqisslzone, "pqissl::accept() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::accept() -> calling reset()");
reset(); reset_locked();
break; break;
} }
@ -1421,7 +1460,7 @@ int pqissl::accept(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr
waiting = WAITING_FAIL_INTERFACE; waiting = WAITING_FAIL_INTERFACE;
// failed completely. // failed completely.
rslog(RSL_ALERT, pqisslzone, "pqissl::accept() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::accept() -> calling reset()");
reset(); reset_locked();
return -1; return -1;
} }
else else
@ -1440,18 +1479,22 @@ int pqissl::accept(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr
// Notify the pqiperson.... (Both Connect/Receive) // Notify the pqiperson.... (Both Connect/Receive)
if (parent()) if (parent())
{ {
parent() -> notifyEvent(this, NET_CONNECT_SUCCESS); struct sockaddr_storage addr = remote_addr;
parent() -> notifyEvent(this, NET_CONNECT_SUCCESS, addr);
} }
return 1; return 1;
} }
/********** Implementation of BinInterface ************************** /********** Implementation of BinInterface **************************
* All the rest of the BinInterface. * All the rest of the BinInterface.
* This functions much be Mutex protected.
* *
*/ */
int pqissl::senddata(void *data, int len) int pqissl::senddata(void *data, int len)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
int tmppktlen ; int tmppktlen ;
#ifdef PQISSL_DEBUG #ifdef PQISSL_DEBUG
@ -1493,7 +1536,7 @@ int pqissl::senddata(void *data, int len)
} }
rslog(RSL_ALERT, pqisslzone, "pqissl::senddata() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::senddata() -> calling reset()");
reset(); reset_locked();
return -1; return -1;
} }
else if (err == SSL_ERROR_WANT_WRITE) else if (err == SSL_ERROR_WANT_WRITE)
@ -1518,7 +1561,7 @@ int pqissl::senddata(void *data, int len)
rslog(RSL_ALERT, pqisslzone, out); rslog(RSL_ALERT, pqisslzone, out);
rslog(RSL_ALERT, pqisslzone, "pqissl::senddata() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::senddata() -> calling reset()");
reset(); reset_locked();
return -1; return -1;
} }
} }
@ -1527,6 +1570,8 @@ int pqissl::senddata(void *data, int len)
int pqissl::readdata(void *data, int len) int pqissl::readdata(void *data, int len)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
#ifdef PQISSL_DEBUG #ifdef PQISSL_DEBUG
std::cout << "Reading data thread=" << pthread_self() << ", ssl=" << (void*)this << std::endl ; std::cout << "Reading data thread=" << pthread_self() << ", ssl=" << (void*)this << std::endl ;
#endif #endif
@ -1598,7 +1643,7 @@ int pqissl::readdata(void *data, int len)
rs_sprintf_append(out, " ReadZero Age: %ld", time(NULL) - mReadZeroTS); rs_sprintf_append(out, " ReadZero Age: %ld", time(NULL) - mReadZeroTS);
rslog(RSL_ALERT, pqisslzone, "pqissl::readdata() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::readdata() -> calling reset()");
reset(); reset_locked();
} }
rslog(RSL_ALERT, pqisslzone, out); rslog(RSL_ALERT, pqisslzone, out);
@ -1635,7 +1680,7 @@ int pqissl::readdata(void *data, int len)
} }
rslog(RSL_ALERT, pqisslzone, "pqissl::readdata() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::readdata() -> calling reset()");
reset(); reset_locked();
std::cerr << out << std::endl ; std::cerr << out << std::endl ;
return -1; return -1;
} }
@ -1666,7 +1711,7 @@ int pqissl::readdata(void *data, int len)
std::cerr << out << std::endl ; std::cerr << out << std::endl ;
rslog(RSL_ALERT, pqisslzone, "pqissl::readdata() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::readdata() -> calling reset()");
reset(); reset_locked();
return -1; return -1;
} }
@ -1702,11 +1747,15 @@ int pqissl::netstatus()
int pqissl::isactive() int pqissl::isactive()
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
return active; return active;
} }
bool pqissl::moretoread() bool pqissl::moretoread(uint32_t usec)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
#ifdef PQISSL_DEBUG #ifdef PQISSL_DEBUG
{ {
std::string out; std::string out;
@ -1726,7 +1775,7 @@ bool pqissl::moretoread()
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = 0; timeout.tv_usec = usec;
if (select(sockfd + 1, &ReadFDs, &WriteFDs, &ExceptFDs, &timeout) < 0) if (select(sockfd + 1, &ReadFDs, &WriteFDs, &ExceptFDs, &timeout) < 0)
{ {
@ -1744,7 +1793,7 @@ bool pqissl::moretoread()
// this is a definite bad socket!. // this is a definite bad socket!.
// reset. // reset.
rslog(RSL_ALERT, pqisslzone, "pqissl::moretoread() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::moretoread() -> calling reset()");
reset(); reset_locked();
return 0; return 0;
} }
@ -1784,8 +1833,10 @@ bool pqissl::moretoread()
} }
bool pqissl::cansend() bool pqissl::cansend(uint32_t usec)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
#ifdef PQISSL_DEBUG #ifdef PQISSL_DEBUG
rslog(RSL_DEBUG_ALL, pqisslzone, rslog(RSL_DEBUG_ALL, pqisslzone,
"pqissl::cansend() polling socket!"); "pqissl::cansend() polling socket!");
@ -1804,7 +1855,8 @@ bool pqissl::cansend()
struct timeval timeout; struct timeval timeout;
timeout.tv_sec = 0; timeout.tv_sec = 0;
timeout.tv_usec = 0; timeout.tv_usec = usec;
if (select(sockfd + 1, &ReadFDs, &WriteFDs, &ExceptFDs, &timeout) < 0) if (select(sockfd + 1, &ReadFDs, &WriteFDs, &ExceptFDs, &timeout) < 0)
{ {
@ -1824,7 +1876,7 @@ bool pqissl::cansend()
// this is a definite bad socket!. // this is a definite bad socket!.
// reset. // reset.
rslog(RSL_ALERT, pqisslzone, "pqissl::cansend() -> calling reset()"); rslog(RSL_ALERT, pqisslzone, "pqissl::cansend() -> calling reset()");
reset(); reset_locked();
return 0; return 0;
} }

View File

@ -109,14 +109,37 @@ virtual int senddata(void*, int);
virtual int readdata(void*, int); virtual int readdata(void*, int);
virtual int netstatus(); virtual int netstatus();
virtual int isactive(); virtual int isactive();
virtual bool moretoread(); virtual bool moretoread(uint32_t usec);
virtual bool cansend(); virtual bool cansend(uint32_t usec);
virtual int close(); /* BinInterface version of reset() */ virtual int close(); /* BinInterface version of reset() */
virtual std::string gethash(); /* not used here */ virtual std::string gethash(); /* not used here */
virtual bool bandwidthLimited() { return true ; } // replace by !sameLAN to avoid bandwidth limiting on LAN virtual bool bandwidthLimited() { return true ; } // replace by !sameLAN to avoid bandwidth limiting on LAN
public:
/* Completion of the SSL connection,
* this is public, so it can be called by
* the listener (should make friends??)
*/
int accept(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr);
void getCryptoParams(RsPeerCryptoParams& params) ;
protected: protected:
/* no mutex protection for these ones */
p3LinkMgr *mLinkMgr;
pqissllistener *pqil;
RsMutex mSslMtx; /**** MUTEX protects data and fn below ****/
virtual int reset_locked();
int accept_locked(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr);
// A little bit of information to describe // A little bit of information to describe
// the SSL state, this is needed // the SSL state, this is needed
// to allow full Non-Blocking Connect behaviour. // to allow full Non-Blocking Connect behaviour.
@ -124,7 +147,6 @@ protected:
// to complete an SSL. // to complete an SSL.
int ConnectAttempt(); int ConnectAttempt();
int waiting;
virtual int Failed_Connection(); virtual int Failed_Connection();
@ -143,18 +165,6 @@ int Authorise_SSL_Connection();
int Extract_Failed_SSL_Certificate(); // try to get cert anyway. int Extract_Failed_SSL_Certificate(); // try to get cert anyway.
public:
/* Completion of the SSL connection,
* this is public, so it can be called by
* the listener (should make friends??)
*/
int accept(SSL *ssl, int fd, const struct sockaddr_storage &foreign_addr);
void getCryptoParams(RsPeerCryptoParams& params) ;
protected:
//protected internal fns that are overloaded for udp case. //protected internal fns that are overloaded for udp case.
virtual int net_internal_close(int fd); virtual int net_internal_close(int fd);
@ -166,13 +176,14 @@ virtual int net_internal_fcntl_nonblock(int fd);
bool active; bool active;
bool certvalid; bool certvalid;
int waiting;
// addition for udp (tcp version == ACTIVE). // addition for udp (tcp version == ACTIVE).
int sslmode; int sslmode;
SSL *ssl_connection; SSL *ssl_connection;
int sockfd; int sockfd;
pqissllistener *pqil;
struct sockaddr_storage remote_addr; struct sockaddr_storage remote_addr;
void *readpkt; void *readpkt;
@ -193,7 +204,6 @@ virtual int net_internal_fcntl_nonblock(int fd);
uint32_t mConnectTimeout; uint32_t mConnectTimeout;
time_t mTimeoutTS; time_t mTimeoutTS;
p3LinkMgr *mLinkMgr;
private: private:
// ssl only fns. // ssl only fns.

View File

@ -60,6 +60,10 @@ pqisslproxy::~pqisslproxy()
{ {
rslog(RSL_ALERT, pqisslproxyzone, rslog(RSL_ALERT, pqisslproxyzone,
"pqisslproxy::~pqisslproxy -> destroying pqisslproxy"); "pqisslproxy::~pqisslproxy -> destroying pqisslproxy");
stoplistening();
reset();
return; return;
} }
@ -455,31 +459,40 @@ int pqisslproxy::Proxy_Connection_Complete()
bool pqisslproxy::connect_parameter(uint32_t type, const std::string &value) bool pqisslproxy::connect_parameter(uint32_t type, const std::string &value)
{ {
if (type == NET_PARAM_CONNECT_DOMAIN_ADDRESS) {
{ RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
std::string out;
rs_sprintf(out, "pqisslproxy::connect_parameter() Peer: %s DOMAIN_ADDRESS: %s", PeerId().c_str(), value.c_str()); if (type == NET_PARAM_CONNECT_DOMAIN_ADDRESS)
rslog(RSL_WARNING, pqisslproxyzone, out); {
std::string out;
rs_sprintf(out, "pqisslproxy::connect_parameter() Peer: %s DOMAIN_ADDRESS: %s", PeerId().c_str(), value.c_str());
rslog(RSL_WARNING, pqisslproxyzone, out);
mDomainAddress = value;
std::cerr << out << std::endl;
return true;
}
}
mDomainAddress = value;
std::cerr << out << std::endl;
return true;
}
return pqissl::connect_parameter(type, value); return pqissl::connect_parameter(type, value);
} }
bool pqisslproxy::connect_parameter(uint32_t type, uint32_t value) bool pqisslproxy::connect_parameter(uint32_t type, uint32_t value)
{ {
if (type == NET_PARAM_CONNECT_REMOTE_PORT) {
{ RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
std::string out;
rs_sprintf(out, "pqisslproxy::connect_parameter() Peer: %s REMOTE_PORT: %lu", PeerId().c_str(), value);
rslog(RSL_WARNING, pqisslproxyzone, out);
mRemotePort = value; if (type == NET_PARAM_CONNECT_REMOTE_PORT)
std::cerr << out << std::endl; {
return true; std::string out;
} rs_sprintf(out, "pqisslproxy::connect_parameter() Peer: %s REMOTE_PORT: %lu", PeerId().c_str(), value);
rslog(RSL_WARNING, pqisslproxyzone, out);
mRemotePort = value;
std::cerr << out << std::endl;
return true;
}
}
return pqissl::connect_parameter(type, value); return pqissl::connect_parameter(type, value);
} }

View File

@ -54,6 +54,8 @@ pqissludp::pqissludp(PQInterface *parent, p3LinkMgr *lm)
:pqissl(NULL, parent, lm), tou_bio(NULL), :pqissl(NULL, parent, lm), tou_bio(NULL),
listen_checktime(0), mConnectPeriod(PQI_SSLUDP_DEF_CONN_PERIOD) listen_checktime(0), mConnectPeriod(PQI_SSLUDP_DEF_CONN_PERIOD)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
sockaddr_storage_clear(remote_addr); sockaddr_storage_clear(remote_addr);
return; return;
} }
@ -74,6 +76,8 @@ pqissludp::~pqissludp()
stoplistening(); /* remove from p3proxy listenqueue */ stoplistening(); /* remove from p3proxy listenqueue */
reset(); reset();
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
if (tou_bio) // this should be in the reset? if (tou_bio) // this should be in the reset?
{ {
BIO_free(tou_bio); BIO_free(tou_bio);
@ -81,12 +85,13 @@ pqissludp::~pqissludp()
return; return;
} }
int pqissludp::reset() int pqissludp::reset_locked()
{ {
/* reset for next time.*/ /* reset for next time.*/
mConnectFlags = 0; mConnectFlags = 0;
mConnectPeriod = PQI_SSLUDP_DEF_CONN_PERIOD; mConnectPeriod = PQI_SSLUDP_DEF_CONN_PERIOD;
return pqissl::reset();
return pqissl::reset_locked();
} }
@ -216,7 +221,7 @@ int pqissludp::Initiate_Connection()
rslog(RSL_WARNING, pqissludpzone, "pqissludp::Initiate_Connection() Invalid (0.0.0.0) Remote Address, Aborting Connect."); rslog(RSL_WARNING, pqissludpzone, "pqissludp::Initiate_Connection() Invalid (0.0.0.0) Remote Address, Aborting Connect.");
waiting = WAITING_FAIL_INTERFACE; waiting = WAITING_FAIL_INTERFACE;
reset(); reset_locked();
return -1; return -1;
} }
@ -322,7 +327,7 @@ int pqissludp::Initiate_Connection()
rslog(RSL_WARNING, pqissludpzone, out); rslog(RSL_WARNING, pqissludpzone, out);
reset(); reset_locked();
return -1; return -1;
} }
@ -356,7 +361,7 @@ int pqissludp::Basic_Connection_Complete()
/* as sockfd is valid, this should close it all up */ /* as sockfd is valid, this should close it all up */
reset(); reset_locked();
return -1; return -1;
} }
@ -387,7 +392,7 @@ int pqissludp::Basic_Connection_Complete()
rs_sprintf_append(out, "Error: Connection Failed: %d - %s", err, socket_errorType(err).c_str()); rs_sprintf_append(out, "Error: Connection Failed: %d - %s", err, socket_errorType(err).c_str());
rslog(RSL_DEBUG_BASIC, pqissludpzone, out); rslog(RSL_DEBUG_BASIC, pqissludpzone, out);
reset(); reset_locked();
// Then send unreachable message. // Then send unreachable message.
waiting = WAITING_FAIL_INTERFACE; waiting = WAITING_FAIL_INTERFACE;
@ -478,73 +483,90 @@ int pqissludp::stoplistening()
bool pqissludp::connect_parameter(uint32_t type, uint32_t value) bool pqissludp::connect_parameter(uint32_t type, uint32_t value)
{ {
//std::cerr << "pqissludp::connect_parameter() type: " << type << "value: " << value << std::endl;
if (type == NET_PARAM_CONNECT_PERIOD)
{ {
std::string out; RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
rs_sprintf(out, "pqissludp::connect_parameter() Peer: %s PERIOD: %lu", PeerId().c_str(), value);
rslog(RSL_WARNING, pqissludpzone, out); //std::cerr << "pqissludp::connect_parameter() type: " << type << "value: " << value << std::endl;
if (type == NET_PARAM_CONNECT_PERIOD)
mConnectPeriod = value; {
std::cerr << out << std::endl; std::string out;
return true; rs_sprintf(out, "pqissludp::connect_parameter() Peer: %s PERIOD: %lu", PeerId().c_str(), value);
rslog(RSL_WARNING, pqissludpzone, out);
mConnectPeriod = value;
std::cerr << out << std::endl;
return true;
}
else if (type == NET_PARAM_CONNECT_FLAGS)
{
std::string out;
rs_sprintf(out, "pqissludp::connect_parameter() Peer: %s FLAGS: %lu", PeerId().c_str(), value);
rslog(RSL_WARNING, pqissludpzone, out);
mConnectFlags = value;
std::cerr << out<< std::endl;
return true;
}
else if (type == NET_PARAM_CONNECT_BANDWIDTH)
{
std::string out;
rs_sprintf(out, "pqissludp::connect_parameter() Peer: %s BANDWIDTH: %lu", PeerId().c_str(), value);
rslog(RSL_WARNING, pqissludpzone, out);
mConnectBandwidth = value;
std::cerr << out << std::endl;
return true;
}
} }
else if (type == NET_PARAM_CONNECT_FLAGS)
{
std::string out;
rs_sprintf(out, "pqissludp::connect_parameter() Peer: %s FLAGS: %lu", PeerId().c_str(), value);
rslog(RSL_WARNING, pqissludpzone, out);
mConnectFlags = value;
std::cerr << out<< std::endl;
return true;
}
else if (type == NET_PARAM_CONNECT_BANDWIDTH)
{
std::string out;
rs_sprintf(out, "pqissludp::connect_parameter() Peer: %s BANDWIDTH: %lu", PeerId().c_str(), value);
rslog(RSL_WARNING, pqissludpzone, out);
mConnectBandwidth = value;
std::cerr << out << std::endl;
return true;
}
return pqissl::connect_parameter(type, value); return pqissl::connect_parameter(type, value);
} }
bool pqissludp::connect_additional_address(uint32_t type, const struct sockaddr_storage &addr) bool pqissludp::connect_additional_address(uint32_t type, const struct sockaddr_storage &addr)
{ {
if (type == NET_PARAM_CONNECT_PROXY)
{ {
std::string out; RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
rs_sprintf(out, "pqissludp::connect_additional_address() Peer: %s PROXYADDR: ", PeerId().c_str());
out += sockaddr_storage_tostring(addr); if (type == NET_PARAM_CONNECT_PROXY)
rslog(RSL_WARNING, pqissludpzone, out); {
std::string out;
mConnectProxyAddr = addr; rs_sprintf(out, "pqissludp::connect_additional_address() Peer: %s PROXYADDR: ", PeerId().c_str());
out += sockaddr_storage_tostring(addr);
std::cerr << out << std::endl; rslog(RSL_WARNING, pqissludpzone, out);
return true;
} mConnectProxyAddr = addr;
else if (type == NET_PARAM_CONNECT_SOURCE)
{ std::cerr << out << std::endl;
std::string out; return true;
rs_sprintf(out, "pqissludp::connect_additional_address() Peer: %s SRCADDR: ", PeerId().c_str()); }
out += sockaddr_storage_tostring(addr); else if (type == NET_PARAM_CONNECT_SOURCE)
rslog(RSL_WARNING, pqissludpzone, out); {
std::string out;
mConnectSrcAddr = addr; rs_sprintf(out, "pqissludp::connect_additional_address() Peer: %s SRCADDR: ", PeerId().c_str());
out += sockaddr_storage_tostring(addr);
std::cerr << out << std::endl; rslog(RSL_WARNING, pqissludpzone, out);
return true;
mConnectSrcAddr = addr;
std::cerr << out << std::endl;
return true;
}
} }
return pqissl::connect_additional_address(type, addr); return pqissl::connect_additional_address(type, addr);
} }
/********** PQI STREAMER OVERLOADING *********************************/ /********** PQI STREAMER OVERLOADING *********************************/
bool pqissludp::moretoread() bool pqissludp::moretoread(uint32_t usec)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
if (usec)
{
std::cerr << "pqissludp::moretoread() usec parameter not implemented";
std::cerr << std::endl;
}
{ {
std::string out = "pqissludp::moretoread()"; std::string out = "pqissludp::moretoread()";
rs_sprintf_append(out, " polling socket (%d)", sockfd); rs_sprintf_append(out, " polling socket (%d)", sockfd);
@ -590,7 +612,7 @@ bool pqissludp::moretoread()
rslog(RSL_WARNING, pqissludpzone, out); rslog(RSL_WARNING, pqissludpzone, out);
} }
reset(); reset_locked();
return 0; return 0;
} }
@ -603,8 +625,16 @@ bool pqissludp::moretoread()
} }
bool pqissludp::cansend() bool pqissludp::cansend(uint32_t usec)
{ {
RsStackMutex stack(mSslMtx); /**** LOCKED MUTEX ****/
if (usec)
{
std::cerr << "pqissludp::cansend() usec parameter not implemented";
std::cerr << std::endl;
}
rslog(RSL_DEBUG_ALL, pqissludpzone, rslog(RSL_DEBUG_ALL, pqissludpzone,
"pqissludp::cansend() polling socket!"); "pqissludp::cansend() polling socket!");

View File

@ -62,23 +62,24 @@ virtual ~pqissludp();
virtual int listen(); virtual int listen();
virtual int stoplistening(); virtual int stoplistening();
virtual int tick(); virtual int tick();
virtual int reset();
virtual bool connect_parameter(uint32_t type, uint32_t value); virtual bool connect_parameter(uint32_t type, uint32_t value);
virtual bool connect_additional_address(uint32_t type, const struct sockaddr_storage &addr); virtual bool connect_additional_address(uint32_t type, const struct sockaddr_storage &addr);
// BinInterface. // BinInterface.
// These are reimplemented. // These are reimplemented.
virtual bool moretoread(); virtual bool moretoread(uint32_t usec);
virtual bool cansend(); virtual bool cansend(uint32_t usec);
/* UDP always through firewalls -> always bandwidth Limited */ /* UDP always through firewalls -> always bandwidth Limited */
virtual bool bandwidthLimited() { return true; } virtual bool bandwidthLimited() { return true; }
protected:
// pqissludp specific. // pqissludp specific.
// called to initiate a connection; // called to initiate a connection;
int attach(); int attach();
protected: virtual int reset_locked();
virtual int Initiate_Connection(); virtual int Initiate_Connection();
virtual int Basic_Connection_Complete(); virtual int Basic_Connection_Complete();

View File

@ -224,7 +224,7 @@ int pqistore::writePkt(RsItem *pqi)
} }
if (!(bio->cansend())) if (!(bio->cansend(0)))
{ {
std::string out; std::string out;
rs_sprintf(out, "pqistore::writePkt() BIO cannot write!\niscarding:\n"); rs_sprintf(out, "pqistore::writePkt() BIO cannot write!\niscarding:\n");
@ -278,7 +278,7 @@ int pqistore::readPkt(RsItem **item_out)
pqioutput(PQL_DEBUG_ALL, pqistorezone, "pqistore::readPkt()"); pqioutput(PQL_DEBUG_ALL, pqistorezone, "pqistore::readPkt()");
#endif #endif
if ((!(bio->isactive())) || (!(bio->moretoread()))) if ((!(bio->isactive())) || (!(bio->moretoread(0))))
{ {
return 0; return 0;
} }
@ -418,7 +418,7 @@ bool pqiSSLstore::getEncryptedItems(std::list<RsItem* >& rsItemList)
{ {
if (NULL != (item = GetItem())) if (NULL != (item = GetItem()))
rsItemList.push_back(item); rsItemList.push_back(item);
} while (enc_bio->isactive() && enc_bio->moretoread()); } while (enc_bio->isactive() && enc_bio->moretoread(0));
return true; return true;
} }
@ -471,7 +471,7 @@ int pqiSSLstore::readPkt(RsItem **item_out)
pqioutput(PQL_DEBUG_ALL, pqistorezone, "pqistore::readPkt()"); pqioutput(PQL_DEBUG_ALL, pqistorezone, "pqistore::readPkt()");
#endif #endif
if ((!(enc_bio->isactive())) || (!(enc_bio->moretoread()))) if ((!(enc_bio->isactive())) || (!(enc_bio->moretoread(0))))
{ {
return 0; return 0;
} }

View File

@ -45,6 +45,7 @@ const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
#define DEBUG_PQISTREAMER 1 #define DEBUG_PQISTREAMER 1
***/ ***/
#define DEBUG_PQISTREAMER 1
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
#include "util/rsprint.h" #include "util/rsprint.h"
@ -52,22 +53,24 @@ const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in, int bio_flags_in) pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in, int bio_flags_in)
:PQInterface(id), rsSerialiser(rss), bio(bio_in), bio_flags(bio_flags_in), :PQInterface(id), mStreamerMtx("pqistreamer"),
pkt_wpending(NULL), mRsSerialiser(rss), mBio(bio_in), mBio_flags(bio_flags_in),
totalRead(0), totalSent(0), mPkt_wpending(NULL),
currRead(0), currSent(0), mTotalRead(0), mTotalSent(0),
avgReadCount(0), avgSentCount(0), streamerMtx("pqistreamer") mCurrRead(0), mCurrSent(0),
mAvgReadCount(0), mAvgSentCount(0)
{ {
avgLastUpdate = currReadTS = currSentTS = time(NULL); RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
mAvgLastUpdate = mCurrReadTS = mCurrSentTS = time(NULL);
/* allocated once */ /* allocated once */
pkt_rpend_size = getRsPktMaxSize(); mPkt_rpend_size = getRsPktMaxSize();
pkt_rpending = malloc(pkt_rpend_size); mPkt_rpending = malloc(mPkt_rpend_size);
reading_state = reading_state_initial ; mReading_state = reading_state_initial ;
// thread_id = pthread_self() ;
// avoid uninitialized (and random) memory read. // avoid uninitialized (and random) memory read.
memset(pkt_rpending,0,pkt_rpend_size) ; memset(mPkt_rpending,0,mPkt_rpend_size) ;
// 100 B/s (minimal) // 100 B/s (minimal)
setMaxRate(true, 0.1); setMaxRate(true, 0.1);
@ -83,48 +86,48 @@ pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in
exit(1); exit(1);
} }
failed_read_attempts = 0 ; // reset failed read, as no packet is still read. mFailed_read_attempts = 0; // reset failed read, as no packet is still read.
return; return;
} }
pqistreamer::~pqistreamer() pqistreamer::~pqistreamer()
{ {
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Destruction!");
if (bio_flags & BIN_FLAGS_NO_CLOSE) if (mBio_flags & BIN_FLAGS_NO_CLOSE)
{ {
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Not Closing BinInterface!"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Not Closing BinInterface!");
} }
else if (bio) else if (mBio)
{ {
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Deleting BinInterface!"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::~pqistreamer() Deleting BinInterface!");
delete bio; delete mBio;
} }
/* clean up serialiser */ /* clean up serialiser */
if (rsSerialiser) if (mRsSerialiser)
delete rsSerialiser; delete mRsSerialiser;
// clean up outgoing. (cntrl packets) // clean up outgoing. (cntrl packets)
locked_clear_out_queue() ; locked_clear_out_queue() ;
if (pkt_wpending) if (mPkt_wpending)
{ {
free(pkt_wpending); free(mPkt_wpending);
pkt_wpending = NULL; mPkt_wpending = NULL;
} }
free(pkt_rpending); free(mPkt_rpending);
// clean up incoming. // clean up incoming.
while(incoming.size() > 0) while(mIncoming.size() > 0)
{ {
RsItem *i = incoming.front(); RsItem *i = mIncoming.front();
incoming.pop_front(); mIncoming.pop_front();
delete i; delete i;
} }
return; return;
@ -143,7 +146,9 @@ int pqistreamer::SendItem(RsItem *si,uint32_t& out_size)
} }
#endif #endif
return queue_outpqi(si,out_size); RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
return queue_outpqi_locked(si,out_size);
} }
RsItem *pqistreamer::GetItem() RsItem *pqistreamer::GetItem()
@ -152,12 +157,13 @@ RsItem *pqistreamer::GetItem()
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::GetItem()"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::GetItem()");
#endif #endif
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
if(incoming.empty()) if(mIncoming.empty())
return NULL; return NULL;
RsItem *osr = incoming.front() ; RsItem *osr = mIncoming.front() ;
incoming.pop_front() ; mIncoming.pop_front() ;
return osr; return osr;
} }
@ -165,19 +171,21 @@ RsItem *pqistreamer::GetItem()
// // PQInterface // // PQInterface
int pqistreamer::tick() int pqistreamer::tick()
{ {
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
{ {
std::string out = "pqistreamer::tick()\n" + PeerId(); std::string out = "pqistreamer::tick()\n" + PeerId();
rs_sprintf_append(out, ": currRead/Sent: %d/%d", currRead, currSent); rs_sprintf_append(out, ": currRead/Sent: %d/%d", mCurrRead, mCurrSent);
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out);
} }
#endif #endif
bio->tick(); mBio->tick();
/* short circuit everything is bio isn't active */ /* short circuit everything is bio isn't active */
if (!(bio->isactive())) if (!(mBio->isactive()))
{ {
return 0; return 0;
} }
@ -187,8 +195,8 @@ int pqistreamer::tick()
* that incoming will not * that incoming will not
*/ */
handleincoming(); handleincoming_locked();
handleoutgoing(); handleoutgoing_locked();
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
/* give details of the packets */ /* give details of the packets */
@ -197,7 +205,7 @@ int pqistreamer::tick()
std::string out = "pqistreamer::tick() Queued Data: for " + PeerId(); std::string out = "pqistreamer::tick() Queued Data: for " + PeerId();
if (bio->isactive()) if (mBio->isactive())
{ {
out += " (active)"; out += " (active)";
} }
@ -208,11 +216,10 @@ int pqistreamer::tick()
out += "\n"; out += "\n";
{ {
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data int total = locked_compute_out_pkt_size() ;
int total = compute_out_pkt_size() ;
rs_sprintf_append(out, "\t Out Packets [%d] => %d bytes\n", out_queue_size(), total); rs_sprintf_append(out, "\t Out Packets [%d] => %d bytes\n", locked_out_queue_size(), total);
rs_sprintf_append(out, "\t Incoming [%d]\n", incoming.size()); rs_sprintf_append(out, "\t Incoming [%d]\n", mIncoming.size());
} }
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
@ -220,7 +227,7 @@ int pqistreamer::tick()
#endif #endif
/* if there is more stuff in the queues */ /* if there is more stuff in the queues */
if ((incoming.size() > 0) || (out_queue_size() > 0)) if ((mIncoming.size() > 0) || (locked_out_queue_size() > 0))
{ {
return 1; return 1;
} }
@ -229,13 +236,16 @@ int pqistreamer::tick()
int pqistreamer::status() int pqistreamer::status()
{ {
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::status()"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::status()");
if (bio->isactive()) if (mBio->isactive())
{ {
std::string out; std::string out;
rs_sprintf(out, "Data in:%d out:%d", totalRead, totalSent); rs_sprintf(out, "Data in:%d out:%d", mTotalRead, mTotalSent);
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
} }
#endif #endif
@ -245,44 +255,32 @@ int pqistreamer::status()
void pqistreamer::locked_storeInOutputQueue(void *ptr,int) void pqistreamer::locked_storeInOutputQueue(void *ptr,int)
{ {
out_pkt.push_back(ptr); mOutPkts.push_back(ptr);
} }
// //
/**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/ /**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/
int pqistreamer::queue_outpqi(RsItem *pqi,uint32_t& pktsize) int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
{ {
pktsize = 0 ; pktsize = 0 ;
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cerr << "pqistreamer::queue_outpqi() called." << std::endl; std::cerr << "pqistreamer::queue_outpqi() called." << std::endl;
#endif #endif
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data
// This is called by different threads, and by threads that are not the handleoutgoing thread,
// so it should be protected by a mutex !!
#ifdef DEBUG_PQISTREAMER
if(dynamic_cast<RsFileData*>(pqi)!=NULL && (bio_flags & BIN_FLAGS_NO_DELETE))
{
std::cerr << "Having file data with flags = " << bio_flags << std::endl ;
}
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::queue_outpqi()");
#endif
/* decide which type of packet it is */ /* decide which type of packet it is */
pktsize = rsSerialiser->size(pqi); pktsize = mRsSerialiser->size(pqi);
void *ptr = malloc(pktsize); void *ptr = malloc(pktsize);
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl; std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
#endif #endif
if (rsSerialiser->serialise(pqi, ptr, &pktsize)) if (mRsSerialiser->serialise(pqi, ptr, &pktsize))
{ {
locked_storeInOutputQueue(ptr,pqi->priority_level()) ; locked_storeInOutputQueue(ptr,pqi->priority_level()) ;
if (!(bio_flags & BIN_FLAGS_NO_DELETE)) if (!(mBio_flags & BIN_FLAGS_NO_DELETE))
{ {
delete pqi; delete pqi;
} }
@ -298,41 +296,42 @@ int pqistreamer::queue_outpqi(RsItem *pqi,uint32_t& pktsize)
pqi -> print_string(out); pqi -> print_string(out);
pqioutput(PQL_ALERT, pqistreamerzone, out); pqioutput(PQL_ALERT, pqistreamerzone, out);
if (!(bio_flags & BIN_FLAGS_NO_DELETE)) if (!(mBio_flags & BIN_FLAGS_NO_DELETE))
{ {
delete pqi; delete pqi;
} }
return 1; // keep error internal. return 1; // keep error internal.
} }
int pqistreamer::handleincomingitem(RsItem *pqi) int pqistreamer::handleincomingitem_locked(RsItem *pqi)
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincomingitem()"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincomingitem_locked()");
#endif #endif
// timestamp last received packet. // timestamp last received packet.
mLastIncomingTs = time(NULL); mLastIncomingTs = time(NULL);
// Use overloaded Contact function // Use overloaded Contact function
pqi -> PeerId(PeerId()); pqi -> PeerId(PeerId());
incoming.push_back(pqi); mIncoming.push_back(pqi);
return 1; return 1;
} }
time_t pqistreamer::getLastIncomingTS() time_t pqistreamer::getLastIncomingTS()
{ {
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
return mLastIncomingTs; return mLastIncomingTs;
} }
int pqistreamer::handleoutgoing() int pqistreamer::handleoutgoing_locked()
{ {
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleoutgoing()"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleoutgoing_locked()");
#endif #endif
int maxbytes = outAllowedBytes(); int maxbytes = outAllowedBytes_locked();
int sentbytes = 0; int sentbytes = 0;
int len; int len;
int ss; int ss;
@ -341,19 +340,19 @@ int pqistreamer::handleoutgoing()
std::list<void *>::iterator it; std::list<void *>::iterator it;
// if not connection, or cannot send anything... pause. // if not connection, or cannot send anything... pause.
if (!(bio->isactive())) if (!(mBio->isactive()))
{ {
/* if we are not active - clear anything in the queues. */ /* if we are not active - clear anything in the queues. */
locked_clear_out_queue() ; locked_clear_out_queue() ;
/* also remove the pending packets */ /* also remove the pending packets */
if (pkt_wpending) if (mPkt_wpending)
{ {
free(pkt_wpending); free(mPkt_wpending);
pkt_wpending = NULL; mPkt_wpending = NULL;
} }
outSentBytes(sentbytes); outSentBytes_locked(sentbytes);
return 0; return 0;
} }
@ -364,41 +363,41 @@ int pqistreamer::handleoutgoing()
{ {
sent = false; sent = false;
if ((!(bio->cansend())) || (maxbytes < sentbytes)) if ((!(mBio->cansend(0))) || (maxbytes < sentbytes))
{ {
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
if (maxbytes < sentbytes) if (maxbytes < sentbytes)
{ {
std::cerr << "pqistreamer::handleoutgoing() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes "; std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending sentbytes > maxbytes. Sent " << sentbytes << " bytes ";
std::cerr << std::endl; std::cerr << std::endl;
} }
else else
{ {
std::cerr << "pqistreamer::handleoutgoing() Stopped sending at cansend() is false"; std::cerr << "pqistreamer::handleoutgoing_locked() Stopped sending at cansend() is false";
std::cerr << std::endl; std::cerr << std::endl;
} }
#endif #endif
outSentBytes(sentbytes); outSentBytes_locked(sentbytes);
return 0; return 0;
} }
// send a out_pkt., else send out_data. unless // send a out_pkt., else send out_data. unless
// there is a pending packet. // there is a pending packet.
if (!pkt_wpending) if (!mPkt_wpending)
pkt_wpending = locked_pop_out_data() ; mPkt_wpending = locked_pop_out_data() ;
if (pkt_wpending) if (mPkt_wpending)
{ {
// write packet. // write packet.
len = getRsItemSize(pkt_wpending); len = getRsItemSize(mPkt_wpending);
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cout << "Sending Out Pkt of size " << len << " !" << std::endl; std::cout << "Sending Out Pkt of size " << len << " !" << std::endl;
#endif #endif
if (len != (ss = bio->senddata(pkt_wpending, len))) if (len != (ss = mBio->senddata(mPkt_wpending, len)))
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::string out; std::string out;
@ -407,60 +406,60 @@ int pqistreamer::handleoutgoing()
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
#endif #endif
outSentBytes(sentbytes); outSentBytes_locked(sentbytes);
// pkt_wpending will kept til next time. // pkt_wpending will kept til next time.
// ensuring exactly the same data is written (openSSL requirement). // ensuring exactly the same data is written (openSSL requirement).
return -1; return -1;
} }
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
std::cerr << "pqistreamer::handleoutgoing() Sent Packet len: " << len << " @ " << RsUtil::AccurateTimeString(); std::cerr << "pqistreamer::handleoutgoing_locked() Sent Packet len: " << len << " @ " << RsUtil::AccurateTimeString();
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
free(pkt_wpending); free(mPkt_wpending);
pkt_wpending = NULL; mPkt_wpending = NULL;
sentbytes += len; sentbytes += len;
sent = true; sent = true;
} }
} }
outSentBytes(sentbytes); outSentBytes_locked(sentbytes);
return 1; return 1;
} }
/* Handles reading from input stream. /* Handles reading from input stream.
*/ */
int pqistreamer::handleincoming() int pqistreamer::handleincoming_locked()
{ {
int readbytes = 0; int readbytes = 0;
static const int max_failed_read_attempts = 2000 ; static const int max_failed_read_attempts = 2000 ;
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming()"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming_locked()");
#endif #endif
if(!(bio->isactive())) if(!(mBio->isactive()))
{ {
reading_state = reading_state_initial ; mReading_state = reading_state_initial ;
inReadBytes(readbytes); inReadBytes_locked(readbytes);
return 0; return 0;
} }
// enough space to read any packet. // enough space to read any packet.
int maxlen = pkt_rpend_size; int maxlen = mPkt_rpend_size;
void *block = pkt_rpending; void *block = mPkt_rpending;
// initial read size: basic packet. // initial read size: basic packet.
int blen = getRsPktBaseSize(); int blen = getRsPktBaseSize();
int maxin = inAllowedBytes(); int maxin = inAllowedBytes_locked();
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << reading_state << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << mReading_state << std::endl ;
#endif #endif
switch(reading_state) switch(mReading_state)
{ {
case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ; case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ;
case reading_state_packet_started: /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ; case reading_state_packet_started: /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ;
@ -475,11 +474,11 @@ start_packet_read:
#endif #endif
memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads. memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads.
if (blen != (tmplen = bio->readdata(block, blen))) if (blen != (tmplen = mBio->readdata(block, blen)))
{ {
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() Didn't read BasePkt!"); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() Didn't read BasePkt!");
inReadBytes(readbytes); inReadBytes_locked(readbytes);
// error.... (either blocked or failure) // error.... (either blocked or failure)
if (tmplen == 0) if (tmplen == 0)
@ -498,7 +497,7 @@ start_packet_read:
// //
//pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read"); //pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read");
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << reading_state << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ;
#endif #endif
return 0; return 0;
} }
@ -525,8 +524,8 @@ start_packet_read:
#endif #endif
readbytes += blen; readbytes += blen;
reading_state = reading_state_packet_started ; mReading_state = reading_state_packet_started ;
failed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
} }
continue_packet: continue_packet:
{ {
@ -537,7 +536,7 @@ continue_packet:
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ;
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ;
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << reading_state << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) << " " std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) << " "
<< (int)(((unsigned char*)block)[4]) << " " << (int)(((unsigned char*)block)[4]) << " "
<< (int)(((unsigned char*)block)[5]) << " " << (int)(((unsigned char*)block)[5]) << " "
@ -588,9 +587,9 @@ continue_packet:
std::cerr << std::endl; std::cerr << std::endl;
} }
bio->close(); mBio->close();
reading_state = reading_state_initial ; // restart at state 1. mReading_state = reading_state_initial ; // restart at state 1.
failed_read_attempts = 0 ; mFailed_read_attempts = 0 ;
return -1; return -1;
// Used to exit now! exit(1); // Used to exit now! exit(1);
@ -609,14 +608,14 @@ continue_packet:
// so, don't do that: // so, don't do that:
// memset( extradata,0,extralen ) ; // memset( extradata,0,extralen ) ;
if (extralen != (tmplen = bio->readdata(extradata, extralen))) if (extralen != (tmplen = mBio->readdata(extradata, extralen)))
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
if(tmplen > 0) if(tmplen > 0)
std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ;
#endif #endif
if(++failed_read_attempts > max_failed_read_attempts) if(++mFailed_read_attempts > max_failed_read_attempts)
{ {
std::string out; std::string out;
rs_sprintf(out, "Error Completing Read (read %d/%d)", tmplen, extralen); rs_sprintf(out, "Error Completing Read (read %d/%d)", tmplen, extralen);
@ -653,22 +652,22 @@ continue_packet:
std::cerr << msgout << std::endl; std::cerr << msgout << std::endl;
} }
bio->close(); mBio->close();
reading_state = reading_state_initial ; // restart at state 1. mReading_state = reading_state_initial ; // restart at state 1.
failed_read_attempts = 0 ; mFailed_read_attempts = 0 ;
return -1; return -1;
} }
else else
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << reading_state << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ;
#endif #endif
return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon.
// we assume readdata() returned either -1 or the complete read size. // we assume readdata() returned either -1 or the complete read size.
} }
} }
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << reading_state << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << (int)(((unsigned char*)extradata)[0]) << " " << (int)(((unsigned char*)extradata)[1]) << " " << (int)(((unsigned char*)extradata)[2]) << " " << (int)(((unsigned char*)extradata)[3]) << " " std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << (int)(((unsigned char*)extradata)[0]) << " " << (int)(((unsigned char*)extradata)[1]) << " " << (int)(((unsigned char*)extradata)[2]) << " " << (int)(((unsigned char*)extradata)[3]) << " "
<< (int)(((unsigned char*)extradata)[4]) << " " << (int)(((unsigned char*)extradata)[4]) << " "
<< (int)(((unsigned char*)extradata)[5]) << " " << (int)(((unsigned char*)extradata)[5]) << " "
@ -676,7 +675,7 @@ continue_packet:
<< (int)(((unsigned char*)extradata)[7]) << " " << std::endl ; << (int)(((unsigned char*)extradata)[7]) << " " << std::endl ;
#endif #endif
failed_read_attempts = 0 ; mFailed_read_attempts = 0 ;
readbytes += extralen; readbytes += extralen;
} }
@ -704,9 +703,9 @@ continue_packet:
// fclose(f) ; // fclose(f) ;
// exit(-1) ; // exit(-1) ;
// } // }
RsItem *pkt = rsSerialiser->deserialise(block, &pktlen); RsItem *pkt = mRsSerialiser->deserialise(block, &pktlen);
if ((pkt != NULL) && (0 < handleincomingitem(pkt))) if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt)))
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!"); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!");
@ -719,11 +718,11 @@ continue_packet:
#endif #endif
} }
reading_state = reading_state_initial ; // restart at state 1. mReading_state = reading_state_initial ; // restart at state 1.
failed_read_attempts = 0 ; // reset failed read, as the packet has been totally read. mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
} }
if(maxin > readbytes && bio->moretoread()) if(maxin > readbytes && mBio->moretoread(0))
goto start_packet_read ; goto start_packet_read ;
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
@ -734,14 +733,14 @@ continue_packet:
} }
#endif #endif
inReadBytes(readbytes); inReadBytes_locked(readbytes);
return 0; return 0;
} }
/* BandWidth Management Assistance */ /* BandWidth Management Assistance */
float pqistreamer::outTimeSlice() float pqistreamer::outTimeSlice_locked()
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::outTimeSlice()"); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::outTimeSlice()");
@ -752,87 +751,87 @@ float pqistreamer::outTimeSlice()
} }
// very simple..... // very simple.....
int pqistreamer::outAllowedBytes() int pqistreamer::outAllowedBytes_locked()
{ {
int t = time(NULL); // get current timestep. int t = time(NULL); // get current timestep.
/* allow a lot if not bandwidthLimited */ /* allow a lot if not bandwidthLimited */
if (!bio->bandwidthLimited()) if (!mBio->bandwidthLimited())
{ {
currSent = 0; mCurrSent = 0;
currSentTS = t; mCurrSentTS = t;
return PQISTREAM_ABS_MAX; return PQISTREAM_ABS_MAX;
} }
int dt = t - currSentTS; int dt = t - mCurrSentTS;
// limiter -> for when currSentTs -> 0. // limiter -> for when currSentTs -> 0.
if (dt > 5) if (dt > 5)
dt = 5; dt = 5;
int maxout = (int) (getMaxRate(false) * 1000.0); int maxout = (int) (getMaxRate(false) * 1000.0);
currSent -= dt * maxout; mCurrSent -= dt * maxout;
if (currSent < 0) if (mCurrSent < 0)
{ {
currSent = 0; mCurrSent = 0;
} }
currSentTS = t; mCurrSentTS = t;
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
{ {
std::string out; std::string out;
rs_sprintf(out, "pqistreamer::outAllowedBytes() is %d/%d", maxout - currSent, maxout); rs_sprintf(out, "pqistreamer::outAllowedBytes() is %d/%d", maxout - mCurrSent, maxout);
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out);
} }
#endif #endif
return maxout - currSent; return maxout - mCurrSent;
} }
int pqistreamer::inAllowedBytes() int pqistreamer::inAllowedBytes_locked()
{ {
int t = time(NULL); // get current timestep. int t = time(NULL); // get current timestep.
/* allow a lot if not bandwidthLimited */ /* allow a lot if not bandwidthLimited */
if (!bio->bandwidthLimited()) if (!mBio->bandwidthLimited())
{ {
currReadTS = t; mCurrReadTS = t;
currRead = 0; mCurrRead = 0;
return PQISTREAM_ABS_MAX; return PQISTREAM_ABS_MAX;
} }
int dt = t - currReadTS; int dt = t - mCurrReadTS;
// limiter -> for when currReadTs -> 0. // limiter -> for when currReadTs -> 0.
if (dt > 5) if (dt > 5)
dt = 5; dt = 5;
int maxin = (int) (getMaxRate(true) * 1000.0); int maxin = (int) (getMaxRate(true) * 1000.0);
currRead -= dt * maxin; mCurrRead -= dt * maxin;
if (currRead < 0) if (mCurrRead < 0)
{ {
currRead = 0; mCurrRead = 0;
} }
currReadTS = t; mCurrReadTS = t;
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
{ {
std::string out; std::string out;
rs_sprintf(out, "pqistreamer::inAllowedBytes() is %d/%d", maxin - currRead, maxin); rs_sprintf(out, "pqistreamer::inAllowedBytes() is %d/%d", maxin - mCurrRead, maxin);
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out);
} }
#endif #endif
return maxin - currRead; return maxin - mCurrRead;
} }
static const float AVG_PERIOD = 5; // sec static const float AVG_PERIOD = 5; // sec
static const float AVG_FRAC = 0.8; // for low pass filter. static const float AVG_FRAC = 0.8; // for low pass filter.
void pqistreamer::outSentBytes(int outb) void pqistreamer::outSentBytes_locked(int outb)
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
{ {
@ -861,29 +860,29 @@ void pqistreamer::outSentBytes(int outb)
totalSent += outb; mTotalSent += outb;
currSent += outb; mCurrSent += outb;
avgSentCount += outb; mAvgSentCount += outb;
int t = time(NULL); // get current timestep. int t = time(NULL); // get current timestep.
if (t - avgLastUpdate > AVG_PERIOD) if (t - mAvgLastUpdate > AVG_PERIOD)
{ {
float avgReadpSec = getRate(true); float avgReadpSec = getRate(true);
float avgSentpSec = getRate(false); float avgSentpSec = getRate(false);
avgReadpSec *= AVG_FRAC; avgReadpSec *= AVG_FRAC;
avgReadpSec += (1.0 - AVG_FRAC) * avgReadCount / avgReadpSec += (1.0 - AVG_FRAC) * mAvgReadCount /
(1000.0 * (t - avgLastUpdate)); (1000.0 * (t - mAvgLastUpdate));
avgSentpSec *= AVG_FRAC; avgSentpSec *= AVG_FRAC;
avgSentpSec += (1.0 - AVG_FRAC) * avgSentCount / avgSentpSec += (1.0 - AVG_FRAC) * mAvgSentCount /
(1000.0 * (t - avgLastUpdate)); (1000.0 * (t - mAvgLastUpdate));
/* pretend our rate is zero if we are /* pretend our rate is zero if we are
* not bandwidthLimited(). * not bandwidthLimited().
*/ */
if (bio->bandwidthLimited()) if (mBio->bandwidthLimited())
{ {
setRate(true, avgReadpSec); setRate(true, avgReadpSec);
setRate(false, avgSentpSec); setRate(false, avgSentpSec);
@ -895,14 +894,14 @@ void pqistreamer::outSentBytes(int outb)
} }
avgLastUpdate = t; mAvgLastUpdate = t;
avgReadCount = 0; mAvgReadCount = 0;
avgSentCount = 0; mAvgSentCount = 0;
} }
return; return;
} }
void pqistreamer::inReadBytes(int inb) void pqistreamer::inReadBytes_locked(int inb)
{ {
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
{ {
@ -912,45 +911,50 @@ void pqistreamer::inReadBytes(int inb)
} }
#endif #endif
totalRead += inb; mTotalRead += inb;
currRead += inb; mCurrRead += inb;
avgReadCount += inb; mAvgReadCount += inb;
return; return;
} }
int pqistreamer::getQueueSize(bool in) int pqistreamer::getQueueSize(bool in)
{ {
RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
if (in) if (in)
return incoming.size(); return mIncoming.size();
return out_queue_size(); return locked_out_queue_size();
} }
void pqistreamer::getRates(RsBwRates &rates) void pqistreamer::getRates(RsBwRates &rates)
{ {
RateInterface::getRates(rates); RateInterface::getRates(rates);
rates.mQueueIn = incoming.size();
rates.mQueueOut = out_queue_size(); RsStackMutex stack(mStreamerMtx); /**** LOCKED MUTEX ****/
rates.mQueueIn = mIncoming.size();
rates.mQueueOut = locked_out_queue_size();
} }
int pqistreamer::out_queue_size() const int pqistreamer::locked_out_queue_size() const
{ {
// Warning: because out_pkt is a list, calling size // Warning: because out_pkt is a list, calling size
// is O(n) ! Makign algorithms pretty inefficient. We should record how many // is O(n) ! Makign algorithms pretty inefficient. We should record how many
// items get stored and discarded to have a proper size value at any time // items get stored and discarded to have a proper size value at any time
// //
return out_pkt.size() ; return mOutPkts.size() ;
} }
void pqistreamer::locked_clear_out_queue() void pqistreamer::locked_clear_out_queue()
{ {
for(std::list<void*>::iterator it = out_pkt.begin(); it != out_pkt.end(); ) for(std::list<void*>::iterator it = mOutPkts.begin(); it != mOutPkts.end(); )
{ {
free(*it); free(*it);
it = out_pkt.erase(it); it = mOutPkts.erase(it);
#ifdef DEBUG_PQISTREAMER #ifdef DEBUG_PQISTREAMER
std::string out = "pqistreamer::handleoutgoing() Not active -> Clearing Pkt!"; std::string out = "pqistreamer::locked_clear_out_queue() Not active -> Clearing Pkt!";
// std::cerr << out ; std::cerr << out << std::endl;
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
#endif #endif
} }
@ -960,7 +964,7 @@ int pqistreamer::locked_compute_out_pkt_size() const
{ {
int total = 0 ; int total = 0 ;
for(std::list<void*>::const_iterator it = out_pkt.begin(); it != out_pkt.end(); ++it) for(std::list<void*>::const_iterator it = mOutPkts.begin(); it != mOutPkts.end(); ++it)
total += getRsItemSize(*it); total += getRsItemSize(*it);
return total ; return total ;
@ -970,12 +974,12 @@ void *pqistreamer::locked_pop_out_data()
{ {
void *res = NULL ; void *res = NULL ;
if (!out_pkt.empty()) if (!mOutPkts.empty())
{ {
res = *(out_pkt.begin()); res = *(mOutPkts.begin());
out_pkt.pop_front(); mOutPkts.pop_front();
#ifdef DEBUG_TRANSFERS #ifdef DEBUG_TRANSFERS
std::cerr << "pqistreamer::handleoutgoing() getting next pkt from out_pkt queue"; std::cerr << "pqistreamer::locked_pop_out_data() getting next pkt from mOutPkts queue";
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
} }

View File

@ -67,69 +67,71 @@ class pqistreamer: public PQInterface
// These methods are redefined in pqiQoSstreamer // These methods are redefined in pqiQoSstreamer
// //
virtual void locked_storeInOutputQueue(void *ptr,int priority) ; virtual void locked_storeInOutputQueue(void *ptr,int priority) ;
virtual int out_queue_size() const ; virtual int locked_out_queue_size() const ;
virtual void locked_clear_out_queue() ; virtual void locked_clear_out_queue() ;
virtual int locked_compute_out_pkt_size() const ; virtual int locked_compute_out_pkt_size() const ;
virtual void *locked_pop_out_data() ; virtual void *locked_pop_out_data() ;
protected:
RsMutex mStreamerMtx ; // Protects data, fns below, protected so pqiqos can use it too.
private: private:
// to filter functions - detect filecancel/data and act! int queue_outpqi_locked(RsItem *i,uint32_t& serialized_size);
int queue_outpqi(RsItem *i,uint32_t& serialized_size); int handleincomingitem_locked(RsItem *i);
int handleincomingitem(RsItem *i);
// ticked regularly (manages out queues and sending // ticked regularly (manages out queues and sending
// via above interfaces. // via above interfaces.
virtual int handleoutgoing(); virtual int handleoutgoing_locked();
virtual int handleincoming(); virtual int handleincoming_locked();
// Bandwidth/Streaming Management. // Bandwidth/Streaming Management.
float outTimeSlice(); float outTimeSlice_locked();
int outAllowedBytes_locked();
void outSentBytes_locked(int );
int inAllowedBytes_locked();
void inReadBytes_locked(int );
int outAllowedBytes();
void outSentBytes(int );
int inAllowedBytes();
void inReadBytes(int );
// RsSerialiser - determines which packets can be serialised. // RsSerialiser - determines which packets can be serialised.
RsSerialiser *rsSerialiser; RsSerialiser *mRsSerialiser;
// Binary Interface for IO, initialisated at startup. // Binary Interface for IO, initialisated at startup.
BinInterface *bio; BinInterface *mBio;
unsigned int bio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE unsigned int mBio_flags; // BIN_FLAGS_NO_CLOSE | BIN_FLAGS_NO_DELETE
void *pkt_wpending; // storage for pending packet to write. void *mPkt_wpending; // storage for pending packet to write.
int pkt_rpend_size; // size of pkt_rpending. int mPkt_rpend_size; // size of pkt_rpending.
void *pkt_rpending; // storage for read in pending packets. void *mPkt_rpending; // storage for read in pending packets.
enum {reading_state_packet_started=1, enum {reading_state_packet_started=1,
reading_state_initial=0 } ; reading_state_initial=0 } ;
int reading_state ; int mReading_state ;
int failed_read_attempts ; int mFailed_read_attempts ;
// Temp Storage for transient data..... // Temp Storage for transient data.....
std::list<void *> out_pkt; // Cntrl / Search / Results queue std::list<void *> mOutPkts; // Cntrl / Search / Results queue
std::list<RsItem *> incoming; std::list<RsItem *> mIncoming;
// data for network stats. // data for network stats.
int totalRead; int mTotalRead;
int totalSent; int mTotalSent;
// these are representative (but not exact) // these are representative (but not exact)
int currRead; int mCurrRead;
int currSent; int mCurrSent;
int currReadTS; // TS from which these are measured. int mCurrReadTS; // TS from which these are measured.
int currSentTS; int mCurrSentTS;
int avgLastUpdate; // TS from which these are measured. int mAvgLastUpdate; // TS from which these are measured.
float avgReadCount; float mAvgReadCount;
float avgSentCount; float mAvgSentCount;
time_t mLastIncomingTs; time_t mLastIncomingTs;
RsMutex streamerMtx ; // WHAT IS THIS PROTECTING. XXX
// pthread_t thread_id;A
}; };

View File

@ -3,11 +3,11 @@
* *
* 3P/PQI network interface for RetroShare. * 3P/PQI network interface for RetroShare.
* *
* Copyright 2004-2008 by Robert Fernie. * Copyright 2004-2013 by Robert Fernie.
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public * modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation. * License Version 2.1 as published by the Free Software Foundation.
* *
* This library is distributed in the hope that it will be useful, * This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of * but WITHOUT ANY WARRANTY; without even the implied warranty of
@ -32,18 +32,14 @@
* #define SERV_DEBUG 1 * #define SERV_DEBUG 1
****/ ****/
void p3Service::addSerialType(RsSerialType *st)
{
rsSerialiser->addSerialType(st);
}
RsItem *p3Service::recvItem() RsItem *p3Service::recvItem()
{ {
srvMtx.lock(); /***** LOCK MUTEX *****/ RsStackMutex stack(srvMtx); /***** LOCK MUTEX *****/
if (recv_queue.size() == 0) if (recv_queue.size() == 0)
{ {
srvMtx.unlock(); /***** UNLOCK MUTEX *****/
return NULL; /* nothing there! */ return NULL; /* nothing there! */
} }
@ -51,75 +47,81 @@ RsItem *p3Service::recvItem()
RsItem *item = recv_queue.front(); RsItem *item = recv_queue.front();
recv_queue.pop_front(); recv_queue.pop_front();
srvMtx.unlock(); /***** UNLOCK MUTEX *****/
return item; return item;
} }
bool p3Service::receivedItems() bool p3Service::receivedItems()
{ {
srvMtx.lock(); /***** LOCK MUTEX *****/ RsStackMutex stack(srvMtx); /***** LOCK MUTEX *****/
bool moreData = (recv_queue.size() != 0); return (!recv_queue.empty());
srvMtx.unlock(); /***** UNLOCK MUTEX *****/
return moreData;
} }
int p3Service::sendItem(RsItem *item) bool p3Service::recvItem(RsItem *item)
{ {
srvMtx.lock(); /***** LOCK MUTEX *****/ if (item)
{
RsStackMutex stack(srvMtx); /***** LOCK MUTEX *****/
send_queue.push_back(item); recv_queue.push_back(item);
}
srvMtx.unlock(); /***** UNLOCK MUTEX *****/
return 1;
} }
void p3FastService::addSerialType(RsSerialType *st)
{
rsSerialiser->addSerialType(st);
}
// overloaded pqiService interface. // overloaded pqiService interface.
int p3Service::receive(RsRawItem *raw) bool p3FastService::recv(RsRawItem *raw)
{ {
srvMtx.lock(); /***** LOCK MUTEX *****/ RsItem *item = NULL;
#ifdef SERV_DEBUG
std::cerr << "p3Service::receive()";
std::cerr << std::endl;
#endif
/* convert to RsServiceItem */
uint32_t size = raw->getRawLength();
RsItem *item = rsSerialiser->deserialise(raw->getRawData(), &size);
if ((!item) || (size != raw->getRawLength()))
{ {
/* error in conversion */ RsStackMutex stack(srvMtx); /***** LOCK MUTEX *****/
#ifdef SERV_DEBUG
std::cerr << "p3Service::receive() Error" << std::endl; #ifdef SERV_DEBUG
std::cerr << "p3Service::receive() Size: " << size << std::endl; std::cerr << "p3Service::recv()";
std::cerr << "p3Service::receive() RawLength: " << raw->getRawLength() << std::endl; std::cerr << std::endl;
#endif #endif
if (item) /* convert to RsServiceItem */
uint32_t size = raw->getRawLength();
item = rsSerialiser->deserialise(raw->getRawData(), &size);
if ((!item) || (size != raw->getRawLength()))
{ {
#ifdef SERV_DEBUG /* error in conversion */
std::cerr << "p3Service::receive() Bad Item:"; #ifdef SERV_DEBUG
std::cerr << std::endl; std::cerr << "p3Service::recv() Error" << std::endl;
item->print(std::cerr, 0); std::cerr << "p3Service::recv() Size: " << size << std::endl;
std::cerr << std::endl; std::cerr << "p3Service::recv() RawLength: " << raw->getRawLength() << std::endl;
#endif #endif
delete item;
item=NULL ; if (item)
{
#ifdef SERV_DEBUG
std::cerr << "p3Service::recv() Bad Item:";
std::cerr << std::endl;
item->print(std::cerr, 0);
std::cerr << std::endl;
#endif
delete item;
item=NULL ;
}
} }
} }
/* if we have something - pass it on */ /* if we have something - pass it on */
if (item) if (item)
{ {
#ifdef SERV_DEBUG #ifdef SERV_DEBUG
std::cerr << "p3Service::receive() item:"; std::cerr << "p3Service::recv() item:";
std::cerr << std::endl; std::cerr << std::endl;
item->print(std::cerr, 0); item->print(std::cerr, 0);
std::cerr << std::endl; std::cerr << std::endl;
@ -127,33 +129,22 @@ int p3Service::receive(RsRawItem *raw)
/* ensure PeerId is transferred */ /* ensure PeerId is transferred */
item->PeerId(raw->PeerId()); item->PeerId(raw->PeerId());
recv_queue.push_back(item); recvItem(item);
} }
/* cleanup input */ /* cleanup input */
delete raw; delete raw;
srvMtx.unlock(); /***** UNLOCK MUTEX *****/
return (item != NULL); return (item != NULL);
} }
RsRawItem *p3Service::send()
int p3FastService::sendItem(RsItem *si)
{ {
srvMtx.lock(); /***** LOCK MUTEX *****/ RsStackMutex stack(srvMtx); /***** LOCK MUTEX *****/
if (send_queue.size() == 0)
{
srvMtx.unlock(); /***** UNLOCK MUTEX *****/
return NULL; /* nothing there! */
}
/* get something off front */
RsItem *si = send_queue.front();
send_queue.pop_front();
#ifdef SERV_DEBUG #ifdef SERV_DEBUG
std::cerr << "p3Service::send() Sending item:"; std::cerr << "p3Service::sendItem() Sending item:";
std::cerr << std::endl; std::cerr << std::endl;
si->print(std::cerr, 0); si->print(std::cerr, 0);
std::cerr << std::endl; std::cerr << std::endl;
@ -170,8 +161,7 @@ RsRawItem *p3Service::send()
/* can't convert! */ /* can't convert! */
delete si; delete si;
srvMtx.unlock(); /***** UNLOCK MUTEX *****/ return 0;
return NULL;
} }
RsRawItem *raw = new RsRawItem(si->PacketId(), size); RsRawItem *raw = new RsRawItem(si->PacketId(), size);
@ -203,7 +193,7 @@ RsRawItem *p3Service::send()
if(si->priority_level() == QOS_PRIORITY_UNKNOWN) if(si->priority_level() == QOS_PRIORITY_UNKNOWN)
{ {
std::cerr << "************************************************************" << std::endl; std::cerr << "************************************************************" << std::endl;
std::cerr << "********** Warning: p3service::send() ********" << std::endl; std::cerr << "********** Warning: p3Service::send() ********" << std::endl;
std::cerr << "********** Warning: caught a RsItem with undefined ********" << std::endl; std::cerr << "********** Warning: caught a RsItem with undefined ********" << std::endl;
std::cerr << "********** priority level. That should not ********" << std::endl; std::cerr << "********** priority level. That should not ********" << std::endl;
std::cerr << "********** happen. Please fix your items! ********" << std::endl; std::cerr << "********** happen. Please fix your items! ********" << std::endl;
@ -215,38 +205,11 @@ RsRawItem *p3Service::send()
/* cleanup */ /* cleanup */
delete si; delete si;
srvMtx.unlock(); /***** UNLOCK MUTEX *****/
#ifdef SERV_DEBUG #ifdef SERV_DEBUG
std::cerr << "p3Service::send() returning RawItem."; std::cerr << "p3Service::send() returning RawItem.";
std::cerr << std::endl; std::cerr << std::endl;
#endif #endif
return raw; return pqiService::send(raw);
} }
std::string generateRandomServiceId()
{
std::string out;
/********************************** WINDOWS/UNIX SPECIFIC PART ******************/
#ifndef WINDOWS_SYS
/* 4 bytes per random number: 4 x 4 = 16 bytes */
for(int i = 0; i < 4; i++)
{
uint32_t rint = random();
rs_sprintf_append(out, "%08x", rint);
}
#else
srand(time(NULL));
/* 2 bytes per random number: 8 x 2 = 16 bytes */
for(int i = 0; i < 8; i++)
{
uint16_t rint = rand(); /* only gives 16 bits */
rs_sprintf_append(out, "%04x", rint);
}
#endif
/********************************** WINDOWS/UNIX SPECIFIC PART ******************/
return out;
}

View File

@ -52,13 +52,14 @@ std::string generateRandomServiceId();
//TODO : encryption and upload / download rate implementation //TODO : encryption and upload / download rate implementation
class p3Service: public pqiService
class p3FastService: public pqiService
{ {
protected: protected:
p3Service(uint16_t type) p3FastService(uint16_t type)
:pqiService((((uint32_t) RS_PKT_VERSION_SERVICE) << 24) + (((uint32_t) type) << 8)), :pqiService((((uint32_t) RS_PKT_VERSION_SERVICE) << 24) + (((uint32_t) type) << 8)),
srvMtx("p3Service"), rsSerialiser(NULL) srvMtx("p3FastService"), rsSerialiser(NULL)
{ {
rsSerialiser = new RsSerialiser(); rsSerialiser = new RsSerialiser();
return; return;
@ -66,33 +67,59 @@ class p3Service: public pqiService
public: public:
virtual ~p3Service() { delete rsSerialiser; return; } virtual ~p3FastService() { delete rsSerialiser; return; }
/*************** INTERFACE ******************************/ /*************** INTERFACE ******************************/
/* called from Thread/tick/GUI */
int sendItem(RsItem *); int sendItem(RsItem *);
RsItem * recvItem();
bool receivedItems();
virtual int tick() { return 0; } virtual int tick() { return 0; }
/*************** INTERFACE ******************************/ /*************** INTERFACE ******************************/
public: public:
// overloaded pqiService interface. // overloaded pqiService interface.
virtual int receive(RsRawItem *); virtual bool recv(RsRawItem *);
virtual RsRawItem * send();
// called by recv().
virtual bool recvItem(RsItem *item) = 0;
protected: protected:
void addSerialType(RsSerialType *); void addSerialType(RsSerialType *);
private: RsMutex srvMtx; /* below locked by Mutex */
RsMutex srvMtx;
/* below locked by Mutex */
RsSerialiser *rsSerialiser; RsSerialiser *rsSerialiser;
std::list<RsItem *> recv_queue, send_queue; };
class p3Service: public p3FastService
{
protected:
p3Service(uint16_t type)
:p3FastService(type)
{
return;
}
public:
/*************** INTERFACE ******************************/
/* called from Thread/tick/GUI */
//int sendItem(RsItem *);
RsItem * recvItem();
bool receivedItems();
//virtual int tick() { return 0; }
/*************** INTERFACE ******************************/
public:
// overloaded p3FastService interface.
virtual bool recvItem(RsItem *item);
private:
/* below locked by srvMtx Mutex */
std::list<RsItem *> recv_queue;
}; };
@ -110,16 +137,17 @@ class nullService: public pqiService
public: public:
// overloaded NULL pqiService interface. // overloaded NULL pqiService interface.
virtual int receive(RsRawItem *item) virtual bool recv(RsRawItem *item)
{ {
/* drop any items */ /* drop any items */
delete item; delete item;
return 1; return true;
} }
virtual RsRawItem * send() virtual bool send(RsRawItem *item)
{ {
return NULL; delete item;
return true;
} }
}; };