Modified (all I hope) remaining pqistream -> pqistore for cache file save/loads.

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@1223 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2009-05-12 21:55:50 +00:00
parent 99828dfa41
commit 02fc02d23c
9 changed files with 53 additions and 84 deletions

View File

@ -29,7 +29,6 @@
#include "pqi/p3authmgr.h"
#include "pqi/pqibin.h"
#include "pqi/pqistore.h"
#include "pqi/pqistreamer.h"
#include "pqi/pqinotify.h"
#include <errno.h>
@ -153,11 +152,9 @@ void p3ConfigMgr::saveConfiguration()
BinMemInterface *membio = new BinMemInterface(1000, bioflags);
RsSerialiser *rss = new RsSerialiser();
rss->addSerialType(new RsGeneralConfigSerialiser());
pqistreamer stream(rss, "CONFIG", membio, 0);
pqistore store(rss, "CONFIG", membio, 0);
stream.SendItem(item);
stream.tick();
stream.tick();
store.SendItem(item);
/* sign data */
std::string signature;
@ -296,10 +293,8 @@ void p3ConfigMgr::loadConfiguration()
membio->fseek(0); /* go to start */
RsSerialiser *rss = new RsSerialiser();
rss->addSerialType(new RsGeneralConfigSerialiser());
pqistreamer stream(rss, "CONFIG", membio, 0);
pqistore stream(rss, "CONFIG", membio, 0);
stream.tick();
stream.tick();
RsItem *rsitem = stream.GetItem();
RsConfigKeyValueSet *item = dynamic_cast<RsConfigKeyValueSet *>(rsitem);
@ -402,7 +397,7 @@ bool p3Config::loadConfiguration(std::string &loadHash)
uint32_t stream_flags = BIN_FLAGS_READABLE;
BinInterface *bio = new BinFileInterface(fname.c_str(), bioflags);
pqistore stream(setupSerialiser(), bio, stream_flags);
pqistore stream(setupSerialiser(), "CONFIG", bio, stream_flags);
RsItem *item = NULL;
while(NULL != (item = stream.GetItem()))
@ -473,7 +468,7 @@ bool p3Config::saveConfiguration()
stream_flags |= BIN_FLAGS_NO_DELETE;
BinInterface *bio = new BinFileInterface(fnametmp.c_str(), bioflags);
pqistore *stream = new pqistore(setupSerialiser(), bio, stream_flags);
pqistore *stream = new pqistore(setupSerialiser(), "CONFIG", bio, stream_flags);
std::list<RsItem *>::iterator it;

View File

@ -50,9 +50,9 @@
const int pqistorezone = 9511;
pqistore::pqistore(RsSerialiser *rss, BinInterface *bio_in, int bio_flags_in)
pqistore::pqistore(RsSerialiser *rss, std::string srcId, BinInterface *bio_in, int bio_flags_in)
:PQInterface(""), rsSerialiser(rss), bio(bio_in), bio_flags(bio_flags_in),
nextPkt(NULL)
nextPkt(NULL), mSrcId(srcId)
{
{
std::ostringstream out;
@ -363,6 +363,7 @@ int pqistore::readPkt(RsItem **item_out)
return 0;
}
item->PeerId(mSrcId);
*item_out = item;
return 1;
}

View File

@ -41,7 +41,7 @@
class pqistore: public PQInterface
{
public:
pqistore(RsSerialiser *rss, BinInterface *bio_in, int bio_flagsin);
pqistore(RsSerialiser *rss, std::string srcId, BinInterface *bio_in, int bio_flagsin);
virtual ~pqistore();
// PQInterface
@ -65,6 +65,7 @@ int readPkt(RsItem **item_out);
// Temp Storage for transient data.....
RsItem *nextPkt;
std::string mSrcId;
};

View File

@ -28,6 +28,7 @@
#include <utility>
#include <ctime>
#include <iomanip>
#include "pqi/pqistore.h"
#include "pqi/pqibin.h"
#include "pqi/p3authmgr.h"
@ -142,7 +143,7 @@ bool p3Qblog::loadBlogFile(std::string filename, std::string src)
uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_READABLE;
BinInterface *bio = new BinFileInterface(filename.c_str(), bioflags);
pqistreamer *stream = new pqistreamer(rsSerialiser, src, bio, 0);
pqistore *store = new pqistore(rsSerialiser, src, bio, 0);
#ifdef QBLOG_DEBUG
@ -159,8 +160,6 @@ bool p3Qblog::loadBlogFile(std::string filename, std::string src)
RsItem *item;
RsQblogMsg *newBlog;
stream->tick(); // tick to read
time_t now = time(NULL);
time_t min, max;
@ -170,7 +169,7 @@ bool p3Qblog::loadBlogFile(std::string filename, std::string src)
max = now + BLOG_MAX_FWD_OFFSET;
} /********** STACK LOCKED MTX ******/
while(NULL != (item = stream->GetItem()))
while(NULL != (item = store->GetItem()))
{
#ifdef QBLOG_DEBUG
std::cerr << "p3Qblog::loadBlogFile() Got Item:";
@ -208,11 +207,9 @@ bool p3Qblog::loadBlogFile(std::string filename, std::string src)
addBlog(newBlog); // add received blog to list
}
stream->tick(); // tick to read
}
delete stream; // stream finished with/return resource
delete store; // store finished with/return resource
return true;
}
@ -273,7 +270,7 @@ bool p3Qblog::postBlogs(void)
uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_WRITEABLE;
BinInterface *bio = new BinFileInterface(fname.c_str(), bioflags);
pqistreamer *stream = new pqistreamer(rsSerialiser, mOwnId, bio,
pqistore *store = new pqistore(rsSerialiser, mOwnId, bio,
BIN_FLAGS_NO_DELETE);
{
@ -297,8 +294,7 @@ bool p3Qblog::postBlogs(void)
std::cerr << std::endl;
#endif
stream->SendItem(item);
stream->tick(); /* tick to write */
store->SendItem(item);
}
else /* if blogs belong to a friend */
{
@ -313,9 +309,6 @@ bool p3Qblog::postBlogs(void)
}
stream->tick(); /* Tick for final write! */
/* flag as new info */
CacheData data;
@ -351,7 +344,7 @@ bool p3Qblog::postBlogs(void)
refreshCache(data);
}
delete stream;
delete store;
return true;
}

View File

@ -429,7 +429,7 @@ bool p3ChatService::loadConfiguration(std::string &loadHash)
rss->addSerialType(new RsChatSerialiser());
BinFileInterface *in = new BinFileInterface(msgfile.c_str(), BIN_FLAGS_READABLE | BIN_FLAGS_HASH_DATA);
pqistore *pa_in = new pqistore(rss, in, BIN_FLAGS_READABLE);
pqistore *pa_in = new pqistore(rss, "CHATCONFIG", in, BIN_FLAGS_READABLE);
RsItem *item;
RsChatMsgItem *mitem;
@ -481,7 +481,7 @@ bool p3ChatService::saveConfiguration()
rss->addSerialType(new RsChatSerialiser());
BinFileInterface *out = new BinFileInterface(msgfiletmp.c_str(), BIN_FLAGS_WRITEABLE | BIN_FLAGS_HASH_DATA);
pqistore *pa_out = new pqistore(rss, out, BIN_FLAGS_WRITEABLE);
pqistore *pa_out = new pqistore(rss, "CHATCONFIG", out, BIN_FLAGS_WRITEABLE);
if(_own_avatar != NULL)
{

View File

@ -202,7 +202,7 @@ void p3GroupDistrib::loadFileGroups(std::string filename, std::string src, bool
/* create the serialiser to load info */
BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE);
pqistreamer *streamer = createStreamer(bio, src, 0);
pqistore *store = createStore(bio, src, 0);
std::cerr << "loading file " << filename << std::endl ;
@ -210,8 +210,7 @@ void p3GroupDistrib::loadFileGroups(std::string filename, std::string src, bool
RsDistribGrp *newGrp;
RsDistribGrpKey *newKey;
streamer->tick();
while(NULL != (item = streamer->GetItem()))
while(NULL != (item = store->GetItem()))
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileGroups() Got Item:";
@ -237,11 +236,9 @@ void p3GroupDistrib::loadFileGroups(std::string filename, std::string src, bool
#endif
delete item;
}
streamer->tick();
}
delete streamer;
delete store;
/* clear publication of groups if local cache file found */
@ -269,15 +266,14 @@ void p3GroupDistrib::loadFileMsgs(std::string filename, uint16_t cacheSubId, std
/* create the serialiser to load msgs */
BinInterface *bio = new BinFileInterface(filename.c_str(), BIN_FLAGS_READABLE);
pqistreamer *streamer = createStreamer(bio, src, 0);
pqistore *store = createStore(bio, src, 0);
std::cerr << "loading file " << filename << std::endl ;
RsItem *item;
RsDistribSignedMsg *newMsg;
streamer->tick();
while(NULL != (item = streamer->GetItem()))
while(NULL != (item = store->GetItem()))
{
#ifdef DISTRIB_DEBUG
std::cerr << "p3GroupDistrib::loadFileMsgs() Got Item:";
@ -299,7 +295,6 @@ void p3GroupDistrib::loadFileMsgs(std::string filename, uint16_t cacheSubId, std
/* wrong message type */
delete item;
}
streamer->tick();
}
@ -335,7 +330,7 @@ void p3GroupDistrib::loadFileMsgs(std::string filename, uint16_t cacheSubId, std
}
}
delete streamer;
delete store;
return;
}
@ -750,7 +745,7 @@ void p3GroupDistrib::locked_publishPendingMsgs()
std::string filenametmp = path + "/" + tmpname + ".tmp";
BinInterface *bio = new BinFileInterface(filenametmp.c_str(), BIN_FLAGS_WRITEABLE | BIN_FLAGS_HASH_DATA);
pqistreamer *streamer = createStreamer(bio, mOwnId, 0); /* messages are deleted! */
pqistore *store = createStore(bio, mOwnId, 0); /* messages are deleted! */
bool resave = false;
std::list<RsDistribSignedMsg *>::iterator it;
@ -772,13 +767,10 @@ void p3GroupDistrib::locked_publishPendingMsgs()
resave = true;
}
streamer->SendItem(*it); /* deletes it */
streamer->tick();
store->SendItem(*it); /* deletes it */
}
streamer->tick(); /* once more for good luck! */
/* Extract File Information from pqistreamer */
/* Extract File Information from pqistore */
newCache.path = path;
newCache.name = tmpname;
@ -788,7 +780,7 @@ void p3GroupDistrib::locked_publishPendingMsgs()
/* cleanup */
mPendingPublish.clear();
delete streamer;
delete store;
if(!RsDirUtil::renameFile(filenametmp,filename))
{
@ -843,7 +835,7 @@ void p3GroupDistrib::publishDistribGroups()
std::string filenametmp = path + "/" + tmpname + ".tmp";
BinInterface *bio = new BinFileInterface(filenametmp.c_str(), BIN_FLAGS_WRITEABLE | BIN_FLAGS_HASH_DATA);
pqistreamer *streamer = createStreamer(bio, mOwnId, BIN_FLAGS_NO_DELETE);
pqistore *store = createStore(bio, mOwnId, BIN_FLAGS_NO_DELETE);
RsStackMutex stack(distribMtx); /****** STACK MUTEX LOCKED *******/
@ -866,8 +858,7 @@ void p3GroupDistrib::publishDistribGroups()
if (grp)
{
/* store in Cache File */
streamer->SendItem(grp); /* no delete */
streamer->tick();
store->SendItem(grp); /* no delete */
}
/* if they have public keys, publish these too */
@ -896,8 +887,7 @@ void p3GroupDistrib::publishDistribGroups()
pubKey->key.startTS = kit->second.startTS;
pubKey->key.endTS = kit->second.endTS;
streamer->SendItem(pubKey);
streamer->tick();
store->SendItem(pubKey);
delete pubKey;
}
else
@ -922,7 +912,7 @@ void p3GroupDistrib::publishDistribGroups()
}
/* Extract File Information from pqistreamer */
/* Extract File Information from pqistore */
newCache.path = path;
newCache.name = tmpname;
@ -931,7 +921,7 @@ void p3GroupDistrib::publishDistribGroups()
newCache.recvd = time(NULL);
/* cleanup */
delete streamer;
delete store;
if(!RsDirUtil::renameFile(filenametmp,filename))
{
@ -1381,15 +1371,15 @@ bool p3GroupDistrib::loadList(std::list<RsItem *> load)
* As All the child packets are Packed, we should only need RsSerialDistrib() in it.
*/
pqistreamer *p3GroupDistrib::createStreamer(BinInterface *bio, std::string src, uint32_t bioflags)
pqistore *p3GroupDistrib::createStore(BinInterface *bio, std::string src, uint32_t bioflags)
{
RsSerialiser *rsSerialiser = new RsSerialiser();
RsSerialType *serialType = new RsDistribSerialiser();
rsSerialiser->addSerialType(serialType);
pqistreamer *streamer = new pqistreamer(rsSerialiser, src, bio, bioflags);
pqistore *store = new pqistore(rsSerialiser, src, bio, bioflags);
return streamer;
return store;
}

View File

@ -27,7 +27,7 @@
#define P3_GENERIC_DISTRIB_HEADER
#include "pqi/pqi.h"
#include "pqi/pqistreamer.h"
#include "pqi/pqistore.h"
#include "pqi/p3cfgmgr.h"
#include "pqi/p3authmgr.h"
#include "services/p3service.h"
@ -340,7 +340,7 @@ uint16_t locked_determineCacheSubId();
virtual RsSerialType *createSerialiser() = 0;
/* Used to Create/Load Cache Files only */
virtual pqistreamer *createStreamer(BinInterface *bio, std::string src, uint32_t bioflags);
virtual pqistore *createStore(BinInterface *bio, std::string src, uint32_t bioflags);
virtual bool validateDistribGrp(RsDistribGrp *newGrp);
virtual bool locked_checkGroupInfo(GroupInfo &info, RsDistribGrp *newGrp);

View File

@ -145,7 +145,7 @@ void p3Ranking::loadRankFile(std::string filename, std::string src)
uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_READABLE;
BinInterface *bio = new BinFileInterface(filename.c_str(), bioflags);
pqistreamer *stream = new pqistreamer(rsSerialiser, src, bio, 0);
pqistore *store = new pqistore(rsSerialiser, src, bio, 0);
time_t now = time(NULL);
time_t min, max;
@ -169,8 +169,7 @@ void p3Ranking::loadRankFile(std::string filename, std::string src)
RsItem *item;
RsRankLinkMsg *newMsg;
stream->tick(); /* Tick to read! */
while(NULL != (item = stream->GetItem()))
while(NULL != (item = store->GetItem()))
{
#ifdef RANK_DEBUG
@ -210,11 +209,9 @@ void p3Ranking::loadRankFile(std::string filename, std::string src)
newMsg->PeerId(newMsg->pid);
addRankMsg(newMsg);
}
stream->tick(); /* Tick to read! */
}
delete stream;
delete store;
}
@ -262,8 +259,7 @@ void p3Ranking::publishMsgs(bool own)
uint32_t bioflags = BIN_FLAGS_HASH_DATA | BIN_FLAGS_WRITEABLE;
BinInterface *bio = new BinFileInterface(fname.c_str(), bioflags);
pqistreamer *stream = new pqistreamer(rsSerialiser, mOwnId, bio,
BIN_FLAGS_NO_DELETE);
pqistore *store = new pqistore(rsSerialiser, mOwnId, bio, BIN_FLAGS_NO_DELETE);
{ RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
@ -288,9 +284,7 @@ void p3Ranking::publishMsgs(bool own)
item->print(std::cerr, 10);
std::cerr << std::endl;
#endif
stream->SendItem(item);
stream->tick(); /* Tick to write! */
store->SendItem(item);
}
}
else
@ -341,8 +335,7 @@ void p3Ranking::publishMsgs(bool own)
msg->print(std::cerr, 10);
std::cerr << std::endl;
#endif
stream->SendItem(msg);
stream->tick(); /* Tick to write! */
store->SendItem(msg);
/* cleanup */
delete msg;
@ -363,16 +356,12 @@ void p3Ranking::publishMsgs(bool own)
(*ait)->print(std::cerr, 10);
std::cerr << std::endl;
#endif
stream->SendItem(*ait);
stream->tick(); /* Tick to write! */
store->SendItem(*ait);
}
}
} /********** STACK LOCKED MTX ******/
stream->tick(); /* Tick for final write! */
/* flag as new info */
CacheData data;
@ -406,7 +395,7 @@ void p3Ranking::publishMsgs(bool own)
refreshCache(data);
}
delete stream;
delete store;
}
@ -1091,7 +1080,7 @@ std::string p3Ranking::anonRankMsg(std::string rid, std::wstring link, std::wstr
pqistreamer *createStreamer(std::string file, std::string src, bool reading)
pqistore *createStore(std::string file, std::string src, bool reading)
{
RsSerialiser *rsSerialiser = new RsSerialiser();
@ -1109,11 +1098,11 @@ pqistreamer *createStreamer(std::string file, std::string src, bool reading)
/* bin flags: READ | WRITE | HASH_DATA */
BinInterface *bio = new BinFileInterface(file.c_str(), bioflags);
/* streamer flags: NO_DELETE (yes) | NO_CLOSE (no) */
pqistreamer *streamer = new pqistreamer(rsSerialiser, src, bio,
/* store flags: NO_DELETE (yes) | NO_CLOSE (no) */
pqistore *store = new pqistore(rsSerialiser, src, bio,
BIN_FLAGS_NO_DELETE);
return streamer;
return store;
}
std::string generateRandomLinkId()

View File

@ -28,7 +28,7 @@
#include "dbase/cachestrapper.h"
#include "pqi/pqiservice.h"
#include "pqi/pqistreamer.h"
#include "pqi/pqistore.h"
#include "pqi/p3connmgr.h"
#include "pqi/p3cfgmgr.h"
@ -116,7 +116,7 @@ float locked_calcRank(RankGroup &grp); /* returns 0->100 */
void locked_reSortGroup(RankGroup &grp);
void sortAllMsgs();
pqistreamer *createStreamer(std::string file, std::string src, bool reading);
pqistore *createStore(std::string file, std::string src, bool reading);
/****************** p3Config STUFF *******************/