fixed a few bugs in existing msg fragmentation code. Signature checking still not working, not NXS_FRAG stays disabled

This commit is contained in:
csoler 2016-01-18 23:53:06 -05:00
parent 2b52456409
commit 68a039540e
2 changed files with 56 additions and 14 deletions

View File

@ -207,8 +207,8 @@
NXS_NET_DEBUG_4 vetting NXS_NET_DEBUG_4 vetting
NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out) NXS_NET_DEBUG_5 summary of transactions (useful to just know what comes in/out)
***/ ***/
//#define NXS_NET_DEBUG_0 1 #define NXS_NET_DEBUG_0 1
//#define NXS_NET_DEBUG_1 1 #define NXS_NET_DEBUG_1 1
//#define NXS_NET_DEBUG_2 1 //#define NXS_NET_DEBUG_2 1
//#define NXS_NET_DEBUG_3 1 //#define NXS_NET_DEBUG_3 1
//#define NXS_NET_DEBUG_4 1 //#define NXS_NET_DEBUG_4 1
@ -216,6 +216,7 @@
//#define NXS_NET_DEBUG_6 1 //#define NXS_NET_DEBUG_6 1
#define GIXS_CUT_OFF 0 #define GIXS_CUT_OFF 0
//#define NXS_FRAG
// The constant below have a direct influence on how fast forums/channels/posted/identity groups propagate and on the overloading of queues: // The constant below have a direct influence on how fast forums/channels/posted/identity groups propagate and on the overloading of queues:
// //
@ -238,8 +239,8 @@
#if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6) #if defined(NXS_NET_DEBUG_0) || defined(NXS_NET_DEBUG_1) || defined(NXS_NET_DEBUG_2) || defined(NXS_NET_DEBUG_3) || defined(NXS_NET_DEBUG_4) || defined(NXS_NET_DEBUG_5) || defined(NXS_NET_DEBUG_6)
static const RsPeerId peer_to_print = RsPeerId(std::string("")) ; static const RsPeerId peer_to_print = RsPeerId(std::string("")) ;
static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("" )) ; // use this to allow to this group id only, or "" for all IDs static const RsGxsGroupId group_id_to_print = RsGxsGroupId(std::string("93ea28698ce9f693d014cc55a0dfb716" )) ; // use this to allow to this group id only, or "" for all IDs
static const uint32_t service_to_print = 0 ; // use this to allow to this service id only, or 0 for all services static const uint32_t service_to_print = 0x215 ; // use this to allow to this service id only, or 0 for all services
// warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums) // warning. Numbers should be SERVICE IDS (see serialiser/rsserviceids.h. E.g. 0x0215 for forums)
class nullstream: public std::ostream {}; class nullstream: public std::ostream {};
@ -898,7 +899,8 @@ bool RsGxsNetService::fragmentMsg(RsNxsMsg& msg, MsgFragments& msgFragments) con
uint32_t msgSize = msg.msg.TlvSize(); uint32_t msgSize = msg.msg.TlvSize();
uint32_t dataLeft = msgSize; uint32_t dataLeft = msgSize;
uint8_t nFragments = ceil(float(msgSize)/FRAGMENT_SIZE); uint8_t nFragments = ceil(float(msgSize)/FRAGMENT_SIZE);
char buffer[FRAGMENT_SIZE];
RsTemporaryMemory buffer(FRAGMENT_SIZE);
int currPos = 0; int currPos = 0;
@ -964,10 +966,25 @@ RsNxsMsg* RsGxsNetService::deFragmentMsg(MsgFragments& msgFragments) const
if(msgFragments.size() == 1) if(msgFragments.size() == 1)
{ {
RsNxsMsg* m = msgFragments.front(); RsNxsMsg* m = msgFragments.front();
if(m->count > 1)
if(m->count > 1) // normally mcount should be exactly 1, but if not initialised (old versions) it's going to be 0
{
// delete everything
std::cerr << "(WW) Cannot deFragment message set. m->count=" << m->count << ", but msgFragments.size()=" << msgFragments.size() << ". Incomplete? Dropping all." << std::endl;
for(uint32_t i=0;i<msgFragments.size();++i)
delete msgFragments[i] ;
msgFragments.clear();
return NULL; return NULL;
}
else else
{
// single piece. No need to say anything. Just return it.
msgFragments.clear();
return m; return m;
}
} }
// first determine total size for binary data // first determine total size for binary data
@ -977,9 +994,21 @@ RsNxsMsg* RsGxsNetService::deFragmentMsg(MsgFragments& msgFragments) const
for(; mit != msgFragments.end(); ++mit) for(; mit != msgFragments.end(); ++mit)
datSize += (*mit)->msg.bin_len; datSize += (*mit)->msg.bin_len;
char* data = new char[datSize]; RsTemporaryMemory data(datSize) ;
if(!data)
{
for(uint32_t i=0;i<msgFragments.size();++i)
delete msgFragments[i] ;
msgFragments.clear();
return NULL ;
}
uint32_t currPos = 0; uint32_t currPos = 0;
std::cerr << "(II) deFragmenting long message of size " << datSize << ", from " << msgFragments.size() << " pieces." << std::endl;
for(mit = msgFragments.begin(); mit != msgFragments.end(); ++mit) for(mit = msgFragments.begin(); mit != msgFragments.end(); ++mit)
{ {
RsNxsMsg* msg = *mit; RsNxsMsg* msg = *mit;
@ -995,10 +1024,17 @@ RsNxsMsg* RsGxsNetService::deFragmentMsg(MsgFragments& msgFragments) const
msg->transactionNumber = m.transactionNumber; msg->transactionNumber = m.transactionNumber;
msg->meta = m.meta; msg->meta = m.meta;
delete[] data; // now clean!
for(uint32_t i=0;i<msgFragments.size();++i)
delete msgFragments[i] ;
msgFragments.clear();
return msg; return msg;
} }
// This is unused apparently, since groups are never large. Anyway, we keep it in case we need it.
RsNxsGrp* RsGxsNetService::deFragmentGrp(GrpFragments& grpFragments) const RsNxsGrp* RsGxsNetService::deFragmentGrp(GrpFragments& grpFragments) const
{ {
if(grpFragments.empty()) return NULL; if(grpFragments.empty()) return NULL;
@ -1257,7 +1293,7 @@ struct MsgFragCollate
bool operator()(RsNxsMsg* msg) { return msg->msgId == mMsgId;} bool operator()(RsNxsMsg* msg) { return msg->msgId == mMsgId;}
}; };
void RsGxsNetService::collateMsgFragments(MsgFragments fragments, std::map<RsGxsMessageId, MsgFragments>& partFragments) const void RsGxsNetService::collateMsgFragments(MsgFragments& fragments, std::map<RsGxsMessageId, MsgFragments>& partFragments) const
{ {
// get all unique message Ids; // get all unique message Ids;
MsgFragments::iterator vit = fragments.begin(); MsgFragments::iterator vit = fragments.begin();
@ -2289,13 +2325,13 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg" << std::endl; std::cerr << "RsGxsNetService::processCompletedTransactions(): item did not caste to msg" << std::endl;
} }
#ifdef NSXS_FRAG #ifdef NXS_FRAG
std::map<RsGxsGroupId, MsgFragments > collatedMsgs; std::map<RsGxsMessageId, MsgFragments > collatedMsgs;
collateMsgFragments(msgs, collatedMsgs); collateMsgFragments(msgs, collatedMsgs); // this destroys msgs whatsoever and recovers memory when needed
msgs.clear(); msgs.clear();
std::map<RsGxsGroupId, MsgFragments >::iterator mit = collatedMsgs.begin(); std::map<RsGxsMessageId, MsgFragments >::iterator mit = collatedMsgs.begin();
for(; mit != collatedMsgs.end(); ++mit) for(; mit != collatedMsgs.end(); ++mit)
{ {
MsgFragments& f = mit->second; MsgFragments& f = mit->second;
@ -2304,6 +2340,7 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
if(msg) if(msg)
msgs.push_back(msg); msgs.push_back(msg);
} }
collatedMsgs.clear();
#endif #endif
#ifdef NXS_NET_DEBUG_0 #ifdef NXS_NET_DEBUG_0
GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grpId) << " ...and notifying observer of " << msgs.size() << " new messages." << std::endl; GXSNETDEBUG_PG(tr->mTransaction->PeerId(),grpId) << " ...and notifying observer of " << msgs.size() << " new messages." << std::endl;
@ -3176,12 +3213,17 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
msg->transactionNumber = transN; msg->transactionNumber = transN;
#ifndef NXS_FRAG #ifndef NXS_FRAG
msg->count = 1;
msg->pos = 0;
newTr->mItems.push_back(msg); newTr->mItems.push_back(msg);
msgSize++; msgSize++;
#else #else
MsgFragments fragments; MsgFragments fragments;
fragmentMsg(*msg, fragments); fragmentMsg(*msg, fragments);
delete msg ;
MsgFragments::iterator mit = fragments.begin(); MsgFragments::iterator mit = fragments.begin();
for(; mit != fragments.end(); ++mit) for(; mit != fragments.end(); ++mit)

View File

@ -435,7 +435,7 @@ private:
* @param fragments message fragments which are not necessarily from the same message * @param fragments message fragments which are not necessarily from the same message
* @param partFragments the partitioned fragments (into message ids) * @param partFragments the partitioned fragments (into message ids)
*/ */
void collateMsgFragments(MsgFragments fragments, std::map<RsGxsMessageId, MsgFragments>& partFragments) const; void collateMsgFragments(MsgFragments &fragments, std::map<RsGxsMessageId, MsgFragments>& partFragments) const;
/*! /*!
* Note that if all fragments for a group are not found then its fragments are dropped * Note that if all fragments for a group are not found then its fragments are dropped