fixed merge with master

This commit is contained in:
csoler 2020-01-11 00:08:36 +01:00
commit 2bd5bb5a3e
No known key found for this signature in database
GPG key ID: 7BCA522266C0804C
176 changed files with 6345 additions and 3927 deletions

View file

@ -1,3 +1,6 @@
# SPDX-FileCopyrightText: (C) 2004-2019 Retroshare Team <contact@retroshare.cc>
# SPDX-License-Identifier: CC0-1.0
# Doxyfile 1.7.1
# This file describes the settings to be used by the documentation system

View file

@ -1,27 +0,0 @@
/*
* "$Id: licence,v 1.1 2007-02-18 21:46:42 rmf24 Exp $"
*
* TOU + 3P/PQI + RetroShare.
*
* Copyright 2004-2006 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/

View file

@ -1,30 +0,0 @@
=======================================================================================
README for RetroShare
=======================================================================================
RetroShare web site . . . . http://retroshare.net/index.html
Developer's blog . . . . . https://retroshareteam.wordpress.com
Documentation . . . . . . . https://retroshare.readthedocs.io/en/latest/
Support . . . . . . . . . . http://retroshare.net/support.html
Forums . . . . . . . . . . http://retroshare.sourceforge.net/forum/
Wiki . . . . . . . . . . . https://github.com/RetroShare/documentation/wiki
Old developers site . . . . http://retroshare.sourceforge.net/wiki/index.php/Developers_Corner
Project site . . . . . . . https://github.com/RetroShare/RetroShare
Relted projects/plugins . . https://github.com/RetroShare
Contact: . . . . . . . . . retroshare@lunamutt.com ,defnax@users.sourceforge.net
=========================================================================================
Compiling RetroShare
=========================================================================================
Build Scripts are avaible on GIT:
https://github.com/RetroShare/RetroShare/tree/master/build_scripts
You can find here instructions howto compile RetroShare:
https://retroshare.readthedocs.io/en/latest/developer/compilation/
You can go on over to our forum or chat lobby when you have trouble with compiling:
retroshare://forum?name=Developers%20Discussions&id=8fd22bd8f99754461e7ba1ca8a727995
retroshare://chat_room?name=Retroshare%20Devel%20%28signed%29&id=L68DB0A1E09BDA3A5
http://retroshare.sourceforge.net/forum/

View file

@ -51,18 +51,13 @@ bool LocalDirectoryUpdater::isEnabled() const
}
void LocalDirectoryUpdater::setEnabled(bool b)
{
if(mIsEnabled == b)
return ;
if(!b)
shutdown();
else if(!isRunning())
start("fs dir updater") ;
mIsEnabled = b ;
if(mIsEnabled == b) return;
if(!b) RsThread::askForStop();
else if(!RsThread::isRunning()) start("fs dir updater");
mIsEnabled = b ;
}
void LocalDirectoryUpdater::data_tick()
void LocalDirectoryUpdater::threadTick()
{
rstime_t now = time(NULL) ;

View file

@ -62,7 +62,7 @@ public:
bool ignoreDuplicates() const;
protected:
virtual void data_tick() ;
void threadTick() override; /// @see RsTickingThread
virtual void hash_callback(uint32_t client_param, const std::string& name, const RsFileHash& hash, uint64_t size);
virtual bool hash_confirm(uint32_t client_param) ;

View file

@ -87,7 +87,7 @@ static std::string friendlyUnit(uint64_t val)
return std::string(buf) + " TB";
}
void HashStorage::data_tick()
void HashStorage::threadTick()
{
FileHashJob job;
RsFileHash hash;
@ -318,14 +318,15 @@ void HashStorage::startHashThread()
void HashStorage::stopHashThread()
{
if (mRunning)
{
std::cerr << "Stopping hashing thread." << std::endl;
shutdown();
if(mRunning)
{
RsInfo() << __PRETTY_FUNCTION__ << "Stopping hashing thread."
<< std::endl;
RsThread::askForStop();
mRunning = false ;
mTotalSizeToHash = 0;
mTotalFilesToHash = 0;
std::cerr << "done." << std::endl;
}
}

View file

@ -85,9 +85,7 @@ public:
void togglePauseHashingProcess() ;
bool hashingProcessPaused();
// Functions called by the thread
virtual void data_tick() ;
void threadTick() override; /// @see RsTickingThread
friend std::ostream& operator<<(std::ostream& o,const HashStorageInfo& info) ;
private:

View file

@ -209,7 +209,7 @@ void ftController::removeFileSource(const RsFileHash& hash,const RsPeerId& peer_
std::cerr << "... not added: hash not found." << std::endl ;
#endif
}
void ftController::data_tick()
void ftController::threadTick()
{
/* check the queues */

View file

@ -109,9 +109,10 @@ class ftPendingRequest
};
class ftController: public RsTickingThread, public pqiServiceMonitor, public p3Config
class ftController:
public RsTickingThread, public pqiServiceMonitor, public p3Config
{
public:
public:
/* Setup */
ftController(ftDataMultiplex *dm, p3ServiceControl *sc, uint32_t ftServiceId);
@ -122,7 +123,7 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C
bool activate();
bool isActiveAndNoPending();
virtual void data_tick();
void threadTick() override; /// @see RsTickingThread
/***************************************************************/
/********************** Controller Access **********************/

View file

@ -47,7 +47,7 @@ ftExtraList::ftExtraList()
}
void ftExtraList::data_tick()
void ftExtraList::threadTick()
{
bool todo = false;
rstime_t now = time(NULL);

View file

@ -146,10 +146,7 @@ public:
*/
void getExtraFileList(std::vector<FileInfo>& files) const ;
/***
* Thread Main Loop
**/
virtual void data_tick();
void threadTick() override; /// @see RsTickingThread
/***
* Configuration - store extra files.

View file

@ -234,28 +234,28 @@ void ftServer::StartupThreads()
void ftServer::StopThreads()
{
/* stop Dataplex */
mFtDataplex->join();
mFtDataplex->fullstop();
/* stop Controller thread */
mFtController->join();
mFtController->fullstop();
/* self contained threads */
/* stop ExtraList Thread */
mFtExtra->join();
mFtExtra->fullstop();
delete (mFtDataplex);
mFtDataplex = NULL;
mFtDataplex = nullptr;
delete (mFtController);
mFtController = NULL;
mFtController = nullptr;
delete (mFtExtra);
mFtExtra = NULL;
mFtExtra = nullptr;
/* stop Monitor Thread */
mFileDatabase->stopThreads();
delete mFileDatabase;
mFileDatabase = NULL ;
mFileDatabase = nullptr;
}
/***************************************************************/

View file

@ -544,7 +544,7 @@ bool ftTransferModule::isCheckingHash()
return mFlag == FT_TM_FLAG_CHECKING || mFlag == FT_TM_FLAG_CHUNK_CRC;
}
class HashThread: public RsSingleJobThread
class HashThread: public RsThread
{
public:
explicit HashThread(ftFileCreator *m)

View file

@ -108,7 +108,7 @@ public:
Status stat;
};
class ftTransferModule
class ftTransferModule
{
public:
ftTransferModule(ftFileCreator *fc, ftDataMultiplex *dm, ftController *c);

View file

@ -32,7 +32,6 @@
/****
* #define P3DISC_DEBUG 1
****/
#define P3DISC_DEBUG 1
/*extern*/ std::shared_ptr<RsGossipDiscovery> rsGossipDiscovery(nullptr);
@ -952,11 +951,9 @@ void p3discovery2::processContactInfo(const RsPeerId &fromId, const RsDiscContac
return ; // fresh information here.
bool should_notify_discovery = false;
auto sit= it->second.mSslIds.find(item->sslId);
DiscSslInfo& sslInfo(it->second.mSslIds[item->sslId]); // This line inserts the entry while not removing already existing data
// do not remove it!
if (!mPeerMgr->isFriend(item->sslId))
{
should_notify_discovery = true;

View file

@ -132,7 +132,7 @@ bool RsGenExchange::getGroupServerUpdateTS(const RsGxsGroupId& gid, rstime_t& gr
return mNetService->getGroupServerUpdateTS(gid,grp_server_update_TS,msg_server_update_TS) ;
}
void RsGenExchange::data_tick()
void RsGenExchange::threadTick()
{
static const double timeDelta = 0.1; // slow tick in sec

View file

@ -176,7 +176,7 @@ public:
*/
RsTokenService* getTokenService();
virtual void data_tick();
void threadTick() override; /// @see RsTickingThread
/*!
* Policy bit pattern portion

View file

@ -1981,7 +1981,7 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransacItem *item)
return false;
}
void RsGxsNetService::data_tick()
void RsGxsNetService::threadTick()
{
static const double timeDelta = 0.5;

View file

@ -76,7 +76,8 @@ struct GroupRequestRecord
* Incoming transaction are in 3 different states
* 1. START 2. RECEIVING 3. END
*/
class RsGxsNetService : public RsNetworkExchangeService, public p3ThreadedService, public p3Config
class RsGxsNetService :
public RsNetworkExchangeService, public p3ThreadedService, public p3Config
{
public:
@ -207,10 +208,8 @@ public:
*/
int tick();
/*!
* Processes transactions and job queue
*/
virtual void data_tick();
void threadTick() override; /// @see RsTickingThread
private:
/*!

View file

@ -726,7 +726,7 @@ void RsGxsNetTunnelService::generateEncryptionKey(const RsGxsGroupId& group_id,c
// Service parts //
//===========================================================================================================================================//
void RsGxsNetTunnelService::data_tick()
void RsGxsNetTunnelService::threadTick()
{
while(!mPendingTurtleItems.empty())
{

View file

@ -103,7 +103,9 @@
class RsGxsNetTunnelItem ;
class RsNetworkExchangeService ;
class RsGxsNetTunnelService: public RsTurtleClientService, public RsTickingThread, public p3Config, public RsGxsDistSync
class RsGxsNetTunnelService:
public RsTurtleClientService, public RsTickingThread, public p3Config,
public RsGxsDistSync
{
public:
RsGxsNetTunnelService() ;
@ -196,9 +198,7 @@ public:
virtual bool receiveSearchRequest(unsigned char *search_request_data, uint32_t search_request_data_len, unsigned char *& search_result_data, uint32_t& search_result_data_len, uint32_t &max_allowed_hits);
virtual void receiveSearchResult(TurtleSearchRequestId request_id,unsigned char *search_result_data,uint32_t search_result_data_len);
// Overloaded from RsTickingThread
void data_tick() ;
void threadTick() override; /// @see RsTickingThread
// Overloads p3Config

View file

@ -183,7 +183,7 @@ private:
* Checks the integrity message and groups
* in rsDataService using computed hash
*/
class RsGxsIntegrityCheck : public RsSingleJobThread
class RsGxsIntegrityCheck : public RsThread
{
enum CheckState { CheckStart, CheckChecking };

View file

@ -434,8 +434,7 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
totalMessageSizeAndCount[msg->metaData->mAuthorId].count++;
delete msg;
if(item != NULL)
delete item ;
delete item;
}
}

View file

@ -288,15 +288,15 @@ private:
void notifyClientService(const OutgoingRecord& pr);
/*!
* Checks the integrity message and groups
*/
class GxsTransIntegrityCleanupThread : public RsSingleJobThread
/// Checks the integrity message and groups
class GxsTransIntegrityCleanupThread : public RsThread
{
enum CheckState { CheckStart, CheckChecking };
public:
explicit GxsTransIntegrityCleanupThread(RsGeneralDataService *const dataService): mDs(dataService),mMtx("GxsTransIntegrityCheck") { mDone=false;}
explicit GxsTransIntegrityCleanupThread(
RsGeneralDataService* const dataService ):
mDs(dataService), mMtx("GxsTransIntegrityCheck"), mDone(false) {}
bool isDone();
void run();
@ -312,7 +312,7 @@ private:
GxsMsgReq mMsgToDel ;
std::map<RsGxsId,MsgSizeCount> total_message_size_and_count;
bool mDone ;
bool mDone;
};
// Overloaded from RsGenExchange.

View file

@ -1,7 +1,8 @@
/*
* RetroShare JSON API
*
* Copyright (C) 2018-2019 Gioacchino Mazzurco <gio@eigenlab.org>
* Copyright (C) 2018-2020 Gioacchino Mazzurco <gio@eigenlab.org>
* Copyright (C) 2019-2020 Asociación Civil Altermundi <info@altermundi.net>
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Affero General Public License as published by the
@ -145,6 +146,7 @@ bool RsJsonApi::parseToken(
JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
mService(std::make_shared<restbed::Service>()),
mServiceMutex("JsonApiServer restbed ptr"),
mListeningPort(RsJsonApi::DEFAULT_PORT),
mBindingAddress(RsJsonApi::DEFAULT_BINDING_ADDRESS)
{
@ -222,7 +224,7 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
rsLoginHelper->attemptLogin(account, password);
if( retval == RsInit::OK )
authorizeUser(account.toStdString(),password);
authorizeUser(account.toStdString(), password);
// serialize out parameters and return value to JSON
{
@ -309,6 +311,7 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
registerHandler("/rsEvents/registerEventsHandler",
[this](const std::shared_ptr<rb::Session> session)
{
const std::weak_ptr<rb::Service> weakService(mService);
const std::multimap<std::string, std::string> headers
{
{ "Connection", "keep-alive" },
@ -318,7 +321,7 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
size_t reqSize = static_cast<size_t>(
session->get_request()->get_header("Content-Length", 0) );
session->fetch( reqSize, [this](
session->fetch( reqSize, [weakService](
const std::shared_ptr<rb::Session> session,
const rb::Bytes& body )
{
@ -331,9 +334,17 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
const std::weak_ptr<rb::Session> weakSession(session);
RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId();
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback =
[this, weakSession, hId](std::shared_ptr<const RsEvent> event)
[weakSession, weakService, hId](
std::shared_ptr<const RsEvent> event )
{
mService->schedule( [weakSession, hId, event]()
auto lService = weakService.lock();
if(!lService || lService->is_down())
{
if(rsEvents) rsEvents->unregisterEventsHandler(hId);
return;
}
lService->schedule( [weakSession, hId, event]()
{
auto session = weakSession.lock();
if(!session || session->is_closed())
@ -499,10 +510,10 @@ bool JsonApiServer::authorizeUser(
return false;
}
if(!librs::util::is_alphanumeric(passwd))
if(passwd.empty())
{
RsErr() << __PRETTY_FUNCTION__ << " Password is not alphanumeric"
<< std::endl;
RsWarn() << __PRETTY_FUNCTION__ << " Password is empty, are you sure "
<< "this what you wanted?" << std::endl;
return false;
}
@ -580,28 +591,21 @@ std::vector<std::shared_ptr<rb::Resource> > JsonApiServer::getResources() const
return tab;
}
bool JsonApiServer::restart()
void JsonApiServer::restart()
{
fullstop();
RsThread::start("JSON API Server");
return true;
/* It is important to wrap into async(...) because fullstop() method can't
* be called from same thread of execution hence from JSON API thread! */
RsThread::async([this]()
{
fullstop();
RsThread::start("JSON API Server");
});
}
bool JsonApiServer::fullstop()
void JsonApiServer::onStopRequested()
{
if(!mService->is_up()) return true;
RS_STACK_MUTEX(mServiceMutex);
mService->stop();
RsThread::ask_for_stop();
while(isRunning())
{
RsDbg() << __PRETTY_FUNCTION__ << " shutting down JSON API service."
<< std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
return true;
}
uint16_t JsonApiServer::listeningPort() const { return mListeningPort; }
@ -610,23 +614,19 @@ void JsonApiServer::setBindingAddress(const std::string& bindAddress)
{ mBindingAddress = bindAddress; }
std::string JsonApiServer::getBindingAddress() const { return mBindingAddress; }
void JsonApiServer::runloop()
void JsonApiServer::run()
{
auto settings = std::make_shared<restbed::Settings>();
settings->set_port(mListeningPort);
settings->set_bind_address(mBindingAddress);
settings->set_default_header("Connection", "close");
if(mService->is_up())
{
RsWarn() << __PRETTY_FUNCTION__ << " restbed is already running. "
<< " stopping it before starting again!" << std::endl;
mService->stop();
}
/* re-allocating mService is important because it deletes the existing
* service and therefore leaves the listening port open */
mService = std::make_shared<restbed::Service>();
{
RS_STACK_MUTEX(mServiceMutex);
mService = std::make_shared<restbed::Service>();
}
for(auto& r: getResources()) mService->publish(r);
@ -634,8 +634,8 @@ void JsonApiServer::runloop()
{
RsUrl apiUrl; apiUrl.setScheme("http").setHost(mBindingAddress)
.setPort(mListeningPort);
RsDbg() << __PRETTY_FUNCTION__ << " JSON API server listening on "
<< apiUrl.toString() << std::endl;
RsInfo() << __PRETTY_FUNCTION__ << " JSON API server listening on "
<< apiUrl.toString() << std::endl;
mService->start(settings);
}
catch(std::exception& e)
@ -646,7 +646,7 @@ void JsonApiServer::runloop()
return;
}
RsInfo() << __PRETTY_FUNCTION__ << " finished!" << std::endl;
RsDbg() << __PRETTY_FUNCTION__ << " finished!" << std::endl;
}
/*static*/ void RsJsonApi::version(

View file

@ -1,7 +1,8 @@
/*
* RetroShare JSON API
*
* Copyright (C) 2018-2019 Gioacchino Mazzurco <gio@eigenlab.org>
* Copyright (C) 2018-2020 Gioacchino Mazzurco <gio@eigenlab.org>
* Copyright (C) 2019-2020 Asociación Civil Altermundi <info@altermundi.net>
*
* This program is free software: you can redistribute it and/or modify it under
* the terms of the GNU Affero General Public License as published by the
@ -62,10 +63,13 @@ public:
std::vector<std::shared_ptr<rb::Resource>> getResources() const;
/// @see RsJsonApi
bool restart() override;
void fullstop() override { RsThread::fullstop(); }
/// @see RsJsonApi
bool fullstop() override;
void restart() override;
/// @see RsJsonApi
void askForStop() override { RsThread::askForStop(); }
/// @see RsJsonApi
inline bool isRunning() override { return RsThread::isRunning(); }
@ -139,7 +143,14 @@ public:
const std::function<bool(const std::string&, const std::string&)>&
callback );
protected:
/// @see RsThread
void onStopRequested() override;
private:
/// @see RsThread
void run() override;
/// @see p3Config::setupSerialiser
RsSerialiser* setupSerialiser() override;
@ -184,10 +195,11 @@ private:
std::reference_wrapper<const JsonApiResourceProvider>,
std::less<const JsonApiResourceProvider> > mResourceProviders;
/// @see RsThread
void runloop() override;
std::shared_ptr<restbed::Service> mService;
/** Protect service only during very critical operation like resetting the
* pointer, still not 100% thread safe, but hopefully we can avoid
* crashes/freeze with this */
RsMutex mServiceMutex;
uint16_t mListeningPort;
std::string mBindingAddress;

View file

@ -1,20 +1,6 @@
################################################################################
# libretroshare.pro #
# Copyright (C) 2018, Retroshare team <retroshare.team@gmailcom> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU Lesser General Public License as #
# published by the Free Software Foundation, either version 3 of the #
# License, or (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU Lesser General Public License for more details. #
# #
# You should have received a copy of the GNU Lesser General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
# SPDX-FileCopyrightText: (C) 2004-2019 Retroshare Team <contact@retroshare.cc>
# SPDX-License-Identifier: CC0-1.0
!include("../../retroshare.pri"): error("Could not include file ../../retroshare.pri")
TEMPLATE = lib
@ -509,7 +495,8 @@ HEADERS += util/folderiterator.h \
util/rsdeprecate.h \
util/cxx11retrocompat.h \
util/cxx17retrocompat.h \
util/rsurl.h
util/rsurl.h \
util/rserrno.h
SOURCES += ft/ftchunkmap.cc \
ft/ftcontroller.cc \
@ -648,7 +635,8 @@ SOURCES += util/folderiterator.cc \
util/rstickevent.cc \
util/rsrecogn.cc \
util/rstime.cc \
util/rsurl.cc
util/rsurl.cc \
util/rserrno.cc
equals(RS_UPNP_LIB, miniupnpc) {
HEADERS += rs_upnp/upnputil.h rs_upnp/upnphandler_miniupnp.h

View file

@ -116,11 +116,11 @@ void AuthGPG::init(
void AuthGPG::exit()
{
if(_instance != NULL)
if(_instance)
{
_instance->join();
delete _instance ;
_instance = NULL;
_instance->fullstop();
delete _instance;
_instance = nullptr;
}
}
@ -188,7 +188,7 @@ int AuthGPG::GPGInit(const RsPgpId &ownId)
{
}
void AuthGPG::data_tick()
void AuthGPG::threadTick()
{
rstime::rs_usleep(100 * 1000); //100 msec

View file

@ -239,7 +239,7 @@ public:
virtual bool loadList(std::list<RsItem *>& load);
/*****************************************************************/
private:
private:
// Gets the certificate pointer and returns NULL if the string is invalid, or the
// cert was not found.
//
@ -271,10 +271,9 @@ public:
bool printAllKeys_locked();
bool printOwnKeys_locked();
/* own thread */
virtual void data_tick();
void threadTick() override; /// @see RsTickingThread
private:
private:
static AuthGPG *instance_gpg; // pointeur vers le singleton

View file

@ -1222,7 +1222,7 @@ int AuthSSLimpl::VerifyX509Callback(int /*preverify_ok*/, X509_STORE_CTX* ctx)
ev->mSslId = sslId;
ev->mPgpId = pgpId;
ev->mErrorMsg = errMsg;
ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::MISSING_AUTHENTICATION_INFO;
ev->mErrorCode = RsAuthSslError::MISSING_AUTHENTICATION_INFO;
rsEvents->postEvent(std::move(ev));
}
@ -1242,7 +1242,7 @@ int AuthSSLimpl::VerifyX509Callback(int /*preverify_ok*/, X509_STORE_CTX* ctx)
ev->mSslId = sslId;
ev->mSslCn = sslCn;
ev->mErrorMsg = errMsg;
ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::MISSING_AUTHENTICATION_INFO;
ev->mErrorCode = RsAuthSslError::MISSING_AUTHENTICATION_INFO;
rsEvents->postEvent(std::move(ev));
}
@ -1273,7 +1273,7 @@ int AuthSSLimpl::VerifyX509Callback(int /*preverify_ok*/, X509_STORE_CTX* ctx)
ev->mSslCn = sslCn;
ev->mPgpId = pgpId;
ev->mErrorMsg = errorMsg;
ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::MISMATCHED_PGP_ID;
ev->mErrorCode = RsAuthSslError::MISMATCHED_PGP_ID;
rsEvents->postEvent(std::move(ev));
}
@ -1299,13 +1299,18 @@ int AuthSSLimpl::VerifyX509Callback(int /*preverify_ok*/, X509_STORE_CTX* ctx)
ev->mSslCn = sslCn;
ev->mPgpId = pgpId;
switch(auth_diagnostic)
{
case RS_SSL_HANDSHAKE_DIAGNOSTIC_ISSUER_UNKNOWN: ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::NOT_A_FRIEND; break;
case RS_SSL_HANDSHAKE_DIAGNOSTIC_WRONG_SIGNATURE: ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::PGP_SIGNATURE_VALIDATION_FAILED;break;
switch(auth_diagnostic)
{
case RS_SSL_HANDSHAKE_DIAGNOSTIC_ISSUER_UNKNOWN:
ev->mErrorCode = RsAuthSslError::NOT_A_FRIEND;
break;
case RS_SSL_HANDSHAKE_DIAGNOSTIC_WRONG_SIGNATURE:
ev->mErrorCode = RsAuthSslError::PGP_SIGNATURE_VALIDATION_FAILED;
break;
default:
ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::MISSING_AUTHENTICATION_INFO;break;
}
ev->mErrorCode = RsAuthSslError::MISSING_AUTHENTICATION_INFO;
break;
}
ev->mErrorMsg = errMsg;
rsEvents->postEvent(std::move(ev));
@ -1331,7 +1336,7 @@ int AuthSSLimpl::VerifyX509Callback(int /*preverify_ok*/, X509_STORE_CTX* ctx)
ev->mSslCn = sslCn;
ev->mPgpId = pgpId;
ev->mErrorMsg = errMsg;
ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::NOT_A_FRIEND;
ev->mErrorCode = RsAuthSslError::NOT_A_FRIEND;
rsEvents->postEvent(std::move(ev));
}
@ -1595,32 +1600,6 @@ bool AuthSSLimpl::decrypt(void *&out, int &outlen, const void *in, int inlen)
return true;
}
/********************************************************************************/
/********************************************************************************/
/********************* Cert Search / Add / Remove **************************/
/********************************************************************************/
/********************************************************************************/
// void AuthSSLimpl::setCurrentConnectionAttemptInfo(const RsPgpId& gpg_id,const RsPeerId& ssl_id,const std::string& ssl_cn)
// {
// #ifdef AUTHSSL_DEBUG
// std::cerr << "AuthSSL: registering connection attempt from:" << std::endl;
// std::cerr << " GPG id: " << gpg_id << std::endl;
// std::cerr << " SSL id: " << ssl_id << std::endl;
// std::cerr << " SSL cn: " << ssl_cn << std::endl;
// #endif
// _last_gpgid_to_connect = gpg_id ;
// _last_sslid_to_connect = ssl_id ;
// _last_sslcn_to_connect = ssl_cn ;
// }
// void AuthSSLimpl::getCurrentConnectionAttemptInfo(RsPgpId& gpg_id,RsPeerId& ssl_id,std::string& ssl_cn)
// {
// gpg_id = _last_gpgid_to_connect ;
// ssl_id = _last_sslid_to_connect ;
// ssl_cn = _last_sslcn_to_connect ;
// }
/* Locked search -> internal help function */
bool AuthSSLimpl::locked_FindCert(const RsPeerId& id, X509** cert)
{

View file

@ -137,13 +137,6 @@ public:
/// SSL specific functions used in pqissl/pqissllistener
virtual SSL_CTX* getCTX() = 0;
// virtual void setCurrentConnectionAttemptInfo(
// const RsPgpId& gpg_id, const RsPeerId& ssl_id,
// const std::string& ssl_cn ) = 0;
// virtual void getCurrentConnectionAttemptInfo(
// RsPgpId& gpg_id, RsPeerId& ssl_id, std::string& ssl_cn ) = 0;
/**
* This function parse X509 certificate from the file and return some
* verified informations, like ID and signer
@ -230,14 +223,6 @@ public:
/* SSL specific functions used in pqissl/pqissllistener */
SSL_CTX* getCTX() override;
/* Restored these functions: */
// void setCurrentConnectionAttemptInfo(
// const RsPgpId& gpg_id, const RsPeerId& ssl_id,
// const std::string& ssl_cn ) override;
// void getCurrentConnectionAttemptInfo(
// RsPgpId& gpg_id, RsPeerId& ssl_id, std::string& ssl_cn ) override;
private:
bool LocalStoreCert(X509* x509);

View file

@ -475,22 +475,18 @@ void p3LinkMgrIMPL::tickMonitors()
/* notify GUI */
if (rsEvents && (peer.actions & RS_PEER_CONNECTED))
{
auto e = std::make_shared<RsConnectionEvent>() ;
e->mConnectionInfoCode = RsConnectionEvent::ConnectionEventCode::PEER_CONNECTED;
auto e = std::make_shared<RsConnectionEvent>();
e->mConnectionInfoCode = RsConnectionEventCode::PEER_CONNECTED;
e->mSslId = peer.id;
rsEvents->postEvent(e);
}
if (rsEvents && (peer.actions & RS_PEER_DISCONNECTED))
{
auto e = std::make_shared<RsConnectionEvent>() ;
e->mConnectionInfoCode = RsConnectionEvent::ConnectionEventCode::PEER_DISCONNECTED;
auto e = std::make_shared<RsConnectionEvent>();
e->mConnectionInfoCode = RsConnectionEventCode::PEER_DISCONNECTED;
e->mSslId = peer.id;
rsEvents->postEvent(e);
}
rsEvents->postEvent(e);
}
}
}

View file

@ -1804,21 +1804,24 @@ bool p3PeerMgrIMPL::addCandidateForOwnExternalAddress(const RsPeerId &from, cons
// Notify for every friend that has reported a wrong external address, except if that address is in the IP whitelist.
if((rsBanList!=NULL && !rsBanList->isAddressAccepted(addr_filtered,RSBANLIST_CHECKING_FLAGS_WHITELIST)) && (!sockaddr_storage_sameip(own_addr,addr_filtered)))
{
std::cerr << " Peer " << from << " reports a connection address (" << sockaddr_storage_iptostring(addr_filtered) <<") that is not your current external address (" << sockaddr_storage_iptostring(own_addr) << "). This is weird." << std::endl;
if((rsBanList && !rsBanList->isAddressAccepted(addr_filtered, RSBANLIST_CHECKING_FLAGS_WHITELIST))
&& !sockaddr_storage_sameip(own_addr, addr_filtered) )
{
RsInfo() << __PRETTY_FUNCTION__ << " Peer " << from
<< " reports a connection address (" << addr_filtered
<<") that is not your current external address ("
<< own_addr << "). This is weird." << std::endl;
if(rsEvents)
{
auto ev = std::make_shared<RsConnectionEvent>();
ev->mSslId = from;
ev->mStrInfo1 = sockaddr_storage_iptostring(addr);
ev->mStrInfo2 = sockaddr_storage_iptostring(own_addr);
ev->mConnectionInfoCode = RsConnectionEvent::ConnectionEventCode::PEER_REPORTS_WRONG_IP;
rsEvents->postEvent(ev);
}
}
if(rsEvents)
{
auto ev = std::make_shared<RsConnectionEvent>();
ev->mSslId = from;
ev->mOwnLocator = RsUrl(own_addr);
ev->mReportedLocator = RsUrl(addr);
ev->mConnectionInfoCode = RsConnectionEventCode::PEER_REPORTS_WRONG_IP;
rsEvents->postEvent(ev);
}
}
// we could also sweep over all connected friends and see if some report a different address.

View file

@ -366,9 +366,9 @@ int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState,
<< " CONNECT_FAILED->marking so!" << std::endl;
#endif
activepqi->shutdown(); // STOP THREAD.
activepqi->askForStop(); // STOP THREAD.
active = false;
activepqi = NULL;
activepqi = nullptr;
}
#ifdef PERSON_DEBUG
else
@ -406,7 +406,7 @@ int pqiperson::reset_locked()
std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it)
{
(it->second) -> shutdown(); // STOP THREAD.
it->second->askForStop(); // STOP THREAD.
(it->second) -> reset();
}

View file

@ -1107,16 +1107,14 @@ int pqissl::SSL_Connection_Complete()
return 0;
}
if(rsEvents)
{
if(rsEvents)
{
X509 *x509 = SSL_get_peer_certificate(ssl_connection);
auto ev = std::make_shared<RsAuthSslConnectionAutenticationEvent>();
X509 *x509 = SSL_get_peer_certificate(ssl_connection) ;
ev->mSslId = RsX509Cert::getCertSslId(*x509);
ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::PEER_REFUSED_CONNECTION;
ev->mErrorCode = RsAuthSslError::PEER_REFUSED_CONNECTION;
rsEvents->postEvent(ev);
}
}
std::string out;
rs_sprintf(out, "pqissl::SSL_Connection_Complete()\nIssues with SSL Connect(%d)!\n", err);
@ -1267,7 +1265,7 @@ int pqissl::accept_locked( SSL *ssl, int fd,
if(rsBanList && !rsBanList->isAddressAccepted( foreign_addr, checking_flags, check_result ))
{
RsErr() << __PRETTY_FUNCTION__
RsInfo() << __PRETTY_FUNCTION__
<< " Refusing incoming SSL connection from blacklisted "
<< "foreign address " << foreign_addr
<< ". Reason: " << check_result << ". This should never happen "
@ -1275,18 +1273,15 @@ int pqissl::accept_locked( SSL *ssl, int fd,
<< std::endl;
print_stacktrace();
if(rsEvents)
{
if(rsEvents)
{
X509 *x509 = SSL_get_peer_certificate(ssl);
auto ev = std::make_shared<RsAuthSslConnectionAutenticationEvent>();
X509 *x509 = SSL_get_peer_certificate(ssl) ;
ev->mSslId = RsX509Cert::getCertSslId(*x509);
ev->mLocator = RsUrl(foreign_addr);
ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::IP_IS_BLACKLISTED;
ev->mErrorCode = RsAuthSslError::IP_IS_BLACKLISTED;
rsEvents->postEvent(ev);
}
}
reset_locked();
return failure;
@ -1343,7 +1338,7 @@ int pqissl::accept_locked( SSL *ssl, int fd,
/* shutdown existing - in all cases use the new one */
if ((ssl_connection) && (ssl_connection != ssl))
{
std::cerr << __PRETTY_FUNCTION__
RsInfo() << __PRETTY_FUNCTION__
<< " closing Previous/Existing ssl_connection" << std::endl;
SSL_shutdown(ssl_connection);
SSL_free (ssl_connection);
@ -1351,7 +1346,7 @@ int pqissl::accept_locked( SSL *ssl, int fd,
if ((sockfd > -1) && (sockfd != fd))
{
std::cerr << __PRETTY_FUNCTION__ << " closing Previous/Existing sockfd"
RsInfo() << __PRETTY_FUNCTION__ << " closing Previous/Existing sockfd"
<< std::endl;
net_internal_close(sockfd);
}
@ -1367,7 +1362,7 @@ int pqissl::accept_locked( SSL *ssl, int fd,
*/
sockaddr_storage_copy(foreign_addr, remote_addr);
std::cerr << __PRETTY_FUNCTION__ << " SUCCESSFUL connection to: "
RsInfo() << __PRETTY_FUNCTION__ << " SUCCESSFUL connection to: "
<< PeerId().toStdString() << " remoteaddr: "
<< sockaddr_storage_iptostring(remote_addr) << std::endl;

View file

@ -480,14 +480,12 @@ int pqissllistenbase::continueSSL(IncomingSSLInfo& incoming_connexion_info, bool
if(vres == X509_V_OK && nullptr != rsEvents)
{
auto ev = std::unique_ptr<RsAuthSslConnectionAutenticationEvent>(new RsAuthSslConnectionAutenticationEvent);
auto ev = std::make_shared<RsAuthSslConnectionAutenticationEvent>();
ev->mLocator = RsUrl(incoming_connexion_info.addr);
ev->mErrorCode = RsAuthSslConnectionAutenticationEvent::AuthenticationCode::MISSING_AUTHENTICATION_INFO;
rsEvents->postEvent(std::move(ev));
ev->mErrorCode = RsAuthSslError::MISSING_AUTHENTICATION_INFO;
rsEvents->postEvent(ev);
}
closeConnection(fd, incoming_connexion_info.ssl) ;
closeConnection(fd, incoming_connexion_info.ssl);
// failure -1, pending 0, sucess 1.
return -1;

View file

@ -49,7 +49,7 @@ int pqithreadstreamer::tick()
return 0;
}
void pqithreadstreamer::data_tick()
void pqithreadstreamer::threadTick()
{
uint32_t recv_timeout = 0;
uint32_t sleep_period = 0;

View file

@ -35,7 +35,7 @@ public:
virtual int tick();
protected:
virtual void data_tick();
void threadTick() override; /// @see RsTickingThread
PQInterface *mParent;
uint32_t mTimeout;

View file

@ -23,11 +23,14 @@
#include <memory>
#include <cstdint>
#include <chrono>
#include <functional>
#include "util/rsmemory.h"
#include "util/rsurl.h"
#include "serialiser/rsserializable.h"
#include "serialiser/rstypeserializer.h"
#include "util/rstime.h"
class RsEvents;

View file

@ -102,23 +102,36 @@ struct RsGxsChannelPost : RsSerializable
~RsGxsChannelPost() override;
};
enum class RsChannelEventCode: uint8_t
{
UNKNOWN = 0x00,
NEW_CHANNEL = 0x01, /// emitted when new channel is received
/// emitted when existing channel is updated
UPDATED_CHANNEL = 0x02,
/// new message reeived in a particular channel (group and msg id)
NEW_MESSAGE = 0x03,
/// existing message has been updated in a particular channel
UPDATED_MESSAGE = 0x04,
/// publish key for this channel has been received
RECEIVED_PUBLISH_KEY = 0x05,
/// subscription for channel mChannelGroupId changed.
SUBSCRIBE_STATUS_CHANGED = 0x06,
};
struct RsGxsChannelEvent: RsEvent
{
RsGxsChannelEvent()
: RsEvent(RsEventType::GXS_CHANNELS), mChannelEventCode(ChannelEventCode::UNKNOWN) {}
RsGxsChannelEvent():
RsEvent(RsEventType::GXS_CHANNELS),
mChannelEventCode(RsChannelEventCode::UNKNOWN) {}
enum class ChannelEventCode: uint8_t {
UNKNOWN = 0x00,
NEW_CHANNEL = 0x01, // emitted when new channel is received
UPDATED_CHANNEL = 0x02, // emitted when existing channel is updated
NEW_MESSAGE = 0x03, // new message reeived in a particular channel (group and msg id)
UPDATED_MESSAGE = 0x04, // existing message has been updated in a particular channel (group and msg id)
RECEIVED_PUBLISH_KEY = 0x05, // publish key for this channel has been received.
SUBSCRIBE_STATUS_CHANGED = 0x06, // subscription for channel mChannelGroupId changed.
};
ChannelEventCode mChannelEventCode;
RsGxsGroupId mChannelGroupId;
RsChannelEventCode mChannelEventCode;
RsGxsGroupId mChannelGroupId;
RsGxsMessageId mChannelMsgId;
///* @see RsEvent @see RsSerializable

View file

@ -162,32 +162,54 @@ struct RsGxsCircleDetails : RsSerializable
}
};
enum class RsGxsCircleEventCode: uint8_t
{
UNKNOWN = 0x00,
/** mCircleId contains the circle id and mGxsId is the id requesting
* membership */
CIRCLE_MEMBERSHIP_REQUEST = 0x01,
/** mCircleId is the circle that invites me, and mGxsId is my own Id that is
* invited */
CIRCLE_MEMBERSHIP_INVITE = 0x02,
/** mCircleId contains the circle id and mGxsId is the id dropping
* membership */
CIRCLE_MEMBERSHIP_LEAVE = 0x03,
/// mCircleId contains the circle id and mGxsId is the id of the new member
CIRCLE_MEMBERSHIP_JOIN = 0x04,
/** mCircleId contains the circle id and mGxsId is the id that was revoqued
* by admin */
CIRCLE_MEMBERSHIP_REVOQUED= 0x05,
};
struct RsGxsCircleEvent: RsEvent
{
RsGxsCircleEvent()
: RsEvent(RsEventType::GXS_CIRCLES), mCircleEventType(CircleEventCode::UNKNOWN) {}
: RsEvent(RsEventType::GXS_CIRCLES),
mCircleEventType(RsGxsCircleEventCode::UNKNOWN) {}
enum class CircleEventCode: uint8_t {
UNKNOWN = 0x00,
CIRCLE_MEMBERSHIP_REQUEST = 0x01, // mCircleId contains the circle id and mGxsId is the id requesting membership
CIRCLE_MEMBERSHIP_INVITE = 0x02, // mCircleId is the circle that invites me, and mGxsId is my own Id that is invited
CIRCLE_MEMBERSHIP_LEAVE = 0x03, // mCircleId contains the circle id and mGxsId is the id dropping membership
CIRCLE_MEMBERSHIP_JOIN = 0x04, // mCircleId contains the circle id and mGxsId is the id of the new member
CIRCLE_MEMBERSHIP_REVOQUED= 0x05, // mCircleId contains the circle id and mGxsId is the id that was revoqued by admin
};
CircleEventCode mCircleEventType;
RsGxsCircleId mCircleId;
RsGxsId mGxsId;
RsGxsCircleEventCode mCircleEventType;
RsGxsCircleId mCircleId;
RsGxsId mGxsId;
///* @see RsEvent @see RsSerializable
void serial_process( RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx) override
void serial_process(
RsGenericSerializer::SerializeJob j,
RsGenericSerializer::SerializeContext& ctx ) override
{
RsEvent::serial_process(j, ctx);
RS_SERIAL_PROCESS(mCircleEventType);
RS_SERIAL_PROCESS(mCircleId);
RS_SERIAL_PROCESS(mGxsId);
}
~RsGxsCircleEvent() override;
};
class RsGxsCircles: public RsGxsIfaceHelper

View file

@ -104,33 +104,45 @@ struct RsGxsForumMsg : RsSerializable
~RsGxsForumMsg() override;
};
enum class RsForumEventCode: uint8_t
{
UNKNOWN = 0x00,
NEW_FORUM = 0x01, /// emitted when new forum is received
UPDATED_FORUM = 0x02, /// emitted when existing forum is updated
/// new message reeived in a particular forum
NEW_MESSAGE = 0x03,
/// existing message has been updated in a particular forum
UPDATED_MESSAGE = 0x04,
/// forum was subscribed or unsubscribed
SUBSCRIBE_STATUS_CHANGED = 0x05,
};
struct RsGxsForumEvent: RsEvent
{
RsGxsForumEvent()
: RsEvent(RsEventType::GXS_FORUMS), mForumEventCode(ForumEventCode::UNKNOWN) {}
RsGxsForumEvent()
: RsEvent(RsEventType::GXS_FORUMS),
mForumEventCode(RsForumEventCode::UNKNOWN) {}
enum class ForumEventCode: uint8_t {
UNKNOWN = 0x00,
NEW_FORUM = 0x01, // emitted when new forum is received
UPDATED_FORUM = 0x02, // emitted when existing forum is updated
NEW_MESSAGE = 0x03, // new message reeived in a particular forum (group and msg id)
UPDATED_MESSAGE = 0x04, // existing message has been updated in a particular forum (group and msg id)
SUBSCRIBE_STATUS_CHANGED = 0x05, // forum was subscribed or unsubscribed
};
ForumEventCode mForumEventCode;
RsGxsGroupId mForumGroupId;
RsForumEventCode mForumEventCode;
RsGxsGroupId mForumGroupId;
RsGxsMessageId mForumMsgId;
///* @see RsEvent @see RsSerializable
void serial_process( RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx) override
void serial_process(
RsGenericSerializer::SerializeJob j,
RsGenericSerializer::SerializeContext& ctx ) override
{
RsEvent::serial_process(j, ctx);
RS_SERIAL_PROCESS(mForumEventCode);
RS_SERIAL_PROCESS(mForumGroupId);
RS_SERIAL_PROCESS(mForumMsgId);
}
}
~RsGxsForumEvent() override;
};
class RsGxsForums: public RsGxsIfaceHelper

View file

@ -43,18 +43,25 @@ public:
static const std::string DEFAULT_BINDING_ADDRESS; // 127.0.0.1
/**
* @brief Restart RsJsonApi server
* @brief Restart RsJsonApi server asynchronously.
* @jsonapi{development}
*/
virtual bool restart() = 0;
virtual void restart() = 0;
/** @brief Request RsJsonApi to stop and wait until it has stopped.
* Do not expose this method to JSON API as fullstop must not be called from
* the same thread of service execution.
*/
virtual void fullstop() = 0;
/**
* @brief Request RsJsonApi to stop and wait until ti has stopped.
* @brief Request RsJsonApi to stop asynchronously.
* @jsonapi{development}
* Be expecially carefull to call this from JSON API because you will loose
* access to the API.
* @jsonapi{development}
* If you need to wait until stopping has completed @see isRunning().
*/
virtual bool fullstop() = 0;
virtual void askForStop() = 0;
/**
* @brief Get status of the json api server
@ -128,8 +135,7 @@ public:
std::string& user, std::string& passwd );
/**
* Add new auth (user,passwd) token to the authorized set, creating the
* token user:passwd internally.
* Add new API auth (user,passwd) token to the authorized set.
* @jsonapi{development}
* @param[in] user user name to autorize, must be alphanumerinc
* @param[in] password password for the user, must be alphanumerinc

View file

@ -296,19 +296,24 @@ struct MsgTagType : RsSerializable
} //namespace Rs
} //namespace Msgs
enum class RsMailStatusEventType: uint8_t {
NEW_MESSAGE = 0x00,
MESSAGE_REMOVED = 0x01,
MESSAGE_SENT = 0x02,
MESSAGE_RECEIVED_ACK = 0x03, // means the peer received the message
FAILED_SIGNATURE = 0x04, // means the signature of the message cannot be verified
enum class RsMailStatusEventCode: uint8_t
{
NEW_MESSAGE = 0x00,
MESSAGE_REMOVED = 0x01,
MESSAGE_SENT = 0x02,
/// means the peer received the message
MESSAGE_RECEIVED_ACK = 0x03,
/// An error occurred attempting to sign the message
SIGNATURE_FAILED = 0x04,
};
struct RsMailStatusEvent : RsEvent
{
RsMailStatusEvent() : RsEvent(RsEventType::MAIL_STATUS) {}
RsMailStatusEventType mMailStatusEventCode;
RsMailStatusEventCode mMailStatusEventCode;
std::set<RsMailMessageId> mChangedMsgIds;
/// @see RsEvent

View file

@ -210,33 +210,35 @@ std::string RsPeerLastConnectString(uint32_t lastConnect);
// Connexion and security events //
//===================================================================================================//
enum class RsAuthSslError: uint8_t
{
// NO_ERROR = 0x00, // enabling break windows build
MISSING_AUTHENTICATION_INFO = 0x01,
PGP_SIGNATURE_VALIDATION_FAILED = 0x02,
MISMATCHED_PGP_ID = 0x03,
NO_CERTIFICATE_SUPPLIED = 0x04,
NOT_A_FRIEND = 0x05,
MISSING_CERTIFICATE = 0x06,
IP_IS_BLACKLISTED = 0x07,
PEER_REFUSED_CONNECTION = 0x08,
UNKNOWN_ERROR = 0x09,
};
/**
* Event triggered by AuthSSL when authentication of a connection attempt either
* fail or success
*/
struct RsAuthSslConnectionAutenticationEvent : RsEvent
{
RsAuthSslConnectionAutenticationEvent() : RsEvent(RsEventType::AUTHSSL_CONNECTION_AUTENTICATION) {}
enum class AuthenticationCode: uint8_t {
NO_CONNECTION_ERROR = 0x00,
MISSING_AUTHENTICATION_INFO = 0x01,
PGP_SIGNATURE_VALIDATION_FAILED = 0x02,
MISMATCHED_PGP_ID = 0x03,
NO_CERTIFICATE_SUPPLIED = 0x04,
NOT_A_FRIEND = 0x05,
MISSING_CERTIFICATE = 0x06,
IP_IS_BLACKLISTED = 0x07,
PEER_REFUSED_CONNECTION = 0x08,
UNKNOWN_ERROR = 0x09,
};
RsAuthSslConnectionAutenticationEvent() :
RsEvent(RsEventType::AUTHSSL_CONNECTION_AUTENTICATION) {}
RsPeerId mSslId;
std::string mSslCn;
RsPgpId mPgpId;
RsUrl mLocator;
RsUrl mLocator;
std::string mErrorMsg;
AuthenticationCode mErrorCode;
RsAuthSslError mErrorCode;
///* @see RsEvent @see RsSerializable
void serial_process( RsGenericSerializer::SerializeJob j,
@ -250,36 +252,49 @@ struct RsAuthSslConnectionAutenticationEvent : RsEvent
RS_SERIAL_PROCESS(mErrorMsg);
RS_SERIAL_PROCESS(mErrorCode);
}
~RsAuthSslConnectionAutenticationEvent() override;
};
enum class RsConnectionEventCode: uint8_t
{
UNKNOWN = 0x00,
PEER_CONNECTED = 0x01,
PEER_DISCONNECTED = 0x02,
PEER_TIME_SHIFT = 0x03, // mTimeShift = time shift in seconds
PEER_REPORTS_WRONG_IP = 0x04, // mPeerLocator = address reported, mOwnLocator = own address
};
struct RsConnectionEvent : RsEvent
{
RsConnectionEvent()
: RsEvent(RsEventType::PEER_CONNECTION),
mConnectionInfoCode(ConnectionEventCode::UNKNOWN) {}
mConnectionInfoCode(RsConnectionEventCode::UNKNOWN), mTimeShift(0) {}
enum class ConnectionEventCode: uint8_t {
UNKNOWN = 0x00,
PEER_CONNECTED = 0x01,
PEER_DISCONNECTED = 0x02,
PEER_TIME_SHIFT = 0x03, // mStrInfo1 = time shift in seconds
PEER_REPORTS_WRONG_IP = 0x04, // mStrInfo1 = address reported, mStrInfo2 = own address
};
ConnectionEventCode mConnectionInfoCode;
RsConnectionEventCode mConnectionInfoCode;
RsPeerId mSslId;
std::string mStrInfo1;
std::string mStrInfo2;
RsUrl mOwnLocator;
RsUrl mReportedLocator;
/** If there is a time shift with the peer aka
* mConnectionInfoCode == PEER_TIME_SHIFT contains the time shift value in
* seconds */
rstime_t mTimeShift;
///* @see RsEvent @see RsSerializable
void serial_process( RsGenericSerializer::SerializeJob j,RsGenericSerializer::SerializeContext& ctx) override
void serial_process(
RsGenericSerializer::SerializeJob j,
RsGenericSerializer::SerializeContext& ctx ) override
{
RsEvent::serial_process(j, ctx);
RS_SERIAL_PROCESS(mConnectionInfoCode);
RS_SERIAL_PROCESS(mSslId);
RS_SERIAL_PROCESS(mStrInfo1);
RS_SERIAL_PROCESS(mStrInfo2);
RS_SERIAL_PROCESS(mOwnLocator);
RS_SERIAL_PROCESS(mReportedLocator);
RS_SERIAL_PROCESS(mTimeShift);
}
~RsConnectionEvent() override;
};
//===================================================================================================//

View file

@ -68,18 +68,21 @@ class RsPostedGroup
std::ostream &operator<<(std::ostream &out, const RsPostedGroup &group);
std::ostream &operator<<(std::ostream &out, const RsPostedPost &post);
enum class RsPostedEventCode: uint8_t
{
UNKNOWN = 0x00,
NEW_POSTED_GROUP = 0x01,
NEW_MESSAGE = 0x02
};
struct RsGxsPostedEvent: RsEvent
{
RsGxsPostedEvent()
: RsEvent(RsEventType::GXS_POSTED), mPostedEventCode(PostedEventCode::UNKNOWN) {}
RsGxsPostedEvent():
RsEvent(RsEventType::GXS_POSTED),
mPostedEventCode(RsPostedEventCode::UNKNOWN) {}
enum class PostedEventCode: uint8_t {
UNKNOWN = 0x00,
NEW_POSTED_GROUP = 0x01,
NEW_MESSAGE = 0x02
};
PostedEventCode mPostedEventCode;
RsPostedEventCode mPostedEventCode;
RsGxsGroupId mPostedGroupId;
RsGxsMessageId mPostedMsgId;

View file

@ -4,7 +4,8 @@
* libretroshare: retroshare core library *
* *
* Copyright (C) 2012-2018 Retroshare Team <contact@retroshare.cc> *
* Copyright (C) 2018-2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* Copyright (C) 2018-2020 Gioacchino Mazzurco <gio@eigenlab.org> *
* Copyright (C) 2020 Asociación Civil Altermundi <info@altermundi.net> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
@ -55,7 +56,7 @@
* Customize it trough qmake command line @see retroshare.pri
*/
#ifndef RS_EXTRA_VERSION
# define RS_EXTRA_VERSION "alpha"
# define RS_EXTRA_VERSION "-alpha"
#endif

View file

@ -34,6 +34,7 @@
#include "util/rsstring.h"
#include "rs_upnp/upnp18_retrocompat.h"
#include "util/rstime.h"
#ifdef __GNUC__
#if __GNUC__ >= 4

View file

@ -139,7 +139,7 @@ RsServer::~RsServer()
/* Thread Fn: Run the Core */
void RsServer::data_tick()
void RsServer::threadTick()
{
rstime::rs_usleep(mTimeDelta * 1000000);

View file

@ -88,9 +88,7 @@ public:
void setShutdownCallback(const std::function<void(int)>& callback)
{ mShutdownCallback = callback; }
/* Thread Fn: Run the Core */
virtual void data_tick();
void threadTick() override; /// @see RsTickingThread
/* locking stuff */
void lockRsCore()

View file

@ -1859,3 +1859,5 @@ RsPeerStateChangedEvent::RsPeerStateChangedEvent(RsPeerId sslId) :
RsEvent(RsEventType::PEER_STATE_CHANGED), mSslId(sslId) {}
RsPeers::~RsPeers() = default;
RsAuthSslConnectionAutenticationEvent::~RsAuthSslConnectionAutenticationEvent() = default;
RsConnectionEvent::~RsConnectionEvent() = default;

View file

@ -45,7 +45,7 @@
#include "retroshare/rsnotify.h"
#include "retroshare/rsiface.h"
#include "plugins/pluginmanager.h"
#include "retroshare/rsversion.h"
#include "rsserver/rsloginhandler.h"
#include "rsserver/rsaccounts.h"
@ -96,10 +96,6 @@ RsDht *rsDht = NULL ;
#ifdef RS_JSONAPI
# include "jsonapi/jsonapi.h"
#ifdef RS_WEBUI
#include "jsonapi/jsonapi.h"
#endif
#endif
#ifdef RS_BROADCAST_DISCOVERY
@ -195,6 +191,9 @@ static const int SSLPWD_LEN = 64;
void RsInit::InitRsConfig()
{
RsInfo() << " libretroshare version: " << RS_HUMAN_READABLE_VERSION
<< std::endl;
rsInitConfig = new RsInitConfig;

View file

@ -280,7 +280,7 @@ bool inline isTunnelActiveError(const std::string &answer) {
return answer.compare(0, 22, "ERROR tunnel is active") == 0;
}
void p3I2pBob::data_tick()
void p3I2pBob::threadTick()
{
int sleepTime = 0;
{

View file

@ -205,9 +205,7 @@ public:
static std::string keyToBase32Addr(const std::string &key);
// RsTickingThread interface
public:
void data_tick();
void threadTick() override; /// @see RsTickingThread
private:
int stateMachineBOB();

View file

@ -139,7 +139,7 @@ void BroadcastDiscoveryService::updatePublishedData()
BroadcastDiscoveryPack::fromPeerDetails(od).serializeToString());
}
void BroadcastDiscoveryService::data_tick()
void BroadcastDiscoveryService::threadTick()
{
auto nextRunAt = std::chrono::system_clock::now() + std::chrono::seconds(5);

View file

@ -57,8 +57,7 @@ public:
/// @see RsBroadcastDiscovery
bool disableMulticastListening() override;
/// @see RsTickingThread
void data_tick() override;
void threadTick() override; /// @see RsTickingThread
protected:
constexpr static uint16_t port = 36405;

View file

@ -257,12 +257,10 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
for (auto mit1 = mit->second.begin(); mit1 != mit->second.end(); ++mit1)
{
auto ev = std::make_shared<RsGxsChannelEvent>();
ev->mChannelMsgId = *mit1;
ev->mChannelGroupId = mit->first;
ev->mChannelEventCode = RsGxsChannelEvent::ChannelEventCode::NEW_MESSAGE;
rsEvents->sendEvent(ev);
ev->mChannelEventCode = RsChannelEventCode::NEW_MESSAGE;
rsEvents->postEvent(ev);
}
}
}
@ -313,11 +311,9 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
for (git = grpList.begin(); git != grpList.end(); ++git)
{
auto ev = std::make_shared<RsGxsChannelEvent>();
ev->mChannelGroupId = *git;
ev->mChannelEventCode = RsGxsChannelEvent::ChannelEventCode::SUBSCRIBE_STATUS_CHANGED;
rsEvents->sendEvent(ev);
ev->mChannelEventCode = RsChannelEventCode::SUBSCRIBE_STATUS_CHANGED;
rsEvents->postEvent(ev);
}
}
@ -338,11 +334,9 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
IndicateConfigChanged();
auto ev = std::make_shared<RsGxsChannelEvent>();
ev->mChannelGroupId = *git;
ev->mChannelEventCode = RsGxsChannelEvent::ChannelEventCode::NEW_CHANNEL;
rsEvents->sendEvent(ev);
ev->mChannelEventCode = RsChannelEventCode::NEW_CHANNEL;
rsEvents->postEvent(ev);
}
else
std::cerr << "(II) Not notifying already known channel " << *git << std::endl;
@ -358,11 +352,9 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
for (git = grpList.begin(); git != grpList.end(); ++git)
{
auto ev = std::make_shared<RsGxsChannelEvent>();
ev->mChannelGroupId = *git;
ev->mChannelEventCode = RsGxsChannelEvent::ChannelEventCode::RECEIVED_PUBLISH_KEY;
rsEvents->sendEvent(ev);
ev->mChannelEventCode = RsChannelEventCode::RECEIVED_PUBLISH_KEY;
rsEvents->postEvent(ev);
}
break;
}

View file

@ -504,23 +504,21 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
if(rsEvents && (c->getType() == RsGxsNotify::TYPE_RECEIVED_NEW) )
for (auto msgIdIt(mit->second.begin()), end(mit->second.end()); msgIdIt != end; ++msgIdIt)
{
// @Gio: should this be async?
RsGxsCircleMsg msg;
RsGxsCircleMsg msg;
getCircleRequest(RsGxsGroupId(circle_id),*msgIdIt,msg);
auto ev = std::make_shared<RsGxsCircleEvent>();
ev->mCircleId = circle_id;
ev->mGxsId = msg.mMeta.mAuthorId;
if (msg.stuff == "SUBSCRIPTION_REQUEST_UNSUBSCRIBE")
ev->mCircleEventType = RsGxsCircleEvent::CircleEventCode::CIRCLE_MEMBERSHIP_LEAVE;
ev->mCircleEventType = RsGxsCircleEventCode::CIRCLE_MEMBERSHIP_LEAVE;
else if(details.mAllowedGxsIds.find(msg.mMeta.mAuthorId) != details.mAllowedGxsIds.end())
ev->mCircleEventType = RsGxsCircleEvent::CircleEventCode::CIRCLE_MEMBERSHIP_JOIN;
ev->mCircleEventType = RsGxsCircleEventCode::CIRCLE_MEMBERSHIP_JOIN;
else
ev->mCircleEventType = RsGxsCircleEvent::CircleEventCode::CIRCLE_MEMBERSHIP_REQUEST;
ev->mCircleEventType = RsGxsCircleEventCode::CIRCLE_MEMBERSHIP_REQUEST;
rsEvents->sendEvent(ev);
rsEvents->postEvent(ev);
}
}
@ -2416,3 +2414,4 @@ RsGxsCircles::~RsGxsCircles() = default;
RsGxsCircleMsg::~RsGxsCircleMsg() = default;
RsGxsCircleDetails::~RsGxsCircleDetails() = default;
RsGxsCircleGroup::~RsGxsCircleGroup() = default;
RsGxsCircleEvent::~RsGxsCircleEvent() = default;

View file

@ -199,12 +199,10 @@ void p3GxsForums::notifyChanges(std::vector<RsGxsNotify *> &changes)
for (auto mit1 = mit->second.begin(); mit1 != mit->second.end(); ++mit1)
{
auto ev = std::make_shared<RsGxsForumEvent>();
ev->mForumMsgId = *mit1;
ev->mForumGroupId = mit->first;
ev->mForumEventCode = RsGxsForumEvent::ForumEventCode::NEW_MESSAGE;
rsEvents->sendEvent(ev);
ev->mForumEventCode = RsForumEventCode::NEW_MESSAGE;
rsEvents->postEvent(ev);
}
}
@ -256,11 +254,9 @@ void p3GxsForums::notifyChanges(std::vector<RsGxsNotify *> &changes)
for (git = grpList.begin(); git != grpList.end(); ++git)
{
auto ev = std::make_shared<RsGxsForumEvent>();
ev->mForumGroupId = *git;
ev->mForumEventCode = RsGxsForumEvent::ForumEventCode::SUBSCRIBE_STATUS_CHANGED;
rsEvents->sendEvent(ev);
ev->mForumEventCode = RsForumEventCode::SUBSCRIBE_STATUS_CHANGED;
rsEvents->postEvent(ev);
}
}
@ -278,18 +274,19 @@ void p3GxsForums::notifyChanges(std::vector<RsGxsNotify *> &changes)
{
if(mKnownForums.find(*git) == mKnownForums.end())
{
mKnownForums.insert(std::make_pair(*git,time(NULL))) ;
mKnownForums.insert(
std::make_pair(*git, time(nullptr)));
IndicateConfigChanged();
auto ev = std::make_shared<RsGxsForumEvent>();
ev->mForumGroupId = *git;
ev->mForumEventCode = RsGxsForumEvent::ForumEventCode::NEW_FORUM;
rsEvents->sendEvent(ev);
ev->mForumEventCode = RsForumEventCode::NEW_FORUM;
rsEvents->postEvent(ev);
}
else
std::cerr << "(II) Not notifying already known channel " << *git << std::endl;
RsInfo() << __PRETTY_FUNCTION__
<< " Not notifying already known forum "
<< *git << std::endl;
}
break;
}
@ -1148,3 +1145,4 @@ bool RsGxsForumGroup::canEditPosts(const RsGxsId& id) const
RsGxsForumGroup::~RsGxsForumGroup() = default;
RsGxsForumMsg::~RsGxsForumMsg() = default;
RsGxsForums::~RsGxsForums() = default;
RsGxsForumEvent::~RsGxsForumEvent() = default;

View file

@ -65,12 +65,14 @@ class Reputation
{
public:
Reputation() :
mOwnOpinion(static_cast<int32_t>(RsOpinion::NEUTRAL)), mOwnOpinionTs(0),
mOwnOpinion(static_cast<int32_t>(RsOpinion::NEUTRAL)),
mOwnOpinionTs(0),
mFriendAverage(1.0f),
/* G10h4ck: TODO shouln't this be initialized with
* RsReputation::NEUTRAL or UNKOWN? */
mReputationScore(static_cast<float>(RsOpinion::NEUTRAL)),
mIdentityFlags(0) {}
mFriendsPositive(0),
mFriendsNegative(0),
mReputationScore(1.0f),
mIdentityFlags(0),
mLastUsedTS(0) {}
void updateReputation();

View file

@ -177,12 +177,11 @@ void p3MsgService::processIncomingMsg(RsMsgItem *mi)
if (rsEvents)
{
auto ev = std::make_shared<RsMailStatusEvent>();
ev->mMailStatusEventCode = RsMailStatusEventType::NEW_MESSAGE;
auto ev = std::make_shared<RsMailStatusEvent>();
ev->mMailStatusEventCode = RsMailStatusEventCode::NEW_MESSAGE;
ev->mChangedMsgIds.insert(std::to_string(mi->msgId));
rsEvents->sendEvent(ev);
}
rsEvents->postEvent(ev);
}
imsg[mi->msgId] = mi;
RsMsgSrcId* msi = new RsMsgSrcId();
@ -337,8 +336,8 @@ int p3MsgService::checkOutgoingMessages()
bool changed = false;
std::list<RsMsgItem*> output_queue;
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEventType::MESSAGE_SENT;
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEventCode::MESSAGE_SENT;
{
RS_STACK_MUTEX(mMsgMtx); /********** STACK LOCKED MTX ******/
@ -898,9 +897,8 @@ bool p3MsgService::removeMsgId(const std::string &mid)
bool changed = false;
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEventType::MESSAGE_REMOVED;
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEventCode::MESSAGE_REMOVED;
{
RsStackMutex stack(mMsgMtx); /********** STACK LOCKED MTX ******/
@ -1268,8 +1266,8 @@ uint32_t p3MsgService::sendMail(
uint32_t ret = 0;
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEventType::MESSAGE_SENT;
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEventCode::MESSAGE_SENT;
auto pSend = [&](const std::set<RsGxsId>& sDest)
{
@ -2083,12 +2081,13 @@ void p3MsgService::notifyDataStatus( const GRouterMsgPropagationId& id,
NOTIFY_TYPE_ADD );
IndicateConfigChanged();
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEventType::NEW_MESSAGE;
pEvent->mChangedMsgIds.insert(std::to_string(msg_id));
if(rsEvents) rsEvents->postEvent(pEvent);
if(rsEvents)
{
auto pEvent = std::make_shared<RsMailStatusEvent>();
pEvent->mMailStatusEventCode = RsMailStatusEventCode::NEW_MESSAGE;
pEvent->mChangedMsgIds.insert(std::to_string(msg_id));
rsEvents->postEvent(pEvent);
}
return;
}
@ -2185,11 +2184,11 @@ bool p3MsgService::notifyGxsTransSendStatus( RsGxsTransId mailId,
Dbg2() << __PRETTY_FUNCTION__ << " " << mailId << ", "
<< static_cast<uint32_t>(status) << std::endl;
auto pEvent = std::make_shared<RsMailStatusEvent>();
auto pEvent = std::make_shared<RsMailStatusEvent>();
if( status == GxsTransSendStatus::RECEIPT_RECEIVED )
{
pEvent->mMailStatusEventCode = RsMailStatusEventType::NEW_MESSAGE;
pEvent->mMailStatusEventCode = RsMailStatusEventCode::NEW_MESSAGE;
uint32_t msg_id;
{
@ -2244,20 +2243,20 @@ bool p3MsgService::notifyGxsTransSendStatus( RsGxsTransId mailId,
else if( status >= GxsTransSendStatus::FAILED_RECEIPT_SIGNATURE )
{
uint32_t msg_id;
pEvent->mMailStatusEventCode = RsMailStatusEventType::FAILED_SIGNATURE;
pEvent->mMailStatusEventCode = RsMailStatusEventCode::SIGNATURE_FAILED;
{
RS_STACK_MUTEX(gxsOngoingMutex);
std::cerr << __PRETTY_FUNCTION__ << " mail delivery "
<< "mailId: " << mailId
<< " failed with " << static_cast<uint32_t>(status);
RsErr() << __PRETTY_FUNCTION__ << " mail delivery "
<< "mailId: " << mailId
<< " failed with " << static_cast<uint32_t>(status);
auto it = gxsOngoingMessages.find(mailId);
if(it == gxsOngoingMessages.end())
{
std::cerr << " cannot find pending message to notify"
<< std::endl;
RsErr() << __PRETTY_FUNCTION__
<< " cannot find pending message to notify"
<< std::endl;
return false;
}

View file

@ -115,13 +115,11 @@ void p3PostBase::notifyChanges(std::vector<RsGxsNotify *> &changes)
if (rsEvents && msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW)
for (auto mit1 = mit->second.begin(); mit1 != mit->second.end(); ++mit1)
{
auto ev = std::make_shared<RsGxsPostedEvent>();
ev->mPostedMsgId = *mit1;
ev->mPostedGroupId = mit->first;
ev->mPostedEventCode = RsGxsPostedEvent::PostedEventCode::NEW_MESSAGE;
rsEvents->sendEvent(ev);
auto ev = std::make_shared<RsGxsPostedEvent>();
ev->mPostedMsgId = *mit1;
ev->mPostedGroupId = mit->first;
ev->mPostedEventCode = RsPostedEventCode::NEW_MESSAGE;
rsEvents->postEvent(ev);
}
}
}
@ -146,11 +144,9 @@ void p3PostBase::notifyChanges(std::vector<RsGxsNotify *> &changes)
if (rsEvents && groupChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW)
{
auto ev = std::make_shared<RsGxsPostedEvent>();
ev->mPostedGroupId = *git;
ev->mPostedEventCode = RsGxsPostedEvent::PostedEventCode::NEW_POSTED_GROUP;
rsEvents->sendEvent(ev);
ev->mPostedEventCode = RsPostedEventCode::NEW_POSTED_GROUP;
rsEvents->postEvent(ev);
}
}
}

View file

@ -20,6 +20,8 @@
* *
*******************************************************************************/
#include <iomanip>
#include <sys/time.h>
#include <cmath>
#include "util/rsdir.h"
#include "retroshare/rsiface.h"
@ -28,12 +30,10 @@
#include "pqi/pqistore.h"
#include "pqi/p3linkmgr.h"
#include "rsserver/p3face.h"
#include "util/cxx17retrocompat.h"
#include "services/p3rtt.h"
#include "rsitems/rsrttitems.h"
#include <sys/time.h>
#include <math.h>
/****
* #define DEBUG_RTT 1
@ -353,32 +353,29 @@ int p3rtt::storePongResult(const RsPeerId& id, uint32_t counter, double recv_ts,
while(peerInfo->mPongResults.size() > MAX_PONG_RESULTS)
{
peerInfo->mPongResults.pop_front();
}
//Wait at least 20 pongs before compute mean time offset
if(peerInfo->mPongResults.size() > 20)
{
double mean = 0;
for(std::list<RsRttPongResult>::const_iterator prIt = peerInfo->mPongResults.begin(), end = peerInfo->mPongResults.end(); prIt != end; ++ prIt)
{
mean += prIt->mOffset;
}
for(auto prIt : std::as_const(peerInfo->mPongResults))
mean += prIt.mOffset;
peerInfo->mCurrentMeanOffset = mean / peerInfo->mPongResults.size();
if(fabs(peerInfo->mCurrentMeanOffset) > 120)
{
if(rsEvents)
{
if(rsEvents)
{
auto ev = std::make_shared<RsConnectionEvent>();
ev->mSslId = peerInfo->mId;
ev->mStrInfo1 = RsUtil::NumberToString(peerInfo->mCurrentMeanOffset,false);
ev->mConnectionInfoCode = RsConnectionEvent::ConnectionEventCode::PEER_TIME_SHIFT;
ev->mTimeShift = static_cast<rstime_t>(peerInfo->mCurrentMeanOffset);
ev->mConnectionInfoCode = RsConnectionEventCode::PEER_TIME_SHIFT;
rsEvents->postEvent(ev);
}
std::cerr << "(WW) Peer:" << peerInfo->mId << " get time offset more than two minutes with you!!!" << std::endl;
}
RsWarn() << __PRETTY_FUNCTION__ << " Peer: " << peerInfo->mId
<< " have a time offset of more than two minutes with you"
<< std::endl;
}
}
return 1;

View file

@ -21,6 +21,7 @@
*******************************************************************************/
#include <string>
#include <thread>
#include "services/rseventsservice.h"
@ -137,7 +138,7 @@ bool RsEventsService::unregisterEventsHandler(RsEventsHandlerId_t hId)
return false;
}
void RsEventsService::data_tick()
void RsEventsService::threadTick()
{
auto nextRunAt = std::chrono::system_clock::now() +
std::chrono::milliseconds(200);

View file

@ -75,8 +75,7 @@ protected:
RsMutex mEventQueueMtx;
std::deque< std::shared_ptr<const RsEvent> > mEventQueue;
/// @see RsTickingThread
void data_tick() override;
void threadTick() override; /// @see RsTickingThread
void handleEvent(std::shared_ptr<const RsEvent> event);
RsEventsHandlerId_t generateUniqueHandlerId_unlocked();

View file

@ -1,20 +1,6 @@
################################################################################
# uselibretroshare.pri #
# Copyright (C) 2018, Retroshare team <retroshare.team@gmailcom> #
# #
# This program is free software: you can redistribute it and/or modify #
# it under the terms of the GNU Lesser General Public License as #
# published by the Free Software Foundation, either version 3 of the #
# License, or (at your option) any later version. #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU Lesser General Public License for more details. #
# #
# You should have received a copy of the GNU Lesser General Public License #
# along with this program. If not, see <https://www.gnu.org/licenses/>. #
################################################################################
# SPDX-FileCopyrightText: (C) 2004-2019 Retroshare Team <contact@retroshare.cc>
# SPDX-License-Identifier: CC0-1.0
RS_SRC_PATH=$$clean_path($${PWD}/../../)
RS_BUILD_PATH=$$clean_path($${OUT_PWD}/../../)

View file

@ -29,7 +29,9 @@
#include <map>
#include <string>
#include "util/rsthreads.h"
#include "util/rstime.h"
struct sockaddr ;

View file

@ -23,8 +23,10 @@
#include <list>
#include <string>
#include "util/rsthreads.h"
#include "util/rsnet.h"
#include "util/rstime.h"
struct sockaddr ;

View file

@ -0,0 +1,54 @@
/*******************************************************************************
* libretroshare/src/util: rserrno.cc *
* *
* libretroshare: retroshare core library *
* *
* Copyright (C) 2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
*******************************************************************************/
#include <cerrno>
#define RS_INTERNAL_ERRNO_CASE(e) case e: return #e
const char* rsErrnoName(int err)
{
switch (err)
{
RS_INTERNAL_ERRNO_CASE(EINVAL);
RS_INTERNAL_ERRNO_CASE(EBUSY);
RS_INTERNAL_ERRNO_CASE(EAGAIN);
RS_INTERNAL_ERRNO_CASE(EDEADLK);
RS_INTERNAL_ERRNO_CASE(EPERM);
RS_INTERNAL_ERRNO_CASE(EBADF);
RS_INTERNAL_ERRNO_CASE(EFAULT);
RS_INTERNAL_ERRNO_CASE(ENOTSOCK);
RS_INTERNAL_ERRNO_CASE(EISCONN);
RS_INTERNAL_ERRNO_CASE(ECONNREFUSED);
RS_INTERNAL_ERRNO_CASE(ETIMEDOUT);
RS_INTERNAL_ERRNO_CASE(ENETUNREACH);
RS_INTERNAL_ERRNO_CASE(EADDRINUSE);
RS_INTERNAL_ERRNO_CASE(EINPROGRESS);
RS_INTERNAL_ERRNO_CASE(EALREADY);
RS_INTERNAL_ERRNO_CASE(ENOTCONN);
RS_INTERNAL_ERRNO_CASE(EPIPE);
RS_INTERNAL_ERRNO_CASE(ECONNRESET);
RS_INTERNAL_ERRNO_CASE(EHOSTUNREACH);
RS_INTERNAL_ERRNO_CASE(EADDRNOTAVAIL);
}
return "rsErrnoName UNKNOWN ERROR CODE";
}

View file

@ -0,0 +1,24 @@
/*******************************************************************************
* libretroshare/src/util: rserrno.h *
* *
* libretroshare: retroshare core library *
* *
* Copyright (C) 2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
*******************************************************************************/
#pragma once
const char* rsErrnoName(int err);

View file

@ -207,15 +207,14 @@ bool ConvertUtf16ToUtf8(const std::wstring& source, std::string& dest)
bool is_alphanumeric(char c)
{
return (c>='0' && c<'9') || (c>='a' && c<='z') || (c>='A' && c<='Z') ;
return (c>='0' && c<='9') || (c>='a' && c<='z') || (c>='A' && c<='Z');
}
bool is_alphanumeric(const std::string& s)
{
for(uint32_t i=0;i<s.size();++i)
if(!is_alphanumeric(s[i]))
return false;
return true;
for( uint32_t i=0; i < s.size(); ++i)
if(!is_alphanumeric(s[i])) return false;
return true;
}
} } // librs::util

View file

@ -3,7 +3,9 @@
* *
* libretroshare: retroshare core library *
* *
* Copyright 2004-2007 by Robert Fernie <retroshare@lunamutt.com> *
* Copyright (C) 2004-2007 Robert Fernie <retroshare@lunamutt.com> *
* Copyright (C) 2016-2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* Copyright (C) 2019-2020 Asociación Civil Altermundi <info@altermundi.net> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
@ -20,14 +22,20 @@
* *
*******************************************************************************/
#include "rsthreads.h"
#include <unistd.h> // for usleep()
#include <errno.h> // for errno
#include <iostream>
#include "util/rstime.h"
#include "util/rsdebug.h"
#include <time.h>
#include <thread>
#include <chrono>
#ifdef RSMUTEX_DEBUG
#include <cstdio>
#include <sys/time.h>
#endif
#include "rsthreads.h"
#include "util/rsdebug.h"
#include "util/rserrno.h"
#include "util/rstime.h"
#ifdef __APPLE__
int __attribute__((weak)) pthread_setname_np(const char *__buf) ;
@ -41,16 +49,14 @@ int RS_pthread_setname_np(pthread_t __target_thread, const char *__buf) {
}
#endif
#ifdef RSMUTEX_DEBUG
#include <stdio.h>
#include <sys/time.h>
#endif
/*******
* #define DEBUG_THREADS 1
* #define RSMUTEX_ABORT 1 // Catch wrong pthreads mode.
*******/
#define THREAD_DEBUG std::cerr << "[this=" << (void*)this << ", caller thread ID: " << std::hex << pthread_self() << ", thread ID: " << mTid << std::dec << "] "
#define THREAD_DEBUG RsDbg() << "[this=" << static_cast<void*>(this) \
<< ", caller thread ID: " << std::hex << pthread_self() << ", thread ID: " \
<< mTid << std::dec << "] "
#ifdef RSMUTEX_ABORT
#include <stdlib.h>
@ -60,215 +66,120 @@ int RS_pthread_setname_np(pthread_t __target_thread, const char *__buf) {
#include <iostream>
#endif
void RsThread::go()
/*static*/ void* RsThread::rsthread_init(void* p)
{
mShouldStopSemaphore.set(0) ;
mHasStoppedSemaphore.set(0) ;
RsThread* thread = reinterpret_cast<RsThread *>(p);
if(!thread) return nullptr;
runloop();
mShouldStopSemaphore.set(0);
mHasStoppedSemaphore.set(1); // last value that we modify because this is interpreted as a signal that the object can be deleted.
}
void *RsThread::rsthread_init(void* p)
{
RsThread *thread = (RsThread *) p;
if (!thread)
{
return NULL;
}
// tell the OS to free the thread resources when this function exits
// it is a replacement for pthread_join()
pthread_detach(pthread_self());
/* Using pthread_detach(...) the thread resources will be automatically
* freed when this function return, so there is no need for pthread_join()
* later. */
pthread_detach(pthread_self());
#ifdef DEBUG_THREADS
std::cerr << "[Thread ID:" << std::hex << pthread_self() << std::dec << "] thread is started. Calling runloop()..." << std::endl;
std::cerr << "[Thread ID:" << std::hex << pthread_self() << std::dec
<< "] thread is started. Calling wrapRun()..." << std::endl;
#endif
thread->go();
return NULL;
thread->wrapRun();
return nullptr;
}
RsThread::RsThread()
void RsThread::resetTid()
{
#ifdef WINDOWS_SYS
memset (&mTid, 0, sizeof(mTid));
memset (&mTid, 0, sizeof(mTid));
#else
mTid = 0;
#endif
// The thread is certainly not running. This avoids to lock down when calling shutdown on a thread that has never started.
#ifdef DEBUG_THREADS
THREAD_DEBUG << "[Thread ID:" << std::hex << pthread_self() << std::dec << "] thread object created. Initing stopped=1, should_stop=0" << std::endl;
#endif
mHasStoppedSemaphore.set(1) ;
mShouldStopSemaphore.set(0) ;
}
RsThread::~RsThread()
{
if(isRunning())
{
RsErr() << "Deleting a thread that is still running! Something is very wrong here and Retroshare is likely to crash because of this." << std::endl;
print_stacktrace();
while(isRunning())
{
std::cerr << "." << std::endl;
rstime::rs_usleep(1000*1000);
}
}
}
bool RsThread::isRunning()
{
// do we need a mutex for this ?
int sval = mHasStoppedSemaphore.value() ;
return !sval ;
}
bool RsThread::shouldStop()
{
int sval = mShouldStopSemaphore.value() ;
return sval > 0;
}
void RsTickingThread::shutdown()
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "pqithreadstreamer::shutdown()" << std::endl;
#endif
int sval = mHasStoppedSemaphore.value() ;
if(sval > 0)
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << " thread not running. Quit." << std::endl;
#endif
return ;
}
ask_for_stop() ;
}
void RsThread::ask_for_stop()
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << " calling stop" << std::endl;
#endif
mShouldStopSemaphore.set(1);
}
void RsTickingThread::fullstop()
{
shutdown() ;
#ifdef DEBUG_THREADS
THREAD_DEBUG << " waiting stop" << std::endl;
#endif
if(pthread_equal(mTid,pthread_self()))
{
THREAD_DEBUG << "(WW) RsTickingThread::fullstop() called by same thread. This is unexpected." << std::endl;
return ;
}
mHasStoppedSemaphore.wait_no_relock(); // Wait for semaphore value to become 1, but does not decrement it when obtained.
#ifdef DEBUG_THREADS
THREAD_DEBUG << " finished!" << std::endl;
mTid = 0;
#endif
}
void RsThread::start(const std::string &threadName)
RsThread::RsThread() : mHasStopped(true), mShouldStop(false), mLastTid()
{ resetTid(); }
bool RsThread::isRunning() { return !mHasStopped; }
bool RsThread::shouldStop() { return mShouldStop; }
void RsThread::askForStop()
{
if(isRunning())
/* Call onStopRequested() only once even if askForStop() is called multiple
* times */
if(!mShouldStop.exchange(true)) onStopRequested();
}
void RsThread::wrapRun()
{
run();
resetTid();
mHasStopped = true;
}
void RsThread::fullstop()
{
askForStop();
const pthread_t callerTid = pthread_self();
if(pthread_equal(mTid, callerTid))
{
std::cerr << "(EE) RsThread \"" << threadName
<< "\" is already running. Will not start twice!"
<< std::endl;
RsErr() << __PRETTY_FUNCTION__ << " called by same thread. This should "
<< "never happen! this: " << static_cast<void*>(this)
<< std::hex << ", callerTid: " << callerTid
<< ", mTid: " << mTid << std::dec
<< ", mFullName: " << mFullName << std::endl;
print_stacktrace();
return;
}
pthread_t tid;
void *data = (void *)this ;
#ifdef DEBUG_THREADS
THREAD_DEBUG << "pqithreadstreamer::start() initing should_stop=0" << std::endl;
#endif
mShouldStopSemaphore.set(0) ;
mHasStoppedSemaphore.set(0) ;
int err ;
// pthread_create is a memory barrier
// -> the new thread will see mIsRunning() = true
if( 0 == (err=pthread_create(&tid, 0, &rsthread_init, data)))
{
mTid = tid;
// set name
if(pthread_setname_np)
{
if(!threadName.empty())
{
// thread names are restricted to 16 characters including the terminating null byte
if(threadName.length() > 15)
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "RsThread::start called with to long name '" << threadName << "' truncating..." << std::endl;
#endif
RS_pthread_setname_np(mTid, threadName.substr(0, 15).c_str());
} else {
RS_pthread_setname_np(mTid, threadName.c_str());
}
}
}
}
else
{
THREAD_DEBUG << "Fatal error: pthread_create could not create a thread. Error returned: " << err << " !!!!!!!" << std::endl;
mHasStoppedSemaphore.set(1) ;
}
// Wait for the thread being stopped
auto i = 1;
while(!mHasStopped)
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
++i;
if(!(i%5))
RsDbg() << __PRETTY_FUNCTION__ << " " << i*0.2 << " seconds passed"
<< " waiting for thread: " << std::hex << mLastTid
<< std::dec << " " << mFullName << " to stop" << std::endl;
}
}
RsTickingThread::RsTickingThread()
bool RsThread::start(const std::string& threadName)
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "RsTickingThread::RsTickingThread()" << std::endl;
#endif
}
// Atomically check if the thread was already started and set it as running
if(mHasStopped.exchange(false))
{
mShouldStop = false;
int pError = pthread_create(
&mTid, nullptr, &rsthread_init, static_cast<void*>(this) );
if(pError)
{
RsErr() << __PRETTY_FUNCTION__ << " pthread_create could not create"
<< " new thread: " << threadName << " pError: " << pError
<< std::endl;
mHasStopped = true;
print_stacktrace();
return false;
}
RsTickingThread::~RsTickingThread()
{
fullstop();
}
void RsSingleJobThread::runloop()
{
run() ;
}
/* Store an extra copy of thread id for debugging */
mLastTid = mTid;
void RsTickingThread::runloop()
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "RsTickingThread::runloop(). Setting stopped=0" << std::endl;
#endif
/* Store thread full name as PThread is not able to keep it entirely */
mFullName = threadName;
while(1)
{
if(shouldStop())
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "pqithreadstreamer::runloop(): asked to stop. setting hasStopped=1, and returning. Thread ends." << std::endl;
#endif
return ;
}
/* Set PThread thread name which is restricted to 16 characters
* including the terminating null byte */
if(pthread_setname_np && !threadName.empty())
RS_pthread_setname_np(mTid, threadName.substr(0, 15).c_str());
data_tick();
}
return true;
}
RsErr() << __PRETTY_FUNCTION__ << " attempt to start already running thread"
<< std::endl;
print_stacktrace();
return false;
}
RsQueueThread::RsQueueThread(uint32_t min, uint32_t max, double relaxFactor )
@ -278,14 +189,14 @@ RsQueueThread::RsQueueThread(uint32_t min, uint32_t max, double relaxFactor )
mLastWork = time(NULL) ;
}
void RsQueueThread::data_tick()
void RsQueueThread::threadTick()
{
bool doneWork = false;
while(workQueued() && doWork())
{
doneWork = true;
}
rstime_t now = time(NULL);
time_t now = time(NULL);
if (doneWork)
{
mLastWork = now;
@ -310,94 +221,38 @@ void RsQueueThread::data_tick()
THREAD_DEBUG << "RsQueueThread::data_tick() no work: sleeping for: " << mLastSleep << " ms" << std::endl;
#endif
}
rstime::rs_usleep(mLastSleep * 1000); // mLastSleep msec
std::this_thread::sleep_for(std::chrono::milliseconds(mLastSleep));
}
void RsMutex::unlock()
{
#ifdef RSTHREAD_SELF_LOCKING_GUARD
if(--_cnt == 0)
{
#endif
_thread_id = 0 ;
pthread_mutex_unlock(&realMutex);
#ifdef RSTHREAD_SELF_LOCKING_GUARD
}
#endif
{
_thread_id = 0;
pthread_mutex_unlock(&realMutex);
}
void RsMutex::lock()
{
#ifdef RSMUTEX_DEBUG
pthread_t owner = _thread_id ;
#endif
int retval = 0;
#ifdef RSTHREAD_SELF_LOCKING_GUARD
if(!trylock())
if(!pthread_equal(_thread_id,pthread_self()))
#endif
retval = pthread_mutex_lock(&realMutex);
switch(retval)
int err = pthread_mutex_lock(&realMutex);
if( err != 0)
{
case 0:
break;
RsErr() << __PRETTY_FUNCTION__ << "pthread_mutex_lock returned: "
<< rsErrnoName(err)
#ifdef RSMUTEX_DEBUG
<< " name: " << name
#endif
<< std::endl;
case EINVAL:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EINVAL";
std::cerr << std::endl;
break;
case EBUSY:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EBUSY";
std::cerr << std::endl;
break;
case EAGAIN:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EAGAIN";
std::cerr << std::endl;
break;
case EDEADLK:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EDEADLK";
std::cerr << std::endl;
break;
case EPERM:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EPERM";
std::cerr << std::endl;
break;
print_stacktrace();
default:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned UNKNOWN ERROR";
std::cerr << std::endl;
break;
}
/* Here is some debugging code - to catch failed locking attempts.
* Major bug is it is ever triggered.
*/
#ifdef RSMUTEX_ABORT
if (retval != 0)
{
#ifdef RSMUTEX_DEBUG
std::cerr << "RsMutex::lock() name: " << name << std::endl;
#endif
std::cerr << "RsMutex::lock() pthread_mutex_lock returned an Error. Aborting()";
std::cerr << std::endl;
abort();
#endif
}
#endif
_thread_id = pthread_self() ;
#ifdef RSTHREAD_SELF_LOCKING_GUARD
++_cnt ;
#endif
_thread_id = pthread_self();
}
#ifdef RSMUTEX_DEBUG
double RsStackMutex::getCurrentTS()
{
@ -416,3 +271,18 @@ double RsStackMutex::getCurrentTS()
#endif
RsThread::~RsThread()
{
if(!mHasStopped)
{
RsErr() << __PRETTY_FUNCTION__ << " deleting thread: " << mLastTid
<< " " << mFullName << " that is still "
<< "running! Something seems very wrong here and RetroShare is "
<< "likely to crash because of this." << std::endl;
print_stacktrace();
fullstop();
}
}
RsQueueThread::~RsQueueThread() = default;

View file

@ -3,7 +3,9 @@
* *
* libretroshare: retroshare core library *
* *
* Copyright 2004-2006 by Robert Fernie <retroshare@lunamutt.com> *
* Copyright (C) 2004-2006 Robert Fernie <retroshare@lunamutt.com> *
* Copyright (C) 2016-2020 Gioacchino Mazzurco <gio@eigenlab.org> *
* Copyright (C) 2019-2020 Asociación Civil Altermundi <info@altermundi.net> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
@ -26,243 +28,199 @@
#include <string>
#include <iostream>
#include <unistd.h>
#include <semaphore.h>
#include <atomic>
#include <thread>
#include <functional>
#include <util/rsmemory.h>
#include "util/rstime.h"
#include "util/rsmemory.h"
#include "util/rsdeprecate.h"
/* RsIface Thread Wrappers */
#undef RSTHREAD_SELF_LOCKING_GUARD
//#define RSMUTEX_DEBUG 300 // Milliseconds for print in the stderr
//#define RSMUTEX_DEBUG
//#define RSMUTEX_DEBUG
/**
* @brief Provide mutexes that keep track of the owner. Based on pthread mutex.
*/
class RsMutex
{
public:
public:
RsMutex(const std::string& name)
{
/* remove unused parameter warnings */
pthread_mutex_init(&realMutex, NULL);
_thread_id = 0 ;
RsMutex(const std::string& name) : _thread_id(0)
#ifdef RSMUTEX_DEBUG
this->_name = name;
#else
(void) name;
, _name(name)
#endif
{
pthread_mutex_init(&realMutex, nullptr);
#ifndef RSMUTEX_DEBUG
(void) name; // remove unused parameter warnings
#endif
}
~RsMutex()
{
pthread_mutex_destroy(&realMutex);
}
inline const pthread_t& owner() const { return _thread_id ; }
#ifdef RSMUTEX_DEBUG
void setName(const std::string &name)
{
this->_name = name;
}
#endif
~RsMutex() { pthread_mutex_destroy(&realMutex); }
void lock();
void unlock();
bool trylock() { return (0 == pthread_mutex_trylock(&realMutex)); }
inline const pthread_t& owner() const { return _thread_id; }
void lock();
void unlock();
bool trylock() { return (0 == pthread_mutex_trylock(&realMutex)); }
#ifdef RSMUTEX_DEBUG
const std::string& name() const { return _name ; }
#endif
private:
pthread_mutex_t realMutex;
pthread_t _thread_id ;
#ifdef RSTHREAD_SELF_LOCKING_GUARD
uint32_t _cnt ;
#endif
private:
pthread_mutex_t realMutex;
pthread_t _thread_id;
#ifdef RSMUTEX_DEBUG
std::string _name;
std::string _name;
#endif
};
/**
* @def RS_STACK_MUTEX(m)
* This macro allows you to trace which mutex in the code is locked and for how
* much time. You can use this as follows:
* @code
* {
* RS_STACK_MUTEX(myMutex);
* do_something();
* }
* @endcode
*/
#define RS_STACK_MUTEX(m) \
RsStackMutex __local_retroshare_stack_mutex_##m( \
m, __PRETTY_FUNCTION__, __FILE__, __LINE__ )
/**
* Provide mutexes that automatically lock/unlock on creation/destruction and
* have powerfull debugging facilities (if RSMUTEX_DEBUG is defined at
* compiletime).
* In most of the cases you should not use this directly instead
* @see RS_STACK_MUTEX(m)
*/
class RsStackMutex
{
public:
RsStackMutex(RsMutex &mtx)
: mMtx(mtx)
{
mMtx.lock();
#ifdef RSMUTEX_DEBUG
double ts = getCurrentTS() ;
_time_stamp = ts ;
_lineno = 0 ;
_info = "[no info]" ;
#endif
}
RsStackMutex(RsMutex &mtx,const char *function_name,const char *file_name,int lineno)
: mMtx(mtx)
#ifdef RSMUTEX_DEBUG
, _info(std::string(function_name)+" in file "+file_name),_lineno(lineno)
#endif
{
#ifdef RSMUTEX_DEBUG
double ts = getCurrentTS() ;
_time_stamp = ts ;
pthread_t owner = mMtx.owner() ;
#else
/* remove unused parameter warnings */
(void) function_name;
(void) file_name;
(void) lineno;
#endif
mMtx.lock();
#ifdef RSMUTEX_DEBUG
ts = getCurrentTS() ;
if(ts - _time_stamp > 1.0)
std::cerr << "Mutex " << (void*)&mMtx << " \"" << mtx.name() << "\""
<< " waited for " << ts - _time_stamp
<< " seconds in thread " << pthread_self()
<< " for locked thread " << owner << ". in " << _info << ":" << _lineno << std::endl;
_time_stamp = ts ; // This is to re-init the locking time without accounting for how much we waited.
#endif
}
~RsStackMutex()
{
mMtx.unlock();
#ifdef RSMUTEX_DEBUG
double ts = getCurrentTS() ;
if(ts - _time_stamp > 1.0)
std::cerr << "Mutex " << (void*)&mMtx << " \"" << mMtx.name() << "\""
<< " locked for " << ts - _time_stamp
<< " seconds in thread " << pthread_self()
<< ". in " << _info << ":" << _lineno << std::endl;
#endif
}
private:
RsMutex &mMtx;
#ifdef RSMUTEX_DEBUG
static double getCurrentTS() ;
double _time_stamp ;
std::string _info ;
int _lineno ;
#endif
};
// This macro allows you to trace which mutex in the code is locked for how much time.
// se this as follows:
//
// {
// RS_STACK_MUTEX(myMutex) ;
//
// do_something() ;
// }
//
#define RS_STACK_MUTEX(m) RsStackMutex __local_retroshare_mutex(m,__PRETTY_FUNCTION__,__FILE__,__LINE__)
// This class handles a Mutex-based semaphore, that makes it cross plateform.
class RsSemaphore
{
class RsSemStruct
{
public:
RsSemStruct() : mtx("Semaphore mutex"), val(0) {}
RsMutex mtx ;
uint32_t val ;
};
public:
RsSemaphore()
{
s = new RsSemStruct ;
}
~RsSemaphore()
{
delete s ;
}
RsStackMutex(RsMutex &mtx) : mMtx(mtx)
{
mMtx.lock();
#ifdef RSMUTEX_DEBUG
double ts = getCurrentTS();
_time_stamp = ts;
_lineno = 0;
_info = "[no info]";
#endif
}
void set(uint32_t i)
{
RS_STACK_MUTEX(s->mtx) ;
s->val = i ;
}
RsStackMutex(RsMutex &mtx, const char *function_name, const char *file_name,
int lineno) : mMtx(mtx)
#ifdef RSMUTEX_DEBUG
, _info(std::string(function_name)+" in file "+file_name), _lineno(lineno)
#endif
{
#ifdef RSMUTEX_DEBUG
double ts = getCurrentTS();
_time_stamp = ts;
pthread_t owner = mMtx.owner();
#else
// remove unused parameter warnings
(void) function_name; (void) file_name; (void) lineno;
#endif
void post()
{
RS_STACK_MUTEX(s->mtx) ;
++(s->val) ;
}
mMtx.lock();
uint32_t value()
{
RS_STACK_MUTEX(s->mtx) ;
return s->val ;
}
#ifdef RSMUTEX_DEBUG
ts = getCurrentTS();
// waits but does not re-locks the semaphore
void wait_no_relock()
{
static const uint32_t max_waiting_time_before_warning=1000 *5 ; // 5 secs
uint32_t tries=0;
if(ts - _time_stamp > 1.0)
std::cerr << "Mutex " << (void*)&mMtx << " \"" << mtx.name() << "\""
<< " waited for " << ts - _time_stamp
<< " seconds in thread " << pthread_self()
<< " for locked thread " << owner << ". in " << _info
<< ":" << _lineno << std::endl;
_time_stamp = ts ; // This is to re-init the locking time without accounting for how much we waited.
#endif
}
while(true)
{
usleep(1000) ;
if(++tries >= max_waiting_time_before_warning)
std::cerr << "(EE) Semaphore waiting for too long. Something is probably wrong in the code." << std::endl;
~RsStackMutex()
{
mMtx.unlock();
RS_STACK_MUTEX(s->mtx) ;
if(s->val > 0)
return ;
}
#ifdef RSMUTEX_DEBUG
double ts = getCurrentTS();
if(ts - _time_stamp > 1.0)
std::cerr << "Mutex " << (void*)&mMtx << " \"" << mMtx.name()
<< "\"" << " locked for " << ts - _time_stamp
<< " seconds in thread " << pthread_self()
<< ". in " << _info << ":" << _lineno << std::endl;
#endif
}
}
private:
RsSemStruct *s ;
RsMutex &mMtx;
#ifdef RSMUTEX_DEBUG
static double getCurrentTS();
double _time_stamp;
std::string _info;
int _lineno;
#endif
};
class RsThread;
/* to create a thread! */
pthread_t createThread(RsThread &thread);
/// @brief Offer basic threading functionalities.
class RsThread
{
public:
RsThread();
virtual ~RsThread() ;
void start(const std::string &threadName = "");
// Returns true if the thread is still running.
bool isRunning();
// Returns true if the thread received a stopping order and hasn't yet stopped.
bool shouldStop();
// Can be called to set the stopping flags. The stop will not be handled
// by RsThread itself, but in subclasses. If you derive your own subclass,
// you need to call shouldStop() in order to check for a possible stopping order.
void ask_for_stop();
RsThread();
virtual ~RsThread();
/**
* Execute given function on another thread without blocking the caller
* @brief start the thread and call run() on it.
* @param threadName string containing the name of the thread used for
* debugging purposes, @note inside PThread it is
* truncated to 16 characters including \0 at the end of
* the string.
* @return false on error, true otherwise
*/
bool start(const std::string& threadName = "");
/**
* @brief Check if thread is running.
* @return true if the thread is still running, false otherwise.
*/
bool isRunning();
/**
* @brief Check if the thread should stop.
* Expecially useful for subclasses which implement a @see run() method
* which may take lot of time before returning when not asked, to check if
* stop has been requested and therefore interrupting the execution ASAP
* returning in a coherent state.
* @return true if the thread received a stopping order.
*/
bool shouldStop();
/**
* @brief Asyncronously ask the thread to stop.
* The real stop will happen when the @see run() method finish.
*/
void askForStop();
/**
* Call @see askForStop() then wait it has really stopped before returning.
* It must not be called in the same thread, as it would not wait for the
* effective stop to occur as it would cause a deadlock.
*/
void fullstop();
/**
* Execute given function on a detached thread without blocking the caller
* execution.
* This can be generalized with variadic template, ATM it is enough to wrap
* any kind of function call or job into a lambda which get no paramethers
@ -272,60 +230,94 @@ public:
static void async(const std::function<void()>& fn)
{ std::thread(fn).detach(); }
/** @return RsThread full name */
const std::string& threadName() { return mFullName; }
protected:
virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */
void go() ; // this one calls runloop and also sets the flags correctly when the thread is finished running.
/**
* This method must be implemented by sublasses, will be called once the
* thread is started. Should return on request, use @see shouldStop() to
* check if stop has been requested.
*/
virtual void run() = 0;
RsSemaphore mHasStoppedSemaphore;
RsSemaphore mShouldStopSemaphore;
/**
* This method is meant to be overridden by subclasses with long running
* @see run() method and is executed asyncronously when @see askForStop()
* is called, any task necessary to stop the thread (aka inducing @see run()
* to return in a coherent state) should be done in the overridden version
* of this method, @see JsonApiServer for an usage example. */
virtual void onStopRequested() {}
static void *rsthread_init(void*) ;
pthread_t mTid;
private:
/** Call @see run() setting the appropriate flags around it*/
void wrapRun();
/// True if thread is stopped, false otherwise
std::atomic<bool> mHasStopped;
/// True if stop has been requested
std::atomic<bool> mShouldStop;
/// Passed as argument for pthread_create(), call start()
static void *rsthread_init(void*);
/// Store the id of the corresponding pthread
pthread_t mTid;
void resetTid();
/** Store thread full name for debugging because PThread is limited to 15
* char thread names */
std::string mFullName;
/** Store a copy of thread id which is never reset to 0 after initialization
* due to RsThread functioning. After RsThread initialization this member is
* only re-written with a new tread id in start(...).
* This is useful for debugging because mTid is reset at the end of wrapRun
* and that might happens concurrently (or just before) a debug message
* being printed, thus causing the debug message to print a mangled value.*/
pthread_t mLastTid;
};
/**
* Provide a detached execution loop that continuously call data_tick() once the
* thread is started
*/
class RsTickingThread: public RsThread
{
public:
RsTickingThread();
virtual ~RsTickingThread();
void shutdown();
void fullstop();
void join() { fullstop() ; } // used for compatibility
virtual void data_tick() =0;
/**
* Subclasses must implement this method, it will be called in a loop once
* the thread is started, so repetitive work (like checking if data is
* available on a socket) should be done here, at the end of this method
* sleep_for(...) or similar function should be called or the CPU will
* be used as much as possible also if there is nothing to do.
*/
virtual void threadTick() = 0;
private:
virtual void runloop() ; /* called once the thread is started. Should be overloaded by subclasses. */
};
class RsSingleJobThread: public RsThread
{
public:
virtual void run() =0;
protected:
virtual void runloop() ;
/// Implement the run loop and continuously call threadTick() in it
void run() override { while(!shouldStop()) threadTick(); }
};
// TODO: Used just one time, is this really an useful abstraction?
class RsQueueThread: public RsTickingThread
{
public:
RsQueueThread(uint32_t min, uint32_t max, double relaxFactor );
virtual ~RsQueueThread() { return; }
RsQueueThread(uint32_t min, uint32_t max, double relaxFactor);
~RsQueueThread() override;
protected:
virtual bool workQueued() = 0;
virtual bool doWork() = 0;
virtual bool workQueued() = 0;
virtual bool doWork() = 0;
virtual void data_tick() ;
void threadTick() override; /// @see RsTickingThread
private:
uint32_t mMinSleep; /* ms */
uint32_t mMaxSleep; /* ms */
uint32_t mLastSleep; /* ms */
rstime_t mLastWork; /* secs */
time_t mLastWork; /* secs */
float mRelaxFactor;
};