/******************************************************************************* * libretroshare/src/services: p3wire.cc * * * * libretroshare: retroshare core library * * * * Copyright 2012-2020 by Robert Fernie * * * * This program is free software: you can redistribute it and/or modify * * it under the terms of the GNU Lesser General Public License as * * published by the Free Software Foundation, either version 3 of the * * License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU Lesser General Public License for more details. * * * * You should have received a copy of the GNU Lesser General Public License * * along with this program. If not, see . * * * *******************************************************************************/ #include "services/p3wire.h" #include "rsitems/rswireitems.h" #include "util/rsrandom.h" /**** * #define WIRE_DEBUG 1 ****/ RsWire *rsWire = NULL; p3Wire::p3Wire(RsGeneralDataService* gds, RsNetworkExchangeService* nes, RsGixs *gixs) :RsGenExchange(gds, nes, new RsGxsWireSerialiser(), RS_SERVICE_GXS_TYPE_WIRE, gixs, wireAuthenPolicy()), RsWire(static_cast(*this)), mWireMtx("WireMtx") { } const std::string WIRE_APP_NAME = "gxswire"; const uint16_t WIRE_APP_MAJOR_VERSION = 1; const uint16_t WIRE_APP_MINOR_VERSION = 0; const uint16_t WIRE_MIN_MAJOR_VERSION = 1; const uint16_t WIRE_MIN_MINOR_VERSION = 0; RsServiceInfo p3Wire::getServiceInfo() { return RsServiceInfo(RS_SERVICE_GXS_TYPE_WIRE, WIRE_APP_NAME, WIRE_APP_MAJOR_VERSION, WIRE_APP_MINOR_VERSION, WIRE_MIN_MAJOR_VERSION, WIRE_MIN_MINOR_VERSION); } uint32_t p3Wire::wireAuthenPolicy() { uint32_t policy = 0; uint8_t flag = 0; // Edits generally need an authors signature. // Wire requires all TopLevel (Orig/Reply) msgs to be signed with both PUBLISH & AUTHOR. // Reply References need to be signed by Author. flag = GXS_SERV::MSG_AUTHEN_ROOT_PUBLISH_SIGN | GXS_SERV::MSG_AUTHEN_CHILD_AUTHOR_SIGN; flag |= GXS_SERV::MSG_AUTHEN_ROOT_AUTHOR_SIGN; RsGenExchange::setAuthenPolicyFlag(flag, policy, RsGenExchange::PUBLIC_GRP_BITS); // expect the requirements to be the same for RESTRICTED / PRIVATE groups too. // This needs to be worked through / fully evaluated. RsGenExchange::setAuthenPolicyFlag(flag, policy, RsGenExchange::RESTRICTED_GRP_BITS); RsGenExchange::setAuthenPolicyFlag(flag, policy, RsGenExchange::PRIVATE_GRP_BITS); flag = 0; RsGenExchange::setAuthenPolicyFlag(flag, policy, RsGenExchange::GRP_OPTION_BITS); return policy; } void p3Wire::service_tick() { return; } RsTokenService* p3Wire::getTokenService() { return RsGenExchange::getTokenService(); } void p3Wire::notifyChanges(std::vector& /*changes*/) { std::cerr << "p3Wire::notifyChanges() New stuff"; std::cerr << std::endl; } /* Specific Service Data */ bool p3Wire::getGroupData(const uint32_t &token, std::vector &groups) { std::cerr << "p3Wire::getGroupData()"; std::cerr << std::endl; std::vector grpData; bool ok = RsGenExchange::getGroupData(token, grpData); if(ok) { std::vector::iterator vit = grpData.begin(); for(; vit != grpData.end(); ++vit) { RsGxsWireGroupItem* item = dynamic_cast(*vit); if (item) { RsWireGroup group = item->group; group.mMeta = item->meta; delete item; groups.push_back(group); std::cerr << "p3Wire::getGroupData() Adding WireGroup to Vector: "; std::cerr << std::endl; std::cerr << group; std::cerr << std::endl; } else { std::cerr << "Not a WireGroupItem, deleting!" << std::endl; delete *vit; } } } return ok; } bool p3Wire::getGroupPtrData(const uint32_t &token, std::map &groups) { std::cerr << "p3Wire::getGroupPtrData()"; std::cerr << std::endl; std::vector grpData; bool ok = RsGenExchange::getGroupData(token, grpData); if(ok) { std::vector::iterator vit = grpData.begin(); for(; vit != grpData.end(); ++vit) { RsGxsWireGroupItem* item = dynamic_cast(*vit); if (item) { RsWireGroupSPtr pGroup = std::make_shared(item->group); pGroup->mMeta = item->meta; delete item; groups[pGroup->mMeta.mGroupId] = pGroup; std::cerr << "p3Wire::getGroupPtrData() Adding WireGroup to Vector: "; std::cerr << pGroup->mMeta.mGroupId.toStdString(); std::cerr << std::endl; } else { std::cerr << "Not a WireGroupItem, deleting!" << std::endl; delete *vit; } } } return ok; } bool p3Wire::getPulseData(const uint32_t &token, std::vector &pulses) { GxsMsgDataMap msgData; bool ok = RsGenExchange::getMsgData(token, msgData); if(ok) { GxsMsgDataMap::iterator mit = msgData.begin(); for(; mit != msgData.end(); ++mit) { std::vector& msgItems = mit->second; std::vector::iterator vit = msgItems.begin(); for(; vit != msgItems.end(); ++vit) { RsGxsWirePulseItem* item = dynamic_cast(*vit); if(item) { RsWirePulse pulse = item->pulse; pulse.mMeta = item->meta; pulses.push_back(pulse); delete item; } else { std::cerr << "Not a WirePulse Item, deleting!" << std::endl; delete *vit; } } } } return ok; } bool p3Wire::getPulsePtrData(const uint32_t &token, std::list &pulses) { GxsMsgDataMap msgData; bool ok = RsGenExchange::getMsgData(token, msgData); if(ok) { GxsMsgDataMap::iterator mit = msgData.begin(); for(; mit != msgData.end(); ++mit) { std::vector& msgItems = mit->second; std::vector::iterator vit = msgItems.begin(); for(; vit != msgItems.end(); ++vit) { RsGxsWirePulseItem* item = dynamic_cast(*vit); if(item) { RsWirePulseSPtr pPulse = std::make_shared(item->pulse); pPulse->mMeta = item->meta; pulses.push_back(pPulse); delete item; } else { std::cerr << "Not a WirePulse Item, deleting!" << std::endl; delete *vit; } } } } return ok; } bool p3Wire::getRelatedPulseData(const uint32_t &token, std::vector &pulses) { GxsMsgRelatedDataMap msgData; std::cerr << "p3Wire::getRelatedPulseData()"; std::cerr << std::endl; bool ok = RsGenExchange::getMsgRelatedData(token, msgData); if (ok) { std::cerr << "p3Wire::getRelatedPulseData() is OK"; std::cerr << std::endl; GxsMsgRelatedDataMap::iterator mit = msgData.begin(); for(; mit != msgData.end(); ++mit) { std::vector& msgItems = mit->second; std::vector::iterator vit = msgItems.begin(); for(; vit != msgItems.end(); ++vit) { RsGxsWirePulseItem* item = dynamic_cast(*vit); if(item) { RsWirePulse pulse = item->pulse; pulse.mMeta = item->meta; pulses.push_back(pulse); delete item; } else { std::cerr << "Not a WirePulse Item, deleting!" << std::endl; delete *vit; } } } } else { std::cerr << "p3Wire::getRelatedPulseData() is NOT OK"; std::cerr << std::endl; } return ok; } bool p3Wire::createGroup(uint32_t &token, RsWireGroup &group) { RsGxsWireGroupItem* groupItem = new RsGxsWireGroupItem(); groupItem->group = group; groupItem->meta = group.mMeta; std::cerr << "p3Wire::createGroup(): "; std::cerr << std::endl; std::cerr << group; std::cerr << std::endl; std::cerr << "p3Wire::createGroup() pushing to RsGenExchange"; std::cerr << std::endl; RsGenExchange::publishGroup(token, groupItem); return true; } bool p3Wire::createPulse(uint32_t &token, RsWirePulse &pulse) { std::cerr << "p3Wire::createPulse(): " << pulse; std::cerr << std::endl; RsGxsWirePulseItem* pulseItem = new RsGxsWirePulseItem(); pulseItem->pulse = pulse; pulseItem->meta = pulse.mMeta; RsGenExchange::publishMsg(token, pulseItem); return true; } // Blocking Interfaces. bool p3Wire::createGroup(RsWireGroup &group) { uint32_t token; return createGroup(token, group) && waitToken(token) == RsTokenService::COMPLETE; } bool p3Wire::updateGroup(const RsWireGroup & /*group*/) { // TODO return false; } bool p3Wire::getGroups(const std::list groupIds, std::vector &groups) { uint32_t token; RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_GROUP_DATA; if (groupIds.empty()) { if (!requestGroupInfo(token, opts) || waitToken(token) != RsTokenService::COMPLETE ) { return false; } } else { if (!requestGroupInfo(token, opts, groupIds) || waitToken(token) != RsTokenService::COMPLETE ) { return false; } } return getGroupData(token, groups) && !groups.empty(); } std::ostream &operator<<(std::ostream &out, const RsWireGroup &group) { out << "RsWireGroup [ "; out << " Name: " << group.mMeta.mGroupName; out << " Tagline: " << group.mTagline; out << " Location: " << group.mLocation; out << " ]"; return out; } std::ostream &operator<<(std::ostream &out, const RsWirePulse &pulse) { out << "RsWirePulse [ "; out << "Title: " << pulse.mMeta.mMsgName; out << "PulseText: " << pulse.mPulseText; out << "]"; return out; } /***** FOR TESTING *****/ std::string p3Wire::genRandomId() { std::string randomId; for(int i = 0; i < 20; i++) { randomId += (char) ('a' + (RSRandom::random_u32() % 26)); } return randomId; } void p3Wire::generateDummyData() { } // New Interfaces. bool p3Wire::fetchPulse(RsGxsGroupId grpId, RsGxsMessageId msgId, RsWirePulseSPtr &pPulse) { std::cerr << "p3Wire::fetchPulse(" << grpId << ", " << msgId << ") waiting for token"; std::cerr << std::endl; uint32_t token; { RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA; opts.mOptions = RS_TOKREQOPT_MSG_LATEST; GxsMsgReq ids; std::set &set_msgIds = ids[grpId]; set_msgIds.insert(msgId); getTokenService()->requestMsgInfo( token, RS_TOKREQ_ANSTYPE_DATA, opts, ids); } // wait for pulse request to completed. std::cerr << "p3Wire::fetchPulse() waiting for token"; std::cerr << std::endl; int result = waitToken(token); if (result != RsTokenService::COMPLETE) { std::cerr << "p3Wire::fetchPulse() token FAILED, result: " << result; std::cerr << std::endl; return false; } // retrieve Pulse. std::cerr << "p3Wire::fetchPulse() retrieving token"; std::cerr << std::endl; { bool okay = true; std::vector pulses; if (getPulseData(token, pulses)) { if (pulses.size() == 1) { // save to output pulse. pPulse = std::make_shared(pulses[0]); std::cerr << "p3Wire::fetchPulse() retrieved token: " << *pPulse; std::cerr << std::endl; } else { std::cerr << "p3Wire::fetchPulse() ERROR multiple pulses"; std::cerr << std::endl; okay = false; } } else { std::cerr << "p3Wire::fetchPulse() ERROR failed to retrieve token"; std::cerr << std::endl; okay = false; } if (!okay) { std::cerr << "p3Wire::fetchPulse() tokenPulse ERROR"; std::cerr << std::endl; // TODO cancel other request. return false; } } return true; } // New Interfaces. // TODO: createOriginalPulse has not been tested. bool p3Wire::createOriginalPulse(RsGxsGroupId grpId, std::string msg) { // request Group. std::list groupIds = { grpId }; std::vector groups; bool groupOkay = getGroups(groupIds, groups); if (!groupOkay) { std::cerr << "p3Wire::createOriginalPulse() getGroups failed"; std::cerr << std::endl; return false; } if (groups.size() != 1) { std::cerr << "p3Wire::createOriginalPulse() getGroups invalid size"; std::cerr << std::endl; return false; } // ensure Group is suitable. RsWireGroup group = groups[0]; if ((group.mMeta.mGroupId != grpId) || (!(group.mMeta.mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_PUBLISH))) { std::cerr << "p3Wire::createOriginalPulse() Group unsuitable"; std::cerr << std::endl; return false; } // Create Msg. RsWirePulse pulse; pulse.mMeta.mGroupId = group.mMeta.mGroupId; pulse.mMeta.mAuthorId = group.mMeta.mAuthorId; pulse.mMeta.mThreadId.clear(); pulse.mMeta.mParentId.clear(); pulse.mMeta.mOrigMsgId.clear(); pulse.mPulseType = WIRE_PULSE_TYPE_ORIGINAL; pulse.mReplySentiment = WIRE_PULSE_SENTIMENT_NO_SENTIMENT; pulse.mPulseText = msg; // all mRefs should empty. uint32_t token; createPulse(token, pulse); int result = waitToken(token); if (result != RsTokenService::COMPLETE) { std::cerr << "p3Wire::createOriginalPulse() Failed to create Pulse"; std::cerr << std::endl; return false; } return true; } // TODO: createReplyPulse has not been tested. bool p3Wire::createReplyPulse(RsGxsGroupId grpId, RsGxsMessageId msgId, RsGxsGroupId replyWith, uint32_t reply_type, uint32_t /*sentiment*/, std::string msg) { // check reply_type. can only be ONE. if (!((reply_type == WIRE_PULSE_TYPE_REPLY) || (reply_type == WIRE_PULSE_TYPE_REPUBLISH) || (reply_type == WIRE_PULSE_TYPE_LIKE))) { std::cerr << "p3Wire::createReplyPulse() reply_type is invalid"; std::cerr << std::endl; return false; } // request both groups. std::list groupIds = { grpId, replyWith }; std::vector groups; bool groupOkay = getGroups(groupIds, groups); if (!groupOkay) { std::cerr << "p3Wire::createReplyPulse() getGroups failed"; std::cerr << std::endl; return false; } if (groups.size() != 2) { std::cerr << "p3Wire::createReplyPulse() getGroups != 2"; std::cerr << std::endl; return false; } // extract group info. RsWireGroup replyToGroup; RsWireGroup replyWithGroup; if (groups[0].mMeta.mGroupId == grpId) { replyToGroup = groups[0]; replyWithGroup = groups[1]; } else { replyToGroup = groups[1]; replyWithGroup = groups[0]; } if ((replyToGroup.mMeta.mGroupId != grpId) || (replyToGroup.mMeta.mGroupId != replyWith)) { std::cerr << "p3Wire::createReplyPulse() groupid mismatch"; std::cerr << std::endl; return false; } // ensure Group is suitable. if ((!(replyToGroup.mMeta.mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED)) || (!(replyWithGroup.mMeta.mSubscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_PUBLISH))) { std::cerr << "p3Wire::createReplyPulse() Group unsuitable"; std::cerr << std::endl; return false; } // ********************************************************** RsWirePulseSPtr replyToPulse; if (!fetchPulse(grpId, msgId, replyToPulse)) { std::cerr << "p3Wire::createReplyPulse() fetchPulse FAILED"; std::cerr << std::endl; return false; } // create Reply Msg. RsWirePulse responsePulse; responsePulse.mMeta.mGroupId = replyWithGroup.mMeta.mGroupId; responsePulse.mMeta.mAuthorId = replyWithGroup.mMeta.mAuthorId; responsePulse.mMeta.mThreadId.clear(); responsePulse.mMeta.mParentId.clear(); responsePulse.mMeta.mOrigMsgId.clear(); responsePulse.mPulseType = WIRE_PULSE_TYPE_RESPONSE | reply_type; responsePulse.mReplySentiment = 0; // toPulseSentiment(ui.comboBox_sentiment->currentIndex()); responsePulse.mPulseText = msg; // ui.textEdit_Pulse->toPlainText().toStdString(); // mRefs refer to parent post. responsePulse.mRefGroupId = replyToPulse->mMeta.mGroupId; responsePulse.mRefGroupName = replyToGroup.mMeta.mGroupName; responsePulse.mRefOrigMsgId = replyToPulse->mMeta.mOrigMsgId; responsePulse.mRefAuthorId = replyToPulse->mMeta.mAuthorId; responsePulse.mRefPublishTs = replyToPulse->mMeta.mPublishTs; responsePulse.mRefPulseText = replyToPulse->mPulseText; uint32_t token; createPulse(token, responsePulse); // get MsgId. // update responsePulse. // TODO. // create ReplyTo Ref Msg. std::cerr << "PulseAddDialog::postRefPulse() create Reference!"; std::cerr << std::endl; // Reference Pulse. posted on Parent's Group. RsWirePulse refPulse; refPulse.mMeta.mGroupId = replyToPulse->mMeta.mGroupId; refPulse.mMeta.mAuthorId = replyWithGroup.mMeta.mAuthorId; // own author Id. refPulse.mMeta.mThreadId = replyToPulse->mMeta.mOrigMsgId; refPulse.mMeta.mParentId = replyToPulse->mMeta.mOrigMsgId; refPulse.mMeta.mOrigMsgId.clear(); refPulse.mPulseType = WIRE_PULSE_TYPE_REFERENCE | reply_type; refPulse.mReplySentiment = 0; //toPulseSentiment(ui.comboBox_sentiment->currentIndex()); // Dont put parent PulseText into refPulse - it is available on Thread Msg. // otherwise gives impression it is correctly setup Parent / Reply... // when in fact the parent PublishTS, and AuthorId are wrong. refPulse.mPulseText = ""; // refs refer back to own Post. refPulse.mRefGroupId = replyWithGroup.mMeta.mGroupId; refPulse.mRefGroupName = replyWithGroup.mMeta.mGroupName; refPulse.mRefOrigMsgId = responsePulse.mMeta.mOrigMsgId; refPulse.mRefAuthorId = replyWithGroup.mMeta.mAuthorId; refPulse.mRefPublishTs = responsePulse.mMeta.mPublishTs; refPulse.mRefPulseText = responsePulse.mPulseText; // uint32_t token; rsWire->createPulse(token, refPulse); // publish Ref Msg. return true; } // Blocking, request structures for display. #if 0 bool p3Wire::createReplyPulse(uint32_t &token, RsWirePulse &pulse) { return true; } bool p3Wire::createRepublishPulse(uint32_t &token, RsWirePulse &pulse) { return true; } bool p3Wire::createLikePulse(uint32_t &token, RsWirePulse &pulse) { return true; } #endif // WireGroup Details. bool p3Wire::getWireGroup(const RsGxsGroupId &groupId, RsWireGroupSPtr &grp) { std::set groupIds = { groupId }; std::map groups; if (!fetchGroupPtrs(groupIds, groups)) { std::cerr << "p3Wire::getWireGroup() failed to fetchGroupPtrs"; std::cerr << std::endl; return false; } if (groups.size() != 1) { std::cerr << "p3Wire::getWireGroup() invalid group size"; std::cerr << std::endl; return false; } grp = groups.begin()->second; // TODO Should fill in Counters of pulses/likes/republishes/replies return true; } bool compare_time(const RsWirePulseSPtr& first, const RsWirePulseSPtr &second) { return first->mMeta.mPublishTs > second->mMeta.mPublishTs; } // should this filter them in some way? // date, or count would be more likely. bool p3Wire::getPulsesForGroups(const std::list &groupIds, std::list &pulsePtrs) { // request all the pulses (Top-Level Thread Msgs). std::cerr << "p3Wire::getPulsesForGroups()"; std::cerr << std::endl; uint32_t token; { RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA; opts.mOptions = RS_TOKREQOPT_MSG_LATEST | RS_TOKREQOPT_MSG_THREAD; getTokenService()->requestMsgInfo( token, RS_TOKREQ_ANSTYPE_DATA, opts, groupIds); } // wait for pulse request to completed. std::cerr << "p3Wire::getPulsesForGroups() waiting for token"; std::cerr << std::endl; int result = waitToken(token); if (result != RsTokenService::COMPLETE) { std::cerr << "p3Wire::getPulsesForGroups() token FAILED, result: " << result; std::cerr << std::endl; return false; } // retrieve Pulses. std::cerr << "p3Wire::getPulsesForGroups() retrieving token"; std::cerr << std::endl; if (!getPulsePtrData(token, pulsePtrs)) { std::cerr << "p3Wire::getPulsesForGroups() tokenPulse ERROR"; std::cerr << std::endl; return false; } std::cerr << "p3Wire::getPulsesForGroups() size = " << pulsePtrs.size(); std::cerr << " sorting and trimming"; std::cerr << std::endl; // sort and filter list. pulsePtrs.sort(compare_time); // trim to N max. uint32_t N = 10; if (pulsePtrs.size() > N) { pulsePtrs.resize(N); } // set to collect groupIds... // this is only important if updatePulse Level > 1. // but this is more general. std::set allGroupIds; // for each fill in details. std::list::iterator it; for (it = pulsePtrs.begin(); it != pulsePtrs.end(); it++) { if (!updatePulse(*it, 1)) { std::cerr << "p3Wire::getPulsesForGroups() Failed to updatePulse"; std::cerr << std::endl; return false; } if (!extractGroupIds(*it, allGroupIds)) { std::cerr << "p3Wire::getPulsesForGroups() failed to extractGroupIds"; std::cerr << std::endl; return false; } } // fetch GroupPtrs for allGroupIds. std::map groups; if (!fetchGroupPtrs(allGroupIds, groups)) { std::cerr << "p3Wire::getPulsesForGroups() failed to fetchGroupPtrs"; std::cerr << std::endl; return false; } // update GroupPtrs for all pulsePtrs. for (it = pulsePtrs.begin(); it != pulsePtrs.end(); it++) { if (!updateGroupPtrs(*it, groups)) { std::cerr << "p3Wire::getPulsesForGroups() failed to updateGroupPtrs"; std::cerr << std::endl; return false; } } return true; } bool p3Wire::getPulseFocus(const RsGxsGroupId &groupId, const RsGxsMessageId &msgId, int /* type */, RsWirePulseSPtr &pPulse) { std::cerr << "p3Wire::getPulseFocus("; std::cerr << "grpId: " << groupId << " msgId: " << msgId; std::cerr << " )"; std::cerr << std::endl; if (!fetchPulse(groupId, msgId, pPulse)) { std::cerr << "p3Wire::getPulseFocus() failed to fetch Pulse"; std::cerr << std::endl; return false; } if (!updatePulse(pPulse, 3)) { std::cerr << "p3Wire::getPulseFocus() failed to update Pulse"; std::cerr << std::endl; return false; } /* final stage is to fetch associated groups and reference them from pulses * this could be done as part of updates, but probably more efficient to do once * -- Future improvement. * -- Fetch RefGroups as well, these are not necessarily available, * so need to add dataRequest FlAG to return okay even if not all groups there. */ std::set groupIds; if (!extractGroupIds(pPulse, groupIds)) { std::cerr << "p3Wire::getPulseFocus() failed to extractGroupIds"; std::cerr << std::endl; return false; } std::map groups; if (!fetchGroupPtrs(groupIds, groups)) { std::cerr << "p3Wire::getPulseFocus() failed to fetchGroupPtrs"; std::cerr << std::endl; return false; } if (!updateGroupPtrs(pPulse, groups)) { std::cerr << "p3Wire::getPulseFocus() failed to updateGroupPtrs"; std::cerr << std::endl; return false; } return true; } // function to update a pulse with the (Ref) child with actual data. bool p3Wire::updatePulse(RsWirePulseSPtr pPulse, int levels) { bool okay = true; // setup logging label. std::ostringstream out; out << "pulse[" << (void *) pPulse.get() << "], " << levels; std::string label = out.str(); std::cerr << "p3Wire::updatePulse(" << label << ") starting"; std::cerr << std::endl; // is pPulse is a REF, then request the original. // if no original available the done. if (pPulse->mPulseType & WIRE_PULSE_TYPE_REFERENCE) { RsWirePulseSPtr fullPulse; std::cerr << "p3Wire::updatePulse(" << label << ") fetching REF ("; std::cerr << "grpId: " << pPulse->mRefGroupId << " msgId: " << pPulse->mRefOrigMsgId; std::cerr << " )"; std::cerr << std::endl; if (!fetchPulse(pPulse->mRefGroupId, pPulse->mRefOrigMsgId, fullPulse)) { std::cerr << "p3Wire::updatePulse(" << label << ") failed to fetch REF"; std::cerr << std::endl; return false; } std::cerr << "p3Wire::updatePulse(" << label << ") replacing REF"; std::cerr << std::endl; *pPulse = *fullPulse; } // Request children: (Likes / Retweets / Replies) std::cerr << "p3Wire::updatePulse(" << label << ") requesting children"; std::cerr << std::endl; uint32_t token; { RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_MSG_RELATED_DATA; // OR opts.mOptions = RS_TOKREQOPT_MSG_LATEST | RS_TOKREQOPT_MSG_PARENT; opts.mOptions = RS_TOKREQOPT_MSG_LATEST | RS_TOKREQOPT_MSG_THREAD; std::vector msgIds = { std::make_pair(pPulse->mMeta.mGroupId, pPulse->mMeta.mOrigMsgId) }; getTokenService()->requestMsgRelatedInfo( token, RS_TOKREQ_ANSTYPE_DATA, opts, msgIds); } // wait for request to complete std::cerr << "p3Wire::updatePulse(" << label << ") waiting for token"; std::cerr << std::endl; int result = waitToken(token); if (result != RsTokenService::COMPLETE) { std::cerr << "p3Wire::updatePulse(" << label << ") token FAILED, result: " << result; std::cerr << std::endl; return false; } /* load children */ okay = updatePulseChildren(pPulse, token); if (!okay) { std::cerr << "p3Wire::updatePulse(" << label << ") FAILED to update Children"; std::cerr << std::endl; return false; } /* if down to last level, no need to updateChildren */ if (levels <= 1) { std::cerr << "p3Wire::updatePulse(" << label << ") Level <= 1 finished"; std::cerr << std::endl; return okay; } /* recursively update children */ std::cerr << "p3Wire::updatePulse(" << label << ") updating children recursively"; std::cerr << std::endl; std::list::iterator it; for (it = pPulse->mReplies.begin(); it != pPulse->mReplies.end(); it++) { bool childOkay = updatePulse(*it, levels - 1); if (!childOkay) { std::cerr << "p3Wire::updatePulse(" << label << ") update children (reply) failed"; std::cerr << std::endl; } } for (it = pPulse->mRepublishes.begin(); it != pPulse->mRepublishes.end(); it++) { bool childOkay = updatePulse(*it, levels - 1); if (!childOkay) { std::cerr << "p3Wire::updatePulse(" << label << ") update children (repub) failed"; std::cerr << std::endl; } } return okay; } // function to update the (Ref) child with actual data. bool p3Wire::updatePulseChildren(RsWirePulseSPtr pParent, uint32_t token) { { bool okay = true; std::vector pulses; if (getRelatedPulseData(token, pulses)) { std::vector::iterator it; for (it = pulses.begin(); it != pulses.end(); it++) { std::cerr << "p3Wire::updatePulseChildren() retrieved child: " << *it; std::cerr << std::endl; RsWirePulseSPtr pPulse = std::make_shared(*it); // switch on type. if (it->mPulseType & WIRE_PULSE_TYPE_LIKE) { pParent->mLikes.push_back(pPulse); std::cerr << "p3Wire::updatePulseChildren() adding Like"; std::cerr << std::endl; } else if (it->mPulseType & WIRE_PULSE_TYPE_REPUBLISH) { pParent->mRepublishes.push_back(pPulse); std::cerr << "p3Wire::updatePulseChildren() adding Republish"; std::cerr << std::endl; } else if (it->mPulseType & WIRE_PULSE_TYPE_REPLY) { pParent->mReplies.push_back(pPulse); std::cerr << "p3Wire::updatePulseChildren() adding Reply"; std::cerr << std::endl; } else { std::cerr << "p3Wire::updatePulseChildren() unknown child type: " << it->mPulseType; std::cerr << std::endl; } } } else { std::cerr << "p3Wire::updatePulseChildren() ERROR failed to retrieve token"; std::cerr << std::endl; okay = false; } if (!okay) { std::cerr << "p3Wire::updatePulseChildren() token ERROR"; std::cerr << std::endl; } return okay; } } // this function doesn't depend on p3Wire, could make static. bool p3Wire::extractGroupIds(RsWirePulseConstSPtr pPulse, std::set &groupIds) { std::cerr << "p3Wire::extractGroupIds()"; std::cerr << std::endl; if (!pPulse) { std::cerr << "p3Wire::extractGroupIds() INVALID pPulse"; std::cerr << std::endl; return false; } /* do this recursively */ if (pPulse->mPulseType & WIRE_PULSE_TYPE_REFERENCE) { /* skipping */ std::cerr << "p3Wire::extractGroupIds() skipping ref type"; std::cerr << std::endl; return true; } // install own groupId. groupIds.insert(pPulse->mMeta.mGroupId); /* iterate through children, recursively */ std::list::const_iterator it; for (it = pPulse->mReplies.begin(); it != pPulse->mReplies.end(); it++) { bool childOkay = extractGroupIds(*it, groupIds); if (!childOkay) { std::cerr << "p3Wire::extractGroupIds() update children (reply) failed"; std::cerr << std::endl; return false; } } for (it = pPulse->mRepublishes.begin(); it != pPulse->mRepublishes.end(); it++) { bool childOkay = extractGroupIds(*it, groupIds); if (!childOkay) { std::cerr << "p3Wire::extractGroupIds() update children (repub) failed"; std::cerr << std::endl; return false; } } // not bothering with LIKEs at the moment. TODO. return true; } bool p3Wire::updateGroupPtrs(RsWirePulseSPtr pPulse, const std::map &groups) { /* don't bother with Refs... though we could */ /* do this recursively */ if (pPulse->mPulseType & WIRE_PULSE_TYPE_REFERENCE) { /* skipping */ return true; } std::map::const_iterator git; git = groups.find(pPulse->mMeta.mGroupId); if (git == groups.end()) { // error return false; } pPulse->mGroupPtr = git->second; /* recursively apply to children */ std::list::iterator it; for (it = pPulse->mReplies.begin(); it != pPulse->mReplies.end(); it++) { bool childOkay = updateGroupPtrs(*it, groups); if (!childOkay) { std::cerr << "p3Wire::updateGroupPtrs() update children (reply) failed"; std::cerr << std::endl; return false; } } for (it = pPulse->mRepublishes.begin(); it != pPulse->mRepublishes.end(); it++) { bool childOkay = updateGroupPtrs(*it, groups); if (!childOkay) { std::cerr << "p3Wire::updateGroupPtrs() update children (repub) failed"; std::cerr << std::endl; return false; } } // not bothering with LIKEs at the moment. TODO. return true; } bool p3Wire::fetchGroupPtrs(const std::set &groupIds, std::map &groups) { std::cerr << "p3Wire::fetchGroupPtrs()"; std::cerr << std::endl; std::list groupIdList(groupIds.begin(), groupIds.end()); uint32_t token; RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_GROUP_DATA; if (!requestGroupInfo(token, opts, groupIdList) || waitToken(token) != RsTokenService::COMPLETE ) { std::cerr << "p3Wire::fetchGroupPtrs() failed to fetch groups"; std::cerr << std::endl; return false; } return getGroupPtrData(token, groups); }