mirror of
https://github.com/RetroShare/RetroShare.git
synced 2024-10-01 02:35:48 -04:00
Added Multiple Channels and Event support to RPC system.
* added chan_id parameter to many RPC calls, this allows RPC to support multiple SSH clients. - the combination of (chan_id, req_id) rather than req_id, should be unique now -> TODO inside rpcserver queued requests. * Modified SSH server to match the new API. Multiple client support has not been added here yet. * Modified Menu System to match these changes too. * Added an Registration Framework to RpcQueueService, to enable easy event support. This code has not been throughly tested yet. git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5500 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
parent
46c945de96
commit
9c2f7f39e7
@ -37,7 +37,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
// RsTermServer Interface.
|
// RsTermServer Interface.
|
||||||
void MenuInterface::reset()
|
void MenuInterface::reset(uint32_t /* chan_id */)
|
||||||
{
|
{
|
||||||
mBase->reset();
|
mBase->reset();
|
||||||
mCurrentMenu = mBase;
|
mCurrentMenu = mBase;
|
||||||
@ -59,7 +59,9 @@ int MenuInterface::tick()
|
|||||||
uint8_t keypress;
|
uint8_t keypress;
|
||||||
std::string output;
|
std::string output;
|
||||||
|
|
||||||
int read = mComms->recv(&keypress, 1);
|
uint32_t chan_id = 1; // dummy - for menu system.
|
||||||
|
|
||||||
|
int read = mComms->recv(chan_id, &keypress, 1);
|
||||||
#ifdef MENU_DEBUG
|
#ifdef MENU_DEBUG
|
||||||
std::cerr << "MenuInterface::tick() read " << read << " bytes";
|
std::cerr << "MenuInterface::tick() read " << read << " bytes";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
@ -78,7 +80,7 @@ int MenuInterface::tick()
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* error, NON BLOCKING is handled by recv returning 0 */
|
/* error, NON BLOCKING is handled by recv returning 0 */
|
||||||
mComms->error("Bad Input");
|
mComms->error(chan_id, "Bad Input");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -121,7 +123,7 @@ int MenuInterface::tick()
|
|||||||
|
|
||||||
if (output.size() > 0)
|
if (output.size() > 0)
|
||||||
{
|
{
|
||||||
mComms->send(output);
|
mComms->send(chan_id, output);
|
||||||
}
|
}
|
||||||
|
|
||||||
return (haveInput);
|
return (haveInput);
|
||||||
|
@ -232,7 +232,7 @@ public:
|
|||||||
uint32_t drawHeader(uint32_t drawFlags, std::string &buffer);
|
uint32_t drawHeader(uint32_t drawFlags, std::string &buffer);
|
||||||
|
|
||||||
// RsSystem Interface.
|
// RsSystem Interface.
|
||||||
virtual void reset();
|
virtual void reset(uint32_t chan_id);
|
||||||
virtual int tick();
|
virtual int tick();
|
||||||
|
|
||||||
|
|
||||||
|
@ -66,7 +66,16 @@ int StdioComms::isOkay()
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int StdioComms::error(std::string msg)
|
int StdioComms::active_channels(std::list<uint32_t> &chan_ids)
|
||||||
|
{
|
||||||
|
if (isOkay())
|
||||||
|
{
|
||||||
|
chan_ids.push_back(1); // only one possible here (stdin/stdout)
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int StdioComms::error(uint32_t chan_id, std::string msg)
|
||||||
{
|
{
|
||||||
std::cerr << "StdioComms::error(" << msg << ")";
|
std::cerr << "StdioComms::error(" << msg << ")";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
@ -75,14 +84,14 @@ int StdioComms::error(std::string msg)
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
int StdioComms::recv_ready()
|
int StdioComms::recv_ready(uint32_t chan_id)
|
||||||
{
|
{
|
||||||
/* should be proper poll / select! - but we don't use this at the moment */
|
/* should be proper poll / select! - but we don't use this at the moment */
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int StdioComms::recv(uint8_t *buffer, int bytes)
|
int StdioComms::recv(uint32_t chan_id, uint8_t *buffer, int bytes)
|
||||||
{
|
{
|
||||||
int size = read(mIn, buffer, bytes);
|
int size = read(mIn, buffer, bytes);
|
||||||
std::cerr << "StdioComms::recv() returned: " << size;
|
std::cerr << "StdioComms::recv() returned: " << size;
|
||||||
@ -102,7 +111,7 @@ int StdioComms::recv(uint8_t *buffer, int bytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int StdioComms::recv(std::string &buffer, int bytes)
|
int StdioComms::recv(uint32_t chan_id, std::string &buffer, int bytes)
|
||||||
{
|
{
|
||||||
uint8_t tmpbuffer[bytes];
|
uint8_t tmpbuffer[bytes];
|
||||||
int size = read(mIn, tmpbuffer, bytes);
|
int size = read(mIn, tmpbuffer, bytes);
|
||||||
@ -114,7 +123,7 @@ int StdioComms::recv(std::string &buffer, int bytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// these make it easier...
|
// these make it easier...
|
||||||
int StdioComms::recv_blocking(uint8_t *buffer, int bytes)
|
int StdioComms::recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes)
|
||||||
{
|
{
|
||||||
int totalread = 0;
|
int totalread = 0;
|
||||||
while(totalread < bytes)
|
while(totalread < bytes)
|
||||||
@ -137,10 +146,10 @@ int StdioComms::recv_blocking(uint8_t *buffer, int bytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int StdioComms::recv_blocking(std::string &buffer, int bytes)
|
int StdioComms::recv_blocking(uint32_t chan_id, std::string &buffer, int bytes)
|
||||||
{
|
{
|
||||||
uint8_t tmpbuffer[bytes];
|
uint8_t tmpbuffer[bytes];
|
||||||
int size = recv_blocking(tmpbuffer, bytes);
|
int size = recv_blocking(chan_id, tmpbuffer, bytes);
|
||||||
|
|
||||||
if (size < 0)
|
if (size < 0)
|
||||||
return size; // error.
|
return size; // error.
|
||||||
@ -152,14 +161,14 @@ int StdioComms::recv_blocking(std::string &buffer, int bytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int StdioComms::send(uint8_t *buffer, int bytes)
|
int StdioComms::send(uint32_t chan_id, uint8_t *buffer, int bytes)
|
||||||
{
|
{
|
||||||
write(mOut, buffer, bytes);
|
write(mOut, buffer, bytes);
|
||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int StdioComms::send(const std::string &output)
|
int StdioComms::send(uint32_t chan_id, const std::string &output)
|
||||||
{
|
{
|
||||||
if (output.size() > 0)
|
if (output.size() > 0)
|
||||||
{
|
{
|
||||||
|
@ -36,18 +36,20 @@ public:
|
|||||||
StdioComms(int infd, int outfd);
|
StdioComms(int infd, int outfd);
|
||||||
|
|
||||||
virtual int isOkay();
|
virtual int isOkay();
|
||||||
virtual int error(std::string msg);
|
virtual int error(uint32_t chan_id, std::string msg);
|
||||||
|
|
||||||
virtual int recv_ready();
|
virtual int active_channels(std::list<uint32_t> &chan_ids);
|
||||||
virtual int recv(uint8_t *buffer, int bytes);
|
|
||||||
virtual int recv(std::string &buffer, int bytes);
|
virtual int recv_ready(uint32_t chan_id);
|
||||||
|
virtual int recv(uint32_t chan_id, uint8_t *buffer, int bytes);
|
||||||
|
virtual int recv(uint32_t chan_id, std::string &buffer, int bytes);
|
||||||
|
|
||||||
// these make it easier...
|
// these make it easier...
|
||||||
virtual int recv_blocking(uint8_t *buffer, int bytes);
|
virtual int recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes);
|
||||||
virtual int recv_blocking(std::string &buffer, int bytes);
|
virtual int recv_blocking(uint32_t chan_id, std::string &buffer, int bytes);
|
||||||
|
|
||||||
virtual int send(uint8_t *buffer, int bytes);
|
virtual int send(uint32_t chan_id, uint8_t *buffer, int bytes);
|
||||||
virtual int send(const std::string &buffer);
|
virtual int send(uint32_t chan_id, const std::string &buffer);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int mIn, mOut;
|
int mIn, mOut;
|
||||||
|
@ -38,7 +38,7 @@ RpcProtoPeers::RpcProtoPeers(uint32_t serviceId)
|
|||||||
|
|
||||||
//RpcProtoPeers::msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
|
//RpcProtoPeers::msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
|
||||||
|
|
||||||
int RpcProtoPeers::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcProtoPeers::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
{
|
{
|
||||||
/* check the msgId */
|
/* check the msgId */
|
||||||
uint8_t topbyte = getRpcMsgIdExtension(msg_id);
|
uint8_t topbyte = getRpcMsgIdExtension(msg_id);
|
||||||
@ -83,13 +83,13 @@ int RpcProtoPeers::processMsg(uint32_t msg_id, uint32_t req_id, const std::strin
|
|||||||
switch(submsg)
|
switch(submsg)
|
||||||
{
|
{
|
||||||
case rsctrl::peers::MsgId_RequestPeers:
|
case rsctrl::peers::MsgId_RequestPeers:
|
||||||
processRequestPeers(msg_id, req_id, msg);
|
processRequestPeers(chan_id, msg_id, req_id, msg);
|
||||||
break;
|
break;
|
||||||
case rsctrl::peers::MsgId_RequestAddPeer:
|
case rsctrl::peers::MsgId_RequestAddPeer:
|
||||||
processAddPeer(msg_id, req_id, msg);
|
processAddPeer(chan_id, msg_id, req_id, msg);
|
||||||
break;
|
break;
|
||||||
case rsctrl::peers::MsgId_RequestModifyPeer:
|
case rsctrl::peers::MsgId_RequestModifyPeer:
|
||||||
processModifyPeer(msg_id, req_id, msg);
|
processModifyPeer(chan_id, msg_id, req_id, msg);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
std::cerr << "RpcProtoPeers::processMsg() ERROR should never get here";
|
std::cerr << "RpcProtoPeers::processMsg() ERROR should never get here";
|
||||||
@ -102,7 +102,7 @@ int RpcProtoPeers::processMsg(uint32_t msg_id, uint32_t req_id, const std::strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RpcProtoPeers::processAddPeer(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
{
|
{
|
||||||
std::cerr << "RpcProtoPeers::processAddPeer() NOT FINISHED";
|
std::cerr << "RpcProtoPeers::processAddPeer() NOT FINISHED";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
@ -136,13 +136,13 @@ int RpcProtoPeers::processAddPeer(uint32_t msg_id, uint32_t req_id, const std::s
|
|||||||
rsctrl::peers::MsgId_ResponseAddPeer, true);
|
rsctrl::peers::MsgId_ResponseAddPeer, true);
|
||||||
|
|
||||||
// queue it.
|
// queue it.
|
||||||
queueResponse(out_msg_id, req_id, outmsg);
|
queueResponse(chan_id, out_msg_id, req_id, outmsg);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RpcProtoPeers::processModifyPeer(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
{
|
{
|
||||||
std::cerr << "RpcProtoPeers::processModifyPeer() NOT FINISHED";
|
std::cerr << "RpcProtoPeers::processModifyPeer() NOT FINISHED";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
@ -177,14 +177,14 @@ int RpcProtoPeers::processModifyPeer(uint32_t msg_id, uint32_t req_id, const std
|
|||||||
rsctrl::peers::MsgId_ResponseModifyPeer, true);
|
rsctrl::peers::MsgId_ResponseModifyPeer, true);
|
||||||
|
|
||||||
// queue it.
|
// queue it.
|
||||||
queueResponse(out_msg_id, req_id, outmsg);
|
queueResponse(chan_id, out_msg_id, req_id, outmsg);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int RpcProtoPeers::processRequestPeers(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
{
|
{
|
||||||
std::cerr << "RpcProtoPeers::processRequestPeers()";
|
std::cerr << "RpcProtoPeers::processRequestPeers()";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
@ -421,7 +421,7 @@ int RpcProtoPeers::processRequestPeers(uint32_t msg_id, uint32_t req_id, const s
|
|||||||
rsctrl::peers::MsgId_ResponsePeerList, true);
|
rsctrl::peers::MsgId_ResponsePeerList, true);
|
||||||
|
|
||||||
// queue it.
|
// queue it.
|
||||||
queueResponse(out_msg_id, req_id, outmsg);
|
queueResponse(chan_id, out_msg_id, req_id, outmsg);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -32,11 +32,11 @@ class RpcProtoPeers: public RpcQueueService
|
|||||||
public:
|
public:
|
||||||
RpcProtoPeers(uint32_t serviceId);
|
RpcProtoPeers(uint32_t serviceId);
|
||||||
// virtual msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
|
// virtual msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
|
||||||
virtual int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg);
|
virtual int processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
||||||
|
|
||||||
virtual int processRequestPeers(uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
virtual int processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
||||||
virtual int processAddPeer(uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
virtual int processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
||||||
virtual int processModifyPeer(uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
virtual int processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ RpcProtoSystem::RpcProtoSystem(uint32_t serviceId)
|
|||||||
|
|
||||||
//RpcProtoSystem::msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
|
//RpcProtoSystem::msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
|
||||||
|
|
||||||
int RpcProtoSystem::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcProtoSystem::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
{
|
{
|
||||||
/* check the msgId */
|
/* check the msgId */
|
||||||
uint8_t topbyte = getRpcMsgIdExtension(msg_id);
|
uint8_t topbyte = getRpcMsgIdExtension(msg_id);
|
||||||
@ -84,7 +84,7 @@ int RpcProtoSystem::processMsg(uint32_t msg_id, uint32_t req_id, const std::stri
|
|||||||
switch(submsg)
|
switch(submsg)
|
||||||
{
|
{
|
||||||
case rsctrl::system::MsgId_RequestSystemStatus:
|
case rsctrl::system::MsgId_RequestSystemStatus:
|
||||||
processSystemStatus(msg_id, req_id, msg);
|
processSystemStatus(chan_id, msg_id, req_id, msg);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
std::cerr << "RpcProtoSystem::processMsg() ERROR should never get here";
|
std::cerr << "RpcProtoSystem::processMsg() ERROR should never get here";
|
||||||
@ -97,7 +97,7 @@ int RpcProtoSystem::processMsg(uint32_t msg_id, uint32_t req_id, const std::stri
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RpcProtoSystem::processSystemStatus(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
{
|
{
|
||||||
std::cerr << "RpcProtoSystem::processSystemStatus()";
|
std::cerr << "RpcProtoSystem::processSystemStatus()";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
@ -205,7 +205,7 @@ int RpcProtoSystem::processSystemStatus(uint32_t msg_id, uint32_t req_id, const
|
|||||||
rsctrl::system::MsgId_ResponseSystemStatus, true);
|
rsctrl::system::MsgId_ResponseSystemStatus, true);
|
||||||
|
|
||||||
// queue it.
|
// queue it.
|
||||||
queueResponse(out_msg_id, req_id, outmsg);
|
queueResponse(chan_id, out_msg_id, req_id, outmsg);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
@ -32,9 +32,9 @@ class RpcProtoSystem: public RpcQueueService
|
|||||||
public:
|
public:
|
||||||
RpcProtoSystem(uint32_t serviceId);
|
RpcProtoSystem(uint32_t serviceId);
|
||||||
// virtual msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
|
// virtual msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
|
||||||
virtual int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg);
|
virtual int processMsg(uint32_t chan_id, uint32_t msgId, uint32_t req_id, const std::string &msg);
|
||||||
|
|
||||||
virtual int processSystemStatus(uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
virtual int processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -38,9 +38,9 @@ RpcMediator::RpcMediator(RpcComms *c)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RpcMediator::reset()
|
void RpcMediator::reset(uint32_t chan_id)
|
||||||
{
|
{
|
||||||
mServer->reset();
|
mServer->reset(chan_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -57,6 +57,11 @@ int RpcMediator::tick()
|
|||||||
worked = true;
|
worked = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (mServer->checkEvents())
|
||||||
|
{
|
||||||
|
worked = true;
|
||||||
|
}
|
||||||
|
|
||||||
if (worked)
|
if (worked)
|
||||||
return 1;
|
return 1;
|
||||||
else
|
else
|
||||||
@ -68,19 +73,26 @@ int RpcMediator::tick()
|
|||||||
int RpcMediator::recv()
|
int RpcMediator::recv()
|
||||||
{
|
{
|
||||||
int recvd = 0;
|
int recvd = 0;
|
||||||
while(recv_msg())
|
|
||||||
|
std::list<uint32_t> chan_ids;
|
||||||
|
std::list<uint32_t>::iterator it;
|
||||||
|
mComms->active_channels(chan_ids);
|
||||||
|
for(it = chan_ids.begin(); it != chan_ids.end(); it++)
|
||||||
{
|
{
|
||||||
recvd = 1;
|
while(recv_msg(*it))
|
||||||
|
{
|
||||||
|
recvd = 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return recvd;
|
return recvd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RpcMediator::recv_msg()
|
int RpcMediator::recv_msg(uint32_t chan_id)
|
||||||
{
|
{
|
||||||
/* nothing in here needs a Mutex... */
|
/* nothing in here needs a Mutex... */
|
||||||
|
|
||||||
if (!mComms->recv_ready())
|
if (!mComms->recv_ready(chan_id))
|
||||||
{
|
{
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -99,14 +111,14 @@ int RpcMediator::recv_msg()
|
|||||||
std::cerr << "RpcMediator::recv_msg() get Header: " << bufsize;
|
std::cerr << "RpcMediator::recv_msg() get Header: " << bufsize;
|
||||||
std::cerr << " bytes" << std::endl;
|
std::cerr << " bytes" << std::endl;
|
||||||
|
|
||||||
int read = mComms->recv_blocking(buffer, bufsize);
|
int read = mComms->recv_blocking(chan_id, buffer, bufsize);
|
||||||
if (read != bufsize)
|
if (read != bufsize)
|
||||||
{
|
{
|
||||||
/* error */
|
/* error */
|
||||||
std::cerr << "RpcMediator::recv_msg() Error Reading Header: " << bufsize;
|
std::cerr << "RpcMediator::recv_msg() Error Reading Header: " << bufsize;
|
||||||
std::cerr << " bytes" << std::endl;
|
std::cerr << " bytes" << std::endl;
|
||||||
|
|
||||||
mComms->error("Failed to Recv Header");
|
mComms->error(chan_id, "Failed to Recv Header");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -116,11 +128,12 @@ int RpcMediator::recv_msg()
|
|||||||
std::cerr << "RpcMediator::recv_msg() Error Deserialising Header";
|
std::cerr << "RpcMediator::recv_msg() Error Deserialising Header";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
|
|
||||||
mComms->error("Failed to Deserialise Header");
|
mComms->error(chan_id, "Failed to Deserialise Header");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cerr << "RpcMediator::recv_msg() MsgId: " << msg_id;
|
std::cerr << "RpcMediator::recv_msg() ChanId: " << chan_id;
|
||||||
|
std::cerr << " MsgId: " << msg_id;
|
||||||
std::cerr << " ReqId: " << req_id;
|
std::cerr << " ReqId: " << req_id;
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
|
|
||||||
@ -128,27 +141,27 @@ int RpcMediator::recv_msg()
|
|||||||
std::cerr << " bytes" << std::endl;
|
std::cerr << " bytes" << std::endl;
|
||||||
|
|
||||||
/* grab real size */
|
/* grab real size */
|
||||||
read = mComms->recv_blocking(msg_body, msg_size);
|
read = mComms->recv_blocking(chan_id, msg_body, msg_size);
|
||||||
if (read != msg_size)
|
if (read != msg_size)
|
||||||
{
|
{
|
||||||
/* error */
|
/* error */
|
||||||
std::cerr << "RpcMediator::recv_msg() Error Reading Body: " << bufsize;
|
std::cerr << "RpcMediator::recv_msg() Error Reading Body: " << bufsize;
|
||||||
std::cerr << " bytes" << std::endl;
|
std::cerr << " bytes" << std::endl;
|
||||||
|
|
||||||
mComms->error("Failed to Recv MsgBody");
|
mComms->error(chan_id, "Failed to Recv MsgBody");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
mServer->processMsg(msg_id, req_id, msg_body);
|
mServer->processMsg(chan_id, msg_id, req_id, msg_body);
|
||||||
|
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RpcMediator::send(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcMediator::send(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
|
|
||||||
{
|
{
|
||||||
std::cerr << "RpcMediator::send(" << msg_id << "," << req_id << ", len(";
|
std::cerr << "RpcMediator::send(" << msg_id << "," << req_id << ", len(";
|
||||||
std::cerr << msg.size() << "))";
|
std::cerr << msg.size() << ")) on chan_id: " << chan_id;
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
|
|
||||||
uint8_t buffer[kMsgHeaderSize];
|
uint8_t buffer[kMsgHeaderSize];
|
||||||
@ -164,22 +177,22 @@ int RpcMediator::send(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!mComms->send(buffer, bufsize))
|
if (!mComms->send(chan_id, buffer, bufsize))
|
||||||
{
|
{
|
||||||
std::cerr << "RpcMediator::send() Send Header Failed";
|
std::cerr << "RpcMediator::send() Send Header Failed";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
/* error */
|
/* error */
|
||||||
mComms->error("Failed to Send Header");
|
mComms->error(chan_id, "Failed to Send Header");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* now send the body */
|
/* now send the body */
|
||||||
if (!mComms->send(msg))
|
if (!mComms->send(chan_id, msg))
|
||||||
{
|
{
|
||||||
std::cerr << "RpcMediator::send() Send Body Failed";
|
std::cerr << "RpcMediator::send() Send Body Failed";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
/* error */
|
/* error */
|
||||||
mComms->error("Failed to Send Msg");
|
mComms->error(chan_id, "Failed to Send Msg");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -43,12 +43,12 @@ public:
|
|||||||
void setRpcServer(RpcServer *s) { mServer = s; } /* Must only be called during setup */
|
void setRpcServer(RpcServer *s) { mServer = s; } /* Must only be called during setup */
|
||||||
|
|
||||||
// Overloaded from RpcSystem.
|
// Overloaded from RpcSystem.
|
||||||
virtual void reset();
|
virtual void reset(uint32_t chan_id);
|
||||||
virtual int tick();
|
virtual int tick();
|
||||||
|
|
||||||
int recv();
|
int recv();
|
||||||
int recv_msg();
|
int recv_msg(uint32_t chan_id);
|
||||||
int send(uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
int send(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RpcComms *mComms;
|
RpcComms *mComms;
|
||||||
|
@ -29,10 +29,10 @@ RpcEcho::RpcEcho(uint32_t serviceId)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int RpcEcho::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcEcho::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
{
|
{
|
||||||
/* */
|
/* */
|
||||||
queueResponse(msg_id, req_id, msg);
|
queueResponse(chan_id, msg_id, req_id, msg);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,7 +31,7 @@ class RpcEcho: public RpcQueueService
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
RpcEcho(uint32_t serviceId);
|
RpcEcho(uint32_t serviceId);
|
||||||
virtual int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg);
|
virtual int processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
@ -33,9 +33,9 @@ RpcServer::RpcServer(RpcMediator *med)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RpcServer::reset()
|
void RpcServer::reset(uint32_t chan_id)
|
||||||
{
|
{
|
||||||
std::cerr << "RpcServer::reset()" << std::endl;
|
std::cerr << "RpcServer::reset(" << chan_id << ")" << std::endl;
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mRpcMtx); /********** LOCKED MUTEX ***************/
|
RsStackMutex stack(mRpcMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
@ -43,7 +43,7 @@ void RpcServer::reset()
|
|||||||
for(it = mAllServices.begin(); it != mAllServices.end(); it++)
|
for(it = mAllServices.begin(); it != mAllServices.end(); it++)
|
||||||
{
|
{
|
||||||
/* in mutex, but should be okay */
|
/* in mutex, but should be okay */
|
||||||
(*it)->reset();
|
(*it)->reset(chan_id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// clear existing queue.
|
// clear existing queue.
|
||||||
@ -64,22 +64,24 @@ int RpcServer::addService(RpcService *service)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RpcServer::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcServer::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
{
|
{
|
||||||
std::cerr << "RpcServer::processMsg(" << msg_id << "," << req_id;
|
std::cerr << "RpcServer::processMsg(" << msg_id << "," << req_id;
|
||||||
std::cerr << ", len(" << msg.size() << "))" << std::endl;
|
std::cerr << ", len(" << msg.size() << ")) from channel: " << chan_id;
|
||||||
|
std::cerr << std::endl;
|
||||||
|
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mRpcMtx); /********** LOCKED MUTEX ***************/
|
RsStackMutex stack(mRpcMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
std::list<RpcService *>::iterator it;
|
std::list<RpcService *>::iterator it;
|
||||||
for(it = mAllServices.begin(); it != mAllServices.end(); it++)
|
for(it = mAllServices.begin(); it != mAllServices.end(); it++)
|
||||||
{
|
{
|
||||||
int rt = (*it)->processMsg(msg_id, req_id, msg);
|
int rt = (*it)->processMsg(chan_id, msg_id, req_id, msg);
|
||||||
if (!rt)
|
if (!rt)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
/* remember request */
|
/* remember request */
|
||||||
queueRequest_locked(msg_id, req_id, (*it));
|
queueRequest_locked(chan_id, msg_id, req_id, (*it));
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -89,7 +91,7 @@ int RpcServer::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &m
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int RpcServer::queueRequest_locked(uint32_t /* msgId */, uint32_t req_id, RpcService *service)
|
int RpcServer::queueRequest_locked(uint32_t /* chan_id */, uint32_t /* msgId */, uint32_t req_id, RpcService *service)
|
||||||
{
|
{
|
||||||
std::cerr << "RpcServer::queueRequest_locked() req_id: " << req_id;
|
std::cerr << "RpcServer::queueRequest_locked() req_id: " << req_id;
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
@ -116,18 +118,21 @@ bool RpcServer::checkPending()
|
|||||||
std::list<RpcQueuedObj>::iterator it;
|
std::list<RpcQueuedObj>::iterator it;
|
||||||
for(it = mRpcQueue.begin(); it != mRpcQueue.end();)
|
for(it = mRpcQueue.begin(); it != mRpcQueue.end();)
|
||||||
{
|
{
|
||||||
|
uint32_t out_chan_id = 0;
|
||||||
uint32_t out_msg_id = 0;
|
uint32_t out_msg_id = 0;
|
||||||
uint32_t out_req_id = it->mReqId;
|
uint32_t out_req_id = it->mReqId;
|
||||||
std::string out_msg;
|
std::string out_msg;
|
||||||
if (it->mService->getResponse(out_msg_id, out_req_id, out_msg))
|
if (it->mService->getResponse(out_chan_id, out_msg_id, out_req_id, out_msg))
|
||||||
{
|
{
|
||||||
std::cerr << "RpcServer::checkPending() Response: (";
|
std::cerr << "RpcServer::checkPending() Response: (";
|
||||||
std::cerr << out_msg_id << "," << out_req_id;
|
std::cerr << out_msg_id << "," << out_req_id;
|
||||||
std::cerr << ", len(" << out_msg.size() << "))";
|
std::cerr << ", len(" << out_msg.size() << "))";
|
||||||
|
std::cerr << " for chan_id: " << out_chan_id;
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
|
|
||||||
/* store and send after queue is processed */
|
/* store and send after queue is processed */
|
||||||
RpcQueuedMsg msg;
|
RpcQueuedMsg msg;
|
||||||
|
msg.mChanId = out_chan_id;
|
||||||
msg.mMsgId = out_msg_id;
|
msg.mMsgId = out_msg_id;
|
||||||
msg.mReqId = out_req_id;
|
msg.mReqId = out_req_id;
|
||||||
msg.mMsg = out_msg;
|
msg.mMsg = out_msg;
|
||||||
@ -153,6 +158,30 @@ bool RpcServer::checkPending()
|
|||||||
return someRemaining;
|
return someRemaining;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
bool RpcServer::checkEvents()
|
||||||
|
{
|
||||||
|
std::list<RpcQueuedMsg> msgsToSend;
|
||||||
|
bool someToSend = false;
|
||||||
|
|
||||||
|
{
|
||||||
|
RsStackMutex stack(mRpcMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
|
std::list<RpcService *>::iterator it;
|
||||||
|
for(it = mAllServices.begin(); it != mAllServices.end(); it++)
|
||||||
|
{
|
||||||
|
if ((*it)->getEvents(msgsToSend))
|
||||||
|
someToSend = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (someToSend)
|
||||||
|
{
|
||||||
|
sendQueuedMsgs(msgsToSend);
|
||||||
|
}
|
||||||
|
return someToSend;
|
||||||
|
}
|
||||||
|
|
||||||
bool RpcServer::sendQueuedMsgs(std::list<RpcQueuedMsg> &msgs)
|
bool RpcServer::sendQueuedMsgs(std::list<RpcQueuedMsg> &msgs)
|
||||||
{
|
{
|
||||||
/* No need for lock, as mOut is the only accessed item - and that has own protection */
|
/* No need for lock, as mOut is the only accessed item - and that has own protection */
|
||||||
@ -163,7 +192,7 @@ bool RpcServer::sendQueuedMsgs(std::list<RpcQueuedMsg> &msgs)
|
|||||||
std::list<RpcQueuedMsg>::iterator it;
|
std::list<RpcQueuedMsg>::iterator it;
|
||||||
for (it = msgs.begin(); it != msgs.end(); it++)
|
for (it = msgs.begin(); it != msgs.end(); it++)
|
||||||
{
|
{
|
||||||
mMediator->send(it->mMsgId, it->mReqId, it->mMsg);
|
mMediator->send(it->mChanId, it->mMsgId, it->mReqId, it->mMsg);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -181,16 +210,35 @@ RpcQueueService::RpcQueueService(uint32_t serviceId)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void RpcQueueService::reset()
|
void RpcQueueService::reset(uint32_t chan_id)
|
||||||
{
|
{
|
||||||
|
clearEventsForChannel(chan_id);
|
||||||
|
|
||||||
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
// clear existing queue.
|
std::list<uint32_t> toRemove;
|
||||||
mResponses.clear();
|
|
||||||
|
// iterate through and remove only chan_id items.
|
||||||
|
std::map<uint32_t, RpcQueuedMsg>::iterator mit;
|
||||||
|
for(mit = mResponses.begin(); mit != mResponses.end(); mit++)
|
||||||
|
{
|
||||||
|
if (mit->second.mChanId == chan_id)
|
||||||
|
toRemove.push_back(mit->first);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* remove items */
|
||||||
|
std::list<uint32_t>::iterator rit;
|
||||||
|
for(rit = toRemove.begin(); rit != toRemove.end(); rit++)
|
||||||
|
{
|
||||||
|
mit = mResponses.find(*rit);
|
||||||
|
mResponses.erase(mit);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int RpcQueueService::getResponse(uint32_t &msg_id, uint32_t &req_id, std::string &msg)
|
int RpcQueueService::getResponse(uint32_t &chan_id, uint32_t &msg_id, uint32_t &req_id, std::string &msg)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
@ -202,6 +250,7 @@ int RpcQueueService::getResponse(uint32_t &msg_id, uint32_t &req_id, std::string
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
chan_id = it->second.mChanId;
|
||||||
msg_id = it->second.mMsgId;
|
msg_id = it->second.mMsgId;
|
||||||
msg = it->second.mMsg;
|
msg = it->second.mMsg;
|
||||||
|
|
||||||
@ -210,11 +259,12 @@ int RpcQueueService::getResponse(uint32_t &msg_id, uint32_t &req_id, std::string
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int RpcQueueService::queueResponse(uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
int RpcQueueService::queueResponse(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
|
||||||
{
|
{
|
||||||
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
RpcQueuedMsg qmsg;
|
RpcQueuedMsg qmsg;
|
||||||
|
qmsg.mChanId = chan_id;
|
||||||
qmsg.mMsgId = msg_id;
|
qmsg.mMsgId = msg_id;
|
||||||
qmsg.mReqId = req_id;
|
qmsg.mReqId = req_id;
|
||||||
qmsg.mMsg = msg;
|
qmsg.mMsg = msg;
|
||||||
@ -225,6 +275,210 @@ int RpcQueueService::queueResponse(uint32_t msg_id, uint32_t req_id, const std::
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/********* Events & Registration ******/
|
||||||
|
|
||||||
|
|
||||||
|
int RpcQueueService::getEvents(std::list<RpcQueuedMsg> &events)
|
||||||
|
{
|
||||||
|
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
|
std::map<uint32_t, std::list<RpcEventRegister> >::iterator it;
|
||||||
|
for(it = mEventRegister.begin(); it != mEventRegister.end(); it++)
|
||||||
|
{
|
||||||
|
locked_checkForEvents(it->first, it->second, events);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (events.empty())
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int RpcQueueService::locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> ®istered, std::list<RpcQueuedMsg> &events)
|
||||||
|
{
|
||||||
|
std::cerr << "RpcQueueService::locked_checkForEvents() NOT IMPLEMENTED";
|
||||||
|
std::cerr << std::endl;
|
||||||
|
std::cerr << "\tRegistered for Event Type: " << event;
|
||||||
|
std::cerr << std::endl;
|
||||||
|
|
||||||
|
std::list<RpcEventRegister>::const_iterator it;
|
||||||
|
for(it = registered.begin(); it != registered.end(); it++)
|
||||||
|
{
|
||||||
|
std::cerr << "\t\t Channel ID: " << it->mChanId;
|
||||||
|
std::cerr << " Request ID: " << it->mReqId;
|
||||||
|
std::cerr << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int RpcQueueService::registerForEvents(uint32_t chan_id, uint32_t req_id, uint32_t event_id)
|
||||||
|
{
|
||||||
|
std::cerr << "RpcQueueService::registerForEvents(ChanId: " << chan_id;
|
||||||
|
std::cerr << ", ReqId: " << req_id;
|
||||||
|
std::cerr << ", EventId: " << event_id;
|
||||||
|
std::cerr << ")";
|
||||||
|
std::cerr << std::endl;
|
||||||
|
|
||||||
|
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
|
std::map<uint32_t, std::list<RpcEventRegister> >::iterator mit;
|
||||||
|
mit = mEventRegister.find(event_id);
|
||||||
|
if (mit == mEventRegister.end())
|
||||||
|
{
|
||||||
|
std::list<RpcEventRegister> emptyList;
|
||||||
|
mEventRegister[event_id] = emptyList;
|
||||||
|
|
||||||
|
mit = mEventRegister.find(event_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
RpcEventRegister reg;
|
||||||
|
reg.mChanId = chan_id;
|
||||||
|
reg.mReqId = req_id;
|
||||||
|
reg.mEventId = event_id;
|
||||||
|
|
||||||
|
mit->second.push_back(reg);
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int RpcQueueService::deregisterForEvents(uint32_t chan_id, uint32_t req_id, uint32_t event_id)
|
||||||
|
{
|
||||||
|
std::cerr << "RpcQueueService::deregisterForEvents(ChanId: " << chan_id;
|
||||||
|
std::cerr << ", ReqId: " << req_id;
|
||||||
|
std::cerr << ", EventId: " << event_id;
|
||||||
|
std::cerr << ")";
|
||||||
|
std::cerr << std::endl;
|
||||||
|
|
||||||
|
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
|
std::map<uint32_t, std::list<RpcEventRegister> >::iterator mit;
|
||||||
|
mit = mEventRegister.find(event_id);
|
||||||
|
if (mit == mEventRegister.end())
|
||||||
|
{
|
||||||
|
std::cerr << "RpcQueueService::deregisterForEvents() ";
|
||||||
|
std::cerr << "ERROR no EventId: " << event_id;
|
||||||
|
std::cerr << std::endl;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool removed = false;
|
||||||
|
std::list<RpcEventRegister>::iterator lit;
|
||||||
|
for(lit = mit->second.begin(); lit != mit->second.end();)
|
||||||
|
{
|
||||||
|
/* remove all matches */
|
||||||
|
if ((lit->mReqId == req_id) && (lit->mChanId == chan_id))
|
||||||
|
{
|
||||||
|
lit = mit->second.erase(lit);
|
||||||
|
if (removed == true)
|
||||||
|
{
|
||||||
|
std::cerr << "RpcQueueService::deregisterForEvents() ";
|
||||||
|
std::cerr << "WARNING REMOVING MULTIPLE MATCHES";
|
||||||
|
std::cerr << std::endl;
|
||||||
|
}
|
||||||
|
removed = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
lit++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mit->second.empty())
|
||||||
|
{
|
||||||
|
std::cerr << "RpcQueueService::deregisterForEvents() ";
|
||||||
|
std::cerr << " Last Registrant for Event, removing List from Map";
|
||||||
|
std::cerr << std::endl;
|
||||||
|
|
||||||
|
mEventRegister.erase(mit);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int RpcQueueService::clearEventsForChannel(uint32_t chan_id)
|
||||||
|
{
|
||||||
|
std::cerr << "RpcQueueService::clearEventsForChannel(" << chan_id;
|
||||||
|
std::cerr << ")";
|
||||||
|
std::cerr << std::endl;
|
||||||
|
|
||||||
|
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
|
std::list<uint32_t> toMapRemove;
|
||||||
|
|
||||||
|
std::map<uint32_t, std::list<RpcEventRegister> >::iterator mit;
|
||||||
|
for(mit = mEventRegister.begin(); mit != mEventRegister.end(); mit++)
|
||||||
|
{
|
||||||
|
std::list<RpcEventRegister>::iterator lit;
|
||||||
|
for(lit = mit->second.begin(); lit != mit->second.end();)
|
||||||
|
{
|
||||||
|
/* remove all matches */
|
||||||
|
if (lit->mChanId == chan_id)
|
||||||
|
{
|
||||||
|
std::cerr << "RpcQueueService::clearEventsForChannel() ";
|
||||||
|
std::cerr << " Removing ReqId: " << lit->mReqId;
|
||||||
|
std::cerr << " for EventId: " << lit->mEventId;
|
||||||
|
std::cerr << std::endl;
|
||||||
|
|
||||||
|
lit = mit->second.erase(lit);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
lit++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (mit->second.empty())
|
||||||
|
{
|
||||||
|
toMapRemove.push_back(mit->first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* remove any empty lists now */
|
||||||
|
std::list<uint32_t>::iterator rit;
|
||||||
|
for(rit = toMapRemove.begin(); rit != toMapRemove.end(); rit++)
|
||||||
|
{
|
||||||
|
mit = mEventRegister.find(*rit);
|
||||||
|
mEventRegister.erase(mit);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int RpcQueueService::printEventRegister(std::ostream &out)
|
||||||
|
{
|
||||||
|
out << "RpcQueueService::printEventRegister()";
|
||||||
|
out << std::endl;
|
||||||
|
|
||||||
|
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
|
||||||
|
|
||||||
|
std::list<uint32_t> toMapRemove;
|
||||||
|
|
||||||
|
std::map<uint32_t, std::list<RpcEventRegister> >::iterator mit;
|
||||||
|
for(mit = mEventRegister.begin(); mit != mEventRegister.end(); mit++)
|
||||||
|
{
|
||||||
|
out << "Event: " << mit->first;
|
||||||
|
out << std::endl;
|
||||||
|
|
||||||
|
std::list<RpcEventRegister>::iterator lit;
|
||||||
|
for(lit = mit->second.begin(); lit != mit->second.end(); lit++)
|
||||||
|
{
|
||||||
|
out << "\tRegistrant: ReqId: " << lit->mReqId;
|
||||||
|
out << "\t ChanId: " << lit->mChanId;
|
||||||
|
out << std::endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Lower 8 bits.
|
// Lower 8 bits.
|
||||||
uint8_t getRpcMsgIdSubMsg(uint32_t msg_id)
|
uint8_t getRpcMsgIdSubMsg(uint32_t msg_id)
|
||||||
{
|
{
|
||||||
|
@ -48,6 +48,7 @@ uint32_t constructMsgId(uint8_t ext, uint16_t service, uint8_t submsg, bool is_r
|
|||||||
class RpcQueuedMsg
|
class RpcQueuedMsg
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
uint32_t mChanId;
|
||||||
uint32_t mMsgId;
|
uint32_t mMsgId;
|
||||||
uint32_t mReqId;
|
uint32_t mReqId;
|
||||||
std::string mMsg;
|
std::string mMsg;
|
||||||
@ -58,26 +59,47 @@ class RpcService
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
RpcService(uint32_t /* serviceId */ ) { return; }
|
RpcService(uint32_t /* serviceId */ ) { return; }
|
||||||
virtual void reset() { return; }
|
virtual void reset(uint32_t /* chan_id */) { return; }
|
||||||
virtual int msgsAccepted(std::list<uint32_t> & /* msgIds */) { return 0; } /* not used at the moment */
|
virtual int msgsAccepted(std::list<uint32_t> & /* msgIds */) { return 0; } /* not used at the moment */
|
||||||
virtual int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg) = 0; /* returns 0 - not handled, > 0, accepted */
|
virtual int processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) = 0; /* returns 0 - not handled, > 0, accepted */
|
||||||
virtual int getResponse(uint32_t &msgId, uint32_t &req_id, std::string &msg) = 0; /* 0 - not ready, > 0 heres the response */
|
virtual int getResponse(uint32_t &chan_id, uint32_t &msgId, uint32_t &req_id, std::string &msg) = 0; /* 0 - not ready, > 0 heres the response */
|
||||||
|
|
||||||
|
virtual int getEvents(std::list<RpcQueuedMsg> & /* events */) { return 0; } /* 0 = none, optional feature */
|
||||||
};
|
};
|
||||||
|
|
||||||
|
class RpcEventRegister
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
uint32_t mChanId;
|
||||||
|
uint32_t mReqId;
|
||||||
|
uint32_t mEventId; // THIS IS A LOCAL PARAMETER, Service Specific
|
||||||
|
};
|
||||||
|
|
||||||
/* Implements a Queue for quick implementation of Instant Response Services */
|
/* Implements a Queue for quick implementation of Instant Response Services */
|
||||||
class RpcQueueService: public RpcService
|
class RpcQueueService: public RpcService
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
RpcQueueService(uint32_t serviceId);
|
RpcQueueService(uint32_t serviceId);
|
||||||
virtual void reset();
|
virtual void reset(uint32_t chan_id);
|
||||||
virtual int getResponse(uint32_t &msgId, uint32_t &req_id, std::string &msg);
|
virtual int getResponse(uint32_t &chan_id, uint32_t &msg_id, uint32_t &req_id, std::string &msg);
|
||||||
|
virtual int getEvents(std::list<RpcQueuedMsg> &events);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
int queueResponse(uint32_t msgId, uint32_t req_id, const std::string &msg);
|
int queueResponse(uint32_t chan_id, uint32_t msgId, uint32_t req_id, const std::string &msg);
|
||||||
|
|
||||||
|
virtual int locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> ®istered, std::list<RpcQueuedMsg> &events); // Overload for functionality.
|
||||||
|
|
||||||
|
int registerForEvents(uint32_t chan_id, uint32_t req_id, uint32_t event_id);
|
||||||
|
int deregisterForEvents(uint32_t chan_id, uint32_t req_id, uint32_t event_id);
|
||||||
|
|
||||||
|
int clearEventsForChannel(uint32_t chan_id);
|
||||||
|
int printEventRegister(std::ostream &out);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
RsMutex mQueueMtx;
|
RsMutex mQueueMtx;
|
||||||
|
|
||||||
std::map<uint32_t, RpcQueuedMsg> mResponses;
|
std::map<uint32_t, RpcQueuedMsg> mResponses;
|
||||||
|
std::map<uint32_t, std::list<RpcEventRegister> > mEventRegister;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -98,14 +120,15 @@ class RpcServer
|
|||||||
public:
|
public:
|
||||||
RpcServer(RpcMediator *med);
|
RpcServer(RpcMediator *med);
|
||||||
int addService(RpcService *service);
|
int addService(RpcService *service);
|
||||||
int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg);
|
int processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
|
||||||
bool checkPending();
|
bool checkPending();
|
||||||
|
bool checkEvents();
|
||||||
|
|
||||||
void reset();
|
void reset(uint32_t chan_id);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool sendQueuedMsgs(std::list<RpcQueuedMsg> &msgs);
|
bool sendQueuedMsgs(std::list<RpcQueuedMsg> &msgs);
|
||||||
int queueRequest_locked(uint32_t msgId, uint32_t req_id, RpcService *service);
|
int queueRequest_locked(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, RpcService *service);
|
||||||
|
|
||||||
RpcMediator *mMediator;
|
RpcMediator *mMediator;
|
||||||
|
|
||||||
|
@ -25,6 +25,7 @@
|
|||||||
#ifndef RS_RPC_SYSTEM_H
|
#ifndef RS_RPC_SYSTEM_H
|
||||||
#define RS_RPC_SYSTEM_H
|
#define RS_RPC_SYSTEM_H
|
||||||
|
|
||||||
|
#include <list>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <inttypes.h>
|
#include <inttypes.h>
|
||||||
|
|
||||||
@ -32,18 +33,20 @@ class RpcComms
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
virtual int isOkay() = 0;
|
virtual int isOkay() = 0;
|
||||||
virtual int error(std::string msg) = 0;
|
virtual int error(uint32_t chan_id, std::string msg) = 0;
|
||||||
|
|
||||||
virtual int recv_ready() = 0;
|
virtual int active_channels(std::list<uint32_t> &chan_ids) = 0;
|
||||||
virtual int recv(uint8_t *buffer, int bytes) = 0;
|
|
||||||
virtual int recv(std::string &buffer, int bytes) = 0;
|
virtual int recv_ready(uint32_t chan_id) = 0;
|
||||||
|
virtual int recv(uint32_t chan_id, uint8_t *buffer, int bytes) = 0;
|
||||||
|
virtual int recv(uint32_t chan_id, std::string &buffer, int bytes) = 0;
|
||||||
|
|
||||||
// these make it easier...
|
// these make it easier...
|
||||||
virtual int recv_blocking(uint8_t *buffer, int bytes) = 0;
|
virtual int recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes) = 0;
|
||||||
virtual int recv_blocking(std::string &buffer, int bytes) = 0;
|
virtual int recv_blocking(uint32_t chan_id, std::string &buffer, int bytes) = 0;
|
||||||
|
|
||||||
virtual int send(uint8_t *buffer, int bytes) = 0;
|
virtual int send(uint32_t chan_id, uint8_t *buffer, int bytes) = 0;
|
||||||
virtual int send(const std::string &buffer) = 0;
|
virtual int send(uint32_t chan_id, const std::string &buffer) = 0;
|
||||||
|
|
||||||
virtual int setSleepPeriods(float /* busy */, float /* idle */) { return 0; }
|
virtual int setSleepPeriods(float /* busy */, float /* idle */) { return 0; }
|
||||||
};
|
};
|
||||||
@ -53,7 +56,7 @@ class RpcSystem
|
|||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
/* this must be regularly ticked to update the display */
|
/* this must be regularly ticked to update the display */
|
||||||
virtual void reset() = 0;
|
virtual void reset(uint32_t chan_id) = 0;
|
||||||
virtual int tick() = 0;
|
virtual int tick() = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -471,7 +471,8 @@ int RsSshd::doRpcSystem()
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
mRpcSystem->reset(); // clear everything for new user.
|
uint32_t dummy_chan_id = 1;
|
||||||
|
mRpcSystem->reset(dummy_chan_id); // clear everything for new user.
|
||||||
|
|
||||||
bool okay = true;
|
bool okay = true;
|
||||||
while(okay)
|
while(okay)
|
||||||
@ -511,8 +512,18 @@ int RsSshd::isOkay()
|
|||||||
return (mState == RSSSHD_STATE_CONNECTED);
|
return (mState == RSSSHD_STATE_CONNECTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::list<uint32_t>::iterator it;
|
||||||
|
int RsSshd::active_channels(std::list<uint32_t> &chan_ids)
|
||||||
|
{
|
||||||
|
if (isOkay())
|
||||||
|
{
|
||||||
|
chan_ids.push_back(1); // dummy for now.
|
||||||
|
}
|
||||||
|
|
||||||
int RsSshd::error(std::string msg)
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int RsSshd::error(uint32_t chan_id, std::string msg)
|
||||||
{
|
{
|
||||||
std::cerr << "RsSshd::error(" << msg << ")";
|
std::cerr << "RsSshd::error(" << msg << ")";
|
||||||
std::cerr << std::endl;
|
std::cerr << std::endl;
|
||||||
@ -522,14 +533,14 @@ int RsSshd::error(std::string msg)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RsSshd::recv_ready()
|
int RsSshd::recv_ready(uint32_t chan_id)
|
||||||
{
|
{
|
||||||
int bytes = ssh_channel_poll(mChannel, 0);
|
int bytes = ssh_channel_poll(mChannel, 0);
|
||||||
return bytes;
|
return bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RsSshd::recv(uint8_t *buffer, int bytes)
|
int RsSshd::recv(uint32_t chan_id, uint8_t *buffer, int bytes)
|
||||||
{
|
{
|
||||||
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
||||||
int size = ssh_channel_read_nonblocking(mChannel, buffer, bytes, 0);
|
int size = ssh_channel_read_nonblocking(mChannel, buffer, bytes, 0);
|
||||||
@ -540,7 +551,7 @@ int RsSshd::recv(uint8_t *buffer, int bytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RsSshd::recv(std::string &buffer, int bytes)
|
int RsSshd::recv(uint32_t chan_id, std::string &buffer, int bytes)
|
||||||
{
|
{
|
||||||
char input[bytes];
|
char input[bytes];
|
||||||
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
||||||
@ -556,7 +567,7 @@ int RsSshd::recv(std::string &buffer, int bytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RsSshd::recv_blocking(uint8_t *buffer, int bytes)
|
int RsSshd::recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes)
|
||||||
{
|
{
|
||||||
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
||||||
int size = ssh_channel_read(mChannel, buffer, bytes, 0);
|
int size = ssh_channel_read(mChannel, buffer, bytes, 0);
|
||||||
@ -567,7 +578,7 @@ int RsSshd::recv_blocking(uint8_t *buffer, int bytes)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int RsSshd::recv_blocking(std::string &buffer, int bytes)
|
int RsSshd::recv_blocking(uint32_t chan_id, std::string &buffer, int bytes)
|
||||||
{
|
{
|
||||||
char input[bytes];
|
char input[bytes];
|
||||||
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
||||||
@ -582,7 +593,7 @@ int RsSshd::recv_blocking(std::string &buffer, int bytes)
|
|||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
int RsSshd::send(uint8_t *buffer, int bytes)
|
int RsSshd::send(uint32_t chan_id, uint8_t *buffer, int bytes)
|
||||||
{
|
{
|
||||||
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
||||||
ssh_channel_write(mChannel, buffer, bytes);
|
ssh_channel_write(mChannel, buffer, bytes);
|
||||||
@ -592,7 +603,7 @@ int RsSshd::send(uint8_t *buffer, int bytes)
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int RsSshd::send(const std::string &buffer)
|
int RsSshd::send(uint32_t chan_id, const std::string &buffer)
|
||||||
{
|
{
|
||||||
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
#if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0)
|
||||||
ssh_channel_write(mChannel, buffer.c_str(), buffer.size());
|
ssh_channel_write(mChannel, buffer.c_str(), buffer.size());
|
||||||
|
@ -83,17 +83,18 @@ int adduserpwdhash(std::string username, std::string hash);
|
|||||||
|
|
||||||
// RsComms Interface.
|
// RsComms Interface.
|
||||||
virtual int isOkay();
|
virtual int isOkay();
|
||||||
virtual int error(std::string msg);
|
virtual int error(uint32_t chan_id, std::string msg);
|
||||||
|
|
||||||
virtual int recv_ready();
|
virtual int active_channels(std::list<uint32_t> &chan_ids);
|
||||||
|
virtual int recv_ready(uint32_t chan_id);
|
||||||
|
|
||||||
virtual int recv(uint8_t *buffer, int bytes);
|
virtual int recv(uint32_t chan_id, uint8_t *buffer, int bytes);
|
||||||
virtual int recv(std::string &buffer, int bytes);
|
virtual int recv(uint32_t chan_id, std::string &buffer, int bytes);
|
||||||
virtual int recv_blocking(uint8_t *buffer, int bytes);
|
virtual int recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes);
|
||||||
virtual int recv_blocking(std::string &buffer, int bytes);
|
virtual int recv_blocking(uint32_t chan_id, std::string &buffer, int bytes);
|
||||||
|
|
||||||
virtual int send(uint8_t *buffer, int bytes);
|
virtual int send(uint32_t chan_id, uint8_t *buffer, int bytes);
|
||||||
virtual int send(const std::string &buffer);
|
virtual int send(uint32_t chan_id, const std::string &buffer);
|
||||||
|
|
||||||
virtual int setSleepPeriods(float busy, float idle);
|
virtual int setSleepPeriods(float busy, float idle);
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user