GxsChannels are basically complete now!:

- Fixed up Automatic downloads for GxsChannels. 
 - Tweaked GXS backend to make Meta Changes happen before requests.
 - Added some new Macros.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@6231 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2013-03-16 12:31:31 +00:00
parent 0c604d08d9
commit a4aa011207
6 changed files with 296 additions and 158 deletions

View File

@ -99,16 +99,18 @@ void RsGenExchange::run()
void RsGenExchange::tick()
{
// Meta Changes should happen first.
// This is important, as services want to change Meta, then get results.
// Services shouldn't rely on this ordering - but some do.
processGrpMetaChanges();
processMsgMetaChanges();
mDataAccess->processRequests();
publishGrps();
publishMsgs();
processGrpMetaChanges();
processMsgMetaChanges();
processRecvdData();
if(!mNotifications.empty())

View File

@ -93,7 +93,9 @@ virtual bool getRelatedPosts(const uint32_t &token, std::vector<RsGxsChannelPost
//////////////////////////////////////////////////////////////////////////////
virtual void setMessageReadStatus(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, bool read) = 0;
virtual void setChannelAutoDownload(uint32_t& token, const RsGxsGroupId& groupId, bool autoDownload) = 0;
virtual bool setChannelAutoDownload(const RsGxsGroupId &groupId, bool enabled) = 0;
virtual bool getChannelAutoDownload(const RsGxsGroupId &groupid) = 0;
//virtual void setChannelAutoDownload(uint32_t& token, const RsGxsGroupId& groupId, bool autoDownload) = 0;
//virtual bool setMessageStatus(const std::string &msgId, const uint32_t status, const uint32_t statusMask);
//virtual bool setGroupSubscribeFlags(const std::string &groupId, uint32_t subscribeFlags, uint32_t subscribeMask);

View File

@ -106,10 +106,15 @@ namespace GXS_SERV {
// GENERIC GXS MACROS
#define IS_MSG_NEW(status) (status & GXS_SERV::GXS_MSG_STATUS_UNPROCESSED)
#define IS_MSG_UNREAD(status) (status & GXS_SERV::GXS_MSG_STATUS_UNREAD)
#define IS_GROUP_ADMIN(subscribeFlags) (subscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_ADMIN)
#define IS_GROUP_PUBLISHER(subscribeFlags) (subscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_PUBLISH)
#define IS_GROUP_SUBSCRIBED(subscribeFlags) (subscribeFlags & (GXS_SERV::GROUP_SUBSCRIBE_ADMIN | GXS_SERV::GROUP_SUBSCRIBE_SUBSCRIBED))
#define IS_GROUP_NOT_SUBSCRIBED(subscribeFlags) (subscribeFlags & GXS_SERV::GROUP_SUBSCRIBE_NOT_SUBSCRIBED)
#define IS_MSG_UNPROCESSED(status) (status & GXS_SERV::GXS_MSG_STATUS_UNPROCESSED)
#endif // RSGXSFLAGS_H

View File

@ -31,6 +31,8 @@
#include "retroshare/rsgxsflags.h"
#include "retroshare/rsfiles.h"
#include <stdio.h>
// For Dummy Msgs.
@ -55,6 +57,9 @@ RsGxsChannels *rsGxsChannels = NULL;
#define CHANNEL_TESTEVENT_DUMMYDATA 0x0002
#define DUMMYDATA_PERIOD 60 // long enough for some RsIdentities to be generated.
#define CHANNEL_DOWNLOAD_PERIOD (3600 * 24 * 7)
#define CHANNEL_MAX_AUTO_DL (1024 * 1024 * 1024)
/********************************************************************************/
/******************* Startup / Tick ******************************************/
/********************************************************************************/
@ -90,6 +95,13 @@ uint32_t p3GxsChannels::channelsAuthenPolicy()
}
/** Overloaded to cache new groups **/
RsGenExchange::ServiceCreate_Return p3GxsChannels::service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& keySet)
{
updateSubscribedGroup(grpItem->meta);
return SERVICE_CREATE_SUCCESS;
}
void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
{
@ -126,17 +138,26 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
}
}
}
/* shouldn't need to worry about groups - as they need to be subscribed to */
}
request_SpecificSubscribedGroups(unprocessedGroups);
RsGxsIfaceHelper::receiveChanges(changes);
}
void p3GxsChannels::service_tick()
{
static time_t last_dummy_tick = 0;
if (time(NULL) > last_dummy_tick + 5)
{
dummy_tick();
last_dummy_tick = time(NULL);
}
RsTickEvent::tick_events();
GxsTokenQueue::checkRequests();
@ -284,13 +305,32 @@ bool p3GxsChannels::getRelatedPosts(const uint32_t &token, std::vector<RsGxsChan
/********************************************************************************************/
/********************************************************************************************/
void p3GxsChannels::setChannelAutoDownload(uint32_t&, const RsGxsGroupId&, bool)
#if 0
bool p3GxsChannels::setChannelAutoDownload(uint32_t &token, const RsGxsGroupId &groupId, bool autoDownload)
{
std::cerr << "p3GxsChannels::setChannelAutoDownload() TODO";
std::cerr << "p3GxsChannels::setChannelAutoDownload()";
std::cerr << std::endl;
// we don't actually use the token at this point....
//bool p3GxsChannels::setAutoDownload(const RsGxsGroupId &groupId, bool enabled)
return;
}
#endif
bool p3GxsChannels::setChannelAutoDownload(const RsGxsGroupId &groupId, bool enabled)
{
return setAutoDownload(groupId, enabled);
}
bool p3GxsChannels::getChannelAutoDownload(const RsGxsGroupId &groupId)
{
return autoDownloadEnabled(groupId);
}
void p3GxsChannels::request_AllSubscribedGroups()
@ -309,6 +349,8 @@ void p3GxsChannels::request_AllSubscribedGroups()
RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts);
GxsTokenQueue::queueRequest(token, GXSCHANNELS_SUBSCRIBED_META);
#define PERIODIC_ALL_PROCESS 300 // TESTING every 5 minutes.
RsTickEvent::schedule_in(CHANNEL_PROCESS, PERIODIC_ALL_PROCESS);
}
@ -426,7 +468,7 @@ void p3GxsChannels::request_SpecificUnprocessedPosts(std::list<std::pair<RsGxsGr
uint32_t ansType = RS_TOKREQ_ANSTYPE_DATA;
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_GROUP_DATA;
opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
uint32_t token = 0;
@ -453,21 +495,26 @@ void p3GxsChannels::request_GroupUnprocessedPosts(const std::list<RsGxsGroupId>
uint32_t ansType = RS_TOKREQ_ANSTYPE_DATA;
RsTokReqOptions opts;
opts.mReqType = GXS_REQUEST_TYPE_GROUP_DATA;
opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
uint32_t token = 0;
RsGenExchange::getTokenService()->requestGroupInfo(token, ansType, opts);
RsGenExchange::getTokenService()->requestMsgInfo(token, ansType, opts, grouplist);
GxsTokenQueue::queueRequest(token, GXSCHANNELS_UNPROCESSED_GENERIC);
}
void p3GxsChannels::load_SpecificUnprocessedPosts(const uint32_t &token)
{
std::cerr << "p3GxsChannels::load_SpecificUnprocessedPosts";
std::cerr << std::endl;
std::vector<RsGxsChannelPost> posts;
if (!getPostData(token, posts))
{
std::cerr << "p3GxsChannels::load_SpecificUnprocessedPosts ERROR";
std::cerr << std::endl;
return;
}
@ -483,11 +530,14 @@ void p3GxsChannels::load_SpecificUnprocessedPosts(const uint32_t &token)
void p3GxsChannels::load_GroupUnprocessedPosts(const uint32_t &token)
{
std::vector<RsGxsChannelPost> posts;
std::cerr << "p3GxsChannels::load_GroupUnprocessedPosts";
std::cerr << std::endl;
std::vector<RsGxsChannelPost> posts;
if (!getPostData(token, posts))
{
std::cerr << "p3GxsChannels::load_GroupUnprocessedPosts ERROR";
std::cerr << std::endl;
return;
}
@ -499,27 +549,55 @@ void p3GxsChannels::load_GroupUnprocessedPosts(const uint32_t &token)
}
}
void p3GxsChannels::handleUnprocessedPost(const RsGxsChannelPost &msg)
{
std::cerr << "p3GxsChannels::handleUnprocessedPost() GroupId: " << msg.mMeta.mGroupId << " MsgId: " << msg.mMeta.mMsgId;
std::cerr << std::endl;
if (!IS_MSG_UNPROCESSED(msg.mMeta.mMsgStatus))
{
std::cerr << "p3GxsChannels::handleUnprocessedPost() Msg already Processed";
std::cerr << std::endl;
return;
}
/* check that autodownload is set */
if (autoDownloadEnabled(msg.mMeta.mGroupId))
{
std::cerr << "p3GxsChannels::handleUnprocessedPost() AutoDownload Enabled ... handling";
std::cerr << std::endl;
/* check the date is not too old */
time_t age = time(NULL) - msg.mMeta.mPublishTs;
if (age < (time_t) CHANNEL_DOWNLOAD_PERIOD )
{
/* start download */
// NOTE WE DON'T HANDLE PRIVATE CHANNELS HERE.
// MORE THOUGHT HAS TO GO INTO THAT STUFF.
std::cerr << "p3GxsChannels::handleUnprocessedPost() START DOWNLOAD (TODO)";
std::cerr << "p3GxsChannels::handleUnprocessedPost() START DOWNLOAD";
std::cerr << std::endl;
std::list<RsGxsFile>::const_iterator fit;
for(fit = msg.mFiles.begin(); fit != msg.mFiles.end(); fit++)
{
std::string fname = fit->mName;
std::string hash = fit->mHash;
uint64_t size = fit->mSize;
std::list<std::string> srcIds;
std::string localpath = "";
TransferRequestFlags flags = RS_FILE_REQ_BACKGROUND | RS_FILE_REQ_ANONYMOUS_ROUTING;
if (size < CHANNEL_MAX_AUTO_DL)
{
rsFiles->FileRequest(fname, hash, size, localpath, flags, srcIds);
}
}
}
/* mark as processed */
uint32_t token;
@ -578,7 +656,7 @@ bool p3GxsChannels::autoDownloadEnabled(const RsGxsGroupId &id)
std::map<RsGxsGroupId, RsGroupMetaData>::iterator it;
it = mSubscribedGroups.find(id);
if (it != mSubscribedGroups.end())
if (it == mSubscribedGroups.end())
{
std::cerr << "p3GxsChannels::autoDownloadEnabled() No Entry";
std::cerr << std::endl;
@ -631,7 +709,7 @@ bool p3GxsChannels::setAutoDownload(const RsGxsGroupId &groupId, bool enabled)
std::map<RsGxsGroupId, RsGroupMetaData>::iterator it;
it = mSubscribedGroups.find(groupId);
if (it != mSubscribedGroups.end())
if (it == mSubscribedGroups.end())
{
std::cerr << "p3GxsChannels::setAutoDownload() Missing Group";
std::cerr << std::endl;
@ -654,7 +732,9 @@ bool p3GxsChannels::setAutoDownload(const RsGxsGroupId &groupId, bool enabled)
ss.mAutoDownload = enabled;
std::string serviceString = ss.save();
uint32_t token;
RsGenExchange::setGroupServiceString(token, groupId, serviceString);
it->second.mServiceString = serviceString; // update Local Cache.
RsGenExchange::setGroupServiceString(token, groupId, serviceString); // update dbase.
/* now reload it */
std::list<RsGxsGroupId> groups;
@ -754,10 +834,10 @@ bool p3GxsChannels::ExtraFileRemove(const std::string &hash)
/* so we need the same tick idea as wiki for generating dummy channels
*/
#define MAX_GEN_GROUPS 3
#define MAX_GEN_POSTS 8
#define MAX_GEN_COMMENTS 50
#define MAX_GEN_VOTES 500
#define MAX_GEN_GROUPS 20
#define MAX_GEN_POSTS 500
#define MAX_GEN_COMMENTS 600
#define MAX_GEN_VOTES 700
std::string p3GxsChannels::genRandomId()
{

View File

@ -66,6 +66,9 @@ virtual void service_tick();
protected:
// Overloaded to cache new groups.
virtual RsGenExchange::ServiceCreate_Return service_CreateGroup(RsGxsGrpItem* grpItem, RsTlvSecurityKeySet& keySet);
virtual void notifyChanges(std::vector<RsGxsNotify*>& changes);
// Overloaded from RsTickEvent.
@ -89,7 +92,9 @@ virtual bool getRelatedPosts(const uint32_t &token, std::vector<RsGxsChannelPost
virtual bool createGroup(uint32_t &token, RsGxsChannelGroup &group);
virtual bool createPost(uint32_t &token, RsGxsChannelPost &post);
virtual void setChannelAutoDownload(uint32_t&, const RsGxsGroupId&, bool);
// no tokens... should be cached.
virtual bool setChannelAutoDownload(const RsGxsGroupId &groupId, bool enabled);
virtual bool getChannelAutoDownload(const RsGxsGroupId &groupid);
/* Comment service - Provide RsGxsCommentService - redirect to p3GxsCommentService */
virtual bool getCommentData(const uint32_t &token, std::vector<RsGxsComment> &msgs)
@ -164,8 +169,8 @@ static uint32_t channelsAuthenPolicy();
// Local Cache of Subscribed Groups. and AutoDownload Flag.
void updateSubscribedGroup(const RsGroupMetaData &group);
void clearUnsubscribedGroup(const RsGxsGroupId &id);
bool autoDownloadEnabled(const RsGxsGroupId &id);
bool setAutoDownload(const RsGxsGroupId &groupId, bool enabled);
bool autoDownloadEnabled(const RsGxsGroupId &id);

View File

@ -40,16 +40,17 @@
/****
* #define DEBUG_IDS 1
* #define GXSID_GEN_DUMMY_DATA 1
* #define ENABLE_PGP_SIGNATURES 1
****/
#define GXSID_GEN_DUMMY_DATA 1
#define ENABLE_PGP_SIGNATURES 1
#define ID_REQUEST_LIST 0x0001
#define ID_REQUEST_IDENTITY 0x0002
#define ID_REQUEST_REPUTATION 0x0003
#define ID_REQUEST_OPINION 0x0004
#define ENABLE_PGP_SIGNATURES 1
RsIdentity *rsIdentity = NULL;
@ -161,6 +162,48 @@ void p3IdService::notifyChanges(std::vector<RsGxsNotify *> &changes)
std::cerr << "p3IdService::notifyChanges()";
std::cerr << std::endl;
/* iterate through and grab any new messages */
std::list<RsGxsGroupId> unprocessedGroups;
std::vector<RsGxsNotify *>::iterator it;
for(it = changes.begin(); it != changes.end(); it++)
{
RsGxsGroupChange *groupChange = dynamic_cast<RsGxsGroupChange *>(*it);
RsGxsMsgChange *msgChange = dynamic_cast<RsGxsMsgChange *>(*it);
if (msgChange)
{
std::cerr << "p3IdService::notifyChanges() Found Message Change Notification";
std::cerr << std::endl;
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> > &msgChangeMap = msgChange->msgChangeMap;
std::map<RsGxsGroupId, std::vector<RsGxsMessageId> >::iterator mit;
for(mit = msgChangeMap.begin(); mit != msgChangeMap.end(); mit++)
{
std::cerr << "p3IdService::notifyChanges() Msgs for Group: " << mit->first;
std::cerr << std::endl;
}
}
/* shouldn't need to worry about groups - as they need to be subscribed to */
if (groupChange)
{
std::cerr << "p3IdService::notifyChanges() Found Message Change Notification";
std::cerr << std::endl;
std::list<RsGxsGroupId> &groupList = groupChange->mGrpIdList;
std::list<RsGxsGroupId>::iterator git;
for(git = groupList.begin(); git != groupList.end(); git++)
{
std::cerr << "p3IdService::notifyChanges() Auto Subscribe to Incoming Groups: " << *git;
std::cerr << std::endl;
uint32_t token;
RsGenExchange::subscribeToGroup(token, *git, true);
}
}
}
RsGxsIfaceHelper::receiveChanges(changes);
}
@ -1343,7 +1386,7 @@ RsGenExchange::ServiceCreate_Return p3IdService::service_CreateGroup(RsGxsGrpIte
/* do signature */
#if ENABLE_PGP_SIGNATURES
#ifdef ENABLE_PGP_SIGNATURES
#define MAX_SIGN_SIZE 2048
uint8_t signarray[MAX_SIGN_SIZE];
unsigned int sign_size = MAX_SIGN_SIZE;
@ -1369,6 +1412,7 @@ RsGenExchange::ServiceCreate_Return p3IdService::service_CreateGroup(RsGxsGrpIte
/* done! */
#else
item->group.mPgpIdSign = "";
createStatus = SERVICE_CREATE_SUCCESS;
#endif
}
@ -1685,7 +1729,7 @@ bool p3IdService::checkId(const RsGxsIdGroup &grp, PGPIdType &pgpId)
std::cerr << "p3IdService::checkId() HASH MATCH!";
std::cerr << std::endl;
#if ENABLE_PGP_SIGNATURES
#ifdef ENABLE_PGP_SIGNATURES
/* miracle match! */
/* check signature too */
if (AuthGPG::getAuthGPG()->VerifySignBin((void *) hash.toByteArray(), hash.SIZE_IN_BYTES,