fixed mutex problem in turtle-GXS search

This commit is contained in:
csoler 2018-06-17 21:23:16 +02:00
parent 84194b6234
commit c67084b7de
No known key found for this signature in database
GPG Key ID: 7BCA522266C0804C
3 changed files with 89 additions and 55 deletions

View File

@ -870,8 +870,6 @@ int p3turtle::handleIncoming()
//
void p3turtle::handleSearchRequest(RsTurtleSearchRequestItem *item)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// take a look at the item and test against inconsistent values
// - If the item destimation is
@ -891,26 +889,55 @@ void p3turtle::handleSearchRequest(RsTurtleSearchRequestItem *item)
return ;
}
if(_search_requests_origins.size() > MAX_ALLOWED_SR_IN_CACHE)
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
if(_search_requests_origins.size() > MAX_ALLOWED_SR_IN_CACHE)
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Dropping, because the search request cache is full." << std::endl ;
std::cerr << " Dropping, because the search request cache is full." << std::endl ;
#endif
std::cerr << " More than " << MAX_ALLOWED_SR_IN_CACHE << " search request in cache. A peer is probably trying to flood your network See the depth charts to find him." << std::endl;
return ;
std::cerr << " More than " << MAX_ALLOWED_SR_IN_CACHE << " search request in cache. A peer is probably trying to flood your network See the depth charts to find him." << std::endl;
return ;
}
// If the item contains an already handled search request, give up. This
// happens when the same search request gets relayed by different peers
//
if(_search_requests_origins.find(item->request_id) != _search_requests_origins.end())
{
#ifdef P3TURTLE_DEBUG
std::cerr << " This is a bouncing request. Ignoring and deleting it." << std::endl ;
#endif
return ;
}
}
// If the item contains an already handled search request, give up. This
// happens when the same search request gets relayed by different peers
//
if(_search_requests_origins.find(item->request_id) != _search_requests_origins.end())
// Perform local search off-mutex,because this might call some services that are above turtle in the mutex chain.
uint32_t search_result_count = 0;
if(item->PeerId() != _own_id) // is the request not coming from us?
{
#ifdef P3TURTLE_DEBUG
std::cerr << " This is a bouncing request. Ignoring and deleting it." << std::endl ;
std::cerr << " Request not from us. Performing local search" << std::endl ;
#endif
return ;
std::list<RsTurtleSearchResultItem*> search_results ;
performLocalSearch(item,search_result_count,search_results) ;
for(auto it(search_results.begin());it!=search_results.end();++it)
{
(*it)->request_id = item->request_id ;
(*it)->depth = 0 ;
(*it)->PeerId(item->PeerId()) ;
sendItem(*it) ;
}
}
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
// This is a new request. Let's add it to the request map, and forward it to
// open peers.
@ -918,23 +945,9 @@ void p3turtle::handleSearchRequest(RsTurtleSearchRequestItem *item)
req.origin = item->PeerId() ;
req.time_stamp = time(NULL) ;
req.depth = item->depth ;
req.result_count = 0;
req.result_count = search_result_count;
req.keywords = item->GetKeywords() ;
// If it's not for us, perform a local search. If something found, forward the search result back.
if(item->PeerId() != _own_id)
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Request not from us. Performing local search" << std::endl ;
#endif
std::list<RsTurtleSearchResultItem*> search_results ;
locked_performLocalSearch(item,req,search_results) ;
for(auto it(search_results.begin());it!=search_results.end();++it)
sendItem(*it) ;
}
req.service_id = item->serviceId() ;
// if enough has been sent back already, do not sarch further
@ -1001,13 +1014,13 @@ void p3turtle::handleSearchRequest(RsTurtleSearchRequestItem *item)
// This function should be removed in the future, when file search will also use generic search items.
void p3turtle::locked_performLocalSearch(RsTurtleSearchRequestItem *item,TurtleSearchRequestInfo& req,std::list<RsTurtleSearchResultItem*>& search_results)
void p3turtle::performLocalSearch(RsTurtleSearchRequestItem *item,uint32_t& req_result_count,std::list<RsTurtleSearchResultItem*>& search_results)
{
RsTurtleFileSearchRequestItem *ftsearch = dynamic_cast<RsTurtleFileSearchRequestItem*>(item) ;
if(ftsearch != NULL)
{
locked_performLocalSearch_files(ftsearch,req,search_results) ;
performLocalSearch_files(ftsearch,req_result_count,search_results) ;
return ;
}
@ -1015,22 +1028,29 @@ void p3turtle::locked_performLocalSearch(RsTurtleSearchRequestItem *item,TurtleS
if(gnsearch != NULL)
{
locked_performLocalSearch_generic(gnsearch,req,search_results) ;
performLocalSearch_generic(gnsearch,req_result_count,search_results) ;
return ;
}
}
void p3turtle::locked_performLocalSearch_generic(RsTurtleGenericSearchRequestItem *item,TurtleSearchRequestInfo& req,std::list<RsTurtleSearchResultItem*>& result)
void p3turtle::performLocalSearch_generic(RsTurtleGenericSearchRequestItem *item, uint32_t& req_result_count, std::list<RsTurtleSearchResultItem*>& result)
{
unsigned char *search_result_data = NULL ;
uint32_t search_result_data_len = 0 ;
auto it = _registered_services.find(item->service_id) ;
RsTurtleClientService *client = NULL ;
if(it == _registered_services.end())
return ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
auto it = _registered_services.find(item->service_id) ;
if(it->second->receiveSearchRequest(item->search_data,item->search_data_len,search_result_data,search_result_data_len))
if(it == _registered_services.end())
return ;
client = it->second ;
}
if(client->receiveSearchRequest(item->search_data,item->search_data_len,search_result_data,search_result_data_len))
{
RsTurtleGenericSearchResultItem *result_item = new RsTurtleGenericSearchResultItem ;
@ -1041,7 +1061,7 @@ void p3turtle::locked_performLocalSearch_generic(RsTurtleGenericSearchRequestIte
}
}
void p3turtle::locked_performLocalSearch_files(RsTurtleFileSearchRequestItem *item,TurtleSearchRequestInfo& req,std::list<RsTurtleSearchResultItem*>& result)
void p3turtle::performLocalSearch_files(RsTurtleFileSearchRequestItem *item,uint32_t& req_result_count,std::list<RsTurtleSearchResultItem*>& result)
{
#ifdef P3TURTLE_DEBUG
std::cerr << "Performing rsFiles->search()" << std::endl ;
@ -1066,21 +1086,17 @@ void p3turtle::locked_performLocalSearch_files(RsTurtleFileSearchRequestItem *it
res_item = new RsTurtleFTSearchResultItem ;
item_size = 0 ;
res_item->depth = 0 ;
res_item->request_id = item->request_id ;
res_item->PeerId(item->PeerId()) ; // send back to the same guy
result.push_back(res_item) ;
}
res_item->result.push_back(*it);
// Let's chop search results items into several chunks of finite size to avoid exceeding streamer's capacity.
//
++req.result_count ; // increase hit number for this particular search request.
++req_result_count ; // increase hit number for this particular search request.
item_size += 8 /* size */ + it->hash.serial_size() + it->name.size() ;
if(item_size > RSTURTLE_MAX_SEARCH_RESPONSE_SIZE || req.result_count >= TURTLE_SEARCH_RESULT_MAX_HITS)
if(item_size > RSTURTLE_MAX_SEARCH_RESPONSE_SIZE || req_result_count >= TURTLE_SEARCH_RESULT_MAX_HITS)
{
#ifdef P3TURTLE_DEBUG
std::cerr << " Sending back chunk of size " << item_size << ", for " << res_item->result.size() << " elements." << std::endl ;
@ -1119,7 +1135,12 @@ void p3turtle::handleSearchResult(RsTurtleSearchResultItem *item)
{
it->second.result_count += item->count() ;
results_to_notify_off_mutex.push_back(std::make_pair(item,it->second.client)) ;
auto it2 = _registered_services.find(it->second.service_id) ;
if(it2 != _registered_services.end())
results_to_notify_off_mutex.push_back(std::make_pair(item,it2->second)) ;
else
std::cerr << "(EE) cannot find client service for ID " << std::hex << it->second.service_id << std::dec << ": search result item will be dropped." << std::endl;
}
else
{ // Nope, so forward it back.
@ -2092,10 +2113,20 @@ void p3turtle::monitorTunnels(const RsFileHash& hash,RsTurtleClientService *clie
//
bool p3turtle::performLocalHashSearch(const TurtleFileHash& hash,const RsPeerId& peer_id,RsTurtleClientService *& service)
{
if(_registered_services.empty())
std::cerr << "Turtle router has no services registered. Tunnel requests cannot be handled." << std::endl;
std::map<uint16_t,RsTurtleClientService*> client_map ;
{
RsStackMutex stack(mTurtleMtx); /********** STACK LOCKED MTX ******/
for(auto it(_registered_services.begin());it!=_registered_services.end();++it)
if(_registered_services.empty())
{
std::cerr << "Turtle router has no services registered. Tunnel requests cannot be handled." << std::endl;
return false ;
}
client_map = _registered_services ;
}
for(auto it(client_map.begin());it!=client_map.end();++it)
if( (*it).second->handleTunnelRequest(hash,peer_id))
{
service = it->second ;

View File

@ -169,11 +169,11 @@ class TurtleSearchRequestInfo
{
public:
TurtlePeerId origin ; // where the request came from.
uint32_t time_stamp ; // last time the tunnel was actually used. Used for cleaning old tunnels.
int depth ; // depth of the request. Used to optimize tunnel length.
uint32_t result_count; // responses to this request. Useful to avoid spamming tunnel responses.
std::string keywords;
RsTurtleClientService *client;// client who issues the request. This is null if the request does not have a local origin.
uint32_t time_stamp ; // last time the tunnel was actually used. Used for cleaning old tunnels.
int depth ; // depth of the request. Used to optimize tunnel length.
uint32_t result_count; // responses to this request. Useful to avoid spamming tunnel responses.
std::string keywords;
uint16_t service_id; // ID of the client service who issues the request. This is null if the request does not have a local origin.
};
class TurtleTunnelRequestInfo
{
@ -397,9 +397,9 @@ class p3turtle: public p3Service, public RsTurtle, public p3Config
//------ Functions connecting the turtle router to other components.----------//
/// Performs a search calling local cache and search structure.
void locked_performLocalSearch (RsTurtleSearchRequestItem *item,TurtleSearchRequestInfo& req,std::list<RsTurtleSearchResultItem*>& result) ;
void locked_performLocalSearch_files (RsTurtleFileSearchRequestItem *item,TurtleSearchRequestInfo& req,std::list<RsTurtleSearchResultItem*>& result) ;
void locked_performLocalSearch_generic(RsTurtleGenericSearchRequestItem *item,TurtleSearchRequestInfo& req,std::list<RsTurtleSearchResultItem*>& result) ;
void performLocalSearch (RsTurtleSearchRequestItem *item, uint32_t& req_result_count,std::list<RsTurtleSearchResultItem*>& result) ;
void performLocalSearch_files (RsTurtleFileSearchRequestItem *item, uint32_t& req_result_count,std::list<RsTurtleSearchResultItem*>& result) ;
void performLocalSearch_generic(RsTurtleGenericSearchRequestItem *item, uint32_t& req_result_count,std::list<RsTurtleSearchResultItem*>& result) ;
/// Returns true if the file with given hash is hosted locally, and accessible in anonymous mode the supplied peer.
virtual bool performLocalHashSearch(const TurtleFileHash& hash,const RsPeerId& client_peer_id,RsTurtleClientService *& service);

View File

@ -82,6 +82,7 @@ class RsTurtleSearchRequestItem: public RsTurtleItem
virtual RsTurtleSearchRequestItem *clone() const = 0 ; // used for cloning in routing methods
virtual std::string GetKeywords() = 0;
virtual uint16_t serviceId() = 0 ;
uint32_t request_id ; // randomly generated request id.
uint16_t depth ; // Used for limiting search depth.
@ -93,6 +94,7 @@ class RsTurtleFileSearchRequestItem: public RsTurtleSearchRequestItem
RsTurtleFileSearchRequestItem(uint32_t subtype) : RsTurtleSearchRequestItem(subtype) {}
virtual ~RsTurtleFileSearchRequestItem() {}
virtual uint16_t serviceId() { return RS_SERVICE_TYPE_FILE_TRANSFER ; }
virtual void search(std::list<TurtleFileInfo> &) const =0;
};
@ -150,6 +152,7 @@ class RsTurtleGenericSearchRequestItem: public RsTurtleSearchRequestItem
unsigned char *search_data ;
std::string GetKeywords() { return std::string("Generic search " + RsUtil::BinToHex(search_data,search_data_len,10)); }
virtual uint16_t serviceId() { return service_id ; }
virtual RsTurtleSearchRequestItem *clone() const ;
void clear() { free(search_data); search_data=NULL; search_data_len=0; }