Hope I finally corrected the connexion + one-way bug

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1396 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2009-07-21 20:14:31 +00:00
parent 11001bd1d4
commit 4685fadbeb
4 changed files with 81 additions and 31 deletions

View File

@ -139,6 +139,10 @@ static int initLib = 0;
// setup cipher lists. // setup cipher lists.
SSL_CTX_set_cipher_list(sslctx, "DEFAULT"); SSL_CTX_set_cipher_list(sslctx, "DEFAULT");
// setup flag against read errors
// SSL_CTX_set_mode(sslctx,SSL_MODE_AUTO_RETRY);
// SSL_CTX_set_mode(sslctx,SSL_MODE_ENABLE_PARTIAL_WRITE) ;
// certificates (Set Local Server Certificate). // certificates (Set Local Server Certificate).
FILE *ownfp = fopen(cert_file, "r"); FILE *ownfp = fopen(cert_file, "r");
if (ownfp == NULL) if (ownfp == NULL)

View File

@ -235,6 +235,7 @@ int pqissl::reset()
ssl_connection = NULL; ssl_connection = NULL;
sameLAN = false; sameLAN = false;
n_read_zero = 0; n_read_zero = 0;
total_len = 0 ;
if (neededReset) if (neededReset)
{ {
@ -1341,6 +1342,9 @@ int pqissl::senddata(void *data, int len)
{ {
int tmppktlen ; int tmppktlen ;
#ifdef DEBUG_PQISSL
std::cout << "Sending data thread=" << pthread_self() << ", ssl=" << (void*)this << ", size=" << len << std::endl ;
#endif
tmppktlen = SSL_write(ssl_connection, data, len) ; tmppktlen = SSL_write(ssl_connection, data, len) ;
if (len != tmppktlen) if (len != tmppktlen)
@ -1401,7 +1405,9 @@ int pqissl::senddata(void *data, int len)
int pqissl::readdata(void *data, int len) int pqissl::readdata(void *data, int len)
{ {
int total_len = 0 ; #ifdef DEBUG_PQISSL
std::cout << "Reading data thread=" << pthread_self() << ", ssl=" << (void*)this << std::endl ;
#endif
// There is a do, because packets can be splitted into multiple ssl buffers // There is a do, because packets can be splitted into multiple ssl buffers
// when they are larger than 16384 bytes. Such packets have to be read in // when they are larger than 16384 bytes. Such packets have to be read in
@ -1410,7 +1416,22 @@ int pqissl::readdata(void *data, int len)
{ {
int tmppktlen ; int tmppktlen ;
#ifdef DEBUG_PQISSL
std::cerr << "calling SSL_read. len=" << len << ", total_len=" << total_len << std::endl ;
#endif
tmppktlen = SSL_read(ssl_connection, (void*)((unsigned long int)data+(unsigned long int)total_len), len-total_len) ; tmppktlen = SSL_read(ssl_connection, (void*)((unsigned long int)data+(unsigned long int)total_len), len-total_len) ;
#ifdef DEBUG_PQISSL
std::cerr << "have read " << tmppktlen << " bytes" << std::endl ;
std::cerr << "data[0] = "
<< (int)((uint8_t*)data)[0] << " "
<< (int)((uint8_t*)data)[1] << " "
<< (int)((uint8_t*)data)[2] << " "
<< (int)((uint8_t*)data)[3] << " "
<< (int)((uint8_t*)data)[4] << " "
<< (int)((uint8_t*)data)[5] << " "
<< (int)((uint8_t*)data)[6] << " "
<< (int)((uint8_t*)data)[7] << std::endl ;
#endif
// Need to catch errors..... // Need to catch errors.....
// //
@ -1501,6 +1522,10 @@ int pqissl::readdata(void *data, int len)
total_len+=tmppktlen ; total_len+=tmppktlen ;
} while(total_len < len) ; } while(total_len < len) ;
#ifdef DEBUG_PQISSL
std::cerr << "pqissl: have read data of length " << total_len << ", expected is " << len << std::endl ;
#endif
if (len != total_len) if (len != total_len)
{ {
std::ostringstream out; std::ostringstream out;
@ -1511,6 +1536,7 @@ int pqissl::readdata(void *data, int len)
std::cerr << out.str() ; std::cerr << out.str() ;
rslog(RSL_WARNING, pqisslzone, out.str()); rslog(RSL_WARNING, pqisslzone, out.str());
} }
total_len = 0 ; // reset the packet pointer as we have finished a packet.
n_read_zero = 0; n_read_zero = 0;
return len;//tmppktlen; return len;//tmppktlen;
} }

View File

@ -181,6 +181,7 @@ virtual int net_internal_fcntl_nonblock(int fd) { return unix_fcntl_nonblock(fd)
void *readpkt; void *readpkt;
int pktlen; int pktlen;
int total_len ; // saves the reading state accross successive calls.
int attempt_ts; int attempt_ts;

View File

@ -194,7 +194,6 @@ RsItem *pqistreamer::GetItem()
// // PQInterface // // PQInterface
int pqistreamer::tick() int pqistreamer::tick()
{ {
// std::cerr << "enterign tick, state = " << reading_state << std::endl ;
{ {
std::ostringstream out; std::ostringstream out;
out << "pqistreamer::tick()"; out << "pqistreamer::tick()";
@ -205,9 +204,7 @@ int pqistreamer::tick()
pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str()); pqioutput(PQL_DEBUG_ALL, pqistreamerzone, out.str());
} }
// std::cerr << "calling bio-> tick, state = " << reading_state << std::endl ;
bio->tick(); bio->tick();
// std::cerr << "after bio-> tick, state = " << reading_state << std::endl ;
/* short circuit everything is bio isn't active */ /* short circuit everything is bio isn't active */
if (!(bio->isactive())) if (!(bio->isactive()))
@ -220,11 +217,8 @@ int pqistreamer::tick()
* that incoming will not * that incoming will not
*/ */
// std::cerr << "calling handle incoming, state = " << reading_state << std::endl ;
handleincoming(); handleincoming();
// std::cerr << "returned from handle incoming, state = " << reading_state << std::endl ;
handleoutgoing(); handleoutgoing();
// std::cerr << "returned from handle outgoing, state = " << reading_state << std::endl ;
/* give details of the packets */ /* give details of the packets */
{ {
@ -308,11 +302,13 @@ int pqistreamer::queue_outpqi(RsItem *pqi)
// This is called by different threads, and by threads that are not the handleoutgoing thread, // This is called by different threads, and by threads that are not the handleoutgoing thread,
// so it should be protected by a mutex !! // so it should be protected by a mutex !!
#ifdef DEBUG_PQISTREAMER
if(dynamic_cast<RsFileData*>(pqi)!=NULL && (bio_flags & BIN_FLAGS_NO_DELETE)) if(dynamic_cast<RsFileData*>(pqi)!=NULL && (bio_flags & BIN_FLAGS_NO_DELETE))
{ {
std::cerr << "Having file data with flags = " << bio_flags << std::endl ; std::cerr << "Having file data with flags = " << bio_flags << std::endl ;
*(int*)0x0=1 ; *(int*)0x0=1 ;
} }
#endif
{ {
std::ostringstream out; std::ostringstream out;
@ -324,12 +320,9 @@ int pqistreamer::queue_outpqi(RsItem *pqi)
RsFileData *dta = dynamic_cast<RsFileData *>(pqi); RsFileData *dta = dynamic_cast<RsFileData *>(pqi);
bool isCntrl = (dta == NULL); bool isCntrl = (dta == NULL);
// std::cerr << "Thread (queue_outpqi): thread = " << pthread_self() << "isCntrl=" << isCntrl << std::endl ;
uint32_t pktsize = rsSerialiser->size(pqi); uint32_t pktsize = rsSerialiser->size(pqi);
void *ptr = malloc(pktsize); void *ptr = malloc(pktsize);
// std::cerr << "serializing packet of size " << pktsize << std::endl ;
if (rsSerialiser->serialise(pqi, ptr, &pktsize)) if (rsSerialiser->serialise(pqi, ptr, &pktsize))
{ {
if (isCntrl) if (isCntrl)
@ -529,7 +522,9 @@ int pqistreamer::handleincoming()
int maxin = inAllowedBytes(); int maxin = inAllowedBytes();
// std::cerr << "reading state = " << reading_state << std::endl ; #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "reading state = " << reading_state << std::endl ;
#endif
switch(reading_state) switch(reading_state)
{ {
case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ; case reading_state_initial: /*std::cerr << "jumping to start" << std::endl; */ goto start_packet_read ;
@ -540,7 +535,9 @@ start_packet_read:
{ // scope to ensure variable visibility { // scope to ensure variable visibility
// read the basic block (minimum packet size) // read the basic block (minimum packet size)
int tmplen; int tmplen;
// std::cerr << "starting packet" << std::endl ; #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "starting packet" << std::endl ;
#endif
memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads. memset(block,0,blen) ; // reset the block, to avoid uninitialized memory reads.
if (blen != (tmplen = bio->readdata(block, blen))) if (blen != (tmplen = bio->readdata(block, blen)))
@ -554,7 +551,9 @@ start_packet_read:
{ {
// most likely blocked! // most likely blocked!
pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked"); pqioutput(PQL_DEBUG_BASIC, pqistreamerzone, "pqistreamer::handleincoming() read blocked");
// std::cerr << "given up 1" << std::endl ; #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 1" << std::endl ;
#endif
return 0; return 0;
} }
else if (tmplen < 0) else if (tmplen < 0)
@ -563,7 +562,9 @@ start_packet_read:
// So we return without an error, and leave the machine state in 'start_read'. // So we return without an error, and leave the machine state in 'start_read'.
// //
//pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read"); //pqioutput(PQL_WARNING, pqistreamerzone, "pqistreamer::handleincoming() Error in bio read");
// std::cerr << "given up 2, state = " << reading_state << std::endl ; #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 2, state = " << reading_state << std::endl ;
#endif
return 0; return 0;
} }
else // tmplen > 0 else // tmplen > 0
@ -573,16 +574,20 @@ start_packet_read:
out << "pqistreamer::handleincoming() Incomplete "; out << "pqistreamer::handleincoming() Incomplete ";
out << "(Strange) read of " << tmplen << " bytes"; out << "(Strange) read of " << tmplen << " bytes";
pqioutput(PQL_ALERT, pqistreamerzone, out.str()); pqioutput(PQL_ALERT, pqistreamerzone, out.str());
// std::cerr << "given up 3" << std::endl ; #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 3" << std::endl ;
#endif
return -1; return -1;
} }
} }
// std::cerr << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " #ifdef DEBUG_PQISTREAMER
// << (int)(((unsigned char*)block)[3]) << " " std::cerr << "[" << (void*)pthread_self() << "] " << "block 0 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " "
// << (int)(((unsigned char*)block)[4]) << " " << (int)(((unsigned char*)block)[3]) << " "
// << (int)(((unsigned char*)block)[5]) << " " << (int)(((unsigned char*)block)[4]) << " "
// << (int)(((unsigned char*)block)[6]) << " " << (int)(((unsigned char*)block)[5]) << " "
// << (int)(((unsigned char*)block)[7]) << " " << std::endl ; << (int)(((unsigned char*)block)[6]) << " "
<< (int)(((unsigned char*)block)[7]) << " " << std::endl ;
#endif
readbytes += blen; readbytes += blen;
reading_state = reading_state_packet_started ; reading_state = reading_state_packet_started ;
@ -593,12 +598,14 @@ continue_packet:
// workout how much more to read. // workout how much more to read.
int extralen = getRsItemSize(block) - blen; int extralen = getRsItemSize(block) - blen;
// std::cerr << "continuing packet state=" << reading_state << std::endl ; #ifdef DEBUG_PQISTREAMER
// std::cerr << "block 1 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << reading_state << std::endl ;
// << (int)(((unsigned char*)block)[4]) << " " std::cerr << "[" << (void*)pthread_self() << "] " << "block 1 : " << (int)(((unsigned char*)block)[0]) << " " << (int)(((unsigned char*)block)[1]) << " " << (int)(((unsigned char*)block)[2]) << " " << (int)(((unsigned char*)block)[3]) << " "
// << (int)(((unsigned char*)block)[5]) << " " << (int)(((unsigned char*)block)[4]) << " "
// << (int)(((unsigned char*)block)[6]) << " " << (int)(((unsigned char*)block)[5]) << " "
// << (int)(((unsigned char*)block)[7]) << " " << std::endl ; << (int)(((unsigned char*)block)[6]) << " "
<< (int)(((unsigned char*)block)[7]) << " " << std::endl ;
#endif
if (extralen > maxlen - blen) if (extralen > maxlen - blen)
{ {
pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!"); pqioutput(PQL_ALERT, pqistreamerzone, "ERROR: Read Packet too Big!");
@ -653,7 +660,7 @@ continue_packet:
if (extralen != (tmplen = bio->readdata(extradata, extralen))) if (extralen != (tmplen = bio->readdata(extradata, extralen)))
{ {
if(tmplen > 0) if(tmplen > 0)
std::cerr << "Incomplete packet read ! This is a real problem ;-)" << std::endl ; std::cerr << "[" << (void*)pthread_self() << "] " << "Incomplete packet read ! This is a real problem ;-)" << std::endl ;
if(++failed_read_attempts > max_failed_read_attempts) if(++failed_read_attempts > max_failed_read_attempts)
{ {
@ -690,7 +697,7 @@ continue_packet:
<< (int)(((unsigned char*)block)[6]) << " " << (int)(((unsigned char*)block)[6]) << " "
<< (int)(((unsigned char*)block)[7]) << " " << (int)(((unsigned char*)block)[7]) << " "
<< std::endl ; << std::endl ;
// notify->AddSysMessage(0, RS_SYS_WARNING, title, msg); notify->AddSysMessage(0, RS_SYS_WARNING, title, msg);
} }
bio->close(); bio->close();
@ -700,11 +707,21 @@ continue_packet:
} }
else else
{ {
// std::cerr << "given up 5, state = " << reading_state << std::endl ; #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "given up 5, state = " << reading_state << std::endl ;
#endif
return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon. return 0 ; // this is just a SSL_WANT_READ error. Don't panic, we'll re-try the read soon.
// we assume readdata() returned either -1 or the complete read size. // we assume readdata() returned either -1 or the complete read size.
} }
} }
#ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "continuing packet state=" << reading_state << std::endl ;
std::cerr << "[" << (void*)pthread_self() << "] " << "block 2 : " << (int)(((unsigned char*)extradata)[0]) << " " << (int)(((unsigned char*)extradata)[1]) << " " << (int)(((unsigned char*)extradata)[2]) << " " << (int)(((unsigned char*)extradata)[3]) << " "
<< (int)(((unsigned char*)extradata)[4]) << " "
<< (int)(((unsigned char*)extradata)[5]) << " "
<< (int)(((unsigned char*)extradata)[6]) << " "
<< (int)(((unsigned char*)extradata)[7]) << " " << std::endl ;
#endif
failed_read_attempts = 0 ; failed_read_attempts = 0 ;
readbytes += extralen; readbytes += extralen;
@ -722,7 +739,9 @@ continue_packet:
// std::cerr << "Deserializing packet of size " << pktlen <<std::endl ; // std::cerr << "Deserializing packet of size " << pktlen <<std::endl ;
uint32_t pktlen = blen+extralen ; uint32_t pktlen = blen+extralen ;
// std::cerr << "deserializing. Size=" << pktlen << std::endl ; #ifdef DEBUG_PQISTREAMER
std::cerr << "[" << (void*)pthread_self() << "] " << "deserializing. Size=" << pktlen << std::endl ;
#endif
// if(pktlen == 17306) // if(pktlen == 17306)
// { // {