Merge pull request #842 from csoler/v0.6-GxsTransport

V0.6 gxs transport
This commit is contained in:
csoler 2017-05-26 22:04:33 +02:00 committed by GitHub
commit 7c439983de
57 changed files with 2226 additions and 613 deletions

View file

@ -1624,11 +1624,18 @@ Sha1CheckSum p3GRouter::computeDataItemHash(RsGRouterGenericDataItem *data_item)
RsTemporaryMemory mem(total_size) ;
uint32_t offset = 0 ;
signature_serializer.serialise(data_item,mem,&total_size) ;
offset += signed_data_size ;
uint32_t tmp_size = total_size ;
signature_serializer.serialise(data_item,mem,&tmp_size) ;
if(tmp_size != signed_data_size)
std::cerr << "(EE) Some error occured in p3GRouter::computeDataItemHash(). Mismatched offset/data size" << std::endl;
offset += tmp_size ;
data_item->signature.SetTlv(mem, total_size,&offset) ;
if(offset != total_size)
std::cerr << "(EE) Some error occured in p3GRouter::computeDataItemHash(). Mismatched offset/data size" << std::endl;
return RsDirUtil::sha1sum(mem,total_size) ;
}

View file

@ -1576,11 +1576,20 @@ bool RsGxsDataAccess::getGroupStatistic(GroupStatisticRequest *req)
req->mGroupStatistic.mNumChildMsgsNew = 0;
req->mGroupStatistic.mNumChildMsgsUnread = 0;
std::set<RsGxsMessageId> obsolete_msgs ; // stored message ids that are referred to as older versions of an existing message
for(uint32_t i = 0; i < msgMetaV.size(); ++i)
if(!msgMetaV[i]->mOrigMsgId.isNull() && msgMetaV[i]->mOrigMsgId!=msgMetaV[i]->mMsgId)
obsolete_msgs.insert(msgMetaV[i]->mOrigMsgId);
for(uint32_t i = 0; i < msgMetaV.size(); ++i)
{
RsGxsMsgMetaData* m = msgMetaV[i];
req->mGroupStatistic.mTotalSizeOfMsgs += m->mMsgSize + m->serial_size();
if(obsolete_msgs.find(m->mMsgId) != obsolete_msgs.end()) // skip obsolete messages.
continue;
if (IS_MSG_NEW(m->mMsgStatus))
{
if (m->mParentId.isNull())

View file

@ -16,11 +16,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "util/rsdir.h"
#include "gxstrans/p3gxstrans.h"
#include "util/stacktrace.h"
typedef unsigned int uint;
RsGxsTrans *rsGxsTrans = NULL ;
p3GxsTrans::~p3GxsTrans()
{
p3Config::saveConfiguration();
@ -31,6 +34,35 @@ p3GxsTrans::~p3GxsTrans()
}
}
bool p3GxsTrans::getStatistics(GxsTransStatistics& stats)
{
stats.prefered_group_id = mPreferredGroupId;
stats.outgoing_records.clear();
{
RS_STACK_MUTEX(mOutgoingMutex);
for ( auto it = mOutgoingQueue.begin(); it != mOutgoingQueue.end(); ++it)
{
const OutgoingRecord& pr(it->second);
RsGxsTransOutgoingRecord rec ;
rec.status = pr.status ;
rec.send_TS = pr.mailItem.meta.mPublishTs ;
rec.group_id = pr.mailItem.meta.mGroupId ;
rec.trans_id = pr.mailItem.mailId ;
rec.recipient = pr.recipient ;
rec.data_size = pr.mailData.size();
rec.data_hash = RsDirUtil::sha1sum(pr.mailData.data(),pr.mailData.size());
rec.client_service = pr.clientService ;
stats.outgoing_records.push_back(rec) ;
}
}
return true;
}
bool p3GxsTrans::sendMail( RsGxsTransId& mailId,
GxsTransSubServices service,
const RsGxsId& own_gxsid, const RsGxsId& recipient,
@ -120,9 +152,9 @@ void p3GxsTrans::handleResponse(uint32_t token, uint32_t req_type)
&& meta.mGroupId != mPreferredGroupId;
if(shoudlSubscribe)
subscribeToGroup(token, meta.mGroupId, true);
RsGenExchange::subscribeToGroup(token, meta.mGroupId, true);
else if(shoudlUnSubscribe)
subscribeToGroup(token, meta.mGroupId, false);
RsGenExchange::subscribeToGroup(token, meta.mGroupId, false);
#ifdef GXS_MAIL_GRP_DEBUG
char buff[30];
@ -337,7 +369,7 @@ void p3GxsTrans::notifyChanges(std::vector<RsGxsNotify*>& changes)
std::cout << "p3GxsTrans::notifyChanges(...) msgChange" << std::endl;
uint32_t token;
RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_MSG_DATA;
getTokenService()->requestMsgInfo( token, 0xcaca,
RsGenExchange::getTokenService()->requestMsgInfo( token, 0xcaca,
opts, msgChange->msgChangeMap );
GxsTokenQueue::queueRequest(token, MAILS_UPDATE);
@ -393,8 +425,8 @@ bool p3GxsTrans::requestGroupsData(const std::list<RsGxsGroupId>* groupIds)
// std::cout << "p3GxsTrans::requestGroupsList()" << std::endl;
uint32_t token;
RsTokReqOptions opts; opts.mReqType = GXS_REQUEST_TYPE_GROUP_DATA;
if(!groupIds) getTokenService()->requestGroupInfo(token, 0xcaca, opts);
else getTokenService()->requestGroupInfo(token, 0xcaca, opts, *groupIds);
if(!groupIds) RsGenExchange::getTokenService()->requestGroupInfo(token, 0xcaca, opts);
else RsGenExchange::getTokenService()->requestGroupInfo(token, 0xcaca, opts, *groupIds);
GxsTokenQueue::queueRequest(token, GROUPS_LIST);
return true;
}

View file

@ -26,6 +26,7 @@
#include "gxstrans/p3gxstransitems.h"
#include "services/p3idservice.h" // For p3IdService
#include "util/rsthreads.h"
#include "retroshare/rsgxstrans.h"
struct p3GxsTrans;
@ -76,18 +77,32 @@ struct GxsTransClient
* @see GxsTransClient::receiveGxsTransMail(...),
* @see GxsTransClient::notifyGxsTransSendStatus(...).
*/
struct p3GxsTrans : RsGenExchange, GxsTokenQueue, p3Config
class p3GxsTrans : public RsGenExchange, public GxsTokenQueue, public p3Config, public RsGxsTrans
{
public:
p3GxsTrans( RsGeneralDataService* gds, RsNetworkExchangeService* nes,
p3IdService& identities ) :
RsGenExchange( gds, nes, new RsGxsTransSerializer(),
RS_SERVICE_TYPE_GXS_TRANS, &identities,
AuthenPolicy(), GXS_STORAGE_PERIOD ),
GxsTokenQueue(this), mIdService(identities),
GxsTokenQueue(this),
RsGxsTrans(this),
mIdService(identities),
mServClientsMutex("p3GxsTrans client services map mutex"),
mOutgoingMutex("p3GxsTrans outgoing queue map mutex"),
mIngoingMutex("p3GxsTrans ingoing queue map mutex") {}
~p3GxsTrans();
virtual ~p3GxsTrans();
/*!
* \brief getStatistics
* Gathers all sorts of statistics about the internals of p3GxsTrans, in order to display info about the running status,
* message transport, etc.
* \param stats This structure contains all statistics information.
* \return true is the call succeeds.
*/
virtual bool getStatistics(GxsTransStatistics& stats);
/**
* Send an email to recipient, in the process author of the email is

View file

@ -23,30 +23,11 @@
#include "serialiser/rsbaseserial.h"
#include "serialiser/rstlvidset.h"
#include "retroshare/rsgxsflags.h"
#include "retroshare/rsgxstrans.h"
#include "retroshare/rsgxscircles.h" // For: GXS_CIRCLE_TYPE_PUBLIC
#include "services/p3idservice.h"
#include "serialiser/rstypeserializer.h"
/// Subservices identifiers (like port for TCP)
enum class GxsTransSubServices : uint16_t
{
UNKNOWN = 0,
TEST_SERVICE = 1,
P3_MSG_SERVICE = 2,
P3_CHAT_SERVICE = 3
};
/// Values must fit into uint8_t
enum class GxsTransItemsSubtypes : uint8_t
{
GXS_TRANS_SUBTYPE_MAIL = 1,
GXS_TRANS_SUBTYPE_RECEIPT = 2,
GXS_TRANS_SUBTYPE_GROUP = 3,
OUTGOING_RECORD_ITEM = 4
};
typedef uint64_t RsGxsTransId;
struct RsNxsTransPresignedReceipt : RsNxsMsg
{
RsNxsTransPresignedReceipt() : RsNxsMsg(RS_SERVICE_TYPE_GXS_TRANS) {}
@ -188,27 +169,6 @@ struct RsGxsTransGroupItem : RsGxsGrpItem
{ return out; }
};
enum class GxsTransSendStatus : uint8_t
{
UNKNOWN = 0,
PENDING_PROCESSING,
PENDING_PREFERRED_GROUP,
PENDING_RECEIPT_CREATE,
PENDING_RECEIPT_SIGNATURE,
PENDING_SERIALIZATION,
PENDING_PAYLOAD_CREATE,
PENDING_PAYLOAD_ENCRYPT,
PENDING_PUBLISH,
/** This will be useful so the user can know if the mail reached at least
* some friend node, in case of internet connection interruption */
//PENDING_TRANSFER,
PENDING_RECEIPT_RECEIVE,
/// Records with status >= RECEIPT_RECEIVED get deleted
RECEIPT_RECEIVED,
FAILED_RECEIPT_SIGNATURE = 240,
FAILED_ENCRYPTION
};
class RsGxsTransSerializer;
struct OutgoingRecord : RsItem
{

View file

@ -60,6 +60,8 @@ public:
public:
RsMsgMetaData mMeta;
std::set<RsGxsMessageId> mOlderVersions ;
std::string mMsg; // UTF8 encoded.
std::list<RsGxsFile> mFiles;

View file

@ -0,0 +1,103 @@
#pragma once
#include "retroshare/rstokenservice.h"
#include "retroshare/rsgxsifacehelper.h"
#include "retroshare/rsgxscommon.h"
/// Subservices identifiers (like port for TCP)
enum class GxsTransSubServices : uint16_t
{
UNKNOWN = 0x00,
TEST_SERVICE = 0x01,
P3_MSG_SERVICE = 0x02,
P3_CHAT_SERVICE = 0x03
};
/// Values must fit into uint8_t
enum class GxsTransItemsSubtypes : uint8_t
{
GXS_TRANS_SUBTYPE_MAIL = 0x01,
GXS_TRANS_SUBTYPE_RECEIPT = 0x02,
GXS_TRANS_SUBTYPE_GROUP = 0x03,
OUTGOING_RECORD_ITEM = 0x04
};
enum class GxsTransSendStatus : uint8_t
{
UNKNOWN = 0x00,
PENDING_PROCESSING = 0x01,
PENDING_PREFERRED_GROUP = 0x02,
PENDING_RECEIPT_CREATE = 0x03,
PENDING_RECEIPT_SIGNATURE = 0x04,
PENDING_SERIALIZATION = 0x05,
PENDING_PAYLOAD_CREATE = 0x06,
PENDING_PAYLOAD_ENCRYPT = 0x07,
PENDING_PUBLISH = 0x08,
/** This will be useful so the user can know if the mail reached at least
* some friend node, in case of internet connection interruption */
//PENDING_TRANSFER,
PENDING_RECEIPT_RECEIVE = 0x09,
/// Records with status >= RECEIPT_RECEIVED get deleted
RECEIPT_RECEIVED = 0x0a,
FAILED_RECEIPT_SIGNATURE = 0xf0,
FAILED_ENCRYPTION = 0xf1
};
typedef uint64_t RsGxsTransId;
class RsGxsTransGroup
{
public:
RsGroupMetaData mMeta;
};
class RsGxsTransMsg
{
public:
RsGxsTransMsg() : size(0),data(NULL) {}
virtual ~RsGxsTransMsg() { free(data) ; }
public:
RsMsgMetaData mMeta;
uint32_t size ;
uint8_t *data ;
};
struct RsGxsTransOutgoingRecord
{
GxsTransSendStatus status;
RsGxsId recipient;
RsGxsTransId trans_id;
GxsTransSubServices client_service;
uint32_t data_size ;
Sha1CheckSum data_hash ;
uint32_t send_TS ;
RsGxsGroupId group_id ;
};
class RsGxsTrans: public RsGxsIfaceHelper
{
public:
class GxsTransStatistics
{
public:
GxsTransStatistics() {}
RsGxsGroupId prefered_group_id ;
std::vector<RsGxsTransOutgoingRecord> outgoing_records;
};
RsGxsTrans(RsGxsIface *gxs) : RsGxsIfaceHelper(gxs) {}
virtual ~RsGxsTrans() {}
virtual bool getStatistics(GxsTransStatistics& stats)=0;
// virtual bool getGroupData(const uint32_t &token, std::vector<RsGxsTransGroup> &groups) = 0;
// virtual bool getPostData(const uint32_t &token, std::vector<RsGxsTransMsg> &posts) = 0;
};
extern RsGxsTrans *rsGxsTrans ;

View file

@ -1799,6 +1799,8 @@ int RsServer::StartupRetroShare()
rsPosted = mPosted;
rsGxsForums = mGxsForums;
rsGxsChannels = mGxsChannels;
rsGxsTrans = mGxsTrans;
//rsPhoto = mPhoto;
//rsWire = mWire;