Updated gxs net test

tested fragmentation with 1 item coalesion
created groups now have admin/subscribed/publish flag (no publish optimisation yet)
was incorrectly apply group_subscribe_mask to group subscribed, using correct mask now. Added comments clarifying purpose of subscribe_mask
Implemented fragmentation but placed under preprocessor flags for phase 2. 



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@6234 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
chrisparker126 2013-03-16 16:44:33 +00:00
parent 206b230f33
commit ea2788b2d2
5 changed files with 94 additions and 31 deletions

View file

@ -905,9 +905,11 @@ void RsGenExchange::groupsChanged(std::list<RsGxsGroupId>& grpIds)
bool RsGenExchange::subscribeToGroup(uint32_t& token, const RsGxsGroupId& grpId, bool subscribe)
{
if(subscribe)
setGroupSubscribeFlags(token, grpId, GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED, GXS_SERV::GROUP_SUBSCRIBE_MASK);
setGroupSubscribeFlags(token, grpId, GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED,
(GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED | GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED));
else
setGroupSubscribeFlags(token, grpId, 0, GXS_SERV::GROUP_SUBSCRIBE_MASK);
setGroupSubscribeFlags(token, grpId, GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED,
(GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED | GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED));
return true;
}
@ -1502,7 +1504,7 @@ bool RsGenExchange::processGrpMask(const RsGxsGroupId& grpId, ContentValue &grpC
ok &= grpCv.getAsInt32(key+GXS_MASK, mask);
// remove mask entry so it doesn't affect
// remove mask entry so it doesn't affect actual entry
grpCv.removeKeyValue(key+GXS_MASK);
// apply mask to current value
@ -1798,7 +1800,10 @@ void RsGenExchange::publishGrps()
grp->metaData = new RsGxsGrpMetaData();
grpItem->meta.mPublishTs = time(NULL);
*(grp->metaData) = grpItem->meta;
grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_ADMIN;
// TODO: change when publish key optimisation added (public groups don't have publish key
grp->metaData->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_ADMIN | GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED
| GXS_SERV::GROUP_SUBSCRIBE_PUBLISH;
create = createGroup(grp, privatekeySet, publicKeySet);
@ -2164,6 +2169,7 @@ void RsGenExchange::processRecvdGroups()
if(ret == VALIDATE_SUCCESS)
{
meta->mGroupStatus = GXS_SERV::GXS_GRP_STATUS_UNPROCESSED | GXS_SERV::GXS_GRP_STATUS_UNREAD;
meta->mSubscribeFlags = GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED;
grps.insert(std::make_pair(grp, meta));
grpIds.push_back(grp->grpId);

View file

@ -30,9 +30,8 @@
#include "rsgxsnetservice.h"
#include "retroshare/rsgxsflags.h"
/**
* #define NXS_NET_DEBUG 1
**/
#define NXS_NET_DEBUG 1
#define SYNC_PERIOD 12 // in microseconds every 10 seconds (1 second for testing)
#define TRANSAC_TIMEOUT 5 // 5 seconds
@ -150,11 +149,14 @@ bool RsGxsNetService::fragmentMsg(RsNxsMsg& msg, MsgFragments& msgFragments) con
msgFrag->grpId = msg.grpId;
msgFrag->msgId = msg.msgId;
msgFrag->meta = msg.meta;
msgFrag->transactionNumber = msg.transactionNumber;
msgFrag->pos = i;
msgFrag->PeerId(msg.PeerId());
msgFrag->count = nFragments;
uint32_t fragSize = std::min(dataLeft, FRAGMENT_SIZE);
memcpy(buffer, ((char*)msg.msg.bin_data) + currPos, fragSize);
msgFrag->msg.setBinData(buffer, fragSize);
currPos += fragSize;
dataLeft -= fragSize;
@ -184,6 +186,7 @@ bool RsGxsNetService::fragmentGrp(RsNxsGrp& grp, GrpFragments& grpFragments) con
uint32_t fragSize = std::min(dataLeft, FRAGMENT_SIZE);
memcpy(buffer, ((char*)grp.grp.bin_data) + currPos, fragSize);
grpFrag->grp.setBinData(buffer, fragSize);
currPos += fragSize;
dataLeft -= fragSize;
@ -197,6 +200,17 @@ RsNxsMsg* RsGxsNetService::deFragmentMsg(MsgFragments& msgFragments) const
{
if(msgFragments.empty()) return NULL;
// if there is only one fragment with a count 1 or less then
// the fragment is the msg
if(msgFragments.size() == 1)
{
RsNxsMsg* m = msgFragments.front();
if(m->count > 1)
return NULL;
else
return m;
}
// first determine total size for binary data
MsgFragments::iterator mit = msgFragments.begin();
uint32_t datSize = 0;
@ -338,12 +352,12 @@ void RsGxsNetService::collateMsgFragments(MsgFragments fragments, std::map<RsGxs
MsgFragCollate(msgId));
// something will always be found for a group id
for(vit = fragments.begin(); vit != bound; )
for(vit = fragments.begin(); vit != bound; vit++ )
{
partFragments[msgId].push_back(*vit);
vit = fragments.erase(vit);
}
fragments.erase(fragments.begin(), bound);
MsgFragments& f = partFragments[msgId];
RsNxsMsg* msg = *(f.begin());
@ -363,6 +377,7 @@ void RsGxsNetService::collateMsgFragments(MsgFragments fragments, std::map<RsGxs
fragments.clear();
}
bool RsGxsNetService::loadList(std::list<RsItem*>& load)
{
return false;
@ -897,6 +912,22 @@ void RsGxsNetService::locked_processCompletedIncomingTrans(NxsTransaction* tr)
}
}
#ifdef NSXS_FRAG
std::map<RsGxsGroupId, MsgFragments > collatedMsgs;
collateMsgFragments(msgs, collatedMsgs);
msgs.clear();
std::map<RsGxsGroupId, MsgFragments >::iterator mit = collatedMsgs.begin();
for(; mit != collatedMsgs.end(); mit++)
{
MsgFragments& f = mit->second;
RsNxsMsg* msg = deFragmentMsg(f);
if(msg)
msgs.push_back(msg);
}
#endif
// notify listener of msgs
mObserver->notifyNewMessages(msgs);
@ -1276,8 +1307,22 @@ void RsGxsNetService::locked_genSendMsgsTransaction(NxsTransaction* tr)
RsNxsMsg* msg = *vit;
msg->PeerId(peerId);
msg->transactionNumber = transN;
#ifndef NXS_FRAG
newTr->mItems.push_back(msg);
msgSize++;
#else
MsgFragments fragments;
fragmentMsg(*msg, fragments);
MsgFragments::iterator mit = fragments.begin();
for(; mit != fragments.end(); mit++)
{
newTr->mItems.push_back(*mit);
msgSize++;
}
#endif
}
}