mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-02-25 17:21:27 -05:00
added debug output and fixed one bug in pqistreamer
This commit is contained in:
parent
0191072326
commit
01da2fbe9e
@ -129,9 +129,14 @@ int FsBioInterface::readdata(void *data, int len)
|
|||||||
|
|
||||||
int FsBioInterface::senddata(void *data, int len)
|
int FsBioInterface::senddata(void *data, int len)
|
||||||
{
|
{
|
||||||
// int written = write(mCLintConnt, data, len);
|
// shouldn't we better send in multiple packets, similarly to how we read?
|
||||||
// return written;
|
|
||||||
return len;
|
RsDbg() << "FsBioInterface: sending data packet of size " << len ;
|
||||||
|
|
||||||
|
int written = write(mCLintConnt, data, len);
|
||||||
|
RsDbg() << "FsBioInterface: done.";
|
||||||
|
|
||||||
|
return written;
|
||||||
}
|
}
|
||||||
int FsBioInterface::netstatus()
|
int FsBioInterface::netstatus()
|
||||||
{
|
{
|
||||||
|
@ -55,30 +55,35 @@ bool FsClient::requestFriends(const std::string& address,uint16_t port,uint32_t
|
|||||||
|
|
||||||
for(auto item:response)
|
for(auto item:response)
|
||||||
{
|
{
|
||||||
auto *encrypted_response_item = dynamic_cast<RsFriendServerEncryptedServerResponseItem*>(item);
|
// auto *encrypted_response_item = dynamic_cast<RsFriendServerEncryptedServerResponseItem*>(item);
|
||||||
|
|
||||||
if(!encrypted_response_item)
|
// if(!encrypted_response_item)
|
||||||
{
|
// {
|
||||||
delete item;
|
// delete item;
|
||||||
continue;
|
// continue;
|
||||||
}
|
// }
|
||||||
|
|
||||||
// For now, also handle unencrypted response items. Will be disabled in production
|
// For now, also handle unencrypted response items. Will be disabled in production
|
||||||
|
|
||||||
auto *response_item = dynamic_cast<RsFriendServerServerResponseItem*>(item);
|
auto *response_item = dynamic_cast<RsFriendServerServerResponseItem*>(item);
|
||||||
|
|
||||||
if(!response_item)
|
if(response_item)
|
||||||
{
|
handleServerResponse(response_item);
|
||||||
delete item;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
for(const auto& it:response_item->friend_invites)
|
delete item;
|
||||||
friend_certificates.insert(it);
|
|
||||||
}
|
}
|
||||||
return friend_certificates.size();
|
return friend_certificates.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void FsClient::handleServerResponse(RsFriendServerServerResponseItem *item)
|
||||||
|
{
|
||||||
|
std::cerr << "Received a response item from server: " << std::endl;
|
||||||
|
std::cerr << *item << std::endl;
|
||||||
|
|
||||||
|
// for(const auto& it:response_item->friend_invites)
|
||||||
|
// friend_certificates.insert(it);
|
||||||
|
}
|
||||||
|
|
||||||
bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,std::list<RsItem*>& response)
|
bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,std::list<RsItem*>& response)
|
||||||
{
|
{
|
||||||
// open a connection
|
// open a connection
|
||||||
@ -131,28 +136,46 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
|
|||||||
|
|
||||||
FsBioInterface *bio = new FsBioInterface(CreateSocket); // deleted by ~pqistreamer()
|
FsBioInterface *bio = new FsBioInterface(CreateSocket); // deleted by ~pqistreamer()
|
||||||
|
|
||||||
pqithreadstreamer p(this,rss,RsPeerId(),bio,BIN_FLAGS_READABLE | BIN_FLAGS_NO_DELETE | BIN_FLAGS_NO_CLOSE);
|
pqithreadstreamer p(this,rss,RsPeerId(),bio,BIN_FLAGS_READABLE | BIN_FLAGS_WRITEABLE | BIN_FLAGS_NO_CLOSE);
|
||||||
p.start();
|
p.start();
|
||||||
|
|
||||||
uint32_t ss;
|
uint32_t ss;
|
||||||
p.SendItem(item,ss);
|
p.SendItem(item,ss);
|
||||||
|
bool should_close = false;
|
||||||
|
|
||||||
while(true)
|
while(true)
|
||||||
{
|
{
|
||||||
p.tick();
|
p.tick(); // ticks bio
|
||||||
|
|
||||||
RsItem *item = GetItem();
|
RsItem *item = GetItem();
|
||||||
|
|
||||||
|
RsDbg() << "Ticking for response...";
|
||||||
|
|
||||||
if(item)
|
if(item)
|
||||||
{
|
{
|
||||||
response.push_back(item);
|
response.push_back(item);
|
||||||
std::cerr << "Got a response item: " << std::endl;
|
std::cerr << "Got a response item: " << std::endl;
|
||||||
std::cerr << *item << std::endl;
|
std::cerr << *item << std::endl;
|
||||||
|
|
||||||
|
if(dynamic_cast<RsFriendServerStatusItem*>(item) != nullptr)
|
||||||
|
{
|
||||||
|
RsDbg() << "End of transmission. " ;
|
||||||
|
should_close = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!bio->isactive()) // socket has probably closed
|
if(!bio->isactive()) // socket has probably closed
|
||||||
{
|
{
|
||||||
RsDbg() << "(client side) Socket has been closed by server.";
|
RsDbg() << "(client side) Socket has been closed by server.";
|
||||||
|
should_close =true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||||
|
}
|
||||||
|
|
||||||
|
if(should_close)
|
||||||
|
{
|
||||||
RsDbg() << " Stopping/killing pqistreamer" ;
|
RsDbg() << " Stopping/killing pqistreamer" ;
|
||||||
p.fullstop();
|
p.fullstop();
|
||||||
|
|
||||||
@ -161,10 +184,6 @@ bool FsClient::sendItem(const std::string& address,uint16_t port,RsItem *item,st
|
|||||||
CreateSocket=0;
|
CreateSocket=0;
|
||||||
|
|
||||||
RsDbg() << " Exiting loop." ;
|
RsDbg() << " Exiting loop." ;
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
|
@ -42,6 +42,7 @@ protected:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
bool sendItem(const std::string &address, uint16_t port, RsItem *item, std::list<RsItem *> &response);
|
bool sendItem(const std::string &address, uint16_t port, RsItem *item, std::list<RsItem *> &response);
|
||||||
|
void handleServerResponse(RsFriendServerServerResponseItem *item);
|
||||||
|
|
||||||
std::list<RsItem*> mIncomingItems;
|
std::list<RsItem*> mIncomingItems;
|
||||||
};
|
};
|
||||||
|
@ -34,6 +34,7 @@ const uint8_t RS_PKT_SUBTYPE_FS_CLIENT_PUBLISH = 0x01 ;
|
|||||||
const uint8_t RS_PKT_SUBTYPE_FS_CLIENT_REMOVE = 0x02 ;
|
const uint8_t RS_PKT_SUBTYPE_FS_CLIENT_REMOVE = 0x02 ;
|
||||||
const uint8_t RS_PKT_SUBTYPE_FS_SERVER_RESPONSE = 0x03 ;
|
const uint8_t RS_PKT_SUBTYPE_FS_SERVER_RESPONSE = 0x03 ;
|
||||||
const uint8_t RS_PKT_SUBTYPE_FS_SERVER_ENCRYPTED_RESPONSE = 0x04 ;
|
const uint8_t RS_PKT_SUBTYPE_FS_SERVER_ENCRYPTED_RESPONSE = 0x04 ;
|
||||||
|
const uint8_t RS_PKT_SUBTYPE_FS_SERVER_STATUS = 0x05 ;
|
||||||
|
|
||||||
class RsFriendServerItem: public RsItem
|
class RsFriendServerItem: public RsItem
|
||||||
{
|
{
|
||||||
@ -71,6 +72,27 @@ public:
|
|||||||
std::string pgp_public_key_b64;
|
std::string pgp_public_key_b64;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class RsFriendServerStatusItem: public RsFriendServerItem
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
RsFriendServerStatusItem() : RsFriendServerItem(RS_PKT_SUBTYPE_FS_SERVER_STATUS) {}
|
||||||
|
|
||||||
|
void serial_process(RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx) override
|
||||||
|
{
|
||||||
|
RS_SERIAL_PROCESS(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ConnectionStatus: uint8_t
|
||||||
|
{
|
||||||
|
UNKNOWN = 0x00,
|
||||||
|
END_OF_TRANSMISSION = 0x01
|
||||||
|
};
|
||||||
|
|
||||||
|
// specific members for that item
|
||||||
|
|
||||||
|
ConnectionStatus status;
|
||||||
|
};
|
||||||
|
|
||||||
class RsFriendServerClientRemoveItem: public RsFriendServerItem
|
class RsFriendServerClientRemoveItem: public RsFriendServerItem
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -152,6 +174,7 @@ struct FsSerializer : RsServiceSerializer
|
|||||||
case RS_PKT_SUBTYPE_FS_CLIENT_REMOVE: return new RsFriendServerClientRemoveItem();
|
case RS_PKT_SUBTYPE_FS_CLIENT_REMOVE: return new RsFriendServerClientRemoveItem();
|
||||||
case RS_PKT_SUBTYPE_FS_CLIENT_PUBLISH: return new RsFriendServerClientPublishItem();
|
case RS_PKT_SUBTYPE_FS_CLIENT_PUBLISH: return new RsFriendServerClientPublishItem();
|
||||||
case RS_PKT_SUBTYPE_FS_SERVER_RESPONSE: return new RsFriendServerServerResponseItem();
|
case RS_PKT_SUBTYPE_FS_SERVER_RESPONSE: return new RsFriendServerServerResponseItem();
|
||||||
|
case RS_PKT_SUBTYPE_FS_SERVER_STATUS: return new RsFriendServerStatusItem();
|
||||||
case RS_PKT_SUBTYPE_FS_SERVER_ENCRYPTED_RESPONSE: return new RsFriendServerEncryptedServerResponseItem();
|
case RS_PKT_SUBTYPE_FS_SERVER_ENCRYPTED_RESPONSE: return new RsFriendServerEncryptedServerResponseItem();
|
||||||
default:
|
default:
|
||||||
RsErr() << "Unknown subitem type " << item_sub_id << " in FsSerialiser" ;
|
RsErr() << "Unknown subitem type " << item_sub_id << " in FsSerialiser" ;
|
||||||
|
@ -357,6 +357,7 @@ int pqistreamer::status()
|
|||||||
// this method is overloaded by pqiqosstreamer
|
// this method is overloaded by pqiqosstreamer
|
||||||
void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int)
|
void pqistreamer::locked_storeInOutputQueue(void *ptr,int,int)
|
||||||
{
|
{
|
||||||
|
RsDbg() << "Storing packet " << std::hex << ptr << std::dec << " in outqueue.";
|
||||||
mOutPkts.push_back(ptr);
|
mOutPkts.push_back(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -375,9 +376,9 @@ int pqistreamer::queue_outpqi_locked(RsItem *pqi,uint32_t& pktsize)
|
|||||||
if(ptr == NULL)
|
if(ptr == NULL)
|
||||||
return 0 ;
|
return 0 ;
|
||||||
|
|
||||||
#ifdef DEBUG_PQISTREAMER
|
//#ifdef DEBUG_PQISTREAMER
|
||||||
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
|
std::cerr << "pqistreamer::queue_outpqi() serializing packet with packet size : " << pktsize << std::endl;
|
||||||
#endif
|
//#endif
|
||||||
|
|
||||||
/*******************************************************************************************/
|
/*******************************************************************************************/
|
||||||
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
|
// keep info for stats for a while. Only keep the items for the last two seconds. sec n is ongoing and second n-1
|
||||||
@ -1430,8 +1431,12 @@ void *pqistreamer::locked_pop_out_data(uint32_t /*max_slice_size*/, uint32_t &si
|
|||||||
{
|
{
|
||||||
res = *(mOutPkts.begin());
|
res = *(mOutPkts.begin());
|
||||||
mOutPkts.pop_front();
|
mOutPkts.pop_front();
|
||||||
|
|
||||||
|
// In pqistreamer, we do not split outgoing packets. For now only pqiQoSStreamer supports packet slicing.
|
||||||
|
size = getRsItemSize(res);
|
||||||
|
|
||||||
#ifdef DEBUG_TRANSFERS
|
#ifdef DEBUG_TRANSFERS
|
||||||
std::cerr << "pqistreamer::locked_pop_out_data() getting next pkt from mOutPkts queue";
|
std::cerr << "pqistreamer::locked_pop_out_data() getting next pkt " << std::hex << res << std::dec << " from mOutPkts queue";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
@ -17,8 +17,6 @@ void FriendServer::threadTick()
|
|||||||
{
|
{
|
||||||
// Listen to the network interface, capture incoming data etc.
|
// Listen to the network interface, capture incoming data etc.
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
|
||||||
|
|
||||||
RsItem *item;
|
RsItem *item;
|
||||||
|
|
||||||
while(nullptr != (item = mni->GetItem()))
|
while(nullptr != (item = mni->GetItem()))
|
||||||
@ -43,6 +41,7 @@ void FriendServer::threadTick()
|
|||||||
}
|
}
|
||||||
delete item;
|
delete item;
|
||||||
}
|
}
|
||||||
|
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
||||||
|
|
||||||
static rstime_t last_autowash_TS = time(nullptr);
|
static rstime_t last_autowash_TS = time(nullptr);
|
||||||
rstime_t now = time(nullptr);
|
rstime_t now = time(nullptr);
|
||||||
@ -78,6 +77,8 @@ void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *it
|
|||||||
// No need to test for it==mCurrentClients.end() because it will be directly caught by the exception handling below even before.
|
// No need to test for it==mCurrentClients.end() because it will be directly caught by the exception handling below even before.
|
||||||
// Respond with a list of potential friends
|
// Respond with a list of potential friends
|
||||||
|
|
||||||
|
RsDbg() << "Sending response item to " << item->PeerId() ;
|
||||||
|
|
||||||
RsFriendServerServerResponseItem *sr_item = new RsFriendServerServerResponseItem;
|
RsFriendServerServerResponseItem *sr_item = new RsFriendServerServerResponseItem;
|
||||||
|
|
||||||
sr_item->nonce = pi->second.last_nonce;
|
sr_item->nonce = pi->second.last_nonce;
|
||||||
@ -85,17 +86,30 @@ void FriendServer::handleClientPublish(const RsFriendServerClientPublishItem *it
|
|||||||
sr_item->PeerId(item->PeerId());
|
sr_item->PeerId(item->PeerId());
|
||||||
|
|
||||||
mni->SendItem(sr_item);
|
mni->SendItem(sr_item);
|
||||||
|
|
||||||
}
|
}
|
||||||
catch(std::exception& e)
|
catch(std::exception& e)
|
||||||
{
|
{
|
||||||
RsErr() << "ERROR: " << e.what() ;
|
RsErr() << "ERROR: " << e.what() ;
|
||||||
|
|
||||||
|
RsFriendServerStatusItem *status_item = new RsFriendServerStatusItem;
|
||||||
|
status_item->status = RsFriendServerStatusItem::END_OF_TRANSMISSION;
|
||||||
|
status_item->PeerId(item->PeerId());
|
||||||
|
mni->SendItem(status_item);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close client connection from server side, to tell the client that nothing more is coming.
|
// Close client connection from server side, to tell the client that nothing more is coming.
|
||||||
|
//RsDbg() << "Closing client connection." ;
|
||||||
|
//mni->closeConnection(item->PeerId());
|
||||||
|
|
||||||
RsDbg() << "Closing client connection." ;
|
RsDbg() << "Sending end-of-stream item to " << item->PeerId() ;
|
||||||
|
|
||||||
mni->closeConnection(item->PeerId());
|
RsFriendServerStatusItem *status_item = new RsFriendServerStatusItem;
|
||||||
|
status_item->status = RsFriendServerStatusItem::END_OF_TRANSMISSION;
|
||||||
|
status_item->PeerId(item->PeerId());
|
||||||
|
|
||||||
|
mni->SendItem(status_item);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::map<std::string, bool> FriendServer::computeListOfFriendInvites(uint32_t nb_reqs_invites, const RsPeerId &pid, const RsPgpFingerprint &fpr)
|
std::map<std::string, bool> FriendServer::computeListOfFriendInvites(uint32_t nb_reqs_invites, const RsPeerId &pid, const RsPgpFingerprint &fpr)
|
||||||
@ -126,16 +140,11 @@ std::map<RsPeerId,PeerInfo>::iterator FriendServer::handleIncomingClientData(con
|
|||||||
RsPgpId pgp_id ;
|
RsPgpId pgp_id ;
|
||||||
std::vector<uint8_t> key_binary_data ;
|
std::vector<uint8_t> key_binary_data ;
|
||||||
|
|
||||||
key_binary_data = Radix64::decode(pgp_public_key_b64);
|
// key_binary_data = Radix64::decode(pgp_public_key_b64);
|
||||||
|
|
||||||
if(key_binary_data.empty())
|
if(RsBase64::decode(pgp_public_key_b64,key_binary_data))
|
||||||
throw std::runtime_error(" Cannot decode client pgp public key: \"" + pgp_public_key_b64 + "\". Wrong format??");
|
throw std::runtime_error(" Cannot decode client pgp public key: \"" + pgp_public_key_b64 + "\". Wrong format??");
|
||||||
|
|
||||||
// Apparently RsBase64 doesn't work correctly.
|
|
||||||
//
|
|
||||||
// if(!RsBase64::decode(item->pgp_public_key_b64,key_binary_data))
|
|
||||||
// throw std::runtime_error(" Cannot decode client pgp public key: \"" + item->pgp_public_key_b64 + "\". Wrong format??");
|
|
||||||
|
|
||||||
RsDbg() << " Public key radix is fine." ;
|
RsDbg() << " Public key radix is fine." ;
|
||||||
|
|
||||||
if(!mPgpHandler->LoadCertificateFromBinaryData(key_binary_data.data(),key_binary_data.size(), pgp_id, error_string))
|
if(!mPgpHandler->LoadCertificateFromBinaryData(key_binary_data.data(),key_binary_data.size(), pgp_id, error_string))
|
||||||
@ -236,9 +245,9 @@ void FriendServer::run()
|
|||||||
void FriendServer::autoWash()
|
void FriendServer::autoWash()
|
||||||
{
|
{
|
||||||
rstime_t now = time(nullptr);
|
rstime_t now = time(nullptr);
|
||||||
|
RsDbg() << "autoWash..." ;
|
||||||
|
|
||||||
for(std::map<RsPeerId,PeerInfo>::iterator it(mCurrentClientPeers.begin());it!=mCurrentClientPeers.end();)
|
for(std::map<RsPeerId,PeerInfo>::iterator it(mCurrentClientPeers.begin());it!=mCurrentClientPeers.end();)
|
||||||
{
|
|
||||||
if(it->second.last_connection_TS + MAXIMUM_PEER_INACTIVE_DELAY < now)
|
if(it->second.last_connection_TS + MAXIMUM_PEER_INACTIVE_DELAY < now)
|
||||||
{
|
{
|
||||||
RsDbg() << "Removing client peer " << it->first << " because it's inactive for more than " << MAXIMUM_PEER_INACTIVE_DELAY << " seconds." ;
|
RsDbg() << "Removing client peer " << it->first << " because it's inactive for more than " << MAXIMUM_PEER_INACTIVE_DELAY << " seconds." ;
|
||||||
@ -247,7 +256,10 @@ void FriendServer::autoWash()
|
|||||||
mCurrentClientPeers.erase(it);
|
mCurrentClientPeers.erase(it);
|
||||||
it = tmp;
|
it = tmp;
|
||||||
}
|
}
|
||||||
}
|
else
|
||||||
|
++it;
|
||||||
|
|
||||||
|
RsDbg() << "done." ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void FriendServer::debugPrint()
|
void FriendServer::debugPrint()
|
||||||
|
@ -95,9 +95,7 @@ void FsNetworkInterface::threadTick()
|
|||||||
|
|
||||||
RS_STACK_MUTEX(mFsNiMtx);
|
RS_STACK_MUTEX(mFsNiMtx);
|
||||||
for(auto& it:mConnections)
|
for(auto& it:mConnections)
|
||||||
{
|
|
||||||
it.second.pqi_thread->tick();
|
it.second.pqi_thread->tick();
|
||||||
}
|
|
||||||
|
|
||||||
rstime::rs_usleep(1000*200);
|
rstime::rs_usleep(1000*200);
|
||||||
}
|
}
|
||||||
@ -210,7 +208,8 @@ int FsNetworkInterface::SendItem(RsItem *item)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
return it->second.pqi_thread->SendItem(item);
|
uint32_t ss;
|
||||||
|
return it->second.pqi_thread->SendItem(item,ss);
|
||||||
}
|
}
|
||||||
|
|
||||||
void FsNetworkInterface::closeConnection(const RsPeerId& peer_id)
|
void FsNetworkInterface::closeConnection(const RsPeerId& peer_id)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user