mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
Added a Mutex around out_pkt and out_data in pqistreamer.cc, as a thread race between queue_outpqi and handleoutgoing is most probably responsible for random packet corruption.
git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1068 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
cc993dab25
commit
05659d6b7c
@ -60,6 +60,7 @@ pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in
|
|||||||
pkt_rpending = malloc(pkt_rpend_size);
|
pkt_rpending = malloc(pkt_rpend_size);
|
||||||
reading_state = reading_state_initial ;
|
reading_state = reading_state_initial ;
|
||||||
|
|
||||||
|
// thread_id = pthread_self() ;
|
||||||
// avoid uninitialized (and random) memory read.
|
// avoid uninitialized (and random) memory read.
|
||||||
memset(pkt_rpending,0,pkt_rpend_size) ;
|
memset(pkt_rpending,0,pkt_rpend_size) ;
|
||||||
|
|
||||||
@ -92,6 +93,8 @@ pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in
|
|||||||
|
|
||||||
pqistreamer::~pqistreamer()
|
pqistreamer::~pqistreamer()
|
||||||
{
|
{
|
||||||
|
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data
|
||||||
|
|
||||||
{
|
{
|
||||||
std::ostringstream out;
|
std::ostringstream out;
|
||||||
out << "pqistreamer::~pqistreamer()";
|
out << "pqistreamer::~pqistreamer()";
|
||||||
@ -241,27 +244,31 @@ int pqistreamer::tick()
|
|||||||
}
|
}
|
||||||
out << std::endl;
|
out << std::endl;
|
||||||
|
|
||||||
int total = 0;
|
|
||||||
for(it = out_pkt.begin(); it != out_pkt.end(); it++)
|
|
||||||
{
|
{
|
||||||
total += getRsItemSize(*it);
|
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data
|
||||||
|
int total = 0;
|
||||||
|
|
||||||
|
for(it = out_pkt.begin(); it != out_pkt.end(); it++)
|
||||||
|
{
|
||||||
|
total += getRsItemSize(*it);
|
||||||
|
}
|
||||||
|
|
||||||
|
out << "\t Out Packets [" << out_pkt.size() << "] => " << total;
|
||||||
|
out << " bytes" << std::endl;
|
||||||
|
|
||||||
|
total = 0;
|
||||||
|
for(it = out_data.begin(); it != out_data.end(); it++)
|
||||||
|
{
|
||||||
|
total += getRsItemSize(*it);
|
||||||
|
}
|
||||||
|
|
||||||
|
out << "\t Out Data [" << out_data.size() << "] => " << total;
|
||||||
|
out << " bytes" << std::endl;
|
||||||
|
|
||||||
|
out << "\t Incoming [" << incoming.size() << "]";
|
||||||
|
out << std::endl;
|
||||||
}
|
}
|
||||||
|
|
||||||
out << "\t Out Packets [" << out_pkt.size() << "] => " << total;
|
|
||||||
out << " bytes" << std::endl;
|
|
||||||
|
|
||||||
total = 0;
|
|
||||||
for(it = out_data.begin(); it != out_data.end(); it++)
|
|
||||||
{
|
|
||||||
total += getRsItemSize(*it);
|
|
||||||
}
|
|
||||||
|
|
||||||
out << "\t Out Data [" << out_data.size() << "] => " << total;
|
|
||||||
out << " bytes" << std::endl;
|
|
||||||
|
|
||||||
out << "\t Incoming [" << incoming.size() << "]";
|
|
||||||
out << std::endl;
|
|
||||||
|
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,6 +303,12 @@ int pqistreamer::status()
|
|||||||
|
|
||||||
int pqistreamer::queue_outpqi(RsItem *pqi)
|
int pqistreamer::queue_outpqi(RsItem *pqi)
|
||||||
{
|
{
|
||||||
|
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data
|
||||||
|
|
||||||
|
// This is called by different threads, and by threads that are not the handleoutgoing thread,
|
||||||
|
// so it should be protected by a mutex !!
|
||||||
|
|
||||||
|
|
||||||
{
|
{
|
||||||
std::ostringstream out;
|
std::ostringstream out;
|
||||||
out << "pqistreamer::queue_outpqi()";
|
out << "pqistreamer::queue_outpqi()";
|
||||||
@ -306,6 +319,7 @@ int pqistreamer::queue_outpqi(RsItem *pqi)
|
|||||||
RsFileData *dta = dynamic_cast<RsFileData *>(pqi);
|
RsFileData *dta = dynamic_cast<RsFileData *>(pqi);
|
||||||
bool isCntrl = (dta == NULL);
|
bool isCntrl = (dta == NULL);
|
||||||
|
|
||||||
|
// std::cerr << "Thread (queue_outpqi): thread = " << pthread_self() << "isCntrl=" << isCntrl << std::endl ;
|
||||||
|
|
||||||
uint32_t pktsize = rsSerialiser->size(pqi);
|
uint32_t pktsize = rsSerialiser->size(pqi);
|
||||||
void *ptr = malloc(pktsize);
|
void *ptr = malloc(pktsize);
|
||||||
@ -363,6 +377,8 @@ int pqistreamer::handleincomingitem(RsItem *pqi)
|
|||||||
|
|
||||||
int pqistreamer::handleoutgoing()
|
int pqistreamer::handleoutgoing()
|
||||||
{
|
{
|
||||||
|
RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data
|
||||||
|
|
||||||
{
|
{
|
||||||
std::ostringstream out;
|
std::ostringstream out;
|
||||||
out << "pqistreamer::handleoutgoing()";
|
out << "pqistreamer::handleoutgoing()";
|
||||||
@ -481,199 +497,6 @@ int pqistreamer::handleoutgoing()
|
|||||||
|
|
||||||
/* Handles reading from input stream.
|
/* Handles reading from input stream.
|
||||||
*/
|
*/
|
||||||
#ifdef OLD_VERSION
|
|
||||||
int pqistreamer::handleincoming()
|
|
||||||
{
|
|
||||||
int readbytes = 0;
|
|
||||||
|
|
||||||
{
|
|
||||||
std::ostringstream out;
|
|
||||||
out << "pqistreamer::handleincoming()";
|
|
||||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!(bio->isactive()))
|
|
||||||
{
|
|
||||||
inReadBytes(readbytes);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// enough space to read any packet.
|
|
||||||
int maxlen = pkt_rpend_size;
|
|
||||||
void *block = pkt_rpending;
|
|
||||||
|
|
||||||
// initial read size: basic packet.
|
|
||||||
int blen = getRsPktBaseSize();
|
|
||||||
|
|
||||||
int tmplen;
|
|
||||||
uint32_t pktlen;
|
|
||||||
int maxin = inAllowedBytes();
|
|
||||||
|
|
||||||
while((maxin > readbytes) && (bio->moretoread()))
|
|
||||||
{
|
|
||||||
// read the basic block (minimum packet size)
|
|
||||||
if (blen != (tmplen = bio->readdata(block, blen)))
|
|
||||||
{
|
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone,
|
|
||||||
"pqistreamer::handleincoming() Didn't read BasePkt!");
|
|
||||||
|
|
||||||
// error.... (either blocked or failure)
|
|
||||||
inReadBytes(readbytes);
|
|
||||||
if (tmplen == 0)
|
|
||||||
{
|
|
||||||
|
|
||||||
// most likely blocked!
|
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone,
|
|
||||||
"pqistreamer::handleincoming() read blocked");
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
else if (tmplen < 0)
|
|
||||||
{
|
|
||||||
// assume the worse, that
|
|
||||||
// the stream is bust ... and jump away.
|
|
||||||
pqioutput(PQL_WARNING, pqistreamerzone,
|
|
||||||
"pqistreamer::handleincoming() Error in bio read");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
else // tmplen > 0
|
|
||||||
{
|
|
||||||
// strange case....
|
|
||||||
std::ostringstream out;
|
|
||||||
out << "pqistreamer::handleincoming() Incomplete ";
|
|
||||||
out << "(Strange) read of " << tmplen << " bytes";
|
|
||||||
pqioutput(PQL_ALERT, pqistreamerzone, out.str());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
readbytes += tmplen;
|
|
||||||
pktlen = tmplen;
|
|
||||||
|
|
||||||
// workout how much more to read.
|
|
||||||
int extralen = getRsItemSize(block) - blen;
|
|
||||||
if (extralen > maxlen - blen)
|
|
||||||
{
|
|
||||||
pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!");
|
|
||||||
|
|
||||||
pqiNotify *notify = getPqiNotify();
|
|
||||||
if (notify)
|
|
||||||
{
|
|
||||||
std::string title =
|
|
||||||
"Warning: Bad Packet Read";
|
|
||||||
|
|
||||||
std::ostringstream msgout;
|
|
||||||
msgout << " **** WARNING **** \n";
|
|
||||||
msgout << "Retroshare has caught a BAD Packet Read";
|
|
||||||
msgout << "\n";
|
|
||||||
msgout << "This is normally caused by connecting to an";
|
|
||||||
msgout << " OLD version of Retroshare";
|
|
||||||
msgout << "\n";
|
|
||||||
msgout << "(M:" << maxlen << " B:" << blen << " E:" << extralen << ")\n";
|
|
||||||
msgout << "\n";
|
|
||||||
msgout << "\n";
|
|
||||||
msgout << "Please get your friends to upgrade to the latest version";
|
|
||||||
msgout << "\n";
|
|
||||||
msgout << "\n";
|
|
||||||
msgout << "If you are sure the error was not caused by an old version";
|
|
||||||
msgout << "\n";
|
|
||||||
msgout << "Please report the problem to Retroshare's developers";
|
|
||||||
msgout << "\n";
|
|
||||||
|
|
||||||
std::string msg = msgout.str();
|
|
||||||
notify->AddSysMessage(0, RS_SYS_WARNING, title, msg);
|
|
||||||
}
|
|
||||||
bio->close();
|
|
||||||
return -1;
|
|
||||||
|
|
||||||
// Used to exit now! exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (extralen > 0)
|
|
||||||
{
|
|
||||||
void *extradata = (void *) (((char *) block) + blen);
|
|
||||||
|
|
||||||
if (extralen != (tmplen = bio->readdata(extradata, extralen)))
|
|
||||||
{
|
|
||||||
std::ostringstream out;
|
|
||||||
out << "Error Completing Read (read ";
|
|
||||||
out << tmplen << "/" << extralen << ")" << std::endl;
|
|
||||||
std::cerr << out.str() ;
|
|
||||||
pqioutput(PQL_ALERT, pqistreamerzone, out.str());
|
|
||||||
|
|
||||||
pqiNotify *notify = getPqiNotify();
|
|
||||||
if (notify)
|
|
||||||
{
|
|
||||||
std::string title =
|
|
||||||
"Warning: Error Completing Read";
|
|
||||||
|
|
||||||
std::ostringstream msgout;
|
|
||||||
msgout << " **** WARNING **** \n";
|
|
||||||
msgout << "Retroshare has experienced an unexpected Read ERROR";
|
|
||||||
msgout << "\n";
|
|
||||||
msgout << "(M:" << maxlen << " B:" << blen;
|
|
||||||
msgout << " E:" << extralen << " R:" << tmplen << ")\n";
|
|
||||||
msgout << "\n";
|
|
||||||
msgout << "Please contact the developers.";
|
|
||||||
msgout << "\n";
|
|
||||||
|
|
||||||
std::string msg = msgout.str();
|
|
||||||
std::cerr << msg << std::endl ;
|
|
||||||
std::cerr << "block = "
|
|
||||||
<< (int)(((unsigned char*)block)[0]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[1]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[2]) << " "
|
|
||||||
<< (int)(((unsigned char*)block)[3]) << std::endl ;
|
|
||||||
// notify->AddSysMessage(0, RS_SYS_WARNING, title, msg);
|
|
||||||
}
|
|
||||||
bio->close();
|
|
||||||
return -1;
|
|
||||||
|
|
||||||
// if it is triggered ... need to modify code.
|
|
||||||
// XXXX Bug to fix!
|
|
||||||
//exit(1);
|
|
||||||
|
|
||||||
// error....
|
|
||||||
inReadBytes(readbytes);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
readbytes += extralen;
|
|
||||||
pktlen += extralen;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// create packet, based on header.
|
|
||||||
{
|
|
||||||
std::ostringstream out;
|
|
||||||
out << "Read Data Block -> Incoming Pkt(";
|
|
||||||
out << blen + extralen << ")" << std::endl;
|
|
||||||
// std::cerr << out.str() ;
|
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str());
|
|
||||||
}
|
|
||||||
|
|
||||||
// std::cerr << "Deserializing packet of size " << pktlen <<std::endl ;
|
|
||||||
RsItem *pkt = rsSerialiser->deserialise(block, &pktlen);
|
|
||||||
|
|
||||||
if ((pkt != NULL) && (0 < handleincomingitem(pkt)))
|
|
||||||
{
|
|
||||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone,
|
|
||||||
"Successfully Read a Packet!");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
pqioutput(PQL_ALERT, pqistreamerzone,
|
|
||||||
"Failed to handle Packet!");
|
|
||||||
inReadBytes(readbytes);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// std::cerr << "pqistreamer:: total bytes read = " << readbytes << std::endl ;
|
|
||||||
inReadBytes(readbytes);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int pqistreamer::handleincoming()
|
int pqistreamer::handleincoming()
|
||||||
{
|
{
|
||||||
int readbytes = 0;
|
int readbytes = 0;
|
||||||
@ -749,7 +572,8 @@ start_packet_read:
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// std::cerr << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3])
|
// std::cerr << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " "
|
||||||
|
// << (int)(((unsigned char*)block)[3]) << " "
|
||||||
// << (int)(((unsigned char*)block)[4]) << " "
|
// << (int)(((unsigned char*)block)[4]) << " "
|
||||||
// << (int)(((unsigned char*)block)[5]) << " "
|
// << (int)(((unsigned char*)block)[5]) << " "
|
||||||
// << (int)(((unsigned char*)block)[6]) << " "
|
// << (int)(((unsigned char*)block)[6]) << " "
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
|
|
||||||
// Only dependent on the base stuff.
|
// Only dependent on the base stuff.
|
||||||
#include "pqi/pqi_base.h"
|
#include "pqi/pqi_base.h"
|
||||||
|
#include "util/rsthreads.h"
|
||||||
|
|
||||||
#include <list>
|
#include <list>
|
||||||
|
|
||||||
@ -106,6 +107,9 @@ void inReadBytes(int );
|
|||||||
int avgLastUpdate; // TS from which these are measured.
|
int avgLastUpdate; // TS from which these are measured.
|
||||||
float avgReadCount;
|
float avgReadCount;
|
||||||
float avgSentCount;
|
float avgSentCount;
|
||||||
|
|
||||||
|
RsMutex streamerMtx ;
|
||||||
|
// pthread_t thread_id;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user