mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-12-27 16:39:29 -05:00
using BinToHex to display mem blocks in pqistreamer debug
This commit is contained in:
parent
18e9e1c2db
commit
dd81ce3bf3
@ -624,335 +624,307 @@ int pqistreamer::handleoutgoing_locked()
|
||||
*/
|
||||
int pqistreamer::handleincoming_locked()
|
||||
{
|
||||
int readbytes = 0;
|
||||
static const int max_failed_read_attempts = 2000 ;
|
||||
int readbytes = 0;
|
||||
static const int max_failed_read_attempts = 2000 ;
|
||||
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming_locked()");
|
||||
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, "pqistreamer::handleincoming_locked()");
|
||||
#endif
|
||||
|
||||
if(!(mBio->isactive()))
|
||||
{
|
||||
mReading_state = reading_state_initial ;
|
||||
free_rpend_locked();
|
||||
return 0;
|
||||
}
|
||||
if(!(mBio->isactive()))
|
||||
{
|
||||
mReading_state = reading_state_initial ;
|
||||
free_rpend_locked();
|
||||
return 0;
|
||||
}
|
||||
else
|
||||
allocate_rpend_locked();
|
||||
allocate_rpend_locked();
|
||||
|
||||
// enough space to read any packet.
|
||||
int maxlen = mPkt_rpend_size;
|
||||
void *block = mPkt_rpending;
|
||||
// enough space to read any packet.
|
||||
int maxlen = mPkt_rpend_size;
|
||||
void *block = mPkt_rpending;
|
||||
|
||||
// initial read size: basic packet.
|
||||
int blen = getRsPktBaseSize(); // this is valid for both packet slices and normal un-sliced packets (same header size)
|
||||
// initial read size: basic packet.
|
||||
int blen = getRsPktBaseSize(); // this is valid for both packet slices and normal un-sliced packets (same header size)
|
||||
|
||||
int maxin = inAllowedBytes_locked();
|
||||
int maxin = inAllowedBytes_locked();
|
||||
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << mReading_state << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << mReading_state << std::endl ;
|
||||
#endif
|
||||
switch(mReading_state)
|
||||
{
|
||||
case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ;
|
||||
case reading_state_packet_started: /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ;
|
||||
}
|
||||
switch(mReading_state)
|
||||
{
|
||||
case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ;
|
||||
case reading_state_packet_started: /*std::cerr << "jumping to middle" << std::endl;*/ goto continue_packet ;
|
||||
}
|
||||
|
||||
start_packet_read:
|
||||
{ // scope to ensure variable visibility
|
||||
// read the basic block (minimum packet size)
|
||||
int tmplen;
|
||||
{ // scope to ensure variable visibility
|
||||
// read the basic block (minimum packet size)
|
||||
int tmplen;
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "starting packet" << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "starting packet" << std::endl ;
|
||||
#endif
|
||||
memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads.
|
||||
memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads.
|
||||
|
||||
if (blen != (tmplen = mBio->readdata(block, blen)))
|
||||
{
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() Didn't read BasePkt!");
|
||||
if (blen != (tmplen = mBio->readdata(block, blen)))
|
||||
{
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() Didn't read BasePkt!");
|
||||
|
||||
// error.... (either blocked or failure)
|
||||
if (tmplen == 0)
|
||||
{
|
||||
// error.... (either blocked or failure)
|
||||
if (tmplen == 0)
|
||||
{
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
// most likely blocked!
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked");
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ;
|
||||
// most likely blocked!
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked");
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ;
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
else if (tmplen < 0)
|
||||
{
|
||||
// Most likely it is that the packet is pending but could not be read by pqissl because of stream flow.
|
||||
// So we return without an error, and leave the machine state in 'start_read'.
|
||||
//
|
||||
//pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read");
|
||||
return 0;
|
||||
}
|
||||
else if (tmplen < 0)
|
||||
{
|
||||
// Most likely it is that the packet is pending but could not be read by pqissl because of stream flow.
|
||||
// So we return without an error, and leave the machine state in 'start_read'.
|
||||
//
|
||||
//pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read");
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << mReading_state << std::endl ;
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
else // tmplen > 0
|
||||
{
|
||||
// strange case....This should never happen as partial reads are handled by pqissl below.
|
||||
return 0;
|
||||
}
|
||||
else // tmplen > 0
|
||||
{
|
||||
// strange case....This should never happen as partial reads are handled by pqissl below.
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::string out = "pqistreamer::handleincoming() Incomplete ";
|
||||
rs_sprintf_append(out, "(Strange) read of %d bytes", tmplen);
|
||||
pqioutput(PQL_ALERT, pqistreamerzone, out);
|
||||
std::string out = "pqistreamer::handleincoming() Incomplete ";
|
||||
rs_sprintf_append(out, "(Strange) read of %d bytes", tmplen);
|
||||
pqioutput(PQL_ALERT, pqistreamerzone, out);
|
||||
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ;
|
||||
#endif
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " "
|
||||
<< (int)(((unsigned char*)block)[3]) << " "
|
||||
<< (int)(((unsigned char*)block)[4]) << " "
|
||||
<< (int)(((unsigned char*)block)[5]) << " "
|
||||
<< (int)(((unsigned char*)block)[6]) << " "
|
||||
<< (int)(((unsigned char*)block)[7]) << " " << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << RsUtil::BinToHex(block,8) << std::endl;
|
||||
#endif
|
||||
|
||||
readbytes += blen;
|
||||
mReading_state = reading_state_packet_started ;
|
||||
mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
|
||||
}
|
||||
readbytes += blen;
|
||||
mReading_state = reading_state_packet_started ;
|
||||
mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
|
||||
}
|
||||
continue_packet:
|
||||
{
|
||||
// workout how much more to read.
|
||||
{
|
||||
// workout how much more to read.
|
||||
|
||||
bool is_partial_packet = false ;
|
||||
bool is_packet_starting = (((char*)block)[0] == 0x11) ;
|
||||
bool is_packet_ending = (((char*)block)[0] == 0x12) ;
|
||||
bool is_partial_packet = false ;
|
||||
bool is_packet_starting = (((char*)block)[0] == 0x11) ;
|
||||
bool is_packet_ending = (((char*)block)[0] == 0x12) ;
|
||||
|
||||
uint32_t extralen =0;
|
||||
uint32_t slice_offset = 0 ;
|
||||
uint32_t slice_packet_id =0;
|
||||
uint32_t extralen =0;
|
||||
uint32_t slice_offset = 0 ;
|
||||
uint32_t slice_packet_id =0;
|
||||
|
||||
if( ((char*)block)[0] == 0x10 || ((char*)block)[0] == 0x11 || ((char*)block)[0] == 0x12)
|
||||
{
|
||||
extralen = (uint32_t(((uint8_t*)block)[6]) << 8) + (uint32_t(((uint8_t*)block)[7]));
|
||||
slice_offset = (uint32_t(((uint8_t*)block)[5]) << 4) + (uint32_t(((uint8_t*)block)[4]) << 12);
|
||||
slice_packet_id = (uint32_t(((uint8_t*)block)[3]) << 0) + (uint32_t(((uint8_t*)block)[2]) << 8) + (uint32_t(((uint8_t*)block)[1]) << 16);
|
||||
if( ((char*)block)[0] == 0x10 || ((char*)block)[0] == 0x11 || ((char*)block)[0] == 0x12)
|
||||
{
|
||||
extralen = (uint32_t(((uint8_t*)block)[6]) << 8) + (uint32_t(((uint8_t*)block)[7]));
|
||||
slice_offset = (uint32_t(((uint8_t*)block)[5]) << 4) + (uint32_t(((uint8_t*)block)[4]) << 12);
|
||||
slice_packet_id = (uint32_t(((uint8_t*)block)[3]) << 0) + (uint32_t(((uint8_t*)block)[2]) << 8) + (uint32_t(((uint8_t*)block)[1]) << 16);
|
||||
|
||||
std::cerr << "Reading from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << slice_packet_id << ", len=" << extralen << ", offset=" << slice_offset << std::endl;
|
||||
is_partial_packet = true ;
|
||||
}
|
||||
else
|
||||
extralen = getRsItemSize(block) - blen;
|
||||
std::cerr << "Reading from mem block " << RsUtil::BinToHex((char*)block,8) << ": packet_id=" << slice_packet_id << ", len=" << extralen << ", offset=" << slice_offset << std::endl;
|
||||
is_partial_packet = true ;
|
||||
}
|
||||
else
|
||||
extralen = getRsItemSize(block) - blen;
|
||||
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet getRsItemSize(block) = " << getRsItemSize(block) << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet extralen = " << extralen << std::endl ;
|
||||
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) << " "
|
||||
<< (int)(((unsigned char*)block)[4]) << " "
|
||||
<< (int)(((unsigned char*)block)[5]) << " "
|
||||
<< (int)(((unsigned char*)block)[6]) << " "
|
||||
<< (int)(((unsigned char*)block)[7]) << " " << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << RsUtil::BinToHex(block,8) << std::endl;
|
||||
#endif
|
||||
if (extralen > maxlen - blen)
|
||||
{
|
||||
pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!");
|
||||
if (extralen > maxlen - blen)
|
||||
{
|
||||
pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!");
|
||||
|
||||
p3Notify *notify = RsServer::notify();
|
||||
if (notify)
|
||||
{
|
||||
std::string title =
|
||||
"Warning: Bad Packet Read";
|
||||
p3Notify *notify = RsServer::notify();
|
||||
if (notify)
|
||||
{
|
||||
std::string title =
|
||||
"Warning: Bad Packet Read";
|
||||
|
||||
std::string msg;
|
||||
msg = " **** WARNING **** \n";
|
||||
msg += "Retroshare has caught a BAD Packet Read";
|
||||
msg += "\n";
|
||||
msg += "This is normally caused by connecting to an";
|
||||
msg += " OLD version of Retroshare";
|
||||
msg += "\n";
|
||||
rs_sprintf_append(msg, "(M:%d B:%d E:%d)\n", maxlen, blen, extralen);
|
||||
msg += "\n";
|
||||
rs_sprintf_append(msg, "block = %d %d %d %d %d %d %d %d\n",
|
||||
(int)(((unsigned char*)block)[0]),
|
||||
(int)(((unsigned char*)block)[1]),
|
||||
(int)(((unsigned char*)block)[2]),
|
||||
(int)(((unsigned char*)block)[3]),
|
||||
(int)(((unsigned char*)block)[4]),
|
||||
(int)(((unsigned char*)block)[5]),
|
||||
(int)(((unsigned char*)block)[6]),
|
||||
(int)(((unsigned char*)block)[7])) ;
|
||||
msg += "\n";
|
||||
msg += "Please get your friends to upgrade to the latest version";
|
||||
msg += "\n";
|
||||
msg += "\n";
|
||||
msg += "If you are sure the error was not caused by an old version";
|
||||
msg += "\n";
|
||||
msg += "Please report the problem to Retroshare's developers";
|
||||
msg += "\n";
|
||||
std::string msg;
|
||||
msg = " **** WARNING **** \n";
|
||||
msg += "Retroshare has caught a BAD Packet Read";
|
||||
msg += "\n";
|
||||
msg += "This is normally caused by connecting to an";
|
||||
msg += " OLD version of Retroshare";
|
||||
msg += "\n";
|
||||
rs_sprintf_append(msg, "(M:%d B:%d E:%d)\n", maxlen, blen, extralen);
|
||||
msg += "\n";
|
||||
msg += "block = " ;
|
||||
msg += RsUtil::BinToHex((char*)block,8);
|
||||
|
||||
notify->AddLogMessage(0, RS_SYS_WARNING, title, msg);
|
||||
msg += "\n";
|
||||
msg += "Please get your friends to upgrade to the latest version";
|
||||
msg += "\n";
|
||||
msg += "\n";
|
||||
msg += "If you are sure the error was not caused by an old version";
|
||||
msg += "\n";
|
||||
msg += "Please report the problem to Retroshare's developers";
|
||||
msg += "\n";
|
||||
|
||||
std::cerr << "pqistreamer::handle_incoming() ERROR: Read Packet too Big" << std::endl;
|
||||
std::cerr << msg;
|
||||
std::cerr << std::endl;
|
||||
notify->AddLogMessage(0, RS_SYS_WARNING, title, msg);
|
||||
|
||||
}
|
||||
mBio->close();
|
||||
mReading_state = reading_state_initial ; // restart at state 1.
|
||||
mFailed_read_attempts = 0 ;
|
||||
return -1;
|
||||
std::cerr << "pqistreamer::handle_incoming() ERROR: Read Packet too Big" << std::endl;
|
||||
std::cerr << msg;
|
||||
std::cerr << std::endl;
|
||||
|
||||
// Used to exit now! exit(1);
|
||||
}
|
||||
}
|
||||
mBio->close();
|
||||
mReading_state = reading_state_initial ; // restart at state 1.
|
||||
mFailed_read_attempts = 0 ;
|
||||
return -1;
|
||||
|
||||
if (extralen > 0)
|
||||
{
|
||||
void *extradata = (void *) (((char *) block) + blen);
|
||||
int tmplen ;
|
||||
// Used to exit now! exit(1);
|
||||
}
|
||||
|
||||
// Don't reset the block now! If pqissl is in the middle of a multiple-chunk
|
||||
// packet (larger than 16384 bytes), and pqistreamer jumped directly yo
|
||||
// continue_packet:, then readdata is going to write after the beginning of
|
||||
// extradata, yet not exactly at start -> the start of the packet would be wiped out.
|
||||
//
|
||||
// so, don't do that:
|
||||
// memset( extradata,0,extralen ) ;
|
||||
if (extralen > 0)
|
||||
{
|
||||
void *extradata = (void *) (((char *) block) + blen);
|
||||
int tmplen ;
|
||||
|
||||
if (extralen != (tmplen = mBio->readdata(extradata, extralen)))
|
||||
{
|
||||
// Don't reset the block now! If pqissl is in the middle of a multiple-chunk
|
||||
// packet (larger than 16384 bytes), and pqistreamer jumped directly yo
|
||||
// continue_packet:, then readdata is going to write after the beginning of
|
||||
// extradata, yet not exactly at start -> the start of the packet would be wiped out.
|
||||
//
|
||||
// so, don't do that:
|
||||
// memset( extradata,0,extralen ) ;
|
||||
|
||||
if (extralen != (tmplen = mBio->readdata(extradata, extralen)))
|
||||
{
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
if(tmplen > 0)
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ;
|
||||
if(tmplen > 0)
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ;
|
||||
#endif
|
||||
|
||||
if(++mFailed_read_attempts > max_failed_read_attempts)
|
||||
{
|
||||
std::string out;
|
||||
rs_sprintf(out, "Error Completing Read (read %d/%d)", tmplen, extralen);
|
||||
std::cerr << out << std::endl ;
|
||||
pqioutput(PQL_ALERT, pqistreamerzone, out);
|
||||
if(++mFailed_read_attempts > max_failed_read_attempts)
|
||||
{
|
||||
std::string out;
|
||||
rs_sprintf(out, "Error Completing Read (read %d/%d)", tmplen, extralen);
|
||||
std::cerr << out << std::endl ;
|
||||
pqioutput(PQL_ALERT, pqistreamerzone, out);
|
||||
|
||||
p3Notify *notify = RsServer::notify();
|
||||
if (notify)
|
||||
{
|
||||
std::string title = "Warning: Error Completing Read";
|
||||
p3Notify *notify = RsServer::notify();
|
||||
if (notify)
|
||||
{
|
||||
std::string title = "Warning: Error Completing Read";
|
||||
|
||||
std::string msgout;
|
||||
msgout = " **** WARNING **** \n";
|
||||
msgout += "Retroshare has experienced an unexpected Read ERROR";
|
||||
msgout += "\n";
|
||||
rs_sprintf_append(msgout, "(M:%d B:%d E:%d R:%d)\n", maxlen, blen, extralen, tmplen);
|
||||
msgout += "\n";
|
||||
msgout += "Note: this error might as well happen (rarely) when a peer disconnects in between a transmission of a large packet.\n";
|
||||
msgout += "If it happens manny time, please contact the developers, and send them these numbers:";
|
||||
msgout += "\n";
|
||||
std::string msgout;
|
||||
msgout = " **** WARNING **** \n";
|
||||
msgout += "Retroshare has experienced an unexpected Read ERROR";
|
||||
msgout += "\n";
|
||||
rs_sprintf_append(msgout, "(M:%d B:%d E:%d R:%d)\n", maxlen, blen, extralen, tmplen);
|
||||
msgout += "\n";
|
||||
msgout += "Note: this error might as well happen (rarely) when a peer disconnects in between a transmission of a large packet.\n";
|
||||
msgout += "If it happens manny time, please contact the developers, and send them these numbers:";
|
||||
msgout += "\n";
|
||||
|
||||
rs_sprintf_append(msgout, "block = %d %d %d %d %d %d %d %d\n",
|
||||
(int)(((unsigned char*)block)[0]),
|
||||
(int)(((unsigned char*)block)[1]),
|
||||
(int)(((unsigned char*)block)[2]),
|
||||
(int)(((unsigned char*)block)[3]),
|
||||
(int)(((unsigned char*)block)[4]),
|
||||
(int)(((unsigned char*)block)[5]),
|
||||
(int)(((unsigned char*)block)[6]),
|
||||
(int)(((unsigned char*)block)[7]));
|
||||
msgout += "block = " ;
|
||||
msgout += RsUtil::BinToHex((char*)block,8) + "\n" ;
|
||||
|
||||
//notify->AddSysMessage(0, RS_SYS_WARNING, title, msgout.str());
|
||||
std::cerr << msgout << std::endl;
|
||||
}
|
||||
|
||||
std::cerr << msgout << std::endl;
|
||||
}
|
||||
|
||||
mBio->close();
|
||||
mReading_state = reading_state_initial ; // restart at state 1.
|
||||
mFailed_read_attempts = 0 ;
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
mBio->close();
|
||||
mReading_state = reading_state_initial ; // restart at state 1.
|
||||
mFailed_read_attempts = 0 ;
|
||||
return -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << mReading_state << std::endl ;
|
||||
#endif
|
||||
return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon.
|
||||
// we assume readdata() returned either -1 or the complete read size.
|
||||
}
|
||||
}
|
||||
return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon.
|
||||
// we assume readdata() returned either -1 or the complete read size.
|
||||
}
|
||||
}
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << (int)(((unsigned char*)extradata)[0]) << " " << (int)(((unsigned char*)extradata)[1]) << " " << (int)(((unsigned char*)extradata)[2]) << " " << (int)(((unsigned char*)extradata)[3]) << " "
|
||||
<< (int)(((unsigned char*)extradata)[4]) << " "
|
||||
<< (int)(((unsigned char*)extradata)[5]) << " "
|
||||
<< (int)(((unsigned char*)extradata)[6]) << " "
|
||||
<< (int)(((unsigned char*)extradata)[7]) << " " << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << mReading_state << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << RsUtil::BinToHex(extradata,8) << std::endl;
|
||||
#endif
|
||||
|
||||
mFailed_read_attempts = 0 ;
|
||||
readbytes += extralen;
|
||||
}
|
||||
mFailed_read_attempts = 0 ;
|
||||
readbytes += extralen;
|
||||
}
|
||||
|
||||
// create packet, based on header.
|
||||
// create packet, based on header.
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
{
|
||||
std::string out;
|
||||
rs_sprintf(out, "Read Data Block -> Incoming Pkt(%d)", blen + extralen);
|
||||
//std::cerr << out ;
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
|
||||
}
|
||||
{
|
||||
std::string out;
|
||||
rs_sprintf(out, "Read Data Block -> Incoming Pkt(%d)", blen + extralen);
|
||||
//std::cerr << out ;
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, out);
|
||||
}
|
||||
#endif
|
||||
|
||||
// std::cerr << "Deserializing packet of size " << pktlen <<std::endl ;
|
||||
// std::cerr << "Deserializing packet of size " << pktlen <<std::endl ;
|
||||
|
||||
uint32_t pktlen = blen+extralen ;
|
||||
uint32_t pktlen = blen+extralen ;
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ;
|
||||
std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ;
|
||||
#endif
|
||||
RsItem *pkt ;
|
||||
std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl;
|
||||
RsItem *pkt ;
|
||||
std::cerr << "Got packet with header " << RsUtil::BinToHex((char*)block,8) << std::endl;
|
||||
|
||||
if(is_partial_packet)
|
||||
{
|
||||
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
|
||||
if(is_partial_packet)
|
||||
{
|
||||
std::cerr << "Inputing partial packet " << RsUtil::BinToHex((char*)block,8) << std::endl;
|
||||
|
||||
pkt = addPartialPacket(block,pktlen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ;
|
||||
}
|
||||
else
|
||||
pkt = mRsSerialiser->deserialise(block, &pktlen);
|
||||
pkt = addPartialPacket(block,pktlen,slice_offset,slice_packet_id,is_packet_starting,is_packet_ending) ;
|
||||
}
|
||||
else
|
||||
pkt = mRsSerialiser->deserialise(block, &pktlen);
|
||||
|
||||
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen)))
|
||||
{
|
||||
if ((pkt != NULL) && (0 < handleincomingitem_locked(pkt,pktlen)))
|
||||
{
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!");
|
||||
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "Successfully Read a Packet!");
|
||||
#endif
|
||||
inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered.
|
||||
}
|
||||
else if (!is_partial_packet)
|
||||
{
|
||||
inReadBytes_locked(pktlen); // only count deserialised packets, because that's what is actually been transfered.
|
||||
}
|
||||
else if (!is_partial_packet)
|
||||
{
|
||||
#ifdef DEBUG_PQISTREAMER
|
||||
pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!");
|
||||
pqioutput(PQL_ALERT, pqistreamerzone, "Failed to handle Packet!");
|
||||
#endif
|
||||
std::cerr << "Incoming Packet could not be deserialised:" << std::endl;
|
||||
std::cerr << " Incoming peer id: " << PeerId() << std::endl;
|
||||
if(pktlen >= 8)
|
||||
std::cerr << " Packet header : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl;
|
||||
if(pktlen > 8)
|
||||
std::cerr << " Packet data : " << RsUtil::BinToHex((unsigned char*)block+8,std::min(50u,pktlen-8)) << ((pktlen>58)?"...":"") << std::endl;
|
||||
}
|
||||
std::cerr << "Incoming Packet could not be deserialised:" << std::endl;
|
||||
std::cerr << " Incoming peer id: " << PeerId() << std::endl;
|
||||
if(pktlen >= 8)
|
||||
std::cerr << " Packet header : " << RsUtil::BinToHex((unsigned char*)block,8) << std::endl;
|
||||
if(pktlen > 8)
|
||||
std::cerr << " Packet data : " << RsUtil::BinToHex((unsigned char*)block+8,std::min(50u,pktlen-8)) << ((pktlen>58)?"...":"") << std::endl;
|
||||
}
|
||||
|
||||
mReading_state = reading_state_initial ; // restart at state 1.
|
||||
mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
|
||||
}
|
||||
mReading_state = reading_state_initial ; // restart at state 1.
|
||||
mFailed_read_attempts = 0 ; // reset failed read, as the packet has been totally read.
|
||||
}
|
||||
|
||||
if(maxin > readbytes && mBio->moretoread(0))
|
||||
goto start_packet_read ;
|
||||
if(maxin > readbytes && mBio->moretoread(0))
|
||||
goto start_packet_read ;
|
||||
|
||||
#ifdef DEBUG_TRANSFERS
|
||||
if (readbytes >= maxin)
|
||||
{
|
||||
std::cerr << "pqistreamer::handleincoming() Stopped reading as readbytes >= maxin. Read " << readbytes << " bytes ";
|
||||
std::cerr << std::endl;
|
||||
}
|
||||
if (readbytes >= maxin)
|
||||
{
|
||||
std::cerr << "pqistreamer::handleincoming() Stopped reading as readbytes >= maxin. Read " << readbytes << " bytes ";
|
||||
std::cerr << std::endl;
|
||||
}
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
RsItem *pqistreamer::addPartialPacket(const void *block,uint32_t len,uint32_t slice_offset,uint32_t slice_packet_id,bool is_packet_starting,bool is_packet_ending)
|
||||
|
Loading…
Reference in New Issue
Block a user