mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-05-27 18:12:21 -04:00
Major changes to the networking core of retroshare to introduce the new serialiser.
- Added new serialiser (PQItem -> RsItem), removed old one. - switched packet sorting from ChanId (array of ids) to PeerId (string) - introduced cleaner service interface (pqiservice). - moved p3disc to service interface. - modified streamers to use the new serialiser. - moved msg/chat to service interface. - removed old source code. (supernode / p3loopback). I've disabled UDP connections / Proxy and Channels for the moment. The code it still here, but is not compiled. The Proxy and Channels will become services, and the UDP connections will be reworked in the near future. git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@274 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
07d33009b9
commit
2c9c31eaf0
36 changed files with 866 additions and 8116 deletions
|
@ -27,7 +27,8 @@
|
|||
|
||||
|
||||
#include "pqi/pqistreamer.h"
|
||||
#include "pqi/pqipacket.h"
|
||||
#include "serialiser/rsserial.h"
|
||||
#include "serialiser/rsbaseitems.h" /***** For RsFileData *****/
|
||||
#include <iostream>
|
||||
#include <fstream>
|
||||
|
||||
|
@ -36,10 +37,10 @@
|
|||
|
||||
const int pqistreamerzone = 8221;
|
||||
|
||||
const int PQISTREAM_ABS_MAX = 10000000; /* 10 MB/sec (actually per loop) */
|
||||
const int PQISTREAM_ABS_MAX = 100000000; /* 100 MB/sec (actually per loop) */
|
||||
|
||||
pqistreamer::pqistreamer(BinInterface *bio_in, int bio_flags_in)
|
||||
:bio(bio_in), bio_flags(bio_flags_in),
|
||||
pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in, int bio_flags_in)
|
||||
:PQInterface(id), rsSerialiser(rss), bio(bio_in), bio_flags(bio_flags_in),
|
||||
pkt_wpending(NULL),
|
||||
totalRead(0), totalSent(0),
|
||||
currRead(0), currSent(0),
|
||||
|
@ -48,7 +49,7 @@ pqistreamer::pqistreamer(BinInterface *bio_in, int bio_flags_in)
|
|||
avgLastUpdate = currReadTS = currSentTS = time(NULL);
|
||||
|
||||
/* allocated once */
|
||||
pkt_rpend_size = pqipkt_maxsize();
|
||||
pkt_rpend_size = getRsPktMaxSize();
|
||||
pkt_rpending = malloc(pkt_rpend_size);
|
||||
|
||||
// 100 B/s (minimal)
|
||||
|
@ -107,7 +108,7 @@ pqistreamer::~pqistreamer()
|
|||
{
|
||||
void *pkt = out_pkt.front();
|
||||
out_pkt.pop_front();
|
||||
pqipkt_delete(pkt);
|
||||
free(pkt);
|
||||
}
|
||||
|
||||
// clean up outgoing (data packets)
|
||||
|
@ -115,12 +116,12 @@ pqistreamer::~pqistreamer()
|
|||
{
|
||||
void *pkt = out_data.front();
|
||||
out_data.pop_front();
|
||||
pqipkt_delete(pkt);
|
||||
free(pkt);
|
||||
}
|
||||
|
||||
if (pkt_wpending)
|
||||
{
|
||||
pqipkt_delete(pkt_wpending);
|
||||
free(pkt_wpending);
|
||||
pkt_wpending = NULL;
|
||||
}
|
||||
|
||||
|
@ -129,7 +130,7 @@ pqistreamer::~pqistreamer()
|
|||
// clean up outgoing.
|
||||
while(incoming.size() > 0)
|
||||
{
|
||||
PQItem *i = incoming.front();
|
||||
RsItem *i = incoming.front();
|
||||
incoming.pop_front();
|
||||
delete i;
|
||||
}
|
||||
|
@ -138,7 +139,7 @@ pqistreamer::~pqistreamer()
|
|||
|
||||
|
||||
// Get/Send Items.
|
||||
int pqistreamer::SendItem(PQItem *si)
|
||||
int pqistreamer::SendItem(RsItem *si)
|
||||
{
|
||||
{
|
||||
std::ostringstream out;
|
||||
|
@ -150,7 +151,7 @@ int pqistreamer::SendItem(PQItem *si)
|
|||
return queue_outpqi(si);
|
||||
}
|
||||
|
||||
PQItem *pqistreamer::GetItem()
|
||||
RsItem *pqistreamer::GetItem()
|
||||
{
|
||||
{
|
||||
std::ostringstream out;
|
||||
|
@ -158,12 +159,12 @@ PQItem *pqistreamer::GetItem()
|
|||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
}
|
||||
|
||||
std::list<PQItem *>::iterator it;
|
||||
std::list<RsItem *>::iterator it;
|
||||
|
||||
it = incoming.begin();
|
||||
if (it == incoming.end()) { return NULL; }
|
||||
|
||||
PQItem *osr = (*it);
|
||||
RsItem *osr = (*it);
|
||||
incoming.erase(it);
|
||||
return osr;
|
||||
}
|
||||
|
@ -175,14 +176,7 @@ int pqistreamer::tick()
|
|||
std::ostringstream out;
|
||||
out << "pqistreamer::tick()";
|
||||
out << std::endl;
|
||||
if (getContact())
|
||||
{
|
||||
out << getContact() -> Name() << ": currRead/Sent: " << currRead << "/" << currSent;
|
||||
}
|
||||
else
|
||||
{
|
||||
out << "Unknown Contact" << ": currRead/Sent: " << currRead << "/" << currSent;
|
||||
}
|
||||
out << PeerId() << ": currRead/Sent: " << currRead << "/" << currSent;
|
||||
out << std::endl;
|
||||
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
|
@ -210,10 +204,8 @@ int pqistreamer::tick()
|
|||
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::tick() Queued Data:";
|
||||
if (getContact())
|
||||
{
|
||||
out << " for " << getContact() -> Name();
|
||||
}
|
||||
out << " for " << PeerId();
|
||||
|
||||
if (bio->isactive())
|
||||
{
|
||||
out << " (active)";
|
||||
|
@ -227,7 +219,7 @@ int pqistreamer::tick()
|
|||
int total = 0;
|
||||
for(it = out_pkt.begin(); it != out_pkt.end(); it++)
|
||||
{
|
||||
total += pqipkt_rawlen(*it);
|
||||
total += getRsItemSize(*it);
|
||||
}
|
||||
|
||||
out << "\t Out Packets [" << out_pkt.size() << "] => " << total;
|
||||
|
@ -236,7 +228,7 @@ int pqistreamer::tick()
|
|||
total = 0;
|
||||
for(it = out_data.begin(); it != out_data.end(); it++)
|
||||
{
|
||||
total += pqipkt_rawlen(*it);
|
||||
total += getRsItemSize(*it);
|
||||
}
|
||||
|
||||
out << "\t Out Data [" << out_data.size() << "] => " << total;
|
||||
|
@ -277,7 +269,7 @@ int pqistreamer::status()
|
|||
//
|
||||
/**************** HANDLE OUTGOING TRANSLATION + TRANSMISSION ******/
|
||||
|
||||
int pqistreamer::queue_outpqi(PQItem *pqi)
|
||||
int pqistreamer::queue_outpqi(RsItem *pqi)
|
||||
{
|
||||
{
|
||||
std::ostringstream out;
|
||||
|
@ -286,12 +278,13 @@ int pqistreamer::queue_outpqi(PQItem *pqi)
|
|||
}
|
||||
|
||||
/* decide which type of packet it is */
|
||||
PQFileData *dta = dynamic_cast<PQFileData *>(pqi);
|
||||
RsFileData *dta = dynamic_cast<RsFileData *>(pqi);
|
||||
bool isCntrl = (dta == NULL);
|
||||
|
||||
|
||||
void *ptr = pqipkt_makepkt(pqi);
|
||||
if (NULL != ptr)
|
||||
uint32_t pktsize = rsSerialiser->size(pqi);
|
||||
void *ptr = malloc(pktsize);
|
||||
if (rsSerialiser->serialise(pqi, ptr, &pktsize))
|
||||
{
|
||||
if (isCntrl)
|
||||
{
|
||||
|
@ -304,6 +297,11 @@ int pqistreamer::queue_outpqi(PQItem *pqi)
|
|||
delete pqi;
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* cleanup serialiser */
|
||||
free(ptr);
|
||||
}
|
||||
|
||||
std::ostringstream out;
|
||||
out << "pqistreamer::queue_outpqi() Null Pkt generated!";
|
||||
|
@ -316,7 +314,7 @@ int pqistreamer::queue_outpqi(PQItem *pqi)
|
|||
return 1; // keep error internal.
|
||||
}
|
||||
|
||||
int pqistreamer::handleincomingitem(PQItem *pqi)
|
||||
int pqistreamer::handleincomingitem(RsItem *pqi)
|
||||
{
|
||||
{
|
||||
std::ostringstream out;
|
||||
|
@ -325,7 +323,7 @@ int pqistreamer::handleincomingitem(PQItem *pqi)
|
|||
}
|
||||
|
||||
// Use overloaded Contact function
|
||||
pqi -> p = getContact();
|
||||
pqi -> PeerId(PeerId());
|
||||
incoming.push_back(pqi);
|
||||
return 1;
|
||||
}
|
||||
|
@ -351,7 +349,7 @@ int pqistreamer::handleoutgoing()
|
|||
/* if we are not active - clear anything in the queues. */
|
||||
for(it = out_pkt.begin(); it != out_pkt.end(); )
|
||||
{
|
||||
pqipkt_delete(*it);
|
||||
free(*it);
|
||||
it = out_pkt.erase(it);
|
||||
|
||||
std::ostringstream out;
|
||||
|
@ -360,7 +358,7 @@ int pqistreamer::handleoutgoing()
|
|||
}
|
||||
for(it = out_data.begin(); it != out_data.end(); )
|
||||
{
|
||||
pqipkt_delete(*it);
|
||||
free(*it);
|
||||
it = out_data.erase(it);
|
||||
|
||||
std::ostringstream out;
|
||||
|
@ -371,7 +369,7 @@ int pqistreamer::handleoutgoing()
|
|||
/* also remove the pending packets */
|
||||
if (pkt_wpending)
|
||||
{
|
||||
pqipkt_delete(pkt_wpending);
|
||||
free(pkt_wpending);
|
||||
pkt_wpending = NULL;
|
||||
}
|
||||
|
||||
|
@ -413,7 +411,7 @@ int pqistreamer::handleoutgoing()
|
|||
std::ostringstream out;
|
||||
out << "Sending Out Pkt!";
|
||||
// write packet.
|
||||
len = pqipkt_rawlen(pkt_wpending);
|
||||
len = getRsItemSize(pkt_wpending);
|
||||
if (len != (ss = bio->senddata(pkt_wpending, len)))
|
||||
{
|
||||
out << "Problems with Send Data!";
|
||||
|
@ -429,7 +427,7 @@ int pqistreamer::handleoutgoing()
|
|||
out << " Success!" << std::endl;
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
||||
|
||||
pqipkt_delete(pkt_wpending);
|
||||
free(pkt_wpending);
|
||||
pkt_wpending = NULL;
|
||||
|
||||
sentbytes += len;
|
||||
|
@ -465,10 +463,10 @@ int pqistreamer::handleincoming()
|
|||
void *block = pkt_rpending;
|
||||
|
||||
// initial read size: basic packet.
|
||||
int blen = pqipkt_basesize();
|
||||
int blen = getRsPktBaseSize();
|
||||
|
||||
int tmplen;
|
||||
int pktlen;
|
||||
uint32_t pktlen;
|
||||
int maxin = inAllowedBytes();
|
||||
|
||||
while((maxin > readbytes) && (bio->moretoread()))
|
||||
|
@ -512,7 +510,7 @@ int pqistreamer::handleincoming()
|
|||
pktlen = tmplen;
|
||||
|
||||
// workout how much more to read.
|
||||
int extralen = pqipkt_rawlen(block) - blen;
|
||||
int extralen = getRsItemSize(block) - blen;
|
||||
if (extralen > maxlen - blen)
|
||||
{
|
||||
pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!");
|
||||
|
@ -553,11 +551,7 @@ int pqistreamer::handleincoming()
|
|||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
||||
}
|
||||
|
||||
PQItem *pkt = NULL;
|
||||
if (pqipkt_check(block, pktlen))
|
||||
{
|
||||
pkt = pqipkt_create(block);
|
||||
}
|
||||
RsItem *pkt = rsSerialiser->deserialise(block, &pktlen);
|
||||
|
||||
if ((pkt != NULL) && (0 < handleincomingitem(pkt)))
|
||||
{
|
||||
|
@ -589,7 +583,7 @@ float pqistreamer::outTimeSlice()
|
|||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
||||
}
|
||||
|
||||
fixme("pqistreamer::outTimeSlice()", 1);
|
||||
//fixme("pqistreamer::outTimeSlice()", 1);
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue