Merge pull request #1520 from G10h4ck/broadcast_discovery

Broadcast Domain friends IP:Port Discovery
This commit is contained in:
G10h4ck 2019-04-23 17:18:54 +02:00 committed by GitHub
commit 9a7ce1ae72
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 1118 additions and 108 deletions

View file

@ -0,0 +1,227 @@
/*******************************************************************************
* RetroShare Broadcast Domain Discovery *
* *
* Copyright (C) 2019 Gioacchino Mazzurco <gio@altermundi.net> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
*******************************************************************************/
#include <functional>
#include <set>
#include <thread>
#include <chrono>
#include <vector>
#include <iostream>
#include "services/broadcastdiscoveryservice.h"
#include "retroshare/rspeers.h"
#include "serialiser/rsserializable.h"
#include "serialiser/rsserializer.h"
#include "retroshare/rsevents.h"
#ifdef RS_BROADCAST_DISCOVERY_DEBUG
# include "util/radix64.h"
#endif
/*extern*/ std::shared_ptr<RsBroadcastDiscovery> rsBroadcastDiscovery(nullptr);
RsBroadcastDiscovery::~RsBroadcastDiscovery() { /* Beware of Rs prefix! */ }
RsBroadcastDiscoveryResult::~RsBroadcastDiscoveryResult() {}
RsBroadcastDiscoveryPeerFoundEvent::~RsBroadcastDiscoveryPeerFoundEvent() {}
struct BroadcastDiscoveryPack : RsSerializable
{
BroadcastDiscoveryPack() : mLocalPort(0) {}
PGPFingerprintType mPgpFingerprint;
RsPeerId mSslId;
uint16_t mLocalPort;
std::string mProfileName;
void serial_process( RsGenericSerializer::SerializeJob j,
RsGenericSerializer::SerializeContext& ctx ) override
{
RS_SERIAL_PROCESS(mPgpFingerprint);
RS_SERIAL_PROCESS(mSslId);
RS_SERIAL_PROCESS(mLocalPort);
RS_SERIAL_PROCESS(mProfileName);
}
static BroadcastDiscoveryPack fromPeerDetails(const RsPeerDetails& pd)
{
BroadcastDiscoveryPack bdp;
bdp.mPgpFingerprint = pd.fpr;
bdp.mSslId = pd.id;
bdp.mLocalPort = pd.localPort;
bdp.mProfileName = pd.name;
return bdp;
}
static BroadcastDiscoveryPack fromSerializedString(const std::string& st)
{
RsGenericSerializer::SerializeContext ctx(
reinterpret_cast<uint8_t*>(const_cast<char*>(st.data())),
static_cast<uint32_t>(st.size()) );
BroadcastDiscoveryPack bdp;
bdp.serial_process(RsGenericSerializer::DESERIALIZE, ctx);
return bdp;
}
std::string serializeToString()
{
/* After some experiments it seems very unlikely that UDP broadcast
* packets bigger then this could get trought a network */
std::vector<uint8_t> buffer(512, 0);
RsGenericSerializer::SerializeContext ctx(
buffer.data(), static_cast<uint32_t>(buffer.size()) );
serial_process(RsGenericSerializer::SERIALIZE, ctx);
return std::string(reinterpret_cast<char*>(buffer.data()), ctx.mOffset);
}
BroadcastDiscoveryPack(const BroadcastDiscoveryPack&) = default;
~BroadcastDiscoveryPack() override;
};
BroadcastDiscoveryPack::~BroadcastDiscoveryPack() {};
BroadcastDiscoveryService::BroadcastDiscoveryService(
RsPeers& pRsPeers ) :
mDiscoveredDataMutex("BroadcastDiscoveryService discovered data mutex"),
mRsPeers(pRsPeers)
{
if(mRsPeers.isHiddenNode(mRsPeers.getOwnId())) return;
mUdcParameters.set_can_discover(true);
mUdcParameters.set_can_be_discovered(true);
mUdcParameters.set_port(port);
mUdcParameters.set_application_id(appId);
mUdcEndpoint.Start(mUdcParameters, "");
updatePublishedData();
}
BroadcastDiscoveryService::~BroadcastDiscoveryService()
{ mUdcEndpoint.Stop(true); }
std::vector<RsBroadcastDiscoveryResult>
BroadcastDiscoveryService::getDiscoveredPeers()
{
std::vector<RsBroadcastDiscoveryResult> ret;
RS_STACK_MUTEX(mDiscoveredDataMutex);
for(auto&& pp: mDiscoveredData)
ret.push_back(createResult(pp.first, pp.second));
return ret;
}
void BroadcastDiscoveryService::updatePublishedData()
{
RsPeerDetails od;
mRsPeers.getPeerDetails(mRsPeers.getOwnId(), od);
mUdcEndpoint.SetUserData(
BroadcastDiscoveryPack::fromPeerDetails(od).serializeToString());
}
void BroadcastDiscoveryService::data_tick()
{
auto nextRunAt = std::chrono::system_clock::now() + std::chrono::seconds(5);
if( mUdcParameters.can_discover() &&
!mRsPeers.isHiddenNode(mRsPeers.getOwnId()) )
{
auto newEndpoints = mUdcEndpoint.ListDiscovered();
std::set< std::pair<UDC::IpPort, std::string> > mChangedData;
mDiscoveredDataMutex.lock();
for(auto&& dEndpoint: newEndpoints)
{
auto findIt = mDiscoveredData.find(dEndpoint.ip_port());
if( !dEndpoint.user_data().empty() && (
findIt == mDiscoveredData.end() ||
(*findIt).second != dEndpoint.user_data() ) )
{
mDiscoveredData[dEndpoint.ip_port()] = dEndpoint.user_data();
mChangedData.insert(std::make_pair(
dEndpoint.ip_port(),
dEndpoint.user_data() ));
}
}
mDiscoveredDataMutex.unlock();
if(!mChangedData.empty())
{
for (auto&& pp : mChangedData)
{
#ifdef RS_BROADCAST_DISCOVERY_DEBUG
{
std::string b64Data;
Radix64::encode(
reinterpret_cast<const unsigned char*>(pp.second.data()),
static_cast<int>(pp.second.size()), b64Data );
std::cerr << __PRETTY_FUNCTION__ << " Got: " << b64Data
<< " Base64 from: " << UDC::IpToString(pp.first.ip())
<< ":" << pp.first.port() << std::endl;
}
#endif // def RS_BROADCAST_DISCOVERY_DEBUG
RsBroadcastDiscoveryResult rbdr =
createResult(pp.first, pp.second);
if( rbdr.locator.hasPort() && mRsPeers.isFriend(rbdr.mSslId) &&
!mRsPeers.isOnline(rbdr.mSslId) )
{
mRsPeers.setLocalAddress(
rbdr.mSslId, rbdr.locator.host(),
rbdr.locator.port() );
mRsPeers.connectAttempt(rbdr.mSslId);
}
else
{
typedef RsBroadcastDiscoveryPeerFoundEvent Evt_t;
// Ensure rsEvents is not deleted while we use it
std::shared_ptr<RsEvents> lockedRsEvents = rsEvents;
if(lockedRsEvents)
lockedRsEvents->postEvent(
std::unique_ptr<Evt_t>(new Evt_t(rbdr)) );
}
}
}
}
/* Probably this would be better if done only on actual change */
if( mUdcParameters.can_be_discovered() &&
!mRsPeers.isHiddenNode(mRsPeers.getOwnId()) ) updatePublishedData();
std::this_thread::sleep_until(nextRunAt);
}
RsBroadcastDiscoveryResult BroadcastDiscoveryService::createResult(
const udpdiscovery::IpPort& ipp, const std::string& uData )
{
BroadcastDiscoveryPack bdp =
BroadcastDiscoveryPack::fromSerializedString(uData);
RsBroadcastDiscoveryResult rbdr;
rbdr.mPgpFingerprint = bdp.mPgpFingerprint;
rbdr.mSslId = bdp.mSslId;
rbdr.mProfileName = bdp.mProfileName;
rbdr.locator.
setScheme("ipv4").
setHost(UDC::IpToString(ipp.ip())).
setPort(bdp.mLocalPort);
return rbdr;
}

View file

@ -0,0 +1,65 @@
/*******************************************************************************
* RetroShare Broadcast Domain Discovery *
* *
* Copyright (C) 2019 Gioacchino Mazzurco <gio@altermundi.net> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
*******************************************************************************/
#pragma once
#include <cstdint>
#include <map>
#include <iostream>
#include <endpoint.hpp>
#include <memory>
#include <forward_list>
#include "retroshare/rsbroadcastdiscovery.h"
#include "util/rsthreads.h"
namespace UDC = udpdiscovery;
class RsPeers;
class BroadcastDiscoveryService :
public RsBroadcastDiscovery, public RsTickingThread
{
public:
// TODO: std::shared_ptr<RsPeers> mRsPeers;
BroadcastDiscoveryService(RsPeers& pRsPeers);
virtual ~BroadcastDiscoveryService() override;
/// @see RsBroadcastDiscovery
std::vector<RsBroadcastDiscoveryResult> getDiscoveredPeers() override;
/// @see RsTickingThread
void data_tick() override;
protected:
constexpr static uint16_t port = 36405;
constexpr static uint32_t appId = 904571;
void updatePublishedData();
UDC::EndpointParameters mUdcParameters;
UDC::Endpoint mUdcEndpoint;
std::map<UDC::IpPort, std::string> mDiscoveredData;
RsMutex mDiscoveredDataMutex;
RsPeers& mRsPeers; // TODO: std::shared_ptr<RsPeers> mRsPeers;
RsBroadcastDiscoveryResult createResult(
const UDC::IpPort& ipp, const std::string& uData );
};

View file

@ -0,0 +1,171 @@
/*******************************************************************************
* Retroshare events service *
* *
* libretroshare: retroshare core library *
* *
* Copyright (C) 2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
*******************************************************************************/
#include <string>
#include "services/rseventsservice.h"
/*extern*/ std::shared_ptr<RsEvents> rsEvents(nullptr);
RsEvent::~RsEvent() {};
RsEvents::~RsEvents() {};
bool isEventValid(const RsEvent& event, std::string& errorMessage)
{
if(event.mType <= RsEventType::NONE)
{
errorMessage = "Event has type NONE: " +
std::to_string(
static_cast<std::underlying_type<RsEventType>::type >(
event.mType ) );
return false;
}
if(event.mType >= RsEventType::MAX)
{
errorMessage = "Event has type >= RsEventType::MAX: " +
std::to_string(
static_cast<std::underlying_type<RsEventType>::type >(
event.mType ) );
}
return true;
}
bool RsEventsService::postEvent( std::unique_ptr<RsEvent> event,
std::string& errorMessage )
{
if(!isEventValid(*event, errorMessage))
{
std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage
<< std::endl;
return false;
}
RS_STACK_MUTEX(mEventQueueMtx);
mEventQueue.push_back(std::move(event));
return true;
}
bool RsEventsService::sendEvent( const RsEvent& event,
std::string& errorMessage )
{
if(!isEventValid(event, errorMessage))
{
std::cerr << __PRETTY_FUNCTION__ << " Error: "<< errorMessage
<< std::endl;
return false;
}
handleEvent(event);
return true;
}
RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId()
{
RS_STACK_MUTEX(mHandlerMapMtx);
return generateUniqueHandlerId_unlocked();
}
RsEventsHandlerId_t RsEventsService::generateUniqueHandlerId_unlocked()
{
if(++mLastHandlerId) return mLastHandlerId; // Avoid 0 after overflow
return 1;
}
bool RsEventsService::registerEventsHandler(
std::function<void(const RsEvent&)> multiCallback,
RsEventsHandlerId_t& hId )
{
RS_STACK_MUTEX(mHandlerMapMtx);
if(!hId) hId = generateUniqueHandlerId_unlocked();
mHandlerMap[hId] = std::move(multiCallback);
return true;
}
bool RsEventsService::unregisterEventsHandler(RsEventsHandlerId_t hId)
{
RS_STACK_MUTEX(mHandlerMapMtx);
auto it = mHandlerMap.find(hId);
if(it == mHandlerMap.end()) return false;
mHandlerMap.erase(it);
return true;
}
void RsEventsService::data_tick()
{
auto nextRunAt = std::chrono::system_clock::now() +
std::chrono::milliseconds(1);
std::unique_ptr<RsEvent> eventPtr(nullptr);
size_t futureEventsCounter = 0;
dispatchEventFromQueueLock:
mEventQueueMtx.lock();
if(mEventQueue.size() > futureEventsCounter)
{
eventPtr = std::move(mEventQueue.front());
mEventQueue.pop_front();
if(eventPtr->mTimePoint >= nextRunAt)
{
mEventQueue.push_back(std::move(eventPtr));
++futureEventsCounter;
}
}
mEventQueueMtx.unlock();
if(eventPtr)
{
/* It is relevant that this stays out of mEventQueueMtx */
handleEvent(*eventPtr);
eventPtr.reset(nullptr); // ensure memory is freed before sleep
goto dispatchEventFromQueueLock;
}
std::this_thread::sleep_until(nextRunAt);
}
void RsEventsService::handleEvent(const RsEvent& event)
{
std::function<void(const RsEvent&)> mCallback;
mHandlerMapMtx.lock();
auto cbpt = mHandlerMap.begin();
mHandlerMapMtx.unlock();
getHandlerFromMapLock:
mHandlerMapMtx.lock();
if(cbpt != mHandlerMap.end())
{
mCallback = cbpt->second;
++cbpt;
}
mHandlerMapMtx.unlock();
if(mCallback)
{
mCallback(event); // It is relevant that this happens outside mutex
mCallback = std::function<void(const RsEvent&)>(nullptr);
goto getHandlerFromMapLock;
}
}

View file

@ -0,0 +1,77 @@
/*******************************************************************************
* Retroshare events service *
* *
* libretroshare: retroshare core library *
* *
* Copyright (C) 2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
*******************************************************************************/
#pragma once
#include <memory>
#include <cstdint>
#include <deque>
#include "retroshare/rsevents.h"
#include "util/rsthreads.h"
class RsEventsService :
public RsEvents, public RsTickingThread
{
public:
RsEventsService():
mHandlerMapMtx("RsEventsService::mHandlerMapMtx"), mLastHandlerId(1),
mEventQueueMtx("RsEventsService::mEventQueueMtx") {}
/// @see RsEvents
bool postEvent(
std::unique_ptr<RsEvent> event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) override;
/// @see RsEvents
bool sendEvent(
const RsEvent& event,
std::string& errorMessage = RS_DEFAULT_STORAGE_PARAM(std::string)
) override;
/// @see RsEvents
RsEventsHandlerId_t generateUniqueHandlerId() override;
/// @see RsEvents
bool registerEventsHandler(
std::function<void(const RsEvent&)> multiCallback,
RsEventsHandlerId_t& hId = RS_DEFAULT_STORAGE_PARAM(RsEventsHandlerId_t, 0)
) override;
/// @see RsEvents
bool unregisterEventsHandler(RsEventsHandlerId_t hId) override;
protected:
RsMutex mHandlerMapMtx;
RsEventsHandlerId_t mLastHandlerId;
std::map< RsEventsHandlerId_t, std::function<void(const RsEvent&)> >
mHandlerMap;
RsMutex mEventQueueMtx;
std::deque< std::unique_ptr<RsEvent> > mEventQueue;
/// @see RsTickingThread
void data_tick() override;
void handleEvent(const RsEvent& event);
RsEventsHandlerId_t generateUniqueHandlerId_unlocked();
};