[TheWire] update GroupPtrs

Ensure all possible GroupPtrs are filled in on Pulse Data Requests.
 * Expand Id collection to include all the additonal RefGroupIds.
 * Perform intersection(available IDs, pulse GroupIds) before retrieving
 * Iterate over pulse tree and update GroupPtr references.
 * Enable GROUP_IDS gxs data fetches.
This commit is contained in:
drbob 2020-08-07 18:12:06 +10:00
parent c64fb331f7
commit 9a90ef694e
4 changed files with 139 additions and 81 deletions

View File

@ -50,12 +50,13 @@ enum class TokenRequestType: uint8_t
__NONE = 0x00, /// Used to detect uninitialized
GROUP_DATA = 0x01,
GROUP_META = 0x02,
POSTS = 0x03,
ALL_POSTS = 0x04,
MSG_RELATED_INFO = 0x05,
GROUP_STATISTICS = 0x06,
SERVICE_STATISTICS = 0x07,
NO_KILL_TYPE = 0x08,
GROUP_IDS = 0x03,
POSTS = 0x04,
ALL_POSTS = 0x05,
MSG_RELATED_INFO = 0x06,
GROUP_STATISTICS = 0x07,
SERVICE_STATISTICS = 0x08,
NO_KILL_TYPE = 0x09,
__MAX /// Used to detect out of range
};
@ -263,6 +264,7 @@ public:
{
case GXS_REQUEST_TYPE_GROUP_DATA: token_request_type = TokenRequestType::GROUP_DATA; break;
case GXS_REQUEST_TYPE_GROUP_META: token_request_type = TokenRequestType::GROUP_META; break;
case GXS_REQUEST_TYPE_GROUP_IDS: token_request_type = TokenRequestType::GROUP_IDS; break;
default:
RsErr() << __PRETTY_FUNCTION__ << "(EE) Unexpected request type " << opts.mReqType << "!!" << std::endl;
return false;
@ -292,6 +294,7 @@ public:
{
case GXS_REQUEST_TYPE_GROUP_DATA: token_request_type = TokenRequestType::GROUP_DATA; break;
case GXS_REQUEST_TYPE_GROUP_META: token_request_type = TokenRequestType::GROUP_META; break;
case GXS_REQUEST_TYPE_GROUP_IDS: token_request_type = TokenRequestType::GROUP_IDS; break;
default:
RsErr() << __PRETTY_FUNCTION__ << "(EE) Unexpected request type " << opts.mReqType << "!!" << std::endl;
return false;

View File

@ -176,8 +176,8 @@ class RsWirePulse
// Pointer to WireGroups
// mRefGroupPtr is opportunistically filled in, but will often be empty.
RsWireGroupSPtr mRefGroupPtr; // ORIG/RESP: N/A , REF: Reply Group
RsWireGroupSPtr mGroupPtr; // ORIG/RESP: Own Group, REF: Parent Group
RsWireGroupSPtr mRefGroupPtr; // ORIG: N/A, RESP: Parent, REF: Reply Group
RsWireGroupSPtr mGroupPtr; // ORIG: Own, RESP: Own, REF: Parent Group
// These are the direct children of this message
// split into likes, replies and retweets.

View File

@ -809,7 +809,7 @@ bool p3Wire::createLikePulse(uint32_t &token, RsWirePulse &pulse)
// WireGroup Details.
bool p3Wire::getWireGroup(const RsGxsGroupId &groupId, RsWireGroupSPtr &grp)
{
std::set<RsGxsGroupId> groupIds = { groupId };
std::list<RsGxsGroupId> groupIds = { groupId };
std::map<RsGxsGroupId, RsWireGroupSPtr> groups;
if (!fetchGroupPtrs(groupIds, groups))
{
@ -885,7 +885,6 @@ bool p3Wire::getPulsesForGroups(const std::list<RsGxsGroupId> &groupIds, std::li
return false;
}
std::cerr << "p3Wire::getPulsesForGroups() size = " << pulsePtrs.size();
std::cerr << std::endl;
{
@ -911,11 +910,6 @@ bool p3Wire::getPulsesForGroups(const std::list<RsGxsGroupId> &groupIds, std::li
pulsePtrs.resize(N);
}
// set to collect groupIds...
// this is only important if updatePulse Level > 1.
// but this is more general.
std::set<RsGxsGroupId> allGroupIds;
// for each fill in details.
std::list<RsWirePulseSPtr>::iterator it;
for (it = pulsePtrs.begin(); it != pulsePtrs.end(); it++)
@ -926,33 +920,14 @@ bool p3Wire::getPulsesForGroups(const std::list<RsGxsGroupId> &groupIds, std::li
std::cerr << std::endl;
return false;
}
if (!extractGroupIds(*it, allGroupIds))
{
std::cerr << "p3Wire::getPulsesForGroups() failed to extractGroupIds";
std::cerr << std::endl;
return false;
}
}
// fetch GroupPtrs for allGroupIds.
std::map<RsGxsGroupId, RsWireGroupSPtr> groups;
if (!fetchGroupPtrs(allGroupIds, groups))
{
std::cerr << "p3Wire::getPulsesForGroups() failed to fetchGroupPtrs";
std::cerr << std::endl;
return false;
}
// update GroupPtrs for all pulsePtrs.
for (it = pulsePtrs.begin(); it != pulsePtrs.end(); it++)
if (!updateGroups(pulsePtrs))
{
if (!updateGroupPtrs(*it, groups))
{
std::cerr << "p3Wire::getPulsesForGroups() failed to updateGroupPtrs";
std::cerr << std::endl;
return false;
}
std::cerr << "p3Wire::getPulsesForGroups() failed to updateGroups";
std::cerr << std::endl;
return false;
}
return true;
@ -980,32 +955,13 @@ bool p3Wire::getPulseFocus(const RsGxsGroupId &groupId, const RsGxsMessageId &ms
return false;
}
/* final stage is to fetch associated groups and reference them from pulses
* this could be done as part of updates, but probably more efficient to do once
* -- Future improvement.
* -- Fetch RefGroups as well, these are not necessarily available,
* so need to add dataRequest FlAG to return okay even if not all groups there.
*/
/* Fill in GroupPtrs */
std::list<RsWirePulseSPtr> pulsePtrs;
pulsePtrs.push_back(pPulse);
std::set<RsGxsGroupId> groupIds;
if (!extractGroupIds(pPulse, groupIds))
if (!updateGroups(pulsePtrs))
{
std::cerr << "p3Wire::getPulseFocus() failed to extractGroupIds";
std::cerr << std::endl;
return false;
}
std::map<RsGxsGroupId, RsWireGroupSPtr> groups;
if (!fetchGroupPtrs(groupIds, groups))
{
std::cerr << "p3Wire::getPulseFocus() failed to fetchGroupPtrs";
std::cerr << std::endl;
return false;
}
if (!updateGroupPtrs(pPulse, groups))
{
std::cerr << "p3Wire::getPulseFocus() failed to updateGroupPtrs";
std::cerr << "p3Wire::getPulseFocus() failed to updateGroups";
std::cerr << std::endl;
return false;
}
@ -1171,6 +1127,58 @@ bool p3Wire::updatePulseChildren(RsWirePulseSPtr pParent, uint32_t token)
}
}
/* High-level utility function to update mGroupPtr / mRefGroupPtr links.
* fetches associated groups and reference them from pulses
*
* extractGroupIds (owner + refs).
* fetch all available GroupIDs. (just IDs - so light).
* do intersection of IDs.
* apply IDs.
*/
bool p3Wire::updateGroups(std::list<RsWirePulseSPtr> &pulsePtrs)
{
std::set<RsGxsGroupId> pulseGroupIds;
std::list<RsWirePulseSPtr>::iterator it;
for (it = pulsePtrs.begin(); it != pulsePtrs.end(); it++)
{
if (!extractGroupIds(*it, pulseGroupIds))
{
std::cerr << "p3Wire::updateGroups() failed to extractGroupIds";
std::cerr << std::endl;
return false;
}
}
std::list<RsGxsGroupId> availGroupIds;
if (!trimToAvailGroupIds(pulseGroupIds, availGroupIds))
{
std::cerr << "p3Wire::updateGroups() failed to trimToAvailGroupIds";
std::cerr << std::endl;
return false;
}
std::map<RsGxsGroupId, RsWireGroupSPtr> groups;
if (!fetchGroupPtrs(availGroupIds, groups))
{
std::cerr << "p3Wire::updateGroups() failed to fetchGroupPtrs";
std::cerr << std::endl;
return false;
}
for (it = pulsePtrs.begin(); it != pulsePtrs.end(); it++)
{
if (!updateGroupPtrs(*it, groups))
{
std::cerr << "p3Wire::updateGroups() failed to updateGroupPtrs";
std::cerr << std::endl;
return false;
}
}
return true;
}
// this function doesn't depend on p3Wire, could make static.
bool p3Wire::extractGroupIds(RsWirePulseConstSPtr pPulse, std::set<RsGxsGroupId> &groupIds)
@ -1184,16 +1192,22 @@ bool p3Wire::extractGroupIds(RsWirePulseConstSPtr pPulse, std::set<RsGxsGroupId>
return false;
}
// install own groupId.
groupIds.insert(pPulse->mMeta.mGroupId);
/* do this recursively */
if (pPulse->mPulseType & WIRE_PULSE_TYPE_REFERENCE) {
// REPLY: mRefGroupId, PARENT was in mMeta.mGroupId.
groupIds.insert(pPulse->mRefGroupId);
/* skipping */
std::cerr << "p3Wire::extractGroupIds() skipping ref type";
std::cerr << std::endl;
return true;
}
// install own groupId.
groupIds.insert(pPulse->mMeta.mGroupId);
if (pPulse->mPulseType & WIRE_PULSE_TYPE_RESPONSE) {
// REPLY: meta.mGroupId, PARENT: mRefGroupId
groupIds.insert(pPulse->mRefGroupId);
}
/* iterate through children, recursively */
std::list<RsWirePulseSPtr>::const_iterator it;
@ -1218,7 +1232,6 @@ bool p3Wire::extractGroupIds(RsWirePulseConstSPtr pPulse, std::set<RsGxsGroupId>
}
// not bothering with LIKEs at the moment. TODO.
return true;
}
@ -1233,13 +1246,9 @@ bool p3Wire::updateGroupPtrs(RsWirePulseSPtr pPulse, const std::map<RsGxsGroupId
pPulse->mGroupPtr = git->second;
/* if Refs, GroupId refers to parent, so GroupPtr is parent's group
* It should already be in groups lists - if its not... */
// if REF, fill in mRefGroupPtr based on mRefGroupId.
if (pPulse->mPulseType & WIRE_PULSE_TYPE_REFERENCE) {
// if REF is in list, fill in (unlikely but try anyway)
// unlikely, as we are not adding RefGroupId, as can potentially fail to look up.
// need additional flag OKAY_IF_NONEXISTENT or similar.
// no error if its not there.
// if RefGroupId is in list, fill in. No error if its not there.
std::map<RsGxsGroupId, RsWireGroupSPtr>::const_iterator rgit;
rgit = groups.find(pPulse->mRefGroupId);
if (rgit != groups.end()) {
@ -1250,6 +1259,17 @@ bool p3Wire::updateGroupPtrs(RsWirePulseSPtr pPulse, const std::map<RsGxsGroupId
return true;
}
// if Response, fill in mRefGroupPtr based on mRefGroupId.
if (pPulse->mPulseType & WIRE_PULSE_TYPE_RESPONSE) {
// if RefGroupId is in list, fill in. No error if its not there.
std::map<RsGxsGroupId, RsWireGroupSPtr>::const_iterator rgit;
rgit = groups.find(pPulse->mRefGroupId);
if (rgit != groups.end()) {
pPulse->mRefGroupPtr = rgit->second;
}
// do children as well.
}
/* recursively apply to children */
std::list<RsWirePulseSPtr>::iterator it;
for (it = pPulse->mReplies.begin(); it != pPulse->mReplies.end(); it++)
@ -1276,18 +1296,51 @@ bool p3Wire::updateGroupPtrs(RsWirePulseSPtr pPulse, const std::map<RsGxsGroupId
return true;
}
bool p3Wire::fetchGroupPtrs(const std::set<RsGxsGroupId> &groupIds,
bool p3Wire::trimToAvailGroupIds(const std::set<RsGxsGroupId> &pulseGroupIds,
std::list<RsGxsGroupId> &availGroupIds)
{
/* request all groupIds */
std::cerr << "p3Wire::trimToAvailGroupIds()";
std::cerr << std::endl;
uint32_t token;
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_GROUP_IDS;
if (!requestGroupInfo(token, opts) || waitToken(token) != RsTokenService::COMPLETE )
{
std::cerr << "p3Wire::trimToAvailGroupIds() failed to fetch groups";
std::cerr << std::endl;
return false;
}
std::list<RsGxsGroupId> localGroupIds;
if (!RsGenExchange::getGroupList(token, localGroupIds))
{
std::cerr << "p3Wire::trimToAvailGroupIds() failed to get GroupIds";
std::cerr << std::endl;
return false;
}
/* do intersection between result ^ pulseGroups -> availGroupIds */
std::set_intersection(localGroupIds.begin(), localGroupIds.end(),
pulseGroupIds.begin(), pulseGroupIds.end(),
std::back_inserter(availGroupIds));
return true;
}
bool p3Wire::fetchGroupPtrs(const std::list<RsGxsGroupId> &groupIds,
std::map<RsGxsGroupId, RsWireGroupSPtr> &groups)
{
std::cerr << "p3Wire::fetchGroupPtrs()";
std::cerr << std::endl;
std::list<RsGxsGroupId> groupIdList(groupIds.begin(), groupIds.end());
uint32_t token;
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_GROUP_DATA;
if (!requestGroupInfo(token, opts, groupIdList) || waitToken(token) != RsTokenService::COMPLETE )
if (!requestGroupInfo(token, opts, groupIds) || waitToken(token) != RsTokenService::COMPLETE )
{
std::cerr << "p3Wire::fetchGroupPtrs() failed to fetch groups";
std::cerr << std::endl;
@ -1296,4 +1349,3 @@ bool p3Wire::fetchGroupPtrs(const std::set<RsGxsGroupId> &groupIds,
return getGroupPtrData(token, groups);
}

View File

@ -95,18 +95,21 @@ private:
bool updatePulseChildren(RsWirePulseSPtr pParent, uint32_t token);
// update GroupPtrs
bool updateGroups(std::list<RsWirePulseSPtr> &pulsePtrs);
// sub utility functions used by updateGroups.
bool extractGroupIds(RsWirePulseConstSPtr pPulse, std::set<RsGxsGroupId> &groupIds);
bool updateGroupPtrs(RsWirePulseSPtr pPulse,
const std::map<RsGxsGroupId, RsWireGroupSPtr> &groups);
bool fetchGroupPtrs(const std::set<RsGxsGroupId> &groupIds,
bool trimToAvailGroupIds(const std::set<RsGxsGroupId> &pulseGroupIds,
std::list<RsGxsGroupId> &availGroupIds);
bool fetchGroupPtrs(const std::list<RsGxsGroupId> &groupIds,
std::map<RsGxsGroupId, RsWireGroupSPtr> &groups);
virtual void generateDummyData();
std::string genRandomId();