Cleanup RsThread and related classes

Removed antipattern abstraction RsSingleJobThread
Rename runloop() method to run() in RsThread
Ported few classes ineriting from RsSingleJobThread to RsThread
RsThread use std::atomic instead of self implemented strange binary semaphores
Removed RsTickingThread::shutdown() use RsThread::askForStop() instead
Removed RsTickingThread::fullstop() use RsThread::fullstop() instead
Stop properly JSON API server in retroshare-gui
Centralize errno traslation to literal in util/rserrno.*
This commit is contained in:
Gioacchino Mazzurco 2019-10-31 18:23:38 +01:00
parent 358aa1e0ab
commit df87fe53b1
No known key found for this signature in database
GPG key ID: A1FBCA3872E87051
43 changed files with 490 additions and 587 deletions

View file

@ -51,18 +51,13 @@ bool LocalDirectoryUpdater::isEnabled() const
} }
void LocalDirectoryUpdater::setEnabled(bool b) void LocalDirectoryUpdater::setEnabled(bool b)
{ {
if(mIsEnabled == b) if(mIsEnabled == b) return;
return ; if(!b) RsThread::askForStop();
else if(!RsThread::isRunning()) start("fs dir updater");
if(!b)
shutdown();
else if(!isRunning())
start("fs dir updater") ;
mIsEnabled = b ; mIsEnabled = b ;
} }
void LocalDirectoryUpdater::data_tick() void LocalDirectoryUpdater::threadTick()
{ {
rstime_t now = time(NULL) ; rstime_t now = time(NULL) ;

View file

@ -62,7 +62,7 @@ public:
bool ignoreDuplicates() const; bool ignoreDuplicates() const;
protected: protected:
virtual void data_tick() ; void threadTick() override; /// @see RsTickingThread
virtual void hash_callback(uint32_t client_param, const std::string& name, const RsFileHash& hash, uint64_t size); virtual void hash_callback(uint32_t client_param, const std::string& name, const RsFileHash& hash, uint64_t size);
virtual bool hash_confirm(uint32_t client_param) ; virtual bool hash_confirm(uint32_t client_param) ;

View file

@ -87,7 +87,7 @@ static std::string friendlyUnit(uint64_t val)
return std::string(buf) + " TB"; return std::string(buf) + " TB";
} }
void HashStorage::data_tick() void HashStorage::threadTick()
{ {
FileHashJob job; FileHashJob job;
RsFileHash hash; RsFileHash hash;
@ -318,14 +318,15 @@ void HashStorage::startHashThread()
void HashStorage::stopHashThread() void HashStorage::stopHashThread()
{ {
if (mRunning) if(mRunning)
{ {
std::cerr << "Stopping hashing thread." << std::endl; RsInfo() << __PRETTY_FUNCTION__ << "Stopping hashing thread."
shutdown(); << std::endl;
RsThread::askForStop();
mRunning = false ; mRunning = false ;
mTotalSizeToHash = 0; mTotalSizeToHash = 0;
mTotalFilesToHash = 0; mTotalFilesToHash = 0;
std::cerr << "done." << std::endl;
} }
} }

View file

@ -85,9 +85,7 @@ public:
void togglePauseHashingProcess() ; void togglePauseHashingProcess() ;
bool hashingProcessPaused(); bool hashingProcessPaused();
// Functions called by the thread void threadTick() override; /// @see RsTickingThread
virtual void data_tick() ;
friend std::ostream& operator<<(std::ostream& o,const HashStorageInfo& info) ; friend std::ostream& operator<<(std::ostream& o,const HashStorageInfo& info) ;
private: private:

View file

@ -209,7 +209,7 @@ void ftController::removeFileSource(const RsFileHash& hash,const RsPeerId& peer_
std::cerr << "... not added: hash not found." << std::endl ; std::cerr << "... not added: hash not found." << std::endl ;
#endif #endif
} }
void ftController::data_tick() void ftController::threadTick()
{ {
/* check the queues */ /* check the queues */

View file

@ -109,9 +109,10 @@ class ftPendingRequest
}; };
class ftController: public RsTickingThread, public pqiServiceMonitor, public p3Config class ftController:
public RsTickingThread, public pqiServiceMonitor, public p3Config
{ {
public: public:
/* Setup */ /* Setup */
ftController(ftDataMultiplex *dm, p3ServiceControl *sc, uint32_t ftServiceId); ftController(ftDataMultiplex *dm, p3ServiceControl *sc, uint32_t ftServiceId);
@ -122,7 +123,7 @@ class ftController: public RsTickingThread, public pqiServiceMonitor, public p3C
bool activate(); bool activate();
bool isActiveAndNoPending(); bool isActiveAndNoPending();
virtual void data_tick(); void threadTick() override; /// @see RsTickingThread
/***************************************************************/ /***************************************************************/
/********************** Controller Access **********************/ /********************** Controller Access **********************/

View file

@ -47,7 +47,7 @@ ftExtraList::ftExtraList()
} }
void ftExtraList::data_tick() void ftExtraList::threadTick()
{ {
bool todo = false; bool todo = false;
rstime_t now = time(NULL); rstime_t now = time(NULL);

View file

@ -146,10 +146,7 @@ public:
*/ */
void getExtraFileList(std::vector<FileInfo>& files) const ; void getExtraFileList(std::vector<FileInfo>& files) const ;
/*** void threadTick() override; /// @see RsTickingThread
* Thread Main Loop
**/
virtual void data_tick();
/*** /***
* Configuration - store extra files. * Configuration - store extra files.

View file

@ -234,28 +234,28 @@ void ftServer::StartupThreads()
void ftServer::StopThreads() void ftServer::StopThreads()
{ {
/* stop Dataplex */ /* stop Dataplex */
mFtDataplex->join(); mFtDataplex->fullstop();
/* stop Controller thread */ /* stop Controller thread */
mFtController->join(); mFtController->fullstop();
/* self contained threads */ /* self contained threads */
/* stop ExtraList Thread */ /* stop ExtraList Thread */
mFtExtra->join(); mFtExtra->fullstop();
delete (mFtDataplex); delete (mFtDataplex);
mFtDataplex = NULL; mFtDataplex = nullptr;
delete (mFtController); delete (mFtController);
mFtController = NULL; mFtController = nullptr;
delete (mFtExtra); delete (mFtExtra);
mFtExtra = NULL; mFtExtra = nullptr;
/* stop Monitor Thread */ /* stop Monitor Thread */
mFileDatabase->stopThreads(); mFileDatabase->stopThreads();
delete mFileDatabase; delete mFileDatabase;
mFileDatabase = NULL ; mFileDatabase = nullptr;
} }
/***************************************************************/ /***************************************************************/

View file

@ -544,7 +544,7 @@ bool ftTransferModule::isCheckingHash()
return mFlag == FT_TM_FLAG_CHECKING || mFlag == FT_TM_FLAG_CHUNK_CRC; return mFlag == FT_TM_FLAG_CHECKING || mFlag == FT_TM_FLAG_CHUNK_CRC;
} }
class HashThread: public RsSingleJobThread class HashThread: public RsThread
{ {
public: public:
explicit HashThread(ftFileCreator *m) explicit HashThread(ftFileCreator *m)

View file

@ -132,7 +132,7 @@ bool RsGenExchange::getGroupServerUpdateTS(const RsGxsGroupId& gid, rstime_t& gr
return mNetService->getGroupServerUpdateTS(gid,grp_server_update_TS,msg_server_update_TS) ; return mNetService->getGroupServerUpdateTS(gid,grp_server_update_TS,msg_server_update_TS) ;
} }
void RsGenExchange::data_tick() void RsGenExchange::threadTick()
{ {
static const double timeDelta = 0.1; // slow tick in sec static const double timeDelta = 0.1; // slow tick in sec

View file

@ -176,7 +176,7 @@ public:
*/ */
RsTokenService* getTokenService(); RsTokenService* getTokenService();
virtual void data_tick(); void threadTick() override; /// @see RsTickingThread
/*! /*!
* Policy bit pattern portion * Policy bit pattern portion

View file

@ -1981,7 +1981,7 @@ bool RsGxsNetService::locked_processTransac(RsNxsTransacItem *item)
return false; return false;
} }
void RsGxsNetService::data_tick() void RsGxsNetService::threadTick()
{ {
static const double timeDelta = 0.5; static const double timeDelta = 0.5;

View file

@ -76,7 +76,8 @@ struct GroupRequestRecord
* Incoming transaction are in 3 different states * Incoming transaction are in 3 different states
* 1. START 2. RECEIVING 3. END * 1. START 2. RECEIVING 3. END
*/ */
class RsGxsNetService : public RsNetworkExchangeService, public p3ThreadedService, public p3Config class RsGxsNetService :
public RsNetworkExchangeService, public p3ThreadedService, public p3Config
{ {
public: public:
@ -207,10 +208,8 @@ public:
*/ */
int tick(); int tick();
/*! void threadTick() override; /// @see RsTickingThread
* Processes transactions and job queue
*/
virtual void data_tick();
private: private:
/*! /*!

View file

@ -726,7 +726,7 @@ void RsGxsNetTunnelService::generateEncryptionKey(const RsGxsGroupId& group_id,c
// Service parts // // Service parts //
//===========================================================================================================================================// //===========================================================================================================================================//
void RsGxsNetTunnelService::data_tick() void RsGxsNetTunnelService::threadTick()
{ {
while(!mPendingTurtleItems.empty()) while(!mPendingTurtleItems.empty())
{ {

View file

@ -103,7 +103,9 @@
class RsGxsNetTunnelItem ; class RsGxsNetTunnelItem ;
class RsNetworkExchangeService ; class RsNetworkExchangeService ;
class RsGxsNetTunnelService: public RsTurtleClientService, public RsTickingThread, public p3Config, public RsGxsDistSync class RsGxsNetTunnelService:
public RsTurtleClientService, public RsTickingThread, public p3Config,
public RsGxsDistSync
{ {
public: public:
RsGxsNetTunnelService() ; RsGxsNetTunnelService() ;
@ -196,9 +198,7 @@ public:
virtual bool receiveSearchRequest(unsigned char *search_request_data, uint32_t search_request_data_len, unsigned char *& search_result_data, uint32_t& search_result_data_len, uint32_t &max_allowed_hits); virtual bool receiveSearchRequest(unsigned char *search_request_data, uint32_t search_request_data_len, unsigned char *& search_result_data, uint32_t& search_result_data_len, uint32_t &max_allowed_hits);
virtual void receiveSearchResult(TurtleSearchRequestId request_id,unsigned char *search_result_data,uint32_t search_result_data_len); virtual void receiveSearchResult(TurtleSearchRequestId request_id,unsigned char *search_result_data,uint32_t search_result_data_len);
// Overloaded from RsTickingThread void threadTick() override; /// @see RsTickingThread
void data_tick() ;
// Overloads p3Config // Overloads p3Config

View file

@ -183,7 +183,7 @@ private:
* Checks the integrity message and groups * Checks the integrity message and groups
* in rsDataService using computed hash * in rsDataService using computed hash
*/ */
class RsGxsIntegrityCheck : public RsSingleJobThread class RsGxsIntegrityCheck : public RsThread
{ {
enum CheckState { CheckStart, CheckChecking }; enum CheckState { CheckStart, CheckChecking };

View file

@ -434,8 +434,7 @@ void p3GxsTrans::GxsTransIntegrityCleanupThread::run()
totalMessageSizeAndCount[msg->metaData->mAuthorId].count++; totalMessageSizeAndCount[msg->metaData->mAuthorId].count++;
delete msg; delete msg;
if(item != NULL) delete item;
delete item ;
} }
} }

View file

@ -288,15 +288,15 @@ private:
void notifyClientService(const OutgoingRecord& pr); void notifyClientService(const OutgoingRecord& pr);
/*! /// Checks the integrity message and groups
* Checks the integrity message and groups class GxsTransIntegrityCleanupThread : public RsThread
*/
class GxsTransIntegrityCleanupThread : public RsSingleJobThread
{ {
enum CheckState { CheckStart, CheckChecking }; enum CheckState { CheckStart, CheckChecking };
public: public:
explicit GxsTransIntegrityCleanupThread(RsGeneralDataService *const dataService): mDs(dataService),mMtx("GxsTransIntegrityCheck") { mDone=false;} explicit GxsTransIntegrityCleanupThread(
RsGeneralDataService* const dataService ):
mDs(dataService), mMtx("GxsTransIntegrityCheck"), mDone(false) {}
bool isDone(); bool isDone();
void run(); void run();
@ -312,7 +312,7 @@ private:
GxsMsgReq mMsgToDel ; GxsMsgReq mMsgToDel ;
std::map<RsGxsId,MsgSizeCount> total_message_size_and_count; std::map<RsGxsId,MsgSizeCount> total_message_size_and_count;
bool mDone ; bool mDone;
}; };
// Overloaded from RsGenExchange. // Overloaded from RsGenExchange.

View file

@ -140,6 +140,9 @@ public:
callback ); callback );
private: private:
/// @see RsThread
void run() override;
/// @see p3Config::setupSerialiser /// @see p3Config::setupSerialiser
RsSerialiser* setupSerialiser() override; RsSerialiser* setupSerialiser() override;

View file

@ -495,7 +495,8 @@ HEADERS += util/folderiterator.h \
util/rsdeprecate.h \ util/rsdeprecate.h \
util/cxx11retrocompat.h \ util/cxx11retrocompat.h \
util/cxx17retrocompat.h \ util/cxx17retrocompat.h \
util/rsurl.h util/rsurl.h \
util/rserrno.h
SOURCES += ft/ftchunkmap.cc \ SOURCES += ft/ftchunkmap.cc \
ft/ftcontroller.cc \ ft/ftcontroller.cc \
@ -634,7 +635,8 @@ SOURCES += util/folderiterator.cc \
util/rstickevent.cc \ util/rstickevent.cc \
util/rsrecogn.cc \ util/rsrecogn.cc \
util/rstime.cc \ util/rstime.cc \
util/rsurl.cc util/rsurl.cc \
util/rserrno.cc
equals(RS_UPNP_LIB, miniupnpc) { equals(RS_UPNP_LIB, miniupnpc) {
HEADERS += rs_upnp/upnputil.h rs_upnp/upnphandler_miniupnp.h HEADERS += rs_upnp/upnputil.h rs_upnp/upnphandler_miniupnp.h

View file

@ -116,11 +116,11 @@ void AuthGPG::init(
void AuthGPG::exit() void AuthGPG::exit()
{ {
if(_instance != NULL) if(_instance)
{ {
_instance->join(); _instance->fullstop();
delete _instance ; delete _instance;
_instance = NULL; _instance = nullptr;
} }
} }
@ -188,7 +188,7 @@ int AuthGPG::GPGInit(const RsPgpId &ownId)
{ {
} }
void AuthGPG::data_tick() void AuthGPG::threadTick()
{ {
rstime::rs_usleep(100 * 1000); //100 msec rstime::rs_usleep(100 * 1000); //100 msec

View file

@ -239,7 +239,7 @@ public:
virtual bool loadList(std::list<RsItem *>& load); virtual bool loadList(std::list<RsItem *>& load);
/*****************************************************************/ /*****************************************************************/
private: private:
// Gets the certificate pointer and returns NULL if the string is invalid, or the // Gets the certificate pointer and returns NULL if the string is invalid, or the
// cert was not found. // cert was not found.
// //
@ -271,10 +271,9 @@ public:
bool printAllKeys_locked(); bool printAllKeys_locked();
bool printOwnKeys_locked(); bool printOwnKeys_locked();
/* own thread */ void threadTick() override; /// @see RsTickingThread
virtual void data_tick();
private: private:
static AuthGPG *instance_gpg; // pointeur vers le singleton static AuthGPG *instance_gpg; // pointeur vers le singleton

View file

@ -366,9 +366,9 @@ int pqiperson::handleNotifyEvent_locked(NetInterface *ni, int newState,
<< " CONNECT_FAILED->marking so!" << std::endl; << " CONNECT_FAILED->marking so!" << std::endl;
#endif #endif
activepqi->shutdown(); // STOP THREAD. activepqi->askForStop(); // STOP THREAD.
active = false; active = false;
activepqi = NULL; activepqi = nullptr;
} }
#ifdef PERSON_DEBUG #ifdef PERSON_DEBUG
else else
@ -406,7 +406,7 @@ int pqiperson::reset_locked()
std::map<uint32_t, pqiconnect *>::iterator it; std::map<uint32_t, pqiconnect *>::iterator it;
for(it = kids.begin(); it != kids.end(); ++it) for(it = kids.begin(); it != kids.end(); ++it)
{ {
(it->second) -> shutdown(); // STOP THREAD. it->second->askForStop(); // STOP THREAD.
(it->second) -> reset(); (it->second) -> reset();
} }

View file

@ -49,7 +49,7 @@ int pqithreadstreamer::tick()
return 0; return 0;
} }
void pqithreadstreamer::data_tick() void pqithreadstreamer::threadTick()
{ {
uint32_t recv_timeout = 0; uint32_t recv_timeout = 0;
uint32_t sleep_period = 0; uint32_t sleep_period = 0;

View file

@ -35,7 +35,7 @@ public:
virtual int tick(); virtual int tick();
protected: protected:
virtual void data_tick(); void threadTick() override; /// @see RsTickingThread
PQInterface *mParent; PQInterface *mParent;
uint32_t mTimeout; uint32_t mTimeout;

View file

@ -23,10 +23,13 @@
#include <memory> #include <memory>
#include <cstdint> #include <cstdint>
#include <chrono>
#include <functional>
#include "util/rsmemory.h" #include "util/rsmemory.h"
#include "serialiser/rsserializable.h" #include "serialiser/rsserializable.h"
#include "serialiser/rstypeserializer.h" #include "serialiser/rstypeserializer.h"
#include "util/rstime.h"
class RsEvents; class RsEvents;

View file

@ -34,6 +34,7 @@
#include "util/rsstring.h" #include "util/rsstring.h"
#include "rs_upnp/upnp18_retrocompat.h" #include "rs_upnp/upnp18_retrocompat.h"
#include "util/rstime.h"
#ifdef __GNUC__ #ifdef __GNUC__
#if __GNUC__ >= 4 #if __GNUC__ >= 4

View file

@ -139,7 +139,7 @@ RsServer::~RsServer()
/* Thread Fn: Run the Core */ /* Thread Fn: Run the Core */
void RsServer::data_tick() void RsServer::threadTick()
{ {
rstime::rs_usleep(mTimeDelta * 1000000); rstime::rs_usleep(mTimeDelta * 1000000);

View file

@ -88,9 +88,7 @@ public:
void setShutdownCallback(const std::function<void(int)>& callback) void setShutdownCallback(const std::function<void(int)>& callback)
{ mShutdownCallback = callback; } { mShutdownCallback = callback; }
void threadTick() override; /// @see RsTickingThread
/* Thread Fn: Run the Core */
virtual void data_tick();
/* locking stuff */ /* locking stuff */
void lockRsCore() void lockRsCore()

View file

@ -280,7 +280,7 @@ bool inline isTunnelActiveError(const std::string &answer) {
return answer.compare(0, 22, "ERROR tunnel is active") == 0; return answer.compare(0, 22, "ERROR tunnel is active") == 0;
} }
void p3I2pBob::data_tick() void p3I2pBob::threadTick()
{ {
int sleepTime = 0; int sleepTime = 0;
{ {

View file

@ -205,9 +205,7 @@ public:
static std::string keyToBase32Addr(const std::string &key); static std::string keyToBase32Addr(const std::string &key);
// RsTickingThread interface void threadTick() override; /// @see RsTickingThread
public:
void data_tick();
private: private:
int stateMachineBOB(); int stateMachineBOB();

View file

@ -139,7 +139,7 @@ void BroadcastDiscoveryService::updatePublishedData()
BroadcastDiscoveryPack::fromPeerDetails(od).serializeToString()); BroadcastDiscoveryPack::fromPeerDetails(od).serializeToString());
} }
void BroadcastDiscoveryService::data_tick() void BroadcastDiscoveryService::threadTick()
{ {
auto nextRunAt = std::chrono::system_clock::now() + std::chrono::seconds(5); auto nextRunAt = std::chrono::system_clock::now() + std::chrono::seconds(5);

View file

@ -57,8 +57,7 @@ public:
/// @see RsBroadcastDiscovery /// @see RsBroadcastDiscovery
bool disableMulticastListening() override; bool disableMulticastListening() override;
/// @see RsTickingThread void threadTick() override; /// @see RsTickingThread
void data_tick() override;
protected: protected:
constexpr static uint16_t port = 36405; constexpr static uint16_t port = 36405;

View file

@ -21,6 +21,7 @@
*******************************************************************************/ *******************************************************************************/
#include <string> #include <string>
#include <thread>
#include "services/rseventsservice.h" #include "services/rseventsservice.h"
@ -117,7 +118,7 @@ bool RsEventsService::unregisterEventsHandler(RsEventsHandlerId_t hId)
return true; return true;
} }
void RsEventsService::data_tick() void RsEventsService::threadTick()
{ {
auto nextRunAt = std::chrono::system_clock::now() + auto nextRunAt = std::chrono::system_clock::now() +
std::chrono::milliseconds(200); std::chrono::milliseconds(200);

View file

@ -71,8 +71,7 @@ protected:
RsMutex mEventQueueMtx; RsMutex mEventQueueMtx;
std::deque< std::shared_ptr<const RsEvent> > mEventQueue; std::deque< std::shared_ptr<const RsEvent> > mEventQueue;
/// @see RsTickingThread void threadTick() override; /// @see RsTickingThread
void data_tick() override;
void handleEvent(std::shared_ptr<const RsEvent> event); void handleEvent(std::shared_ptr<const RsEvent> event);
RsEventsHandlerId_t generateUniqueHandlerId_unlocked(); RsEventsHandlerId_t generateUniqueHandlerId_unlocked();

View file

@ -29,7 +29,9 @@
#include <map> #include <map>
#include <string> #include <string>
#include "util/rsthreads.h" #include "util/rsthreads.h"
#include "util/rstime.h"
struct sockaddr ; struct sockaddr ;

View file

@ -23,8 +23,10 @@
#include <list> #include <list>
#include <string> #include <string>
#include "util/rsthreads.h" #include "util/rsthreads.h"
#include "util/rsnet.h" #include "util/rsnet.h"
#include "util/rstime.h"
struct sockaddr ; struct sockaddr ;

View file

@ -0,0 +1,54 @@
/*******************************************************************************
* libretroshare/src/util: rserrno.cc *
* *
* libretroshare: retroshare core library *
* *
* Copyright (C) 2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
*******************************************************************************/
#include <cerrno>
#define RS_INTERNAL_ERRNO_CASE(e) case e: return #e
const char* rsErrnoName(int err)
{
switch (err)
{
RS_INTERNAL_ERRNO_CASE(EINVAL);
RS_INTERNAL_ERRNO_CASE(EBUSY);
RS_INTERNAL_ERRNO_CASE(EAGAIN);
RS_INTERNAL_ERRNO_CASE(EDEADLK);
RS_INTERNAL_ERRNO_CASE(EPERM);
RS_INTERNAL_ERRNO_CASE(EBADF);
RS_INTERNAL_ERRNO_CASE(EFAULT);
RS_INTERNAL_ERRNO_CASE(ENOTSOCK);
RS_INTERNAL_ERRNO_CASE(EISCONN);
RS_INTERNAL_ERRNO_CASE(ECONNREFUSED);
RS_INTERNAL_ERRNO_CASE(ETIMEDOUT);
RS_INTERNAL_ERRNO_CASE(ENETUNREACH);
RS_INTERNAL_ERRNO_CASE(EADDRINUSE);
RS_INTERNAL_ERRNO_CASE(EINPROGRESS);
RS_INTERNAL_ERRNO_CASE(EALREADY);
RS_INTERNAL_ERRNO_CASE(ENOTCONN);
RS_INTERNAL_ERRNO_CASE(EPIPE);
RS_INTERNAL_ERRNO_CASE(ECONNRESET);
RS_INTERNAL_ERRNO_CASE(EHOSTUNREACH);
RS_INTERNAL_ERRNO_CASE(EADDRNOTAVAIL);
}
return "rsErrnoName UNKNOWN ERROR CODE";
}

View file

@ -0,0 +1,24 @@
/*******************************************************************************
* libretroshare/src/util: rserrno.h *
* *
* libretroshare: retroshare core library *
* *
* Copyright (C) 2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* *
* This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as *
* published by the Free Software Foundation, either version 3 of the *
* License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU Lesser General Public License for more details. *
* *
* You should have received a copy of the GNU Lesser General Public License *
* along with this program. If not, see <https://www.gnu.org/licenses/>. *
* *
*******************************************************************************/
#pragma once
const char* rsErrnoName(int err);

View file

@ -3,7 +3,8 @@
* * * *
* libretroshare: retroshare core library * * libretroshare: retroshare core library *
* * * *
* Copyright 2004-2007 by Robert Fernie <retroshare@lunamutt.com> * * Copyright (C) 2004-2007 Robert Fernie <retroshare@lunamutt.com> *
* Copyright (C) 2016-2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* * * *
* This program is free software: you can redistribute it and/or modify * * This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as * * it under the terms of the GNU Lesser General Public License as *
@ -20,14 +21,20 @@
* * * *
*******************************************************************************/ *******************************************************************************/
#include "rsthreads.h"
#include <unistd.h> // for usleep()
#include <errno.h> // for errno
#include <iostream> #include <iostream>
#include "util/rstime.h" #include <time.h>
#include "util/rsdebug.h" #include <thread>
#include <chrono>
#ifdef RSMUTEX_DEBUG
#include <cstdio>
#include <sys/time.h>
#endif
#include "rsthreads.h"
#include "util/rsdebug.h"
#include "util/rserrno.h"
#include "util/rstime.h"
#ifdef __APPLE__ #ifdef __APPLE__
int __attribute__((weak)) pthread_setname_np(const char *__buf) ; int __attribute__((weak)) pthread_setname_np(const char *__buf) ;
@ -41,16 +48,14 @@ int RS_pthread_setname_np(pthread_t __target_thread, const char *__buf) {
} }
#endif #endif
#ifdef RSMUTEX_DEBUG
#include <stdio.h>
#include <sys/time.h>
#endif
/******* /*******
* #define DEBUG_THREADS 1 * #define DEBUG_THREADS 1
* #define RSMUTEX_ABORT 1 // Catch wrong pthreads mode. * #define RSMUTEX_ABORT 1 // Catch wrong pthreads mode.
*******/ *******/
#define THREAD_DEBUG std::cerr << "[this=" << (void*)this << ", caller thread ID: " << std::hex << pthread_self() << ", thread ID: " << mTid << std::dec << "] " #define THREAD_DEBUG RsDbg() << "[this=" << static_cast<void*>(this) \
<< ", caller thread ID: " << std::hex << pthread_self() << ", thread ID: " \
<< mTid << std::dec << "] "
#ifdef RSMUTEX_ABORT #ifdef RSMUTEX_ABORT
#include <stdlib.h> #include <stdlib.h>
@ -60,215 +65,99 @@ int RS_pthread_setname_np(pthread_t __target_thread, const char *__buf) {
#include <iostream> #include <iostream>
#endif #endif
void RsThread::go() /*static*/ void* RsThread::rsthread_init(void* p)
{ {
mShouldStopSemaphore.set(0) ; RsThread* thread = reinterpret_cast<RsThread *>(p);
mHasStoppedSemaphore.set(0) ; if(!thread) return nullptr;
runloop(); /* Using pthread_detach(...) the thread resources will be automatically
* freed when this function return, so there is no need for pthread_join()
mShouldStopSemaphore.set(0); * later. */
mHasStoppedSemaphore.set(1); // last value that we modify because this is interpreted as a signal that the object can be deleted.
}
void *RsThread::rsthread_init(void* p)
{
RsThread *thread = (RsThread *) p;
if (!thread)
{
return NULL;
}
// tell the OS to free the thread resources when this function exits
// it is a replacement for pthread_join()
pthread_detach(pthread_self()); pthread_detach(pthread_self());
#ifdef DEBUG_THREADS #ifdef DEBUG_THREADS
std::cerr << "[Thread ID:" << std::hex << pthread_self() << std::dec << "] thread is started. Calling runloop()..." << std::endl; std::cerr << "[Thread ID:" << std::hex << pthread_self() << std::dec
<< "] thread is started. Calling wrapRun()..." << std::endl;
#endif #endif
thread->go(); thread->wrapRun();
return NULL; return nullptr;
} }
RsThread::RsThread()
RsThread::RsThread() : mHasStopped(true), mShouldStop(false)
{ {
#ifdef WINDOWS_SYS #ifdef WINDOWS_SYS
memset (&mTid, 0, sizeof(mTid)); memset (&mTid, 0, sizeof(mTid));
#else #else
mTid = 0; mTid = 0;
#endif #endif
// The thread is certainly not running. This avoids to lock down when calling shutdown on a thread that has never started.
#ifdef DEBUG_THREADS
THREAD_DEBUG << "[Thread ID:" << std::hex << pthread_self() << std::dec << "] thread object created. Initing stopped=1, should_stop=0" << std::endl;
#endif
mHasStoppedSemaphore.set(1) ;
mShouldStopSemaphore.set(0) ;
} }
RsThread::~RsThread() bool RsThread::isRunning() { return !mHasStopped; }
bool RsThread::shouldStop() { return mShouldStop; }
void RsThread::askForStop()
{ {
if(isRunning()) /* Call onStopRequested() only once even if askForStop() is called multiple
* times */
if(!mShouldStop.exchange(true))
RsThread::async([&](){ onStopRequested(); });
}
void RsThread::wrapRun()
{
run();
mHasStopped = true;
}
void RsThread::fullstop()
{
askForStop();
const pthread_t callerTid = pthread_self();
if(pthread_equal(mTid, callerTid))
{ {
RsErr() << "Deleting a thread that is still running! Something is very wrong here and Retroshare is likely to crash because of this." << std::endl; RsErr() << __PRETTY_FUNCTION__ << " called by same thread. This should "
print_stacktrace(); << "never happen! this: " << static_cast<void*>(this)
<< std::hex << ", callerTid: " << callerTid
while(isRunning()) << ", mTid: " << mTid << std::dec << std::endl;
{
std::cerr << "." << std::endl;
rstime::rs_usleep(1000*1000);
}
}
}
bool RsThread::isRunning()
{
// do we need a mutex for this ?
int sval = mHasStoppedSemaphore.value() ;
return !sval ;
}
bool RsThread::shouldStop()
{
int sval = mShouldStopSemaphore.value() ;
return sval > 0;
}
void RsTickingThread::shutdown()
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "pqithreadstreamer::shutdown()" << std::endl;
#endif
int sval = mHasStoppedSemaphore.value() ;
if(sval > 0)
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << " thread not running. Quit." << std::endl;
#endif
return ;
}
ask_for_stop() ;
}
void RsThread::ask_for_stop()
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << " calling stop" << std::endl;
#endif
mShouldStopSemaphore.set(1);
}
void RsTickingThread::fullstop()
{
shutdown() ;
#ifdef DEBUG_THREADS
THREAD_DEBUG << " waiting stop" << std::endl;
#endif
if(pthread_equal(mTid,pthread_self()))
{
THREAD_DEBUG << "(WW) RsTickingThread::fullstop() called by same thread. This is unexpected." << std::endl;
return ;
}
mHasStoppedSemaphore.wait_no_relock(); // Wait for semaphore value to become 1, but does not decrement it when obtained.
#ifdef DEBUG_THREADS
THREAD_DEBUG << " finished!" << std::endl;
#endif
}
void RsThread::start(const std::string &threadName)
{
if(isRunning())
{
std::cerr << "(EE) RsThread \"" << threadName
<< "\" is already running. Will not start twice!"
<< std::endl;
print_stacktrace(); print_stacktrace();
return; return;
} }
pthread_t tid;
void *data = (void *)this ;
#ifdef DEBUG_THREADS while(!mHasStopped); // Wait for the thread being stopped
THREAD_DEBUG << "pqithreadstreamer::start() initing should_stop=0" << std::endl; }
#endif
mShouldStopSemaphore.set(0) ;
mHasStoppedSemaphore.set(0) ;
int err ; bool RsThread::start(const std::string& threadName)
{
// pthread_create is a memory barrier // Atomically check if the thread was already started and set it as running
// -> the new thread will see mIsRunning() = true if(mHasStopped.exchange(false))
if( 0 == (err=pthread_create(&tid, 0, &rsthread_init, data)))
{ {
mTid = tid; mShouldStop = false;
int pError = pthread_create(
&mTid, nullptr, &rsthread_init, static_cast<void*>(this) );
if(pError)
{
RsErr() << __PRETTY_FUNCTION__ << " pthread_create could not create"
<< " a new thread. pError: " << pError << std::endl;
mHasStopped = true;
print_stacktrace();
return false;
}
// set name /* Set thread name which is restricted to 16 characters including the
* terminating null byte */
if(pthread_setname_np) if(pthread_setname_np && !threadName.empty())
{
if(!threadName.empty())
{
// thread names are restricted to 16 characters including the terminating null byte
if(threadName.length() > 15)
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "RsThread::start called with to long name '" << threadName << "' truncating..." << std::endl;
#endif
RS_pthread_setname_np(mTid, threadName.substr(0, 15).c_str()); RS_pthread_setname_np(mTid, threadName.substr(0, 15).c_str());
} else {
RS_pthread_setname_np(mTid, threadName.c_str());
}
}
}
}
else
{
THREAD_DEBUG << "Fatal error: pthread_create could not create a thread. Error returned: " << err << " !!!!!!!" << std::endl;
mHasStoppedSemaphore.set(1) ;
}
}
return true;
RsTickingThread::RsTickingThread()
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "RsTickingThread::RsTickingThread()" << std::endl;
#endif
}
RsTickingThread::~RsTickingThread()
{
fullstop();
}
void RsSingleJobThread::runloop()
{
run() ;
}
void RsTickingThread::runloop()
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "RsTickingThread::runloop(). Setting stopped=0" << std::endl;
#endif
while(1)
{
if(shouldStop())
{
#ifdef DEBUG_THREADS
THREAD_DEBUG << "pqithreadstreamer::runloop(): asked to stop. setting hasStopped=1, and returning. Thread ends." << std::endl;
#endif
return ;
} }
data_tick(); RsErr() << __PRETTY_FUNCTION__ << " attempt to start already running thread"
} << std::endl;
print_stacktrace();
return false;
} }
RsQueueThread::RsQueueThread(uint32_t min, uint32_t max, double relaxFactor ) RsQueueThread::RsQueueThread(uint32_t min, uint32_t max, double relaxFactor )
@ -278,14 +167,14 @@ RsQueueThread::RsQueueThread(uint32_t min, uint32_t max, double relaxFactor )
mLastWork = time(NULL) ; mLastWork = time(NULL) ;
} }
void RsQueueThread::data_tick() void RsQueueThread::threadTick()
{ {
bool doneWork = false; bool doneWork = false;
while(workQueued() && doWork()) while(workQueued() && doWork())
{ {
doneWork = true; doneWork = true;
} }
rstime_t now = time(NULL); time_t now = time(NULL);
if (doneWork) if (doneWork)
{ {
mLastWork = now; mLastWork = now;
@ -310,94 +199,38 @@ void RsQueueThread::data_tick()
THREAD_DEBUG << "RsQueueThread::data_tick() no work: sleeping for: " << mLastSleep << " ms" << std::endl; THREAD_DEBUG << "RsQueueThread::data_tick() no work: sleeping for: " << mLastSleep << " ms" << std::endl;
#endif #endif
} }
rstime::rs_usleep(mLastSleep * 1000); // mLastSleep msec
std::this_thread::sleep_for(std::chrono::milliseconds(mLastSleep));
} }
void RsMutex::unlock() void RsMutex::unlock()
{ {
#ifdef RSTHREAD_SELF_LOCKING_GUARD _thread_id = 0;
if(--_cnt == 0)
{
#endif
_thread_id = 0 ;
pthread_mutex_unlock(&realMutex); pthread_mutex_unlock(&realMutex);
#ifdef RSTHREAD_SELF_LOCKING_GUARD
}
#endif
} }
void RsMutex::lock() void RsMutex::lock()
{ {
#ifdef RSMUTEX_DEBUG int err = pthread_mutex_lock(&realMutex);
pthread_t owner = _thread_id ; if( err != 0)
#endif
int retval = 0;
#ifdef RSTHREAD_SELF_LOCKING_GUARD
if(!trylock())
if(!pthread_equal(_thread_id,pthread_self()))
#endif
retval = pthread_mutex_lock(&realMutex);
switch(retval)
{ {
case 0: RsErr() << __PRETTY_FUNCTION__ << "pthread_mutex_lock returned: "
break; << rsErrnoName(err)
#ifdef RSMUTEX_DEBUG
<< " name: " << name
#endif
<< std::endl;
case EINVAL: print_stacktrace();
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EINVAL";
std::cerr << std::endl;
break;
case EBUSY:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EBUSY";
std::cerr << std::endl;
break;
case EAGAIN:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EAGAIN";
std::cerr << std::endl;
break;
case EDEADLK:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EDEADLK";
std::cerr << std::endl;
break;
case EPERM:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned EPERM";
std::cerr << std::endl;
break;
default:
std::cerr << "RsMutex::lock() pthread_mutex_lock returned UNKNOWN ERROR";
std::cerr << std::endl;
break;
}
/* Here is some debugging code - to catch failed locking attempts.
* Major bug is it is ever triggered.
*/
#ifdef RSMUTEX_ABORT #ifdef RSMUTEX_ABORT
if (retval != 0)
{
#ifdef RSMUTEX_DEBUG
std::cerr << "RsMutex::lock() name: " << name << std::endl;
#endif
std::cerr << "RsMutex::lock() pthread_mutex_lock returned an Error. Aborting()";
std::cerr << std::endl;
abort(); abort();
#endif
} }
#endif
_thread_id = pthread_self();
_thread_id = pthread_self() ;
#ifdef RSTHREAD_SELF_LOCKING_GUARD
++_cnt ;
#endif
} }
#ifdef RSMUTEX_DEBUG #ifdef RSMUTEX_DEBUG
double RsStackMutex::getCurrentTS() double RsStackMutex::getCurrentTS()
{ {
@ -416,3 +249,24 @@ double RsStackMutex::getCurrentTS()
#endif #endif
RsThread::~RsThread()
{
if(isRunning())
{
RsErr() << __PRETTY_FUNCTION__ << " deleting a thread that is still "
<< "running! Something seems very wrong here and RetroShare is "
<< "likely to crash because of this." << std::endl;
print_stacktrace();
askForStop();
while(isRunning())
{
RsErr() << __PRETTY_FUNCTION__ << "waiting 1s for stop..."
<< std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
RsQueueThread::~RsQueueThread() = default;

View file

@ -3,7 +3,8 @@
* * * *
* libretroshare: retroshare core library * * libretroshare: retroshare core library *
* * * *
* Copyright 2004-2006 by Robert Fernie <retroshare@lunamutt.com> * * Copyright (C) 2004-2006 Robert Fernie <retroshare@lunamutt.com> *
* Copyright (C) 2016-2019 Gioacchino Mazzurco <gio@eigenlab.org> *
* * * *
* This program is free software: you can redistribute it and/or modify * * This program is free software: you can redistribute it and/or modify *
* it under the terms of the GNU Lesser General Public License as * * it under the terms of the GNU Lesser General Public License as *
@ -26,47 +27,38 @@
#include <string> #include <string>
#include <iostream> #include <iostream>
#include <unistd.h> #include <unistd.h>
#include <semaphore.h> #include <atomic>
#include <thread> #include <thread>
#include <functional> #include <functional>
#include <util/rsmemory.h> #include "util/rsmemory.h"
#include "util/rstime.h" #include "util/rsdeprecate.h"
/* RsIface Thread Wrappers */
#undef RSTHREAD_SELF_LOCKING_GUARD
//#define RSMUTEX_DEBUG 300 // Milliseconds for print in the stderr
//#define RSMUTEX_DEBUG //#define RSMUTEX_DEBUG
/**
* @brief Provide mutexes that keep track of the owner. Based on pthread mutex.
*/
class RsMutex class RsMutex
{ {
public: public:
RsMutex(const std::string& name) RsMutex(const std::string& name) : _thread_id(0)
{
/* remove unused parameter warnings */
pthread_mutex_init(&realMutex, NULL);
_thread_id = 0 ;
#ifdef RSMUTEX_DEBUG #ifdef RSMUTEX_DEBUG
this->_name = name; , _name(name)
#else #endif
(void) name; {
pthread_mutex_init(&realMutex, nullptr);
#ifndef RSMUTEX_DEBUG
(void) name; // remove unused parameter warnings
#endif #endif
} }
~RsMutex()
{
pthread_mutex_destroy(&realMutex);
}
inline const pthread_t& owner() const { return _thread_id ; } ~RsMutex() { pthread_mutex_destroy(&realMutex); }
#ifdef RSMUTEX_DEBUG
void setName(const std::string &name) inline const pthread_t& owner() const { return _thread_id; }
{
this->_name = name;
}
#endif
void lock(); void lock();
void unlock(); void unlock();
@ -76,60 +68,78 @@ class RsMutex
const std::string& name() const { return _name ; } const std::string& name() const { return _name ; }
#endif #endif
private: private:
pthread_mutex_t realMutex; pthread_mutex_t realMutex;
pthread_t _thread_id ; pthread_t _thread_id;
#ifdef RSTHREAD_SELF_LOCKING_GUARD
uint32_t _cnt ;
#endif
#ifdef RSMUTEX_DEBUG #ifdef RSMUTEX_DEBUG
std::string _name; std::string _name;
#endif #endif
}; };
/**
* @def RS_STACK_MUTEX(m)
* This macro allows you to trace which mutex in the code is locked and for how
* much time. You can use this as follows:
* @code
* {
* RS_STACK_MUTEX(myMutex);
* do_something();
* }
* @endcode
*/
#define RS_STACK_MUTEX(m) \
RsStackMutex __local_retroshare_stack_mutex_##m( \
m, __PRETTY_FUNCTION__, __FILE__, __LINE__ )
/**
* Provide mutexes that automatically lock/unlock on creation/destruction and
* have powerfull debugging facilities (if RSMUTEX_DEBUG is defined at
* compiletime).
* In most of the cases you should not use this directly instead
* @see RS_STACK_MUTEX(m)
*/
class RsStackMutex class RsStackMutex
{ {
public: public:
RsStackMutex(RsMutex &mtx) RsStackMutex(RsMutex &mtx) : mMtx(mtx)
: mMtx(mtx)
{ {
mMtx.lock(); mMtx.lock();
#ifdef RSMUTEX_DEBUG #ifdef RSMUTEX_DEBUG
double ts = getCurrentTS() ; double ts = getCurrentTS();
_time_stamp = ts ; _time_stamp = ts;
_lineno = 0 ; _lineno = 0;
_info = "[no info]" ; _info = "[no info]";
#endif #endif
} }
RsStackMutex(RsMutex &mtx,const char *function_name,const char *file_name,int lineno)
: mMtx(mtx) RsStackMutex(RsMutex &mtx, const char *function_name, const char *file_name,
int lineno) : mMtx(mtx)
#ifdef RSMUTEX_DEBUG #ifdef RSMUTEX_DEBUG
, _info(std::string(function_name)+" in file "+file_name),_lineno(lineno) , _info(std::string(function_name)+" in file "+file_name), _lineno(lineno)
#endif #endif
{ {
#ifdef RSMUTEX_DEBUG #ifdef RSMUTEX_DEBUG
double ts = getCurrentTS() ; double ts = getCurrentTS();
_time_stamp = ts ; _time_stamp = ts;
pthread_t owner = mMtx.owner() ; pthread_t owner = mMtx.owner();
#else #else
/* remove unused parameter warnings */ // remove unused parameter warnings
(void) function_name; (void) function_name; (void) file_name; (void) lineno;
(void) file_name;
(void) lineno;
#endif #endif
mMtx.lock(); mMtx.lock();
#ifdef RSMUTEX_DEBUG #ifdef RSMUTEX_DEBUG
ts = getCurrentTS() ; ts = getCurrentTS();
if(ts - _time_stamp > 1.0) if(ts - _time_stamp > 1.0)
std::cerr << "Mutex " << (void*)&mMtx << " \"" << mtx.name() << "\"" std::cerr << "Mutex " << (void*)&mMtx << " \"" << mtx.name() << "\""
<< " waited for " << ts - _time_stamp << " waited for " << ts - _time_stamp
<< " seconds in thread " << pthread_self() << " seconds in thread " << pthread_self()
<< " for locked thread " << owner << ". in " << _info << ":" << _lineno << std::endl; << " for locked thread " << owner << ". in " << _info
<< ":" << _lineno << std::endl;
_time_stamp = ts ; // This is to re-init the locking time without accounting for how much we waited. _time_stamp = ts ; // This is to re-init the locking time without accounting for how much we waited.
#endif #endif
} }
@ -137,132 +147,78 @@ class RsStackMutex
~RsStackMutex() ~RsStackMutex()
{ {
mMtx.unlock(); mMtx.unlock();
#ifdef RSMUTEX_DEBUG #ifdef RSMUTEX_DEBUG
double ts = getCurrentTS() ; double ts = getCurrentTS();
if(ts - _time_stamp > 1.0) if(ts - _time_stamp > 1.0)
std::cerr << "Mutex " << (void*)&mMtx << " \"" << mMtx.name() << "\"" std::cerr << "Mutex " << (void*)&mMtx << " \"" << mMtx.name()
<< " locked for " << ts - _time_stamp << "\"" << " locked for " << ts - _time_stamp
<< " seconds in thread " << pthread_self() << " seconds in thread " << pthread_self()
<< ". in " << _info << ":" << _lineno << std::endl; << ". in " << _info << ":" << _lineno << std::endl;
#endif #endif
} }
private: private:
RsMutex &mMtx; RsMutex &mMtx;
#ifdef RSMUTEX_DEBUG #ifdef RSMUTEX_DEBUG
static double getCurrentTS() ; static double getCurrentTS();
double _time_stamp ; double _time_stamp;
std::string _info ; std::string _info;
int _lineno ; int _lineno;
#endif #endif
}; };
// This macro allows you to trace which mutex in the code is locked for how much time.
// se this as follows:
//
// {
// RS_STACK_MUTEX(myMutex) ;
//
// do_something() ;
// }
//
#define RS_STACK_MUTEX(m) RsStackMutex __local_retroshare_mutex(m,__PRETTY_FUNCTION__,__FILE__,__LINE__)
// This class handles a Mutex-based semaphore, that makes it cross plateform.
class RsSemaphore
{
class RsSemStruct
{
public:
RsSemStruct() : mtx("Semaphore mutex"), val(0) {}
RsMutex mtx ;
uint32_t val ;
};
public:
RsSemaphore()
{
s = new RsSemStruct ;
}
~RsSemaphore()
{
delete s ;
}
void set(uint32_t i)
{
RS_STACK_MUTEX(s->mtx) ;
s->val = i ;
}
void post()
{
RS_STACK_MUTEX(s->mtx) ;
++(s->val) ;
}
uint32_t value()
{
RS_STACK_MUTEX(s->mtx) ;
return s->val ;
}
// waits but does not re-locks the semaphore
void wait_no_relock()
{
static const uint32_t max_waiting_time_before_warning=1000 *5 ; // 5 secs
uint32_t tries=0;
while(true)
{
usleep(1000) ;
if(++tries >= max_waiting_time_before_warning)
std::cerr << "(EE) Semaphore waiting for too long. Something is probably wrong in the code." << std::endl;
RS_STACK_MUTEX(s->mtx) ;
if(s->val > 0)
return ;
}
}
private:
RsSemStruct *s ;
};
class RsThread;
/* to create a thread! */
pthread_t createThread(RsThread &thread);
/// @brief Offer basic threading functionalities.
class RsThread class RsThread
{ {
public: public:
RsThread(); RsThread();
virtual ~RsThread() ; virtual ~RsThread();
void start(const std::string &threadName = "");
// Returns true if the thread is still running.
bool isRunning();
// Returns true if the thread received a stopping order and hasn't yet stopped.
bool shouldStop();
// Can be called to set the stopping flags. The stop will not be handled
// by RsThread itself, but in subclasses. If you derive your own subclass,
// you need to call shouldStop() in order to check for a possible stopping order.
void ask_for_stop();
/** /**
* Execute given function on another thread without blocking the caller * @brief start the thread and call run() on it.
* @param threadName string containing the name of the thread used for
* debugging purposes, it is truncated to 16 characters
* including \0 at the end of the string.
* @return false on error, true otherwise
*/
bool start(const std::string& threadName = "");
/**
* @brief Check if thread is running.
* @return true if the thread is still running, false otherwise.
*/
bool isRunning();
/**
* @brief Check if the thread should stop.
* Expecially useful for subclasses which implement a @see run() method
* which may take lot of time before returning when not asked, to check if
* stop has been requested and therefore interrupting the execution ASAP
* returning in a coherent state.
* @return true if the thread received a stopping order.
*/
bool shouldStop();
/**
* @brief Asyncronously ask the thread to stop.
* The real stop will happen when the @see run() method finish.
*/
void askForStop();
/**
* Call @see askForStop() then wait it has really stopped before returning.
* It must not be called in the same thread, as it would not wait for the
* effective stop to occur as it would cause a deadlock.
*/
void fullstop();
/**
* Execute given function on a detached thread without blocking the caller
* execution. * execution.
* This can be generalized with variadic template, ATM it is enough to wrap * This can be generalized with variadic template, ATM it is enough to wrap
* any kind of function call or job into a lambda which get no paramethers * any kind of function call or job into a lambda which get no paramethers
@ -273,59 +229,77 @@ public:
{ std::thread(fn).detach(); } { std::thread(fn).detach(); }
protected: protected:
virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */ /**
void go() ; // this one calls runloop and also sets the flags correctly when the thread is finished running. * This method must be implemented by sublasses, will be called once the
* thread is started. Should return on request, use @see shouldStop() to
* check if stop has been requested.
*/
virtual void run() = 0;
RsSemaphore mHasStoppedSemaphore; /**
RsSemaphore mShouldStopSemaphore; * This method is meant to be overridden by subclasses with long running
* @see run() method and is executed asyncronously when @see askForStop()
* is called, any task necessary to stop the thread (aka inducing @see run()
* to return in a coherent state) should be done in the overridden version
* of this method, @see JsonApiServer for an usage example. */
virtual void onStopRequested() {}
static void *rsthread_init(void*) ; private:
/** Call @see run() setting the appropriate flags around it*/
void wrapRun();
/// True if thread is stopped, false otherwise
std::atomic<bool> mHasStopped;
/// True if stop has been requested
std::atomic<bool> mShouldStop;
/// Passed as argument for pthread_create(), call start()
static void *rsthread_init(void*);
/// Store the id of the corresponding pthread
pthread_t mTid; pthread_t mTid;
}; };
/**
* Provide a detached execution loop that continuously call data_tick() once the
* thread is started
*/
class RsTickingThread: public RsThread class RsTickingThread: public RsThread
{ {
public: public:
RsTickingThread();
virtual ~RsTickingThread();
void shutdown(); /**
void fullstop(); * Subclasses must implement this method, it will be called in a loop once
void join() { fullstop() ; } // used for compatibility * the thread is started, so repetitive work (like checking if data is
* available on a socket) should be done here, at the end of this method
virtual void data_tick() =0; * sleep_for(...) or similar function should be called or the CPU will
* be used as much as possible also if there is nothing to do.
*/
virtual void threadTick() = 0;
private: private:
virtual void runloop() ; /* called once the thread is started. Should be overloaded by subclasses. */ /// Implement the run loop and continuously call threadTick() in it
}; void run() override { while(!shouldStop()) threadTick(); }
class RsSingleJobThread: public RsThread
{
public:
virtual void run() =0;
protected:
virtual void runloop() ;
}; };
// TODO: Used just one time, is this really an useful abstraction?
class RsQueueThread: public RsTickingThread class RsQueueThread: public RsTickingThread
{ {
public: public:
RsQueueThread(uint32_t min, uint32_t max, double relaxFactor);
RsQueueThread(uint32_t min, uint32_t max, double relaxFactor ); ~RsQueueThread() override;
virtual ~RsQueueThread() { return; }
protected: protected:
virtual bool workQueued() = 0; virtual bool workQueued() = 0;
virtual bool doWork() = 0; virtual bool doWork() = 0;
virtual void data_tick() ;
void threadTick() override; /// @see RsTickingThread
private: private:
uint32_t mMinSleep; /* ms */ uint32_t mMinSleep; /* ms */
uint32_t mMaxSleep; /* ms */ uint32_t mMaxSleep; /* ms */
uint32_t mLastSleep; /* ms */ uint32_t mLastSleep; /* ms */
rstime_t mLastWork; /* secs */ time_t mLastWork; /* secs */
float mRelaxFactor; float mRelaxFactor;
}; };