implemented publish key sharing between peers for channels.

git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@7582 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
csoler 2014-10-05 19:14:05 +00:00
parent 1d6bf4190e
commit d0469ccfc3
22 changed files with 511 additions and 57 deletions

View file

@ -865,6 +865,31 @@ int RsDataService::updateGroup(std::map<RsNxsGrp *, RsGxsGrpMetaData *> &grp)
return ret;
}
int RsDataService::updateGroupKeys(const RsGxsGroupId& grpId,const RsTlvSecurityKeySet& keys,uint32_t subscribe_flags)
{
RsStackMutex stack(mDbMutex);
// begin transaction
mDb->execSQL("BEGIN;");
/*!
* STORE key set
**/
ContentValue cv;
//cv.put(KEY_NXS_FLAGS, (int32_t)grpMetaPtr->mGroupFlags); ?
uint32_t offset = 0;
char keySetData[keys.TlvSize()];
keys.SetTlv(keySetData, keys.TlvSize(), &offset);
cv.put(KEY_KEY_SET, keys.TlvSize(), keySetData);
cv.put(KEY_GRP_SUBCR_FLAG, (int32_t)subscribe_flags);
mDb->sqlUpdate(GRP_TABLE_NAME, "grpId='" + grpId.toStdString() + "'", cv);
// finish transaction
return mDb->execSQL("COMMIT;");
}
bool RsDataService::validSize(RsNxsGrp* grp) const
{

View file

@ -175,6 +175,13 @@ public:
bool validSize(RsNxsMsg* msg) const;
bool validSize(RsNxsGrp* grp) const;
/*!
* Convenience function used to only update group keys. This is used when sending
* publish keys between peers.
* @return SQL error code
*/
int updateGroupKeys(const RsGxsGroupId& grpId,const RsTlvSecurityKeySet& keys, uint32_t subscribe_flags) ;
private:

View file

@ -230,6 +230,7 @@ public:
*/
virtual int updateGroupMetaData(GrpLocMetaData& meta) = 0;
virtual int updateGroupKeys(const RsGxsGroupId& grpId,const RsTlvSecurityKeySet& keys,uint32_t subscribed_flags) = 0 ;
/*!
* Completely clear out data stored in

View file

@ -87,6 +87,14 @@ RsGenExchange::RsGenExchange(RsGeneralDataService *gds, RsNetworkExchangeService
}
void RsGenExchange::setNetworkExchangeService(RsNetworkExchangeService *ns)
{
if(mNetService != NULL)
std::cerr << "(EE) Cannot override existing network exchange service. Make sure it has been deleted otherwise." << std::endl;
else
mNetService = ns ;
}
#ifdef TO_BE_DELETED_IF_NOT_USEFUL
// This class has been tested so as to see where the database gets modified.
class RsDataBaseTester
@ -1436,6 +1444,7 @@ void RsGenExchange::notifyNewGroups(std::vector<RsNxsGrp *> &groups)
}
void RsGenExchange::notifyNewMessages(std::vector<RsNxsMsg *>& messages)
{
RsStackMutex stack(mGenMtx);
@ -2019,11 +2028,12 @@ void RsGenExchange::processGroupUpdatePublish()
mGroupUpdatePublish.clear();
}
void RsGenExchange::processGroupDelete()
{
RsStackMutex stack(mGenMtx);
// get keys for group delete publish
// get keys for group delete publish
typedef std::pair<bool, RsGxsGroupId> GrpNote;
std::map<uint32_t, GrpNote> toNotify;
@ -2349,6 +2359,14 @@ bool RsGenExchange::getGroupKeys(const RsGxsGroupId &grpId, RsTlvSecurityKeySet
return true;
}
void RsGenExchange::shareGroupPublishKey(const RsGxsGroupId& grpId,const std::list<RsPeerId>& peers)
{
if(grpId.isNull())
return ;
mNetService->sharePublishKey(grpId,peers) ;
}
void RsGenExchange::processRecvdData()
{
processRecvdGroups();
@ -2356,6 +2374,7 @@ void RsGenExchange::processRecvdData()
processRecvdMessages();
performUpdateValidation();
}
@ -2628,7 +2647,7 @@ void RsGenExchange::processRecvdGroups()
#ifdef GEN_EXCH_DEBUG
std::cerr << "failed to validate incoming grp, trying again. grpId: "
<< grp->grpId << std::endl;
<< grp->grpId << std::endl;
#endif
if(gpsi.mAttempts == VALIDATE_MAX_ATTEMPTS)

View file

@ -137,6 +137,7 @@ public:
// and passes to network service.
virtual RsServiceInfo getServiceInfo() = 0;
void setNetworkExchangeService(RsNetworkExchangeService *ns) ;
/** S: Observer implementation **/
@ -632,6 +633,12 @@ public:
*/
void setMsgServiceString(uint32_t& token, const RsGxsGrpMsgIdPair& msgId, const std::string& servString );
/*!
* sets the message service string
*/
void shareGroupPublishKey(const RsGxsGroupId& grpId,const std::list<RsPeerId>& peers) ;
protected:
/** Notifications **/

View file

@ -51,7 +51,7 @@ RsGxsNetService::RsGxsNetService(uint16_t servType, RsGeneralDataService *gds,
: p3ThreadedService(), p3Config(), mTransactionN(0),
mObserver(nxsObs), mDataStore(gds), mServType(servType),
mTransactionTimeOut(TRANSAC_TIMEOUT), mNetMgr(netMgr), mNxsMutex("RsGxsNetService"),
mSyncTs(0), mSYNC_PERIOD(SYNC_PERIOD), mCircles(circles), mReputations(reputations),
mSyncTs(0), mLastKeyPublishTs(0), mSYNC_PERIOD(SYNC_PERIOD), mCircles(circles), mReputations(reputations),
mPgpUtils(pgpUtils),
mGrpAutoSync(grpAutoSync), mGrpServerUpdateItem(NULL),
mServiceInfo(serviceInfo)
@ -74,8 +74,8 @@ int RsGxsNetService::tick()
if(receivedItems())
recvNxsItemQueue();
uint32_t now = time(NULL);
uint32_t elapsed = mSYNC_PERIOD + mSyncTs;
time_t now = time(NULL);
time_t elapsed = mSYNC_PERIOD + mSyncTs;
if((elapsed) < now)
{
@ -83,6 +83,12 @@ int RsGxsNetService::tick()
mSyncTs = now;
}
if(now > 10 + mLastKeyPublishTs)
{
sharePublishKeysPending() ;
mLastKeyPublishTs = now ;
}
return 1;
}
@ -754,7 +760,8 @@ void RsGxsNetService::recvNxsItemQueue(){
{
case RS_PKT_SUBTYPE_NXS_SYNC_GRP: handleRecvSyncGroup (dynamic_cast<RsNxsSyncGrp*>(ni)) ; break ;
case RS_PKT_SUBTYPE_NXS_SYNC_MSG: handleRecvSyncMessage (dynamic_cast<RsNxsSyncMsg*>(ni)) ; break ;
default:
case RS_PKT_SUBTYPE_NXS_GRP_PUBLISH_KEY: handleRecvPublishKeys (dynamic_cast<RsNxsGroupPublishKeyItem*>(ni)) ; break ;
default:
std::cerr << "Unhandled item subtype " << (uint32_t) ni->PacketSubType() << " in RsGxsNetService: " << std::endl; break;
}
delete item ;
@ -2727,3 +2734,205 @@ void RsGxsNetService::processExplicitGroupRequests()
mExplicitRequest.clear();
}
#define NXS_NET_DEBUG
int RsGxsNetService::sharePublishKey(const RsGxsGroupId& grpId,const std::list<RsPeerId>& peers)
{
RsStackMutex stack(mNxsMutex);
mPendingPublishKeyRecipients[grpId] = peers ;
std::cerr << "RsGxsNetService::sharePublishKeys() " << (void*)this << " adding publish keys for grp " << grpId << " to sending list" << std::endl;
return true ;
}
void RsGxsNetService::sharePublishKeysPending()
{
RsStackMutex stack(mNxsMutex);
if(mPendingPublishKeyRecipients.empty())
return ;
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::sharePublishKeys() " << (void*)this << std::endl;
#endif
// get list of peers that are online
std::set<RsPeerId> peersOnline;
std::list<RsGxsGroupId> toDelete;
std::map<RsGxsGroupId,std::list<RsPeerId> >::iterator mit ;
mNetMgr->getOnlineList(mServiceInfo.mServiceType, peersOnline);
#ifdef NXS_NET_DEBUG
std::cerr << " " << peersOnline.size() << " peers online." << std::endl;
#endif
/* send public key to peers online */
for(mit = mPendingPublishKeyRecipients.begin(); mit != mPendingPublishKeyRecipients.end(); ++mit)
{
// Compute the set of peers to send to. We start with this, to avoid retrieving the data for nothing.
std::list<RsPeerId> recipients ;
std::list<RsPeerId> offline_recipients ;
for(std::list<RsPeerId>::const_iterator it(mit->second.begin());it!=mit->second.end();++it)
if(peersOnline.find(*it) != peersOnline.end())
{
#ifdef NXS_NET_DEBUG
std::cerr << " " << *it << ": online. Adding." << std::endl;
#endif
recipients.push_back(*it) ;
}
else
{
#ifdef NXS_NET_DEBUG
std::cerr << " " << *it << ": offline. Keeping for next try." << std::endl;
#endif
offline_recipients.push_back(*it) ;
}
// If empty, skip
if(recipients.empty())
{
std::cerr << " No recipients online. Skipping." << std::endl;
continue ;
}
// Get the meta data for this group Id
//
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMetaMap;
grpMetaMap[mit->first] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
// Find the publish keys in the retrieved info
RsGxsGrpMetaData *grpMeta = grpMetaMap[mit->first] ;
if(grpMeta == NULL)
{
std::cerr << "(EE) RsGxsNetService::sharePublishKeys() Publish keys cannot be found for group " << mit->first << std::endl;
continue ;
}
const RsTlvSecurityKeySet& keys = grpMeta->keys;
std::map<RsGxsId, RsTlvSecurityKey>::const_iterator kit = keys.keys.begin(), kit_end = keys.keys.end();
bool publish_key_found = false;
RsTlvSecurityKey publishKey ;
for(; kit != kit_end && !publish_key_found; ++kit)
{
publish_key_found = (kit->second.keyFlags == (RSTLV_KEY_DISTRIB_PRIVATE | RSTLV_KEY_TYPE_FULL));
publishKey = kit->second ;
}
if(!publish_key_found)
{
std::cerr << "(EE) no publish key in group " << mit->first << ". Cannot share!" << std::endl;
continue ;
}
#ifdef NXS_NET_DEBUG
std::cerr << " using publish key ID=" << publishKey.keyId << ", flags=" << publishKey.keyFlags << std::endl;
#endif
for(std::list<RsPeerId>::const_iterator it(recipients.begin());it!=recipients.end();++it)
{
/* Create publish key sharing item */
RsNxsGroupPublishKeyItem *publishKeyItem = new RsNxsGroupPublishKeyItem(mServType);
publishKeyItem->clear();
publishKeyItem->grpId = mit->first;
publishKeyItem->key = publishKey ;
publishKeyItem->PeerId(*it);
sendItem(publishKeyItem);
#ifdef NXS_NET_DEBUG
std::cerr << " sent key item to " << *it << std::endl;
#endif
}
mit->second = offline_recipients ;
// If given peers have all received key(s) then stop sending for group
if(offline_recipients.empty())
toDelete.push_back(mit->first);
}
// delete pending peer list which are done with
for(std::list<RsGxsGroupId>::const_iterator lit = toDelete.begin(); lit != toDelete.end(); lit++)
mPendingPublishKeyRecipients.erase(*lit);
}
void RsGxsNetService::handleRecvPublishKeys(RsNxsGroupPublishKeyItem *item)
{
#ifdef NXS_NET_DEBUG
std::cerr << "RsGxsNetService::sharePublishKeys() " << std::endl;
#endif
RsStackMutex stack(mNxsMutex);
#ifdef NXS_NET_DEBUG
std::cerr << " PeerId : " << item->PeerId() << std::endl;
std::cerr << " GrpId: " << item->grpId << std::endl;
std::cerr << " Got key Item: " << item->key.keyId << std::endl;
#endif
// Get the meta data for this group Id
//
std::map<RsGxsGroupId, RsGxsGrpMetaData*> grpMetaMap;
grpMetaMap[item->grpId] = NULL;
mDataStore->retrieveGxsGrpMetaData(grpMetaMap);
// update the publish keys in this group meta info
RsGxsGrpMetaData *grpMeta = grpMetaMap[item->grpId] ;
// Check that the keys correspond, and that FULL keys are supplied, etc.
std::cerr << " Key received: " << std::endl;
bool admin = (item->key.keyFlags & RSTLV_KEY_DISTRIB_ADMIN) && (item->key.keyFlags & RSTLV_KEY_TYPE_FULL) ;
bool publi = (item->key.keyFlags & RSTLV_KEY_DISTRIB_PRIVATE) && (item->key.keyFlags & RSTLV_KEY_TYPE_FULL) ;
std::cerr << " Key id = " << item->key.keyId << " admin=" << admin << ", publish=" << publi << " ts=" << item->key.endTS << std::endl;
if(!(!admin && publi))
{
std::cerr << " Key is not a publish private key. Discarding!" << std::endl;
return ;
}
// Also check that we don't already have full keys for that group.
std::map<RsGxsId,RsTlvSecurityKey>::iterator it = grpMeta->keys.keys.find(item->key.keyId) ;
if(it == grpMeta->keys.keys.end())
{
std::cerr << " (EE) Key not found in known group keys. This is an inconsistency." << std::endl;
return ;
}
if((it->second.keyFlags & RSTLV_KEY_DISTRIB_PRIVATE) && (it->second.keyFlags & RSTLV_KEY_TYPE_FULL))
{
std::cerr << " (EE) Publish key already present in database. Discarding message." << std::endl;
// return ;
}
// Store/update the info.
//std::cerr << " (WW) Skipping key update, until fully debugged." << std::endl;
it->second = item->key ;
bool ret = mDataStore->updateGroupKeys(item->grpId,grpMeta->keys, grpMeta->mSubscribeFlags | GXS_SERV::GROUP_SUBSCRIBE_PUBLISH) ;
if(!ret)
std::cerr << "(EE) could not update database. Something went wrong." << std::endl;
#ifdef NXS_NET_DEBUG
else
std::cerr << " updated database with new publish keys." << std::endl;
#endif
}

View file

@ -122,8 +122,13 @@ public:
*/
int requestGrp(const std::list<RsGxsGroupId>& grpId, const RsPeerId& peerId);
/* p3Config methods */
/*!
* share publish keys for the specified group with the peers in the specified list.
*/
int sharePublishKey(const RsGxsGroupId& grpId,const std::list<RsPeerId>& peers) ;
/* p3Config methods */
public:
bool loadList(std::list<RsItem *>& load);
@ -141,7 +146,6 @@ public:
* Processes transactions and job queue
*/
void run();
private:
/*!
@ -296,6 +300,12 @@ private:
*/
void handleRecvSyncMessage(RsNxsSyncMsg* item);
/*!
* Handles an nxs item for group publish key
* @param item contaims keys/grp info
*/
void handleRecvPublishKeys(RsNxsGroupPublishKeyItem*) ;
/** E: item handlers **/
@ -331,7 +341,7 @@ private:
bool locked_canReceive(const RsGxsGrpMetaData * const grpMeta, const RsPeerId& peerId);
void processExplicitGroupRequests();
void locked_doMsgUpdateWork(const RsNxsTransac* nxsTrans, const RsGxsGroupId& grpId);
void updateServerSyncTS();
@ -344,6 +354,11 @@ private:
typedef std::vector<RsNxsGrp*> GrpFragments;
typedef std::vector<RsNxsMsg*> MsgFragments;
/*!
* Loops over pending publish key orders.
*/
void sharePublishKeysPending() ;
/*!
* Fragment a message into individual fragments which are at most 150kb
* @param msg message to fragment
@ -421,6 +436,7 @@ private:
// for an active transaction
uint32_t mTransactionTimeOut;
std::map<RsGxsGroupId,std::list<RsPeerId> > mPendingPublishKeyRecipients ;
RsPeerId mOwnId;
@ -430,6 +446,7 @@ private:
RsMutex mNxsMutex;
uint32_t mSyncTs;
uint32_t mLastKeyPublishTs;
const uint32_t mSYNC_PERIOD;

View file

@ -112,6 +112,12 @@ public:
virtual int requestGrp(const std::list<RsGxsGroupId>& grpId, const RsPeerId& peerId) = 0;
/*!
* Request for this group is sent through to peers on your network
* and how many hops from them you've indicated
*/
virtual int sharePublishKey(const RsGxsGroupId& grpId,const std::list<RsPeerId>& peers)=0 ;
};
#endif // RSGNP_H