Lots of little changes to libretroshare. Improvements mainly focused

on configuration storage and loading:
 * Improved Configuration Manager (almost finished)
 * Mutex protections for Configuration system
 * added Configuration storage to p3ConnectMgr.
 * added Configuration storage to p3MsgService.
 * bugfixes in p3GeneralConfig.
 * Added Config Save notification where necessary.
 * added safe FinalConfigSave before exit().
 * added RsPeerNetItem + RsPeerStunItem (serialiser)
 * reordered startup for correct config loading.
 * enabled Loading of certs in AuthXPGP.
 * move sockaddr_clear() to util/rsnet.h
 * switched p3MsgService sendMessage checking to pqiMonitor syste,.
 * corrected pqiarchive saving of PeerIds.
 * added setNetworkMode() & setVisState() to p3ConnectMgr.
 * added Mutex protection to p3ranking.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@336 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2008-02-07 16:18:34 +00:00
parent 2c69fd7eaf
commit 806b8285f2
36 changed files with 1712 additions and 599 deletions

View file

@ -51,8 +51,8 @@ const int msgservicezone = 54319;
p3MsgService::p3MsgService(p3ConnectMgr *cm)
:p3Service(RS_SERVICE_TYPE_MSG), mConnMgr(cm),
msgChanged(1), mMsgUniqueId(1)
:p3Service(RS_SERVICE_TYPE_MSG), pqiConfig(CONFIG_TYPE_MSGS),
mConnMgr(cm), msgChanged(1), mMsgUniqueId(1)
{
addSerialType(new RsMsgSerialiser());
}
@ -102,7 +102,7 @@ int p3MsgService::tick()
*/
incomingMsgs();
checkOutgoingMessages();
//checkOutgoingMessages();
return 0;
}
@ -146,12 +146,19 @@ int p3MsgService::incomingMsgs()
imsg[mi->msgId] = mi;
msgChanged.IndicateChanged();
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
/**** STACK UNLOCKED ***/
}
return 1;
}
void p3MsgService::statusChange(const std::list<pqipeer> &plist)
{
/* should do it properly! */
checkOutgoingMessages();
}
int p3MsgService::checkOutgoingMessages()
{
/* iterate through the outgoing queue
@ -215,11 +222,18 @@ int p3MsgService::checkOutgoingMessages()
}
}
if (toErase.size() > 0)
{
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
}
return 0;
}
int p3MsgService::save_config()
bool p3MsgService::saveConfiguration()
{
std::list<std::string>::iterator it;
std::string empty("");
@ -230,14 +244,14 @@ int p3MsgService::save_config()
/* now we create a pqiarchive, and stream all the msgs into it
*/
std::string msgfile = Filename();
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
std::string statelog = config_dir + "/msgs.rst";
RsSerialiser *rss = new RsSerialiser();
rss->addSerialType(new RsMsgSerialiser());
BinFileInterface *out = new BinFileInterface((char *) statelog.c_str(), BIN_FLAGS_WRITEABLE);
BinFileInterface *out = new BinFileInterface(msgfile.c_str(), BIN_FLAGS_WRITEABLE | BIN_FLAGS_HASH_DATA);
pqiarchive *pa_out = new pqiarchive(rss, out, BIN_FLAGS_WRITEABLE | BIN_FLAGS_NO_DELETE);
bool written = false;
@ -253,8 +267,6 @@ int p3MsgService::save_config()
for(mit = msgOutgoing.begin(); mit != msgOutgoing.end(); mit++)
{
//RsMsgItem *mi = (*mit)->clone();
//mi -> msgFlags |= RS_MSG_FLAGS_PENDING;
if (pa_out -> SendItem(mit->second))
{
written = true;
@ -262,30 +274,26 @@ int p3MsgService::save_config()
}
setHash(out->gethash());
delete pa_out;
return 1;
return true;
}
int p3MsgService::load_config()
bool p3MsgService::loadConfiguration(std::string &loadHash)
{
std::list<std::string>::iterator it;
std::string empty("");
std::string dir("notempty");
std::string str_true("true");
/* load msg/ft */
std::string statelog = config_dir + "/msgs.rst";
std::string msgfile = Filename();
RsSerialiser *rss = new RsSerialiser();
rss->addSerialType(new RsMsgSerialiser());
BinFileInterface *in = new BinFileInterface((char *) statelog.c_str(), BIN_FLAGS_READABLE);
BinFileInterface *in = new BinFileInterface(msgfile.c_str(), BIN_FLAGS_READABLE | BIN_FLAGS_HASH_DATA);
pqiarchive *pa_in = new pqiarchive(rss, in, BIN_FLAGS_READABLE);
RsItem *item;
RsMsgItem *mitem;
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
while((item = pa_in -> GetItem()))
{
@ -298,6 +306,8 @@ int p3MsgService::load_config()
mitem->msgId = getNewUniqueMsgId();
if (mitem -> msgFlags & RS_MSG_FLAGS_PENDING)
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
std::cerr << "MSG_PENDING";
std::cerr << std::endl;
mitem->print(std::cerr);
@ -305,6 +315,8 @@ int p3MsgService::load_config()
}
else
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
imsg[mitem->msgId] = mitem;
}
}
@ -314,9 +326,23 @@ int p3MsgService::load_config()
}
}
std::string hashin = in->gethash();
if (hashin != loadHash)
{
/* big error message! */
std::cerr << "p3MsgService::loadConfiguration() FAILED!" << std::endl;
std::cerr << "p3MsgService::loadConfiguration() FAILED!" << std::endl;
std::cerr << "p3MsgService::loadConfiguration() FAILED!" << std::endl;
std::cerr << "p3MsgService::loadConfiguration() FAILED!" << std::endl;
std::cerr << "p3MsgService::loadConfiguration() FAILED!" << std::endl;
}
setHash(hashin);
delete pa_in;
return 1;
return true;
}
@ -423,6 +449,8 @@ bool p3MsgService::removeMsgId(std::string mid)
delete mi;
msgChanged.IndicateChanged();
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
return true;
}
@ -434,6 +462,8 @@ bool p3MsgService::removeMsgId(std::string mid)
delete mi;
msgChanged.IndicateChanged();
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
return true;
}
@ -466,6 +496,9 @@ int p3MsgService::sendMessage(RsMsgItem *item)
pqioutput(PQL_DEBUG_BASIC, msgservicezone,
"p3MsgService::sendMessage()");
item -> msgId = getNewUniqueMsgId(); /* grabs Mtx as well */
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
/* add pending flag */
@ -473,8 +506,12 @@ int p3MsgService::sendMessage(RsMsgItem *item)
(RS_MSG_FLAGS_OUTGOING |
RS_MSG_FLAGS_PENDING);
/* STORE MsgID */
item -> msgId = getNewUniqueMsgId();
msgOutgoing[item->msgId] = item;
}
IndicateConfigChanged(); /**** INDICATE MSG CONFIG CHANGED! *****/
checkOutgoingMessages();
return 1;
}

View file

@ -41,7 +41,7 @@
class p3ConnectMgr;
class p3MsgService: public p3Service
class p3MsgService: public p3Service, public pqiConfig, public pqiMonitor
{
public:
p3MsgService(p3ConnectMgr *cm);
@ -63,18 +63,22 @@ bool MessageSend(MessageInfo &info);
void loadWelcomeMsg(); /* startup message */
int checkOutgoingMessages();
std::list<RsMsgItem *> &getMsgList();
std::list<RsMsgItem *> &getMsgOutList();
int load_config();
int save_config();
//std::list<RsMsgItem *> &getMsgList();
//std::list<RsMsgItem *> &getMsgOutList();
int tick();
int status();
/*** Overloaded from pqiConfig ****/
virtual bool loadConfiguration(std::string &loadHash);
virtual bool saveConfiguration();
/*** Overloaded from pqiConfig ****/
/*** Overloaded from pqiMonitor ***/
virtual void statusChange(const std::list<pqipeer> &plist);
int checkOutgoingMessages();
/*** Overloaded from pqiMonitor ***/
private:
uint32_t getNewUniqueMsgId();

View file

@ -49,10 +49,15 @@ p3Ranking::p3Ranking(uint16_t type, CacheTransfer *cft,
CacheStore(type, true, cft, storedir),
mStorePeriod(storePeriod)
{
{ RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
mOwnId = getAuthMgr()->OwnId();
mViewPeriod = 60 * 60 * 24 * 30; /* one Month */
mSortType = RS_RANK_ALG;
} RsStackMutex stack(mRankMtx);
// createDummyData();
return;
}
@ -100,8 +105,14 @@ void p3Ranking::loadRankFile(std::string filename, std::string src)
pqistreamer *stream = new pqistreamer(rsSerialiser, src, bio, 0);
time_t now = time(NULL);
time_t min = now - mStorePeriod;
time_t max = now + RANK_MAX_FWD_OFFSET;
time_t min, max;
{ RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
min = now - mStorePeriod;
max = now + RANK_MAX_FWD_OFFSET;
} /********** STACK LOCKED MTX ******/
#ifdef RANK_DEBUG
std::cerr << "p3Ranking::loadRankFile()";
@ -192,6 +203,8 @@ void p3Ranking::publishMsgs()
pqistreamer *stream = new pqistreamer(rsSerialiser, mOwnId, bio,
BIN_FLAGS_NO_DELETE);
{ RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
/* iterate through list */
std::map<std::string, RankGroup>::iterator it;
for(it = mData.begin(); it != mData.end(); it++)
@ -223,11 +236,18 @@ void p3Ranking::publishMsgs()
}
}
} /********** STACK LOCKED MTX ******/
stream->tick(); /* Tick for final write! */
/* flag as new info */
CacheData data;
{ RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
data.pid = mOwnId;
} /********** STACK LOCKED MTX ******/
data.cid = CacheId(CacheSource::getCacheType(), 1);
data.path = path;
@ -272,6 +292,8 @@ void p3Ranking::addRankMsg(RsRankLinkMsg *msg)
std::cerr << std::endl;
#endif
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
std::map<std::string, RankGroup>::iterator it;
it = mData.find(rid);
if (it == mData.end())
@ -328,7 +350,7 @@ void p3Ranking::addRankMsg(RsRankLinkMsg *msg)
mRepublish = true;
}
reSortGroup(it->second);
locked_reSortGroup(it->second);
}
}
@ -337,8 +359,14 @@ void p3Ranking::addRankMsg(RsRankLinkMsg *msg)
bool p3Ranking::setSortPeriod(uint32_t period)
{
bool reSort = (mViewPeriod != period);
mViewPeriod = period;
bool reSort = false;
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
reSort = (mViewPeriod != period);
mViewPeriod = period;
}
if (reSort)
{
@ -350,8 +378,13 @@ bool p3Ranking::setSortPeriod(uint32_t period)
bool p3Ranking::setSortMethod(uint32_t type)
{
bool reSort = (mSortType != type);
mSortType = type;
bool reSort = false;
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
reSort = (mSortType != type);
mSortType = type;
}
if (reSort)
{
@ -363,9 +396,14 @@ bool p3Ranking::setSortMethod(uint32_t type)
bool p3Ranking::clearPeerFilter()
{
bool reSort = (mPeerFilter.size() > 0);
bool reSort = false;
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
reSort = (mPeerFilter.size() > 0);
mPeerFilter.clear();
}
mPeerFilter.clear();
if (reSort)
{
@ -377,7 +415,10 @@ bool p3Ranking::clearPeerFilter()
bool p3Ranking::setPeerFilter(std::list<std::string> peers)
{
mPeerFilter = peers;
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
mPeerFilter = peers;
}
sortAllMsgs();
@ -507,10 +548,9 @@ float p3Ranking::locked_calcRank(RankGroup &grp)
}
void p3Ranking::reSortGroup(RankGroup &grp)
void p3Ranking::locked_reSortGroup(RankGroup &grp)
{
std::string rid = grp.rid;
//float rank = grp.rank;
/* remove from existings rankings */
std::multimap<float, std::string>::iterator rit;
@ -532,6 +572,8 @@ void p3Ranking::reSortGroup(RankGroup &grp)
void p3Ranking::sortAllMsgs()
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
/* iterate through list and re-score each one */
std::map<std::string, RankGroup>::iterator it;
@ -554,11 +596,15 @@ void p3Ranking::sortAllMsgs()
/* get Ids */
uint32_t p3Ranking::getRankingsCount()
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
return mRankings.size();
}
float p3Ranking::getMaxRank()
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
if (mRankings.size() == 0)
return 0;
@ -567,6 +613,8 @@ float p3Ranking::getMaxRank()
bool p3Ranking::getRankings(uint32_t first, uint32_t count, std::list<std::string> &rids)
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
uint32_t i = 0;
std::multimap<float, std::string>::reverse_iterator rit;
for(rit = mRankings.rbegin(); (i < first) && (rit != mRankings.rend()); rit++);
@ -579,9 +627,13 @@ bool p3Ranking::getRankings(uint32_t first, uint32_t count, std::list<std::st
return true;
}
bool p3Ranking::getRankDetails(std::string rid, RsRankDetails &details)
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
/* get the details. */
std::map<std::string, RankGroup>::iterator it;
it = mData.find(rid);
if (mData.end() == it)
@ -613,9 +665,17 @@ bool p3Ranking::getRankDetails(std::string rid, RsRankDetails &details)
void p3Ranking::tick()
{
if (mRepublish)
bool repub = false;
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
repub = mRepublish;
}
if (repub)
{
publishMsgs();
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
mRepublish = false;
}
}
@ -633,7 +693,11 @@ std::string p3Ranking::newRankMsg(std::wstring link, std::wstring title, std::ws
time_t now = time(NULL);
msg->PeerId(mOwnId);
{
RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
msg->PeerId(mOwnId);
}
msg->rid = rid;
msg->title = title;
msg->timestamp = now;
@ -654,6 +718,9 @@ bool p3Ranking::updateComment(std::string rid, std::wstring comment)
std::cerr << "p3Ranking::updateComment() rid:" << rid;
std::cerr << std::endl;
#endif
RsRankLinkMsg *msg = NULL;
{ RsStackMutex stack(mRankMtx); /********** STACK LOCKED MTX ******/
std::map<std::string, RankGroup>::iterator it;
it = mData.find(rid);
@ -668,7 +735,7 @@ bool p3Ranking::updateComment(std::string rid, std::wstring comment)
return false;
}
RsRankLinkMsg *msg = new RsRankLinkMsg();
msg = new RsRankLinkMsg();
time_t now = time(NULL);
@ -681,6 +748,8 @@ bool p3Ranking::updateComment(std::string rid, std::wstring comment)
msg->linktype = RS_LINK_TYPE_WEB;
msg->link = (it->second).link;
} /********** STACK UNLOCKED MTX ******/
#ifdef RANK_DEBUG
std::cerr << "p3Ranking::updateComment() Item:";
std::cerr << std::endl;

View file

@ -105,8 +105,10 @@ void tick();
void loadRankFile(std::string filename, std::string src);
void addRankMsg(RsRankLinkMsg *msg);
void publishMsgs();
float locked_calcRank(RankGroup &grp); /* returns 0->100 */
void reSortGroup(RankGroup &grp);
void locked_reSortGroup(RankGroup &grp);
void sortAllMsgs();
pqistreamer *createStreamer(std::string file, std::string src, bool reading);
@ -114,6 +116,10 @@ pqistreamer *createStreamer(std::string file, std::string src, bool reading);
void createDummyData();
RsMutex mRankMtx;
/***** below here is locked *****/
bool mRepublish;
uint32_t mStorePeriod;