JSON API service and RsThread fixups

This commit is contained in:
Gioacchino Mazzurco 2020-01-08 14:24:46 +01:00
parent f4d7fe4dde
commit f12ec2b535
No known key found for this signature in database
GPG key ID: A1FBCA3872E87051
6 changed files with 88 additions and 53 deletions

View file

@ -146,6 +146,7 @@ bool RsJsonApi::parseToken(
JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
mService(std::make_shared<restbed::Service>()),
mServiceMutex("JsonApiServer restbed ptr"),
mListeningPort(RsJsonApi::DEFAULT_PORT),
mBindingAddress(RsJsonApi::DEFAULT_BINDING_ADDRESS)
{
@ -223,7 +224,7 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
rsLoginHelper->attemptLogin(account, password);
if( retval == RsInit::OK )
authorizeUser(account.toStdString(),password);
authorizeUser(account.toStdString(), password);
// serialize out parameters and return value to JSON
{
@ -310,6 +311,7 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
registerHandler("/rsEvents/registerEventsHandler",
[this](const std::shared_ptr<rb::Session> session)
{
const std::weak_ptr<rb::Service> weakService(mService);
const std::multimap<std::string, std::string> headers
{
{ "Connection", "keep-alive" },
@ -319,7 +321,7 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
size_t reqSize = static_cast<size_t>(
session->get_request()->get_header("Content-Length", 0) );
session->fetch( reqSize, [this](
session->fetch( reqSize, [weakService](
const std::shared_ptr<rb::Session> session,
const rb::Bytes& body )
{
@ -332,9 +334,17 @@ JsonApiServer::JsonApiServer(): configMutex("JsonApiServer config"),
const std::weak_ptr<rb::Session> weakSession(session);
RsEventsHandlerId_t hId = rsEvents->generateUniqueHandlerId();
std::function<void(std::shared_ptr<const RsEvent>)> multiCallback =
[this, weakSession, hId](std::shared_ptr<const RsEvent> event)
[weakSession, weakService, hId](
std::shared_ptr<const RsEvent> event )
{
mService->schedule( [weakSession, hId, event]()
auto lService = weakService.lock();
if(!lService || lService->is_down())
{
if(rsEvents) rsEvents->unregisterEventsHandler(hId);
return;
}
lService->schedule( [weakSession, hId, event]()
{
auto session = weakSession.lock();
if(!session || session->is_closed())
@ -500,10 +510,10 @@ bool JsonApiServer::authorizeUser(
return false;
}
if(!librs::util::is_alphanumeric(passwd))
if(passwd.empty())
{
RsErr() << __PRETTY_FUNCTION__ << " Password is not alphanumeric"
<< std::endl;
RsWarn() << __PRETTY_FUNCTION__ << " Password is empty, are you sure "
<< "this what you wanted?" << std::endl;
return false;
}
@ -581,21 +591,21 @@ std::vector<std::shared_ptr<rb::Resource> > JsonApiServer::getResources() const
return tab;
}
bool JsonApiServer::restart()
void JsonApiServer::restart()
{
fullstop();
RsThread::start("JSON API Server");
return true;
/* It is important to wrap into async(...) because fullstop() method can't
* be called from same thread of execution hence from JSON API thread! */
RsThread::async([this]()
{
fullstop();
RsThread::start("JSON API Server");
});
}
void JsonApiServer::onStopRequested()
{ if(mService->is_up()) mService->stop(); }
bool JsonApiServer::fullstop()
{
RsThread::fullstop();
return true;
RS_STACK_MUTEX(mServiceMutex);
if(mService->is_up()) mService->stop();
}
uint16_t JsonApiServer::listeningPort() const { return mListeningPort; }
@ -611,16 +621,12 @@ void JsonApiServer::run()
settings->set_bind_address(mBindingAddress);
settings->set_default_header("Connection", "close");
if(mService->is_up())
{
RsWarn() << __PRETTY_FUNCTION__ << " restbed is already running. "
<< " stopping it before starting again!" << std::endl;
mService->stop();
}
/* re-allocating mService is important because it deletes the existing
* service and therefore leaves the listening port open */
mService = std::make_shared<restbed::Service>();
{
RS_STACK_MUTEX(mServiceMutex);
mService = std::make_shared<restbed::Service>();
}
for(auto& r: getResources()) mService->publish(r);
@ -628,8 +634,8 @@ void JsonApiServer::run()
{
RsUrl apiUrl; apiUrl.setScheme("http").setHost(mBindingAddress)
.setPort(mListeningPort);
RsDbg() << __PRETTY_FUNCTION__ << " JSON API server listening on "
<< apiUrl.toString() << std::endl;
RsInfo() << __PRETTY_FUNCTION__ << " JSON API server listening on "
<< apiUrl.toString() << std::endl;
mService->start(settings);
}
catch(std::exception& e)
@ -640,7 +646,7 @@ void JsonApiServer::run()
return;
}
RsInfo() << __PRETTY_FUNCTION__ << " finished!" << std::endl;
RsDbg() << __PRETTY_FUNCTION__ << " finished!" << std::endl;
}
/*static*/ void RsJsonApi::version(

View file

@ -63,10 +63,13 @@ public:
std::vector<std::shared_ptr<rb::Resource>> getResources() const;
/// @see RsJsonApi
bool restart() override;
void fullstop() override { RsThread::fullstop(); }
/// @see RsJsonApi
bool fullstop() override;
void restart() override;
/// @see RsJsonApi
void askForStop() override { RsThread::askForStop(); }
/// @see RsJsonApi
inline bool isRunning() override { return RsThread::isRunning(); }
@ -193,6 +196,10 @@ private:
std::less<const JsonApiResourceProvider> > mResourceProviders;
std::shared_ptr<restbed::Service> mService;
/** Protect service only during very critical operation like resetting the
* pointer, still not 100% thread safe, but hopefully we can avoid
* crashes/freeze with this */
RsMutex mServiceMutex;
uint16_t mListeningPort;
std::string mBindingAddress;