Merge upstream

This commit is contained in:
hunbernd 2020-01-13 11:21:27 +01:00
commit 2b2a9f9a89
10 changed files with 92 additions and 57 deletions

View File

@ -146,6 +146,7 @@ bool RsJsonApi::parseToken(
JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"), JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
mService(std::make_shared<restbed::Service>()), mService(std::make_shared<restbed::Service>()),
mServiceMutex("JsonApiServer restbed ptr"),
mListeningPort(RsJsonApi::DEFAULT_PORT), mListeningPort(RsJsonApi::DEFAULT_PORT),
mBindingAddress(RsJsonApi::DEFAULT_BINDING_ADDRESS) mBindingAddress(RsJsonApi::DEFAULT_BINDING_ADDRESS)
{ {
@ -310,6 +311,7 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
registerHandler("/rsEvents/registerEventsHandler", registerHandler("/rsEvents/registerEventsHandler",
[this](const std::shared_ptr<rb::Session> session) [this](const std::shared_ptr<rb::Session> session)
{ {
const std::weak_ptr<rb::Service> weakService(mService);
const std::multimap<std::string, std::string> headers const std::multimap<std::string, std::string> headers
{ {
{ "Connection", "keep-alive" }, { "Connection", "keep-alive" },
@ -319,7 +321,7 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
size_t reqSize = static_cast<size_t>( size_t reqSize = static_cast<size_t>(
session->get_request()->get_header("Content-Length", 0) ); session->get_request()->get_header("Content-Length", 0) );
session->fetch( reqSize, [this]( session->fetch( reqSize, [weakService](
const std::shared_ptr<rb::Session> session, const std::shared_ptr<rb::Session> session,
const rb::Bytes& body ) const rb::Bytes& body )
{ {
@ -332,9 +334,17 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
const std::weak_ptr<rb::Session> weakSession(session); const std::weak_ptr<rb::Session> weakSession(session);
RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId(); RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId();
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback = std::function<void(std::shared_ptr<const RsEvent>)> multiCallback =
[this, weakSession, hId](std::shared_ptr<const RsEvent> event) [weakSession, weakService, hId](
std::shared_ptr<const RsEvent> event )
{ {
mService->schedule( [weakSession, hId, event]() auto lService = weakService.lock();
if(!lService || lService->is_down())
{
if(rsEvents) rsEvents->unregisterEventsHandler(hId);
return;
}
lService->schedule( [weakSession, hId, event]()
{ {
auto session = weakSession.lock(); auto session = weakSession.lock();
if(!session || session->is_closed()) if(!session || session->is_closed())
@ -500,10 +510,10 @@ bool JsonApiServer::authorizeUser(
return false; return false;
} }
if(!librs::util::is_alphanumeric(passwd)) if(passwd.empty())
{ {
RsErr() << __PRETTY_FUNCTION__ << " Password is not alphanumeric" RsWarn() << __PRETTY_FUNCTION__ << " Password is empty, are you sure "
<< std::endl; << "this what you wanted?" << std::endl;
return false; return false;
} }
@ -581,21 +591,21 @@ std::vector<std::shared_ptr<rb::Resource> > JsonApiServer::getResources() const
return tab; return tab;
} }
bool JsonApiServer::restart() void JsonApiServer::restart()
{
/* It is important to wrap into async(...) because fullstop() method can't
* be called from same thread of execution hence from JSON API thread! */
RsThread::async([this]()
{ {
fullstop(); fullstop();
RsThread::start("JSON API Server"); RsThread::start("JSON API Server");
});
return true;
} }
void JsonApiServer::onStopRequested() void JsonApiServer::onStopRequested()
{ if(mService->is_up()) mService->stop(); }
bool JsonApiServer::fullstop()
{ {
RsThread::fullstop(); RS_STACK_MUTEX(mServiceMutex);
return true; mService->stop();
} }
uint16_t JsonApiServer::listeningPort() const { return mListeningPort; } uint16_t JsonApiServer::listeningPort() const { return mListeningPort; }
@ -611,16 +621,12 @@ void JsonApiServer::run()
settings->set_bind_address(mBindingAddress); settings->set_bind_address(mBindingAddress);
settings->set_default_header("Connection", "close"); settings->set_default_header("Connection", "close");
if(mService->is_up())
{
RsWarn() << __PRETTY_FUNCTION__ << " restbed is already running. "
<< " stopping it before starting again!" << std::endl;
mService->stop();
}
/* re-allocating mService is important because it deletes the existing /* re-allocating mService is important because it deletes the existing
* service and therefore leaves the listening port open */ * service and therefore leaves the listening port open */
{
RS_STACK_MUTEX(mServiceMutex);
mService = std::make_shared<restbed::Service>(); mService = std::make_shared<restbed::Service>();
}
for(auto& r: getResources()) mService->publish(r); for(auto& r: getResources()) mService->publish(r);
@ -628,7 +634,7 @@ void JsonApiServer::run()
{ {
RsUrl apiUrl; apiUrl.setScheme("http").setHost(mBindingAddress) RsUrl apiUrl; apiUrl.setScheme("http").setHost(mBindingAddress)
.setPort(mListeningPort); .setPort(mListeningPort);
RsDbg() << __PRETTY_FUNCTION__ << " JSON API server listening on " RsInfo() << __PRETTY_FUNCTION__ << " JSON API server listening on "
<< apiUrl.toString() << std::endl; << apiUrl.toString() << std::endl;
mService->start(settings); mService->start(settings);
} }
@ -640,7 +646,7 @@ void JsonApiServer::run()
return; return;
} }
RsInfo() << __PRETTY_FUNCTION__ << " finished!" << std::endl; RsDbg() << __PRETTY_FUNCTION__ << " finished!" << std::endl;
} }
/*static*/ void RsJsonApi::version( /*static*/ void RsJsonApi::version(

View File

@ -63,10 +63,13 @@ public:
std::vector<std::shared_ptr<rb::Resource>> getResources() const; std::vector<std::shared_ptr<rb::Resource>> getResources() const;
/// @see RsJsonApi /// @see RsJsonApi
bool restart() override; void fullstop() override { RsThread::fullstop(); }
/// @see RsJsonApi /// @see RsJsonApi
bool fullstop() override; void restart() override;
/// @see RsJsonApi
void askForStop() override { RsThread::askForStop(); }
/// @see RsJsonApi /// @see RsJsonApi
inline bool isRunning() override { return RsThread::isRunning(); } inline bool isRunning() override { return RsThread::isRunning(); }
@ -193,6 +196,10 @@ private:
std::less<const JsonApiResourceProvider> > mResourceProviders; std::less<const JsonApiResourceProvider> > mResourceProviders;
std::shared_ptr<restbed::Service> mService; std::shared_ptr<restbed::Service> mService;
/** Protect service only during very critical operation like resetting the
* pointer, still not 100% thread safe, but hopefully we can avoid
* crashes/freeze with this */
RsMutex mServiceMutex;
uint16_t mListeningPort; uint16_t mListeningPort;
std::string mBindingAddress; std::string mBindingAddress;

View File

@ -43,18 +43,25 @@ public:
static const std::string DEFAULT_BINDING_ADDRESS; // 127.0.0.1 static const std::string DEFAULT_BINDING_ADDRESS; // 127.0.0.1
/** /**
* @brief Restart RsJsonApi server * @brief Restart RsJsonApi server asynchronously.
* @jsonapi{development} * @jsonapi{development}
*/ */
virtual bool restart() = 0; virtual void restart() = 0;
/** @brief Request RsJsonApi to stop and wait until it has stopped.
* Do not expose this method to JSON API as fullstop must not be called from
* the same thread of service execution.
*/
virtual void fullstop() = 0;
/** /**
* @brief Request RsJsonApi to stop and wait until ti has stopped. * @brief Request RsJsonApi to stop asynchronously.
* @jsonapi{development}
* Be expecially carefull to call this from JSON API because you will loose * Be expecially carefull to call this from JSON API because you will loose
* access to the API. * access to the API.
* @jsonapi{development} * If you need to wait until stopping has completed @see isRunning().
*/ */
virtual bool fullstop() = 0; virtual void askForStop() = 0;
/** /**
* @brief Get status of the json api server * @brief Get status of the json api server
@ -128,8 +135,7 @@ public:
std::string& user, std::string& passwd ); std::string& user, std::string& passwd );
/** /**
* Add new auth (user,passwd) token to the authorized set, creating the * Add new API auth (user,passwd) token to the authorized set.
* token user:passwd internally.
* @jsonapi{development} * @jsonapi{development}
* @param[in] user user name to autorize, must be alphanumerinc * @param[in] user user name to autorize, must be alphanumerinc
* @param[in] password password for the user, must be alphanumerinc * @param[in] password password for the user, must be alphanumerinc

View File

@ -247,7 +247,7 @@ void p3GxsChannels::notifyChanges(std::vector<RsGxsNotify *> &changes)
RsGxsMsgChange *msgChange = dynamic_cast<RsGxsMsgChange *>(*it); RsGxsMsgChange *msgChange = dynamic_cast<RsGxsMsgChange *>(*it);
if (msgChange) if (msgChange)
{ {
if (msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW) if (msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW|| msgChange->getType() == RsGxsNotify::TYPE_PUBLISHED)
{ {
/* message received */ /* message received */
if (rsEvents) if (rsEvents)

View File

@ -501,7 +501,7 @@ void p3GxsCircles::notifyChanges(std::vector<RsGxsNotify *> &changes)
RsGxsCircleDetails details; RsGxsCircleDetails details;
getCircleDetails(circle_id,details); getCircleDetails(circle_id,details);
if(rsEvents && (c->getType() == RsGxsNotify::TYPE_RECEIVED_NEW) ) if(rsEvents && (c->getType() == RsGxsNotify::TYPE_RECEIVED_NEW|| c->getType() == RsGxsNotify::TYPE_PUBLISHED) )
for (auto msgIdIt(mit->second.begin()), end(mit->second.end()); msgIdIt != end; ++msgIdIt) for (auto msgIdIt(mit->second.begin()), end(mit->second.end()); msgIdIt != end; ++msgIdIt)
{ {
RsGxsCircleMsg msg; RsGxsCircleMsg msg;

View File

@ -191,7 +191,7 @@ void p3GxsForums::notifyChanges(std::vector<RsGxsNotify *> &changes)
RsGxsMsgChange *msgChange = dynamic_cast<RsGxsMsgChange *>(*it); RsGxsMsgChange *msgChange = dynamic_cast<RsGxsMsgChange *>(*it);
if (msgChange) if (msgChange)
{ {
if (msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW) /* message received */ if (msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW || msgChange->getType() == RsGxsNotify::TYPE_PUBLISHED) /* message received */
if (rsEvents) if (rsEvents)
{ {
std::map<RsGxsGroupId, std::set<RsGxsMessageId> >& msgChangeMap = msgChange->msgChangeMap; std::map<RsGxsGroupId, std::set<RsGxsMessageId> >& msgChangeMap = msgChange->msgChangeMap;

View File

@ -112,7 +112,7 @@ void p3PostBase::notifyChanges(std::vector<RsGxsNotify *> &changes)
// It could be taken a step further and directly request these msgs for an update. // It could be taken a step further and directly request these msgs for an update.
addGroupForProcessing(mit->first); addGroupForProcessing(mit->first);
if (rsEvents && msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW) if (rsEvents && (msgChange->getType() == RsGxsNotify::TYPE_RECEIVED_NEW || msgChange->getType() == RsGxsNotify::TYPE_PUBLISHED))
for (auto mit1 = mit->second.begin(); mit1 != mit->second.end(); ++mit1) for (auto mit1 = mit->second.begin(); mit1 != mit->second.end(); ++mit1)
{ {
auto ev = std::make_shared<RsGxsPostedEvent>(); auto ev = std::make_shared<RsGxsPostedEvent>();

View File

@ -207,14 +207,13 @@ bool ConvertUtf16ToUtf8(const std::wstring& source, std::string& dest)
bool is_alphanumeric(char c) bool is_alphanumeric(char c)
{ {
return (c>='0' && c<'9') || (c>='a' && c<='z') || (c>='A' && c<='Z') ; return (c>='0' && c<='9') || (c>='a' && c<='z') || (c>='A' && c<='Z');
} }
bool is_alphanumeric(const std::string& s) bool is_alphanumeric(const std::string& s)
{ {
for( uint32_t i=0; i < s.size(); ++i) for( uint32_t i=0; i < s.size(); ++i)
if(!is_alphanumeric(s[i])) if(!is_alphanumeric(s[i])) return false;
return false;
return true; return true;
} }

View File

@ -85,7 +85,7 @@ int RS_pthread_setname_np(pthread_t __target_thread, const char *__buf) {
return nullptr; return nullptr;
} }
RsThread::RsThread() : mHasStopped(true), mShouldStop(false) void RsThread::resetTid()
{ {
#ifdef WINDOWS_SYS #ifdef WINDOWS_SYS
memset (&mTid, 0, sizeof(mTid)); memset (&mTid, 0, sizeof(mTid));
@ -94,6 +94,9 @@ RsThread::RsThread() : mHasStopped(true), mShouldStop(false)
#endif #endif
} }
RsThread::RsThread() : mHasStopped(true), mShouldStop(false), mLastTid()
{ resetTid(); }
bool RsThread::isRunning() { return !mHasStopped; } bool RsThread::isRunning() { return !mHasStopped; }
bool RsThread::shouldStop() { return mShouldStop; } bool RsThread::shouldStop() { return mShouldStop; }
@ -102,13 +105,13 @@ void RsThread::askForStop()
{ {
/* Call onStopRequested() only once even if askForStop() is called multiple /* Call onStopRequested() only once even if askForStop() is called multiple
* times */ * times */
if(!mShouldStop.exchange(true)) if(!mShouldStop.exchange(true)) onStopRequested();
RsThread::async([&](){ onStopRequested(); });
} }
void RsThread::wrapRun() void RsThread::wrapRun()
{ {
run(); run();
resetTid();
mHasStopped = true; mHasStopped = true;
} }
@ -122,7 +125,8 @@ void RsThread::fullstop()
RsErr() << __PRETTY_FUNCTION__ << " called by same thread. This should " RsErr() << __PRETTY_FUNCTION__ << " called by same thread. This should "
<< "never happen! this: " << static_cast<void*>(this) << "never happen! this: " << static_cast<void*>(this)
<< std::hex << ", callerTid: " << callerTid << std::hex << ", callerTid: " << callerTid
<< ", mTid: " << mTid << std::dec << std::endl; << ", mTid: " << mTid << std::dec
<< ", mFullName: " << mFullName << std::endl;
print_stacktrace(); print_stacktrace();
return; return;
} }
@ -134,9 +138,9 @@ void RsThread::fullstop()
std::this_thread::sleep_for(std::chrono::milliseconds(200)); std::this_thread::sleep_for(std::chrono::milliseconds(200));
++i; ++i;
if(!(i%5)) if(!(i%5))
RsInfo() << __PRETTY_FUNCTION__ << " " << i*0.2 << " seconds passed" RsDbg() << __PRETTY_FUNCTION__ << " " << i*0.2 << " seconds passed"
<< " waiting for thread: " << mTid << " " << mFullName << " waiting for thread: " << std::hex << mLastTid
<< " to stop" << std::endl; << std::dec << " " << mFullName << " to stop" << std::endl;
} }
} }
@ -158,6 +162,9 @@ bool RsThread::start(const std::string& threadName)
return false; return false;
} }
/* Store an extra copy of thread id for debugging */
mLastTid = mTid;
/* Store thread full name as PThread is not able to keep it entirely */ /* Store thread full name as PThread is not able to keep it entirely */
mFullName = threadName; mFullName = threadName;
@ -266,10 +273,10 @@ double RsStackMutex::getCurrentTS()
RsThread::~RsThread() RsThread::~RsThread()
{ {
if(isRunning()) if(!mHasStopped)
{ {
RsErr() << __PRETTY_FUNCTION__ << " deleting thread: " << mTid << " " RsErr() << __PRETTY_FUNCTION__ << " deleting thread: " << mLastTid
<< mFullName << " that is still " << " " << mFullName << " that is still "
<< "running! Something seems very wrong here and RetroShare is " << "running! Something seems very wrong here and RetroShare is "
<< "likely to crash because of this." << std::endl; << "likely to crash because of this." << std::endl;
print_stacktrace(); print_stacktrace();

View File

@ -264,9 +264,19 @@ private:
/// Store the id of the corresponding pthread /// Store the id of the corresponding pthread
pthread_t mTid; pthread_t mTid;
void resetTid();
/// Store thread full name /** Store thread full name for debugging because PThread is limited to 15
* char thread names */
std::string mFullName; std::string mFullName;
/** Store a copy of thread id which is never reset to 0 after initialization
* due to RsThread functioning. After RsThread initialization this member is
* only re-written with a new tread id in start(...).
* This is useful for debugging because mTid is reset at the end of wrapRun
* and that might happens concurrently (or just before) a debug message
* being printed, thus causing the debug message to print a mangled value.*/
pthread_t mLastTid;
}; };
/** /**