mirror of
https://github.com/RetroShare/RetroShare.git
synced 2025-07-23 14:41:04 -04:00
added item splitting and checksum checking in p3filelists
This commit is contained in:
parent
2343c91055
commit
3f9acb5ff8
5 changed files with 130 additions and 5 deletions
|
@ -38,3 +38,5 @@ static const std::string HASH_CACHE_FILE_NAME = "hash_cache.bin" ; // hard-co
|
||||||
|
|
||||||
static const uint32_t MIN_INTERVAL_BETWEEN_HASH_CACHE_SAVE = 20 ; // never save hash cache more often than every 20 secs.
|
static const uint32_t MIN_INTERVAL_BETWEEN_HASH_CACHE_SAVE = 20 ; // never save hash cache more often than every 20 secs.
|
||||||
static const uint32_t MIN_INTERVAL_BETWEEN_REMOTE_DIRECTORY_SAVE = 23 ; // never save remote directories more often than this
|
static const uint32_t MIN_INTERVAL_BETWEEN_REMOTE_DIRECTORY_SAVE = 23 ; // never save remote directories more often than this
|
||||||
|
|
||||||
|
static const uint32_t MAX_DIR_SYNC_RESPONSE_DATA_SIZE = 20000 ; // Maximum RsItem data size in bytes for serialised directory transmission
|
||||||
|
|
|
@ -606,12 +606,13 @@ bool p3FileDatabase::findChildPointer(void *ref, int row, void *& result, FileSe
|
||||||
|
|
||||||
return true ;
|
return true ;
|
||||||
}
|
}
|
||||||
else for(uint32_t i=0;i<mRemoteDirectories.size();++i)
|
else if(row < mRemoteDirectories.size())
|
||||||
{
|
{
|
||||||
convertEntryIndexToPointer(mRemoteDirectories[i]->root(),i+1,result);
|
convertEntryIndexToPointer(mRemoteDirectories[row]->root(),row+1,result);
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
return false;
|
||||||
|
|
||||||
uint32_t fi;
|
uint32_t fi;
|
||||||
DirectoryStorage::EntryIndex e ;
|
DirectoryStorage::EntryIndex e ;
|
||||||
|
@ -1077,11 +1078,120 @@ void p3FileDatabase::handleDirSyncRequest(RsFileListsSyncRequestItem *item)
|
||||||
|
|
||||||
// sends the response.
|
// sends the response.
|
||||||
|
|
||||||
|
splitAndSendItem(ritem) ;
|
||||||
|
}
|
||||||
|
|
||||||
|
void p3FileDatabase::splitAndSendItem(RsFileListsSyncResponseItem *ritem)
|
||||||
|
{
|
||||||
|
ritem->checksum = RsDirUtil::sha1sum((uint8_t*)ritem->directory_content_data.bin_data,ritem->directory_content_data.bin_len);
|
||||||
|
|
||||||
|
while(ritem->directory_content_data.bin_len > MAX_DIR_SYNC_RESPONSE_DATA_SIZE)
|
||||||
|
{
|
||||||
|
P3FILELISTS_DEBUG() << "Chopping off partial chunk of size " << MAX_DIR_SYNC_RESPONSE_DATA_SIZE << " from item data of size " << ritem->directory_content_data.bin_len << std::endl;
|
||||||
|
|
||||||
|
RsFileListsSyncResponseItem *subitem = new RsFileListsSyncResponseItem() ;
|
||||||
|
|
||||||
|
subitem->entry_hash = ritem->entry_hash;
|
||||||
|
subitem->flags = ritem->flags | RsFileListsItem::FLAGS_SYNC_PARTIAL;
|
||||||
|
subitem->last_known_recurs_modf_TS = ritem->last_known_recurs_modf_TS;
|
||||||
|
subitem->request_id = ritem->request_id;
|
||||||
|
subitem->checksum = ritem->checksum ;
|
||||||
|
|
||||||
|
// copy a subpart of the data
|
||||||
|
subitem->directory_content_data.tlvtype = ritem->directory_content_data.tlvtype ;
|
||||||
|
subitem->directory_content_data.setBinData(ritem->directory_content_data.bin_data, MAX_DIR_SYNC_RESPONSE_DATA_SIZE) ;
|
||||||
|
|
||||||
|
// update ritem to chop off the data that was sent.
|
||||||
|
memmove(ritem->directory_content_data.bin_data, &((unsigned char*)ritem->directory_content_data.bin_data)[MAX_DIR_SYNC_RESPONSE_DATA_SIZE], ritem->directory_content_data.bin_len - MAX_DIR_SYNC_RESPONSE_DATA_SIZE) ;
|
||||||
|
ritem->directory_content_data.bin_len -= MAX_DIR_SYNC_RESPONSE_DATA_SIZE ;
|
||||||
|
|
||||||
|
// send
|
||||||
|
subitem->PeerId(ritem->PeerId()) ;
|
||||||
|
|
||||||
|
sendItem(subitem) ;
|
||||||
|
|
||||||
|
// fix up last chunk
|
||||||
|
if(ritem->directory_content_data.bin_len <= MAX_DIR_SYNC_RESPONSE_DATA_SIZE)
|
||||||
|
ritem->flags |= RsFileListsItem::FLAGS_SYNC_PARTIAL_END ;
|
||||||
|
}
|
||||||
|
|
||||||
sendItem(ritem) ;
|
sendItem(ritem) ;
|
||||||
}
|
}
|
||||||
|
|
||||||
void p3FileDatabase::handleDirSyncResponse(RsFileListsSyncResponseItem *item)
|
// This function should not take memory ownership of ritem, so it makes copies.
|
||||||
|
|
||||||
|
RsFileListsSyncResponseItem *p3FileDatabase::recvAndRebuildItem(RsFileListsSyncResponseItem *ritem)
|
||||||
{
|
{
|
||||||
|
#warning make sure about how robust that is to disconnections, etc.
|
||||||
|
if(!(ritem->flags & RsFileListsItem::FLAGS_SYNC_PARTIAL ))
|
||||||
|
return ritem ;
|
||||||
|
|
||||||
|
// item is a partial item. Look first for a starting entry
|
||||||
|
|
||||||
|
P3FILELISTS_DEBUG() << "Item from peer " << ritem->PeerId() << " is partial. Size = " << ritem->directory_content_data.bin_len << std::endl;
|
||||||
|
|
||||||
|
RS_STACK_MUTEX(mFLSMtx) ;
|
||||||
|
|
||||||
|
bool is_ending = (ritem->flags & RsFileListsItem::FLAGS_SYNC_PARTIAL_END);
|
||||||
|
std::map<DirSyncRequestId,RsFileListsSyncResponseItem*>::iterator it = mPartialResponseItems.find(ritem->request_id) ;
|
||||||
|
|
||||||
|
if(it == mPartialResponseItems.end())
|
||||||
|
{
|
||||||
|
if(is_ending)
|
||||||
|
{
|
||||||
|
P3FILELISTS_ERROR() << "Impossible situation: partial item ended right away. Dropping..." << std::endl;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
P3FILELISTS_DEBUG() << "Creating new item buffer" << std::endl;
|
||||||
|
|
||||||
|
mPartialResponseItems[ritem->request_id] = new RsFileListsSyncResponseItem(*ritem) ;
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
else if(it->second->checksum != ritem->checksum)
|
||||||
|
{
|
||||||
|
P3FILELISTS_ERROR() << "Impossible situation: partial items with different checksums. Dropping..." << std::endl;
|
||||||
|
mPartialResponseItems.erase(it);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// collapse the item at the end of the existing partial item. Dont delete ritem as it will be by the caller.
|
||||||
|
|
||||||
|
it->second->directory_content_data.bin_data = realloc(it->second->directory_content_data.bin_data,it->second->directory_content_data.bin_len + ritem->directory_content_data.bin_len) ;
|
||||||
|
memcpy(it->second->directory_content_data.bin_data,ritem->directory_content_data.bin_data,ritem->directory_content_data.bin_len);
|
||||||
|
|
||||||
|
// if finished, return the item
|
||||||
|
|
||||||
|
if(is_ending)
|
||||||
|
{
|
||||||
|
P3FILELISTS_DEBUG() << "Item is complete. Returning it" << std::endl;
|
||||||
|
|
||||||
|
RsFileListsSyncResponseItem *ret = it->second ;
|
||||||
|
mPartialResponseItems.erase(it) ;
|
||||||
|
|
||||||
|
ret->flags &= ~RsFileListsItem::FLAGS_SYNC_PARTIAL_END ;
|
||||||
|
ret->flags &= ~RsFileListsItem::FLAGS_SYNC_PARTIAL ;
|
||||||
|
|
||||||
|
return ret ;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
return NULL ;
|
||||||
|
}
|
||||||
|
|
||||||
|
void p3FileDatabase::handleDirSyncResponse(RsFileListsSyncResponseItem *sitem)
|
||||||
|
{
|
||||||
|
RsFileListsSyncResponseItem *item = recvAndRebuildItem(sitem) ;
|
||||||
|
|
||||||
|
if(!item)
|
||||||
|
return ;
|
||||||
|
|
||||||
|
// check the hash. If anything goes wrong (in the chunking for instance) the hash will not match
|
||||||
|
|
||||||
|
if(RsDirUtil::sha1sum((uint8_t*)item->directory_content_data.bin_data,item->directory_content_data.bin_len) != item->checksum)
|
||||||
|
{
|
||||||
|
P3FILELISTS_ERROR() << "Checksum error in response item " << std::hex << item->request_id << std::dec << " . This is unexpected, and might be due to connection problems." << std::endl;
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
P3FILELISTS_DEBUG() << "Handling sync response for directory with hash " << item->entry_hash << std::endl;
|
P3FILELISTS_DEBUG() << "Handling sync response for directory with hash " << item->entry_hash << std::endl;
|
||||||
|
|
||||||
EntryIndex entry_index = DirectoryStorage::NO_INDEX;
|
EntryIndex entry_index = DirectoryStorage::NO_INDEX;
|
||||||
|
|
|
@ -173,6 +173,11 @@ class p3FileDatabase: public p3Service, public p3Config, public ftSearch //, pub
|
||||||
|
|
||||||
static DirSyncRequestId makeDirSyncReqId(const RsPeerId& peer_id, const RsFileHash &hash) ;
|
static DirSyncRequestId makeDirSyncReqId(const RsPeerId& peer_id, const RsFileHash &hash) ;
|
||||||
|
|
||||||
|
// utility functions to send items with some maximum size.
|
||||||
|
|
||||||
|
void splitAndSendItem(RsFileListsSyncResponseItem *ritem);
|
||||||
|
RsFileListsSyncResponseItem *recvAndRebuildItem(RsFileListsSyncResponseItem *ritem);
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
* \brief generateAndSendSyncRequest
|
* \brief generateAndSendSyncRequest
|
||||||
* \param rds Remote directory storage for the request
|
* \param rds Remote directory storage for the request
|
||||||
|
@ -221,6 +226,7 @@ class p3FileDatabase: public p3Service, public p3Config, public ftSearch //, pub
|
||||||
|
|
||||||
time_t mLastRemoteDirSweepTS ; // TS for friend list update
|
time_t mLastRemoteDirSweepTS ; // TS for friend list update
|
||||||
std::map<DirSyncRequestId,DirSyncRequestData> mPendingSyncRequests ; // pending requests, waiting for an answer
|
std::map<DirSyncRequestId,DirSyncRequestData> mPendingSyncRequests ; // pending requests, waiting for an answer
|
||||||
|
std::map<DirSyncRequestId,RsFileListsSyncResponseItem *> mPartialResponseItems;
|
||||||
|
|
||||||
void locked_recursSweepRemoteDirectory(RemoteDirectoryStorage *rds, DirectoryStorage::EntryIndex e, int depth);
|
void locked_recursSweepRemoteDirectory(RemoteDirectoryStorage *rds, DirectoryStorage::EntryIndex e, int depth);
|
||||||
|
|
||||||
|
|
|
@ -152,6 +152,7 @@ bool RsFileListsSyncResponseItem::serialise(void *data, uint32_t& size) const
|
||||||
/* RsFileListsSyncMsgItem */
|
/* RsFileListsSyncMsgItem */
|
||||||
|
|
||||||
ok &= entry_hash.serialise(data, size, offset);
|
ok &= entry_hash.serialise(data, size, offset);
|
||||||
|
ok &= checksum.serialise(data, size, offset);
|
||||||
ok &= setRawUInt32(data, size, &offset, flags );
|
ok &= setRawUInt32(data, size, &offset, flags );
|
||||||
ok &= setRawUInt32(data, size, &offset, last_known_recurs_modf_TS);
|
ok &= setRawUInt32(data, size, &offset, last_known_recurs_modf_TS);
|
||||||
ok &= setRawUInt64(data, size, &offset, request_id);
|
ok &= setRawUInt64(data, size, &offset, request_id);
|
||||||
|
@ -231,6 +232,7 @@ RsFileListsSyncResponseItem* RsFileListsSerialiser::deserialFileListsSyncRespons
|
||||||
uint64_t request_id; // use to determine if changes that have occured since last hash
|
uint64_t request_id; // use to determine if changes that have occured since last hash
|
||||||
|
|
||||||
ok &= item->entry_hash.deserialise(data, *size, offset);
|
ok &= item->entry_hash.deserialise(data, *size, offset);
|
||||||
|
ok &= item->checksum.deserialise(data, *size, offset);
|
||||||
ok &= getRawUInt32(data, *size, &offset, &item->flags);
|
ok &= getRawUInt32(data, *size, &offset, &item->flags);
|
||||||
ok &= getRawUInt32(data, *size, &offset, &item->last_known_recurs_modf_TS);
|
ok &= getRawUInt32(data, *size, &offset, &item->last_known_recurs_modf_TS);
|
||||||
ok &= getRawUInt64(data, *size, &offset, &item->request_id);
|
ok &= getRawUInt64(data, *size, &offset, &item->request_id);
|
||||||
|
@ -312,6 +314,7 @@ uint32_t RsFileListsSyncResponseItem::serial_size()const
|
||||||
uint32_t s = 8; //header size
|
uint32_t s = 8; //header size
|
||||||
|
|
||||||
s += RsFileHash::serial_size(); // entry hash
|
s += RsFileHash::serial_size(); // entry hash
|
||||||
|
s += RsFileHash::serial_size(); // checksum
|
||||||
s += 4; // flags
|
s += 4; // flags
|
||||||
s += 4; // last_known_recurs_modf_TS
|
s += 4; // last_known_recurs_modf_TS
|
||||||
s += 8; // request_id
|
s += 8; // request_id
|
||||||
|
@ -348,6 +351,7 @@ std::ostream& RsFileListsSyncResponseItem::print(std::ostream &out, uint16_t ind
|
||||||
uint16_t int_Indent = indent + 2;
|
uint16_t int_Indent = indent + 2;
|
||||||
|
|
||||||
printIndent(out , int_Indent); out << "Entry hash: " << entry_hash << std::endl;
|
printIndent(out , int_Indent); out << "Entry hash: " << entry_hash << std::endl;
|
||||||
|
printIndent(out , int_Indent); out << "Checksum : " << checksum << std::endl;
|
||||||
printIndent(out , int_Indent); out << "Flags: " << (uint32_t) flags << std::endl;
|
printIndent(out , int_Indent); out << "Flags: " << (uint32_t) flags << std::endl;
|
||||||
printIndent(out , int_Indent); out << "Last modf TS: " << last_known_recurs_modf_TS << std::endl;
|
printIndent(out , int_Indent); out << "Last modf TS: " << last_known_recurs_modf_TS << std::endl;
|
||||||
printIndent(out , int_Indent); out << "request id: " << std::hex << request_id << std::dec << std::endl;
|
printIndent(out , int_Indent); out << "request id: " << std::hex << request_id << std::dec << std::endl;
|
||||||
|
|
|
@ -66,6 +66,8 @@ public:
|
||||||
static const uint32_t FLAGS_SYNC_DIR_CONTENT = 0x0004 ;
|
static const uint32_t FLAGS_SYNC_DIR_CONTENT = 0x0004 ;
|
||||||
static const uint32_t FLAGS_ENTRY_UP_TO_DATE = 0x0008 ;
|
static const uint32_t FLAGS_ENTRY_UP_TO_DATE = 0x0008 ;
|
||||||
static const uint32_t FLAGS_ENTRY_WAS_REMOVED = 0x0010 ;
|
static const uint32_t FLAGS_ENTRY_WAS_REMOVED = 0x0010 ;
|
||||||
|
static const uint32_t FLAGS_SYNC_PARTIAL = 0x0020 ;
|
||||||
|
static const uint32_t FLAGS_SYNC_PARTIAL_END = 0x0040 ;
|
||||||
};
|
};
|
||||||
|
|
||||||
/*!
|
/*!
|
||||||
|
@ -102,6 +104,7 @@ public:
|
||||||
virtual uint32_t serial_size() const ;
|
virtual uint32_t serial_size() const ;
|
||||||
|
|
||||||
RsFileHash entry_hash ; // hash of the directory to sync
|
RsFileHash entry_hash ; // hash of the directory to sync
|
||||||
|
RsFileHash checksum ; // checksum of the bindary data, for checking
|
||||||
uint32_t flags; // is it a partial/final item (used for large items only)
|
uint32_t flags; // is it a partial/final item (used for large items only)
|
||||||
uint32_t last_known_recurs_modf_TS; // time of last modification, computed over all files+directories below.
|
uint32_t last_known_recurs_modf_TS; // time of last modification, computed over all files+directories below.
|
||||||
uint64_t request_id; // use to determine if changes that have occured since last hash
|
uint64_t request_id; // use to determine if changes that have occured since last hash
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue