diff --git a/libretroshare/src/pqi/p3cfgmgr.cc b/libretroshare/src/pqi/p3cfgmgr.cc index af1749e63..f8230956a 100644 --- a/libretroshare/src/pqi/p3cfgmgr.cc +++ b/libretroshare/src/pqi/p3cfgmgr.cc @@ -29,7 +29,6 @@ #include "pqi/p3authmgr.h" #include "pqi/pqibin.h" #include "pqi/pqistore.h" -#include "pqi/pqistreamer.h" #include "pqi/pqinotify.h" #include @@ -153,11 +152,9 @@ void p3ConfigMgr::saveConfiguration() BinMemInterface *membio = new BinMemInterface(1000, bioflags); RsSerialiser *rss = new RsSerialiser(); rss->addSerialType(new RsGeneralConfigSerialiser()); - pqistreamer stream(rss, "CONFIG", membio, 0); + pqistore store(rss, "CONFIG", membio, 0); - stream.SendItem(item); - stream.tick(); - stream.tick(); + store.SendItem(item); /* sign data */ std::string signature; @@ -296,10 +293,8 @@ void p3ConfigMgr::loadConfiguration() membio->fseek(0); /* go to start */ RsSerialiser *rss = new RsSerialiser(); rss->addSerialType(new RsGeneralConfigSerialiser()); - pqistreamer stream(rss, "CONFIG", membio, 0); + pqistore stream(rss, "CONFIG", membio, 0); - stream.tick(); - stream.tick(); RsItem *rsitem = stream.GetItem(); RsConfigKeyValueSet *item = dynamic_cast(rsitem); @@ -402,7 +397,7 @@ bool p3Config::loadConfiguration(std::string &loadHash) uint32_t stream_flags = BIN_FLAGS_READABLE; BinInterface *bio = new BinFileInterface(fname.c_str(), bioflags); - pqistore stream(setupSerialiser(), bio, stream_flags); + pqistore stream(setupSerialiser(), "CONFIG", bio, stream_flags); RsItem *item = NULL; while(NULL != (item = stream.GetItem())) @@ -473,7 +468,7 @@ bool p3Config::saveConfiguration() stream_flags |= BIN_FLAGS_NO_DELETE; BinInterface *bio = new BinFileInterface(fnametmp.c_str(), bioflags); - pqistore *stream = new pqistore(setupSerialiser(), bio, stream_flags); + pqistore *stream = new pqistore(setupSerialiser(), "CONFIG", bio, stream_flags); std::list::iterator it; diff --git a/libretroshare/src/pqi/pqistore.cc b/libretroshare/src/pqi/pqistore.cc index 8850d9c1b..c940e4a8c 100644 --- a/libretroshare/src/pqi/pqistore.cc +++ b/libretroshare/src/pqi/pqistore.cc @@ -50,9 +50,9 @@ const int pqistorezone = 9511; -pqistore::pqistore(RsSerialiser *rss, BinInterface *bio_in, int bio_flags_in) +pqistore::pqistore(RsSerialiser *rss, std::string srcId, BinInterface *bio_in, int bio_flags_in) :PQInterface(""), rsSerialiser(rss), bio(bio_in), bio_flags(bio_flags_in), - nextPkt(NULL) + nextPkt(NULL), mSrcId(srcId) { { std::ostringstream out; @@ -363,6 +363,7 @@ int pqistore::readPkt(RsItem **item_out) return 0; } + item->PeerId(mSrcId); *item_out = item; return 1; } diff --git a/libretroshare/src/pqi/pqistore.h b/libretroshare/src/pqi/pqistore.h index 6be3980fb..83ba6d7e1 100644 --- a/libretroshare/src/pqi/pqistore.h +++ b/libretroshare/src/pqi/pqistore.h @@ -41,7 +41,7 @@ class pqistore: public PQInterface { public: - pqistore(RsSerialiser *rss, BinInterface *bio_in, int bio_flagsin); + pqistore(RsSerialiser *rss, std::string srcId, BinInterface *bio_in, int bio_flagsin); virtual ~pqistore(); // PQInterface @@ -65,6 +65,7 @@ int readPkt(RsItem **item_out); // Temp Storage for transient data..... RsItem *nextPkt; + std::string mSrcId; }; diff --git a/libretroshare/src/services/p3Qblog.cc b/libretroshare/src/services/p3Qblog.cc index 5e8a30c04..124ffd660 100644 --- a/libretroshare/src/services/p3Qblog.cc +++ b/libretroshare/src/services/p3Qblog.cc @@ -28,6 +28,7 @@ #include #include #include +#include "pqi/pqistore.h" #include "pqi/pqibin.h" #include "pqi/p3authmgr.h" @@ -142,7 +143,7 @@ bool p3Qblog::loadBlogFile(std::string filename, std::string src) uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_READABLE; BinInterface *bio = new BinFileInterface(filename.c_str(), bioflags); - pqistreamer *stream = new pqistreamer(rsSerialiser, src, bio, 0); + pqistore *store = new pqistore(rsSerialiser, src, bio, 0); #ifdef QBLOG_DEBUG @@ -159,8 +160,6 @@ bool p3Qblog::loadBlogFile(std::string filename, std::string src) RsItem *item; RsQblogMsg *newBlog; - stream->tick(); // tick to read - time_t now = time(NULL); time_t min, max; @@ -170,7 +169,7 @@ bool p3Qblog::loadBlogFile(std::string filename, std::string src) max = now + BLOG_MAX_FWD_OFFSET; } /********** STACK LOCKED MTX ******/ - while(NULL != (item = stream->GetItem())) + while(NULL != (item = store->GetItem())) { #ifdef QBLOG_DEBUG std::cerr << "p3Qblog::loadBlogFile() Got Item:"; @@ -208,11 +207,9 @@ bool p3Qblog::loadBlogFile(std::string filename, std::string src) addBlog(newBlog); // add received blog to list } - - stream->tick(); // tick to read } - delete stream; // stream finished with/return resource + delete store; // store finished with/return resource return true; } @@ -273,7 +270,7 @@ bool p3Qblog::postBlogs(void) uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_WRITEABLE; BinInterface *bio = new BinFileInterface(fname.c_str(), bioflags); - pqistreamer *stream = new pqistreamer(rsSerialiser, mOwnId, bio, + pqistore *store = new pqistore(rsSerialiser, mOwnId, bio, BIN_FLAGS_NO_DELETE); { @@ -297,8 +294,7 @@ bool p3Qblog::postBlogs(void) std::cerr << std::endl; #endif - stream->SendItem(item); - stream->tick(); /* tick to write */ + store->SendItem(item); } else /* if blogs belong to a friend */ { @@ -313,9 +309,6 @@ bool p3Qblog::postBlogs(void) } - - stream->tick(); /* Tick for final write! */ - /* flag as new info */ CacheData data; @@ -351,7 +344,7 @@ bool p3Qblog::postBlogs(void) refreshCache(data); } - delete stream; + delete store; return true; } diff --git a/libretroshare/src/services/p3chatservice.cc b/libretroshare/src/services/p3chatservice.cc index 91b295948..ceeb07f2a 100644 --- a/libretroshare/src/services/p3chatservice.cc +++ b/libretroshare/src/services/p3chatservice.cc @@ -429,7 +429,7 @@ bool p3ChatService::loadConfiguration(std::string &loadHash) rss->addSerialType(new RsChatSerialiser()); BinFileInterface *in = new BinFileInterface(msgfile.c_str(), BIN_FLAGS_READABLE | BIN_FLAGS_HASH_DATA); - pqistore *pa_in = new pqistore(rss, in, BIN_FLAGS_READABLE); + pqistore *pa_in = new pqistore(rss, "CHATCONFIG", in, BIN_FLAGS_READABLE); RsItem *item; RsChatMsgItem *mitem; @@ -481,7 +481,7 @@ bool p3ChatService::saveConfiguration() rss->addSerialType(new RsChatSerialiser()); BinFileInterface *out = new BinFileInterface(msgfiletmp.c_str(), BIN_FLAGS_WRITEABLE | BIN_FLAGS_HASH_DATA); - pqistore *pa_out = new pqistore(rss, out, BIN_FLAGS_WRITEABLE); + pqistore *pa_out = new pqistore(rss, "CHATCONFIG", out, BIN_FLAGS_WRITEABLE); if(_own_avatar != NULL) { diff --git a/libretroshare/src/services/p3distrib.cc b/libretroshare/src/services/p3distrib.cc index 700323823..94bca34d6 100644 --- a/libretroshare/src/services/p3distrib.cc +++ b/libretroshare/src/services/p3distrib.cc @@ -202,7 +202,7 @@ void p3GroupDistrib::loadFileGroups(std::string filename, std::string src, bool /* create the serialiser to load info */ BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE); - pqistreamer *streamer = createStreamer(bio, src, 0); + pqistore *store = createStore(bio, src, 0); std::cerr << "loading file " << filename << std::endl ; @@ -210,8 +210,7 @@ void p3GroupDistrib::loadFileGroups(std::string filename, std::string src, bool RsDistribGrp *newGrp; RsDistribGrpKey *newKey; - streamer->tick(); - while(NULL != (item = streamer->GetItem())) + while(NULL != (item = store->GetItem())) { #ifdef DISTRIB_DEBUG std::cerr << "p3GroupDistrib::loadFileGroups() Got Item:"; @@ -237,11 +236,9 @@ void p3GroupDistrib::loadFileGroups(std::string filename, std::string src, bool #endif delete item; } - streamer->tick(); } - delete streamer; - + delete store; /* clear publication of groups if local cache file found */ @@ -269,15 +266,14 @@ void p3GroupDistrib::loadFileMsgs(std::string filename, uint16_t cacheSubId, std /* create the serialiser to load msgs */ BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE); - pqistreamer *streamer = createStreamer(bio, src, 0); + pqistore *store = createStore(bio, src, 0); std::cerr << "loading file " << filename << std::endl ; RsItem *item; RsDistribSignedMsg *newMsg; - streamer->tick(); - while(NULL != (item = streamer->GetItem())) + while(NULL != (item = store->GetItem())) { #ifdef DISTRIB_DEBUG std::cerr << "p3GroupDistrib::loadFileMsgs() Got Item:"; @@ -299,7 +295,6 @@ void p3GroupDistrib::loadFileMsgs(std::string filename, uint16_t cacheSubId, std /* wrong message type */ delete item; } - streamer->tick(); } @@ -335,7 +330,7 @@ void p3GroupDistrib::loadFileMsgs(std::string filename, uint16_t cacheSubId, std } } - delete streamer; + delete store; return; } @@ -750,7 +745,7 @@ void p3GroupDistrib::locked_publishPendingMsgs() std::string filenametmp = path + "/" + tmpname + ".tmp"; BinInterface *bio = new BinFileInterface(filenametmp.c_str(), BIN_FLAGS_WRITEABLE | BIN_FLAGS_HASH_DATA); - pqistreamer *streamer = createStreamer(bio, mOwnId, 0); /* messages are deleted! */ + pqistore *store = createStore(bio, mOwnId, 0); /* messages are deleted! */ bool resave = false; std::list::iterator it; @@ -772,13 +767,10 @@ void p3GroupDistrib::locked_publishPendingMsgs() resave = true; } - streamer->SendItem(*it); /* deletes it */ - streamer->tick(); + store->SendItem(*it); /* deletes it */ } - streamer->tick(); /* once more for good luck! */ - - /* Extract File Information from pqistreamer */ + /* Extract File Information from pqistore */ newCache.path = path; newCache.name = tmpname; @@ -788,7 +780,7 @@ void p3GroupDistrib::locked_publishPendingMsgs() /* cleanup */ mPendingPublish.clear(); - delete streamer; + delete store; if(!RsDirUtil::renameFile(filenametmp,filename)) { @@ -843,7 +835,7 @@ void p3GroupDistrib::publishDistribGroups() std::string filenametmp = path + "/" + tmpname + ".tmp"; BinInterface *bio = new BinFileInterface(filenametmp.c_str(), BIN_FLAGS_WRITEABLE | BIN_FLAGS_HASH_DATA); - pqistreamer *streamer = createStreamer(bio, mOwnId, BIN_FLAGS_NO_DELETE); + pqistore *store = createStore(bio, mOwnId, BIN_FLAGS_NO_DELETE); RsStackMutex stack(distribMtx); /****** STACK MUTEX LOCKED *******/ @@ -866,8 +858,7 @@ void p3GroupDistrib::publishDistribGroups() if (grp) { /* store in Cache File */ - streamer->SendItem(grp); /* no delete */ - streamer->tick(); + store->SendItem(grp); /* no delete */ } /* if they have public keys, publish these too */ @@ -896,8 +887,7 @@ void p3GroupDistrib::publishDistribGroups() pubKey->key.startTS = kit->second.startTS; pubKey->key.endTS = kit->second.endTS; - streamer->SendItem(pubKey); - streamer->tick(); + store->SendItem(pubKey); delete pubKey; } else @@ -922,7 +912,7 @@ void p3GroupDistrib::publishDistribGroups() } - /* Extract File Information from pqistreamer */ + /* Extract File Information from pqistore */ newCache.path = path; newCache.name = tmpname; @@ -931,7 +921,7 @@ void p3GroupDistrib::publishDistribGroups() newCache.recvd = time(NULL); /* cleanup */ - delete streamer; + delete store; if(!RsDirUtil::renameFile(filenametmp,filename)) { @@ -1381,15 +1371,15 @@ bool p3GroupDistrib::loadList(std::list load) * As All the child packets are Packed, we should only need RsSerialDistrib() in it. */ -pqistreamer *p3GroupDistrib::createStreamer(BinInterface *bio, std::string src, uint32_t bioflags) +pqistore *p3GroupDistrib::createStore(BinInterface *bio, std::string src, uint32_t bioflags) { RsSerialiser *rsSerialiser = new RsSerialiser(); RsSerialType *serialType = new RsDistribSerialiser(); rsSerialiser->addSerialType(serialType); - pqistreamer *streamer = new pqistreamer(rsSerialiser, src, bio, bioflags); + pqistore *store = new pqistore(rsSerialiser, src, bio, bioflags); - return streamer; + return store; } diff --git a/libretroshare/src/services/p3distrib.h b/libretroshare/src/services/p3distrib.h index 2f421aaa9..2fd3cc483 100644 --- a/libretroshare/src/services/p3distrib.h +++ b/libretroshare/src/services/p3distrib.h @@ -27,7 +27,7 @@ #define P3_GENERIC_DISTRIB_HEADER #include "pqi/pqi.h" -#include "pqi/pqistreamer.h" +#include "pqi/pqistore.h" #include "pqi/p3cfgmgr.h" #include "pqi/p3authmgr.h" #include "services/p3service.h" @@ -340,7 +340,7 @@ uint16_t locked_determineCacheSubId(); virtual RsSerialType *createSerialiser() = 0; /* Used to Create/Load Cache Files only */ -virtual pqistreamer *createStreamer(BinInterface *bio, std::string src, uint32_t bioflags); +virtual pqistore *createStore(BinInterface *bio, std::string src, uint32_t bioflags); virtual bool validateDistribGrp(RsDistribGrp *newGrp); virtual bool locked_checkGroupInfo(GroupInfo &info, RsDistribGrp *newGrp); diff --git a/libretroshare/src/services/p3ranking.cc b/libretroshare/src/services/p3ranking.cc index 5d74fa1a0..be7581be3 100644 --- a/libretroshare/src/services/p3ranking.cc +++ b/libretroshare/src/services/p3ranking.cc @@ -145,7 +145,7 @@ void p3Ranking::loadRankFile(std::string filename, std::string src) uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_READABLE; BinInterface *bio = new BinFileInterface(filename.c_str(), bioflags); - pqistreamer *stream = new pqistreamer(rsSerialiser, src, bio, 0); + pqistore *store = new pqistore(rsSerialiser, src, bio, 0); time_t now = time(NULL); time_t min, max; @@ -169,8 +169,7 @@ void p3Ranking::loadRankFile(std::string filename, std::string src) RsItem *item; RsRankLinkMsg *newMsg; - stream->tick(); /* Tick to read! */ - while(NULL != (item = stream->GetItem())) + while(NULL != (item = store->GetItem())) { #ifdef RANK_DEBUG @@ -210,11 +209,9 @@ void p3Ranking::loadRankFile(std::string filename, std::string src) newMsg->PeerId(newMsg->pid); addRankMsg(newMsg); } - - stream->tick(); /* Tick to read! */ } - delete stream; + delete store; } @@ -262,8 +259,7 @@ void p3Ranking::publishMsgs(bool own) uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_WRITEABLE; BinInterface *bio = new BinFileInterface(fname.c_str(), bioflags); - pqistreamer *stream = new pqistreamer(rsSerialiser, mOwnId, bio, - BIN_FLAGS_NO_DELETE); + pqistore *store = new pqistore(rsSerialiser, mOwnId, bio, BIN_FLAGS_NO_DELETE); { RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/ @@ -288,9 +284,7 @@ void p3Ranking::publishMsgs(bool own) item->print(std::cerr, 10); std::cerr << std::endl; #endif - stream->SendItem(item); - stream->tick(); /* Tick to write! */ - + store->SendItem(item); } } else @@ -341,8 +335,7 @@ void p3Ranking::publishMsgs(bool own) msg->print(std::cerr, 10); std::cerr << std::endl; #endif - stream->SendItem(msg); - stream->tick(); /* Tick to write! */ + store->SendItem(msg); /* cleanup */ delete msg; @@ -363,16 +356,12 @@ void p3Ranking::publishMsgs(bool own) (*ait)->print(std::cerr, 10); std::cerr << std::endl; #endif - stream->SendItem(*ait); - stream->tick(); /* Tick to write! */ + store->SendItem(*ait); } } } /********** STACK LOCKED MTX ******/ - - stream->tick(); /* Tick for final write! */ - /* flag as new info */ CacheData data; @@ -406,7 +395,7 @@ void p3Ranking::publishMsgs(bool own) refreshCache(data); } - delete stream; + delete store; } @@ -1091,7 +1080,7 @@ std::string p3Ranking::anonRankMsg(std::string rid, std::wstring link, std::wstr -pqistreamer *createStreamer(std::string file, std::string src, bool reading) +pqistore *createStore(std::string file, std::string src, bool reading) { RsSerialiser *rsSerialiser = new RsSerialiser(); @@ -1109,11 +1098,11 @@ pqistreamer *createStreamer(std::string file, std::string src, bool reading) /* bin flags: READ | WRITE | HASH_DATA */ BinInterface *bio = new BinFileInterface(file.c_str(), bioflags); - /* streamer flags: NO_DELETE (yes) | NO_CLOSE (no) */ - pqistreamer *streamer = new pqistreamer(rsSerialiser, src, bio, + /* store flags: NO_DELETE (yes) | NO_CLOSE (no) */ + pqistore *store = new pqistore(rsSerialiser, src, bio, BIN_FLAGS_NO_DELETE); - return streamer; + return store; } std::string generateRandomLinkId() diff --git a/libretroshare/src/services/p3ranking.h b/libretroshare/src/services/p3ranking.h index 685fc737f..852058b41 100644 --- a/libretroshare/src/services/p3ranking.h +++ b/libretroshare/src/services/p3ranking.h @@ -28,7 +28,7 @@ #include "dbase/cachestrapper.h" #include "pqi/pqiservice.h" -#include "pqi/pqistreamer.h" +#include "pqi/pqistore.h" #include "pqi/p3connmgr.h" #include "pqi/p3cfgmgr.h" @@ -116,7 +116,7 @@ float locked_calcRank(RankGroup &grp); /* returns 0->100 */ void locked_reSortGroup(RankGroup &grp); void sortAllMsgs(); -pqistreamer *createStreamer(std::string file, std::string src, bool reading); +pqistore *createStore(std::string file, std::string src, bool reading); /****************** p3Config STUFF *******************/