diff --git a/libretroshare/src/pqi/pqistreamer.cc b/libretroshare/src/pqi/pqistreamer.cc index b7396ed4b..11da04dae 100644 --- a/libretroshare/src/pqi/pqistreamer.cc +++ b/libretroshare/src/pqi/pqistreamer.cc @@ -60,6 +60,7 @@ pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in pkt_rpending = malloc(pkt_rpend_size); reading_state = reading_state_initial ; +// thread_id = pthread_self() ; // avoid uninitialized (and random) memory read. memset(pkt_rpending,0,pkt_rpend_size) ; @@ -92,6 +93,8 @@ pqistreamer::pqistreamer(RsSerialiser *rss, std::string id, BinInterface *bio_in pqistreamer::~pqistreamer() { + RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data + { std::ostringstream out; out << "pqistreamer::~pqistreamer()"; @@ -241,27 +244,31 @@ int pqistreamer::tick() } out << std::endl; - int total = 0; - for(it = out_pkt.begin(); it != out_pkt.end(); it++) { - total += getRsItemSize(*it); + RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data + int total = 0; + + for(it = out_pkt.begin(); it != out_pkt.end(); it++) + { + total += getRsItemSize(*it); + } + + out << "\t Out Packets [" << out_pkt.size() << "] => " << total; + out << " bytes" << std::endl; + + total = 0; + for(it = out_data.begin(); it != out_data.end(); it++) + { + total += getRsItemSize(*it); + } + + out << "\t Out Data [" << out_data.size() << "] => " << total; + out << " bytes" << std::endl; + + out << "\t Incoming [" << incoming.size() << "]"; + out << std::endl; } - out << "\t Out Packets [" << out_pkt.size() << "] => " << total; - out << " bytes" << std::endl; - - total = 0; - for(it = out_data.begin(); it != out_data.end(); it++) - { - total += getRsItemSize(*it); - } - - out << "\t Out Data [" << out_data.size() << "] => " << total; - out << " bytes" << std::endl; - - out << "\t Incoming [" << incoming.size() << "]"; - out << std::endl; - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str()); } @@ -296,6 +303,12 @@ int pqistreamer::status() int pqistreamer::queue_outpqi(RsItem *pqi) { + RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data + + // This is called by different threads, and by threads that are not the handleoutgoing thread, + // so it should be protected by a mutex !! + + { std::ostringstream out; out << "pqistreamer::queue_outpqi()"; @@ -306,6 +319,7 @@ int pqistreamer::queue_outpqi(RsItem *pqi) RsFileData *dta = dynamic_cast(pqi); bool isCntrl = (dta == NULL); +// std::cerr << "Thread (queue_outpqi): thread = " << pthread_self() << "isCntrl=" << isCntrl << std::endl ; uint32_t pktsize = rsSerialiser->size(pqi); void *ptr = malloc(pktsize); @@ -363,6 +377,8 @@ int pqistreamer::handleincomingitem(RsItem *pqi) int pqistreamer::handleoutgoing() { + RsStackMutex stack(streamerMtx) ; // lock out_pkt and out_data + { std::ostringstream out; out << "pqistreamer::handleoutgoing()"; @@ -481,199 +497,6 @@ int pqistreamer::handleoutgoing() /* Handles reading from input stream. */ -#ifdef OLD_VERSION -int pqistreamer::handleincoming() -{ - int readbytes = 0; - - { - std::ostringstream out; - out << "pqistreamer::handleincoming()"; - pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); - } - - if (!(bio->isactive())) - { - inReadBytes(readbytes); - return 0; - } - - // enough space to read any packet. - int maxlen = pkt_rpend_size; - void *block = pkt_rpending; - - // initial read size: basic packet. - int blen = getRsPktBaseSize(); - - int tmplen; - uint32_t pktlen; - int maxin = inAllowedBytes(); - - while((maxin > readbytes) && (bio->moretoread())) - { - // read the basic block (minimum packet size) - if (blen != (tmplen = bio->readdata(block, blen))) - { - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, - "pqistreamer::handleincoming() Didn't read BasePkt!"); - - // error.... (either blocked or failure) - inReadBytes(readbytes); - if (tmplen == 0) - { - - // most likely blocked! - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, - "pqistreamer::handleincoming() read blocked"); - - return 0; - } - else if (tmplen < 0) - { - // assume the worse, that - // the stream is bust ... and jump away. - pqioutput(PQL_WARNING, pqistreamerzone, - "pqistreamer::handleincoming() Error in bio read"); - return -1; - } - else // tmplen > 0 - { - // strange case.... - std::ostringstream out; - out << "pqistreamer::handleincoming() Incomplete "; - out << "(Strange) read of " << tmplen << " bytes"; - pqioutput(PQL_ALERT, pqistreamerzone, out.str()); - return -1; - } - } - readbytes += tmplen; - pktlen = tmplen; - - // workout how much more to read. - int extralen = getRsItemSize(block) - blen; - if (extralen > maxlen - blen) - { - pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!"); - - pqiNotify *notify = getPqiNotify(); - if (notify) - { - std::string title = - "Warning: Bad Packet Read"; - - std::ostringstream msgout; - msgout << " **** WARNING **** \n"; - msgout << "Retroshare has caught a BAD Packet Read"; - msgout << "\n"; - msgout << "This is normally caused by connecting to an"; - msgout << " OLD version of Retroshare"; - msgout << "\n"; - msgout << "(M:" << maxlen << " B:" << blen << " E:" << extralen << ")\n"; - msgout << "\n"; - msgout << "\n"; - msgout << "Please get your friends to upgrade to the latest version"; - msgout << "\n"; - msgout << "\n"; - msgout << "If you are sure the error was not caused by an old version"; - msgout << "\n"; - msgout << "Please report the problem to Retroshare's developers"; - msgout << "\n"; - - std::string msg = msgout.str(); - notify->AddSysMessage(0, RS_SYS_WARNING, title, msg); - } - bio->close(); - return -1; - - // Used to exit now! exit(1); - } - - if (extralen > 0) - { - void *extradata = (void *) (((char *) block) + blen); - - if (extralen != (tmplen = bio->readdata(extradata, extralen))) - { - std::ostringstream out; - out << "Error Completing Read (read "; - out << tmplen << "/" << extralen << ")" << std::endl; - std::cerr << out.str() ; - pqioutput(PQL_ALERT, pqistreamerzone, out.str()); - - pqiNotify *notify = getPqiNotify(); - if (notify) - { - std::string title = - "Warning: Error Completing Read"; - - std::ostringstream msgout; - msgout << " **** WARNING **** \n"; - msgout << "Retroshare has experienced an unexpected Read ERROR"; - msgout << "\n"; - msgout << "(M:" << maxlen << " B:" << blen; - msgout << " E:" << extralen << " R:" << tmplen << ")\n"; - msgout << "\n"; - msgout << "Please contact the developers."; - msgout << "\n"; - - std::string msg = msgout.str(); - std::cerr << msg << std::endl ; - std::cerr << "block = " - << (int)(((unsigned char*)block)[0]) << " " - << (int)(((unsigned char*)block)[1]) << " " - << (int)(((unsigned char*)block)[2]) << " " - << (int)(((unsigned char*)block)[3]) << std::endl ; - // notify->AddSysMessage(0, RS_SYS_WARNING, title, msg); - } - bio->close(); - return -1; - - // if it is triggered ... need to modify code. - // XXXX Bug to fix! - //exit(1); - - // error.... - inReadBytes(readbytes); - return -1; - } - - readbytes += extralen; - pktlen += extralen; - } - - - // create packet, based on header. - { - std::ostringstream out; - out << "Read Data Block -> Incoming Pkt("; - out << blen + extralen << ")" << std::endl; - // std::cerr << out.str() ; - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out.str()); - } - - // std::cerr << "Deserializing packet of size " << pktlen <deserialise(block, &pktlen); - - if ((pkt != NULL) && (0 < handleincomingitem(pkt))) - { - pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, - "Successfully Read a Packet!"); - } - else - { - pqioutput(PQL_ALERT, pqistreamerzone, - "Failed to handle Packet!"); - inReadBytes(readbytes); - return -1; - } - } - - // std::cerr << "pqistreamer:: total bytes read = " << readbytes << std::endl ; - inReadBytes(readbytes); - return 0; -} -#endif - int pqistreamer::handleincoming() { int readbytes = 0; @@ -749,7 +572,8 @@ start_packet_read: return -1; } } -// std::cerr << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) +// std::cerr << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " +// << (int)(((unsigned char*)block)[3]) << " " // << (int)(((unsigned char*)block)[4]) << " " // << (int)(((unsigned char*)block)[5]) << " " // << (int)(((unsigned char*)block)[6]) << " " diff --git a/libretroshare/src/pqi/pqistreamer.h b/libretroshare/src/pqi/pqistreamer.h index 800cd9c5e..74b99c1b5 100644 --- a/libretroshare/src/pqi/pqistreamer.h +++ b/libretroshare/src/pqi/pqistreamer.h @@ -29,6 +29,7 @@ // Only dependent on the base stuff. #include "pqi/pqi_base.h" +#include "util/rsthreads.h" #include @@ -106,6 +107,9 @@ void inReadBytes(int ); int avgLastUpdate; // TS from which these are measured. float avgReadCount; float avgSentCount; + + RsMutex streamerMtx ; +// pthread_t thread_id; };