diff --git a/retroshare-nogui/src/menu/menu.cc b/retroshare-nogui/src/menu/menu.cc index bfe55e048..46852a4f3 100644 --- a/retroshare-nogui/src/menu/menu.cc +++ b/retroshare-nogui/src/menu/menu.cc @@ -37,7 +37,7 @@ */ // RsTermServer Interface. -void MenuInterface::reset() +void MenuInterface::reset(uint32_t /* chan_id */) { mBase->reset(); mCurrentMenu = mBase; @@ -59,7 +59,9 @@ int MenuInterface::tick() uint8_t keypress; std::string output; - int read = mComms->recv(&keypress, 1); + uint32_t chan_id = 1; // dummy - for menu system. + + int read = mComms->recv(chan_id, &keypress, 1); #ifdef MENU_DEBUG std::cerr << "MenuInterface::tick() read " << read << " bytes"; std::cerr << std::endl; @@ -78,7 +80,7 @@ int MenuInterface::tick() else { /* error, NON BLOCKING is handled by recv returning 0 */ - mComms->error("Bad Input"); + mComms->error(chan_id, "Bad Input"); return -1; } @@ -121,7 +123,7 @@ int MenuInterface::tick() if (output.size() > 0) { - mComms->send(output); + mComms->send(chan_id, output); } return (haveInput); diff --git a/retroshare-nogui/src/menu/menu.h b/retroshare-nogui/src/menu/menu.h index 0feaec1b5..c667e64dd 100644 --- a/retroshare-nogui/src/menu/menu.h +++ b/retroshare-nogui/src/menu/menu.h @@ -232,7 +232,7 @@ public: uint32_t drawHeader(uint32_t drawFlags, std::string &buffer); // RsSystem Interface. - virtual void reset(); + virtual void reset(uint32_t chan_id); virtual int tick(); diff --git a/retroshare-nogui/src/menu/stdiocomms.cc b/retroshare-nogui/src/menu/stdiocomms.cc index 08d1abb76..99d982616 100644 --- a/retroshare-nogui/src/menu/stdiocomms.cc +++ b/retroshare-nogui/src/menu/stdiocomms.cc @@ -66,7 +66,16 @@ int StdioComms::isOkay() } -int StdioComms::error(std::string msg) +int StdioComms::active_channels(std::list &chan_ids) +{ + if (isOkay()) + { + chan_ids.push_back(1); // only one possible here (stdin/stdout) + } + return 1; +} + +int StdioComms::error(uint32_t chan_id, std::string msg) { std::cerr << "StdioComms::error(" << msg << ")"; std::cerr << std::endl; @@ -75,14 +84,14 @@ int StdioComms::error(std::string msg) -int StdioComms::recv_ready() +int StdioComms::recv_ready(uint32_t chan_id) { /* should be proper poll / select! - but we don't use this at the moment */ return 1; } -int StdioComms::recv(uint8_t *buffer, int bytes) +int StdioComms::recv(uint32_t chan_id, uint8_t *buffer, int bytes) { int size = read(mIn, buffer, bytes); std::cerr << "StdioComms::recv() returned: " << size; @@ -102,7 +111,7 @@ int StdioComms::recv(uint8_t *buffer, int bytes) } -int StdioComms::recv(std::string &buffer, int bytes) +int StdioComms::recv(uint32_t chan_id, std::string &buffer, int bytes) { uint8_t tmpbuffer[bytes]; int size = read(mIn, tmpbuffer, bytes); @@ -114,7 +123,7 @@ int StdioComms::recv(std::string &buffer, int bytes) } // these make it easier... -int StdioComms::recv_blocking(uint8_t *buffer, int bytes) +int StdioComms::recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes) { int totalread = 0; while(totalread < bytes) @@ -137,10 +146,10 @@ int StdioComms::recv_blocking(uint8_t *buffer, int bytes) } -int StdioComms::recv_blocking(std::string &buffer, int bytes) +int StdioComms::recv_blocking(uint32_t chan_id, std::string &buffer, int bytes) { uint8_t tmpbuffer[bytes]; - int size = recv_blocking(tmpbuffer, bytes); + int size = recv_blocking(chan_id, tmpbuffer, bytes); if (size < 0) return size; // error. @@ -152,14 +161,14 @@ int StdioComms::recv_blocking(std::string &buffer, int bytes) } -int StdioComms::send(uint8_t *buffer, int bytes) +int StdioComms::send(uint32_t chan_id, uint8_t *buffer, int bytes) { write(mOut, buffer, bytes); return bytes; } -int StdioComms::send(const std::string &output) +int StdioComms::send(uint32_t chan_id, const std::string &output) { if (output.size() > 0) { diff --git a/retroshare-nogui/src/menu/stdiocomms.h b/retroshare-nogui/src/menu/stdiocomms.h index bad510d6c..bc559af4c 100644 --- a/retroshare-nogui/src/menu/stdiocomms.h +++ b/retroshare-nogui/src/menu/stdiocomms.h @@ -36,18 +36,20 @@ public: StdioComms(int infd, int outfd); virtual int isOkay(); - virtual int error(std::string msg); + virtual int error(uint32_t chan_id, std::string msg); - virtual int recv_ready(); - virtual int recv(uint8_t *buffer, int bytes); - virtual int recv(std::string &buffer, int bytes); + virtual int active_channels(std::list &chan_ids); + + virtual int recv_ready(uint32_t chan_id); + virtual int recv(uint32_t chan_id, uint8_t *buffer, int bytes); + virtual int recv(uint32_t chan_id, std::string &buffer, int bytes); // these make it easier... - virtual int recv_blocking(uint8_t *buffer, int bytes); - virtual int recv_blocking(std::string &buffer, int bytes); + virtual int recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes); + virtual int recv_blocking(uint32_t chan_id, std::string &buffer, int bytes); - virtual int send(uint8_t *buffer, int bytes); - virtual int send(const std::string &buffer); + virtual int send(uint32_t chan_id, uint8_t *buffer, int bytes); + virtual int send(uint32_t chan_id, const std::string &buffer); private: int mIn, mOut; diff --git a/retroshare-nogui/src/rpc/proto/rpcprotopeers.cc b/retroshare-nogui/src/rpc/proto/rpcprotopeers.cc index 7943f3248..aefe0312d 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotopeers.cc +++ b/retroshare-nogui/src/rpc/proto/rpcprotopeers.cc @@ -38,7 +38,7 @@ RpcProtoPeers::RpcProtoPeers(uint32_t serviceId) //RpcProtoPeers::msgsAccepted(std::list &msgIds); /* not used at the moment */ -int RpcProtoPeers::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoPeers::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { /* check the msgId */ uint8_t topbyte = getRpcMsgIdExtension(msg_id); @@ -83,13 +83,13 @@ int RpcProtoPeers::processMsg(uint32_t msg_id, uint32_t req_id, const std::strin switch(submsg) { case rsctrl::peers::MsgId_RequestPeers: - processRequestPeers(msg_id, req_id, msg); + processRequestPeers(chan_id, msg_id, req_id, msg); break; case rsctrl::peers::MsgId_RequestAddPeer: - processAddPeer(msg_id, req_id, msg); + processAddPeer(chan_id, msg_id, req_id, msg); break; case rsctrl::peers::MsgId_RequestModifyPeer: - processModifyPeer(msg_id, req_id, msg); + processModifyPeer(chan_id, msg_id, req_id, msg); break; default: std::cerr << "RpcProtoPeers::processMsg() ERROR should never get here"; @@ -102,7 +102,7 @@ int RpcProtoPeers::processMsg(uint32_t msg_id, uint32_t req_id, const std::strin } -int RpcProtoPeers::processAddPeer(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoPeers::processAddPeer() NOT FINISHED"; std::cerr << std::endl; @@ -136,13 +136,13 @@ int RpcProtoPeers::processAddPeer(uint32_t msg_id, uint32_t req_id, const std::s rsctrl::peers::MsgId_ResponseAddPeer, true); // queue it. - queueResponse(out_msg_id, req_id, outmsg); + queueResponse(chan_id, out_msg_id, req_id, outmsg); return 1; } -int RpcProtoPeers::processModifyPeer(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoPeers::processModifyPeer() NOT FINISHED"; std::cerr << std::endl; @@ -177,14 +177,14 @@ int RpcProtoPeers::processModifyPeer(uint32_t msg_id, uint32_t req_id, const std rsctrl::peers::MsgId_ResponseModifyPeer, true); // queue it. - queueResponse(out_msg_id, req_id, outmsg); + queueResponse(chan_id, out_msg_id, req_id, outmsg); return 1; } -int RpcProtoPeers::processRequestPeers(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoPeers::processRequestPeers()"; std::cerr << std::endl; @@ -421,7 +421,7 @@ int RpcProtoPeers::processRequestPeers(uint32_t msg_id, uint32_t req_id, const s rsctrl::peers::MsgId_ResponsePeerList, true); // queue it. - queueResponse(out_msg_id, req_id, outmsg); + queueResponse(chan_id, out_msg_id, req_id, outmsg); return 1; } diff --git a/retroshare-nogui/src/rpc/proto/rpcprotopeers.h b/retroshare-nogui/src/rpc/proto/rpcprotopeers.h index 969aa1a59..92e91d88a 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotopeers.h +++ b/retroshare-nogui/src/rpc/proto/rpcprotopeers.h @@ -32,11 +32,11 @@ class RpcProtoPeers: public RpcQueueService public: RpcProtoPeers(uint32_t serviceId); // virtual msgsAccepted(std::list &msgIds); /* not used at the moment */ - virtual int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg); + virtual int processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); - virtual int processRequestPeers(uint32_t msg_id, uint32_t req_id, const std::string &msg); - virtual int processAddPeer(uint32_t msg_id, uint32_t req_id, const std::string &msg); - virtual int processModifyPeer(uint32_t msg_id, uint32_t req_id, const std::string &msg); + virtual int processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + virtual int processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + virtual int processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); }; diff --git a/retroshare-nogui/src/rpc/proto/rpcprotosystem.cc b/retroshare-nogui/src/rpc/proto/rpcprotosystem.cc index abdc7d626..29c7c5cbc 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotosystem.cc +++ b/retroshare-nogui/src/rpc/proto/rpcprotosystem.cc @@ -39,7 +39,7 @@ RpcProtoSystem::RpcProtoSystem(uint32_t serviceId) //RpcProtoSystem::msgsAccepted(std::list &msgIds); /* not used at the moment */ -int RpcProtoSystem::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoSystem::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { /* check the msgId */ uint8_t topbyte = getRpcMsgIdExtension(msg_id); @@ -84,7 +84,7 @@ int RpcProtoSystem::processMsg(uint32_t msg_id, uint32_t req_id, const std::stri switch(submsg) { case rsctrl::system::MsgId_RequestSystemStatus: - processSystemStatus(msg_id, req_id, msg); + processSystemStatus(chan_id, msg_id, req_id, msg); break; default: std::cerr << "RpcProtoSystem::processMsg() ERROR should never get here"; @@ -97,7 +97,7 @@ int RpcProtoSystem::processMsg(uint32_t msg_id, uint32_t req_id, const std::stri } -int RpcProtoSystem::processSystemStatus(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoSystem::processSystemStatus()"; std::cerr << std::endl; @@ -205,7 +205,7 @@ int RpcProtoSystem::processSystemStatus(uint32_t msg_id, uint32_t req_id, const rsctrl::system::MsgId_ResponseSystemStatus, true); // queue it. - queueResponse(out_msg_id, req_id, outmsg); + queueResponse(chan_id, out_msg_id, req_id, outmsg); return 1; } diff --git a/retroshare-nogui/src/rpc/proto/rpcprotosystem.h b/retroshare-nogui/src/rpc/proto/rpcprotosystem.h index 57dfcb1de..db29cf207 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotosystem.h +++ b/retroshare-nogui/src/rpc/proto/rpcprotosystem.h @@ -32,9 +32,9 @@ class RpcProtoSystem: public RpcQueueService public: RpcProtoSystem(uint32_t serviceId); // virtual msgsAccepted(std::list &msgIds); /* not used at the moment */ - virtual int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg); + virtual int processMsg(uint32_t chan_id, uint32_t msgId, uint32_t req_id, const std::string &msg); - virtual int processSystemStatus(uint32_t msg_id, uint32_t req_id, const std::string &msg); + virtual int processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); }; diff --git a/retroshare-nogui/src/rpc/rpc.cc b/retroshare-nogui/src/rpc/rpc.cc index 368baf22e..6fd5a0b74 100644 --- a/retroshare-nogui/src/rpc/rpc.cc +++ b/retroshare-nogui/src/rpc/rpc.cc @@ -38,9 +38,9 @@ RpcMediator::RpcMediator(RpcComms *c) return; } -void RpcMediator::reset() +void RpcMediator::reset(uint32_t chan_id) { - mServer->reset(); + mServer->reset(chan_id); } @@ -57,6 +57,11 @@ int RpcMediator::tick() worked = true; } + if (mServer->checkEvents()) + { + worked = true; + } + if (worked) return 1; else @@ -68,19 +73,26 @@ int RpcMediator::tick() int RpcMediator::recv() { int recvd = 0; - while(recv_msg()) + + std::list chan_ids; + std::list::iterator it; + mComms->active_channels(chan_ids); + for(it = chan_ids.begin(); it != chan_ids.end(); it++) { - recvd = 1; + while(recv_msg(*it)) + { + recvd = 1; + } } return recvd; } -int RpcMediator::recv_msg() +int RpcMediator::recv_msg(uint32_t chan_id) { /* nothing in here needs a Mutex... */ - if (!mComms->recv_ready()) + if (!mComms->recv_ready(chan_id)) { return 0; } @@ -99,14 +111,14 @@ int RpcMediator::recv_msg() std::cerr << "RpcMediator::recv_msg() get Header: " << bufsize; std::cerr << " bytes" << std::endl; - int read = mComms->recv_blocking(buffer, bufsize); + int read = mComms->recv_blocking(chan_id, buffer, bufsize); if (read != bufsize) { /* error */ std::cerr << "RpcMediator::recv_msg() Error Reading Header: " << bufsize; std::cerr << " bytes" << std::endl; - mComms->error("Failed to Recv Header"); + mComms->error(chan_id, "Failed to Recv Header"); return 0; } @@ -116,11 +128,12 @@ int RpcMediator::recv_msg() std::cerr << "RpcMediator::recv_msg() Error Deserialising Header"; std::cerr << std::endl; - mComms->error("Failed to Deserialise Header"); + mComms->error(chan_id, "Failed to Deserialise Header"); return 0; } - std::cerr << "RpcMediator::recv_msg() MsgId: " << msg_id; + std::cerr << "RpcMediator::recv_msg() ChanId: " << chan_id; + std::cerr << " MsgId: " << msg_id; std::cerr << " ReqId: " << req_id; std::cerr << std::endl; @@ -128,27 +141,27 @@ int RpcMediator::recv_msg() std::cerr << " bytes" << std::endl; /* grab real size */ - read = mComms->recv_blocking(msg_body, msg_size); + read = mComms->recv_blocking(chan_id, msg_body, msg_size); if (read != msg_size) { /* error */ std::cerr << "RpcMediator::recv_msg() Error Reading Body: " << bufsize; std::cerr << " bytes" << std::endl; - mComms->error("Failed to Recv MsgBody"); + mComms->error(chan_id, "Failed to Recv MsgBody"); return 0; } - mServer->processMsg(msg_id, req_id, msg_body); + mServer->processMsg(chan_id, msg_id, req_id, msg_body); return 1; } -int RpcMediator::send(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcMediator::send(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { std::cerr << "RpcMediator::send(" << msg_id << "," << req_id << ", len("; - std::cerr << msg.size() << "))"; + std::cerr << msg.size() << ")) on chan_id: " << chan_id; std::cerr << std::endl; uint8_t buffer[kMsgHeaderSize]; @@ -164,22 +177,22 @@ int RpcMediator::send(uint32_t msg_id, uint32_t req_id, const std::string &msg) return 0; } - if (!mComms->send(buffer, bufsize)) + if (!mComms->send(chan_id, buffer, bufsize)) { std::cerr << "RpcMediator::send() Send Header Failed"; std::cerr << std::endl; /* error */ - mComms->error("Failed to Send Header"); + mComms->error(chan_id, "Failed to Send Header"); return 0; } /* now send the body */ - if (!mComms->send(msg)) + if (!mComms->send(chan_id, msg)) { std::cerr << "RpcMediator::send() Send Body Failed"; std::cerr << std::endl; /* error */ - mComms->error("Failed to Send Msg"); + mComms->error(chan_id, "Failed to Send Msg"); return 0; } return 1; diff --git a/retroshare-nogui/src/rpc/rpc.h b/retroshare-nogui/src/rpc/rpc.h index ff1817cd1..33e9dec9e 100644 --- a/retroshare-nogui/src/rpc/rpc.h +++ b/retroshare-nogui/src/rpc/rpc.h @@ -43,12 +43,12 @@ public: void setRpcServer(RpcServer *s) { mServer = s; } /* Must only be called during setup */ // Overloaded from RpcSystem. -virtual void reset(); +virtual void reset(uint32_t chan_id); virtual int tick(); int recv(); - int recv_msg(); - int send(uint32_t msg_id, uint32_t req_id, const std::string &msg); + int recv_msg(uint32_t chan_id); + int send(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); private: RpcComms *mComms; diff --git a/retroshare-nogui/src/rpc/rpcecho.cc b/retroshare-nogui/src/rpc/rpcecho.cc index ff01ccaf8..b11220bb8 100644 --- a/retroshare-nogui/src/rpc/rpcecho.cc +++ b/retroshare-nogui/src/rpc/rpcecho.cc @@ -29,10 +29,10 @@ RpcEcho::RpcEcho(uint32_t serviceId) return; } -int RpcEcho::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcEcho::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { /* */ - queueResponse(msg_id, req_id, msg); + queueResponse(chan_id, msg_id, req_id, msg); return 1; } diff --git a/retroshare-nogui/src/rpc/rpcecho.h b/retroshare-nogui/src/rpc/rpcecho.h index a72871382..b34446310 100644 --- a/retroshare-nogui/src/rpc/rpcecho.h +++ b/retroshare-nogui/src/rpc/rpcecho.h @@ -31,7 +31,7 @@ class RpcEcho: public RpcQueueService { public: RpcEcho(uint32_t serviceId); - virtual int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg); + virtual int processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); }; diff --git a/retroshare-nogui/src/rpc/rpcserver.cc b/retroshare-nogui/src/rpc/rpcserver.cc index 0f167d1da..4b796097b 100644 --- a/retroshare-nogui/src/rpc/rpcserver.cc +++ b/retroshare-nogui/src/rpc/rpcserver.cc @@ -33,9 +33,9 @@ RpcServer::RpcServer(RpcMediator *med) } -void RpcServer::reset() +void RpcServer::reset(uint32_t chan_id) { - std::cerr << "RpcServer::reset()" << std::endl; + std::cerr << "RpcServer::reset(" << chan_id << ")" << std::endl; { RsStackMutex stack(mRpcMtx); /********** LOCKED MUTEX ***************/ @@ -43,7 +43,7 @@ void RpcServer::reset() for(it = mAllServices.begin(); it != mAllServices.end(); it++) { /* in mutex, but should be okay */ - (*it)->reset(); + (*it)->reset(chan_id); } // clear existing queue. @@ -64,22 +64,24 @@ int RpcServer::addService(RpcService *service) } -int RpcServer::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcServer::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { std::cerr << "RpcServer::processMsg(" << msg_id << "," << req_id; - std::cerr << ", len(" << msg.size() << "))" << std::endl; + std::cerr << ", len(" << msg.size() << ")) from channel: " << chan_id; + std::cerr << std::endl; + { RsStackMutex stack(mRpcMtx); /********** LOCKED MUTEX ***************/ std::list::iterator it; for(it = mAllServices.begin(); it != mAllServices.end(); it++) { - int rt = (*it)->processMsg(msg_id, req_id, msg); + int rt = (*it)->processMsg(chan_id, msg_id, req_id, msg); if (!rt) continue; /* remember request */ - queueRequest_locked(msg_id, req_id, (*it)); + queueRequest_locked(chan_id, msg_id, req_id, (*it)); return 1; } } @@ -89,7 +91,7 @@ int RpcServer::processMsg(uint32_t msg_id, uint32_t req_id, const std::string &m return 0; } -int RpcServer::queueRequest_locked(uint32_t /* msgId */, uint32_t req_id, RpcService *service) +int RpcServer::queueRequest_locked(uint32_t /* chan_id */, uint32_t /* msgId */, uint32_t req_id, RpcService *service) { std::cerr << "RpcServer::queueRequest_locked() req_id: " << req_id; std::cerr << std::endl; @@ -116,18 +118,21 @@ bool RpcServer::checkPending() std::list::iterator it; for(it = mRpcQueue.begin(); it != mRpcQueue.end();) { + uint32_t out_chan_id = 0; uint32_t out_msg_id = 0; uint32_t out_req_id = it->mReqId; std::string out_msg; - if (it->mService->getResponse(out_msg_id, out_req_id, out_msg)) + if (it->mService->getResponse(out_chan_id, out_msg_id, out_req_id, out_msg)) { std::cerr << "RpcServer::checkPending() Response: ("; std::cerr << out_msg_id << "," << out_req_id; std::cerr << ", len(" << out_msg.size() << "))"; + std::cerr << " for chan_id: " << out_chan_id; std::cerr << std::endl; /* store and send after queue is processed */ RpcQueuedMsg msg; + msg.mChanId = out_chan_id; msg.mMsgId = out_msg_id; msg.mReqId = out_req_id; msg.mMsg = out_msg; @@ -153,6 +158,30 @@ bool RpcServer::checkPending() return someRemaining; } + +bool RpcServer::checkEvents() +{ + std::list msgsToSend; + bool someToSend = false; + + { + RsStackMutex stack(mRpcMtx); /********** LOCKED MUTEX ***************/ + + std::list::iterator it; + for(it = mAllServices.begin(); it != mAllServices.end(); it++) + { + if ((*it)->getEvents(msgsToSend)) + someToSend = true; + } + } + + if (someToSend) + { + sendQueuedMsgs(msgsToSend); + } + return someToSend; +} + bool RpcServer::sendQueuedMsgs(std::list &msgs) { /* No need for lock, as mOut is the only accessed item - and that has own protection */ @@ -163,7 +192,7 @@ bool RpcServer::sendQueuedMsgs(std::list &msgs) std::list::iterator it; for (it = msgs.begin(); it != msgs.end(); it++) { - mMediator->send(it->mMsgId, it->mReqId, it->mMsg); + mMediator->send(it->mChanId, it->mMsgId, it->mReqId, it->mMsg); } return true; } @@ -181,16 +210,35 @@ RpcQueueService::RpcQueueService(uint32_t serviceId) } -void RpcQueueService::reset() +void RpcQueueService::reset(uint32_t chan_id) { + clearEventsForChannel(chan_id); + RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ - // clear existing queue. - mResponses.clear(); + std::list toRemove; + + // iterate through and remove only chan_id items. + std::map::iterator mit; + for(mit = mResponses.begin(); mit != mResponses.end(); mit++) + { + if (mit->second.mChanId == chan_id) + toRemove.push_back(mit->first); + } + + /* remove items */ + std::list::iterator rit; + for(rit = toRemove.begin(); rit != toRemove.end(); rit++) + { + mit = mResponses.find(*rit); + mResponses.erase(mit); + } + + return; } -int RpcQueueService::getResponse(uint32_t &msg_id, uint32_t &req_id, std::string &msg) +int RpcQueueService::getResponse(uint32_t &chan_id, uint32_t &msg_id, uint32_t &req_id, std::string &msg) { RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ @@ -202,6 +250,7 @@ int RpcQueueService::getResponse(uint32_t &msg_id, uint32_t &req_id, std::string return 0; } + chan_id = it->second.mChanId; msg_id = it->second.mMsgId; msg = it->second.mMsg; @@ -210,11 +259,12 @@ int RpcQueueService::getResponse(uint32_t &msg_id, uint32_t &req_id, std::string return 1; } -int RpcQueueService::queueResponse(uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcQueueService::queueResponse(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) { RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ RpcQueuedMsg qmsg; + qmsg.mChanId = chan_id; qmsg.mMsgId = msg_id; qmsg.mReqId = req_id; qmsg.mMsg = msg; @@ -225,6 +275,210 @@ int RpcQueueService::queueResponse(uint32_t msg_id, uint32_t req_id, const std:: } + + +/********* Events & Registration ******/ + + +int RpcQueueService::getEvents(std::list &events) +{ + RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ + + std::map >::iterator it; + for(it = mEventRegister.begin(); it != mEventRegister.end(); it++) + { + locked_checkForEvents(it->first, it->second, events); + } + + if (events.empty()) + { + return 0; + } + return 1; +} + +int RpcQueueService::locked_checkForEvents(uint32_t event, const std::list ®istered, std::list &events) +{ + std::cerr << "RpcQueueService::locked_checkForEvents() NOT IMPLEMENTED"; + std::cerr << std::endl; + std::cerr << "\tRegistered for Event Type: " << event; + std::cerr << std::endl; + + std::list::const_iterator it; + for(it = registered.begin(); it != registered.end(); it++) + { + std::cerr << "\t\t Channel ID: " << it->mChanId; + std::cerr << " Request ID: " << it->mReqId; + std::cerr << std::endl; + } + + return 1; +} + +int RpcQueueService::registerForEvents(uint32_t chan_id, uint32_t req_id, uint32_t event_id) +{ + std::cerr << "RpcQueueService::registerForEvents(ChanId: " << chan_id; + std::cerr << ", ReqId: " << req_id; + std::cerr << ", EventId: " << event_id; + std::cerr << ")"; + std::cerr << std::endl; + + RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ + + std::map >::iterator mit; + mit = mEventRegister.find(event_id); + if (mit == mEventRegister.end()) + { + std::list emptyList; + mEventRegister[event_id] = emptyList; + + mit = mEventRegister.find(event_id); + } + + RpcEventRegister reg; + reg.mChanId = chan_id; + reg.mReqId = req_id; + reg.mEventId = event_id; + + mit->second.push_back(reg); + + return 1; +} + +int RpcQueueService::deregisterForEvents(uint32_t chan_id, uint32_t req_id, uint32_t event_id) +{ + std::cerr << "RpcQueueService::deregisterForEvents(ChanId: " << chan_id; + std::cerr << ", ReqId: " << req_id; + std::cerr << ", EventId: " << event_id; + std::cerr << ")"; + std::cerr << std::endl; + + RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ + + std::map >::iterator mit; + mit = mEventRegister.find(event_id); + if (mit == mEventRegister.end()) + { + std::cerr << "RpcQueueService::deregisterForEvents() "; + std::cerr << "ERROR no EventId: " << event_id; + std::cerr << std::endl; + + return 0; + } + + bool removed = false; + std::list::iterator lit; + for(lit = mit->second.begin(); lit != mit->second.end();) + { + /* remove all matches */ + if ((lit->mReqId == req_id) && (lit->mChanId == chan_id)) + { + lit = mit->second.erase(lit); + if (removed == true) + { + std::cerr << "RpcQueueService::deregisterForEvents() "; + std::cerr << "WARNING REMOVING MULTIPLE MATCHES"; + std::cerr << std::endl; + } + removed = true; + } + else + { + lit++; + } + } + + if (mit->second.empty()) + { + std::cerr << "RpcQueueService::deregisterForEvents() "; + std::cerr << " Last Registrant for Event, removing List from Map"; + std::cerr << std::endl; + + mEventRegister.erase(mit); + } + + return 1; +} + + +int RpcQueueService::clearEventsForChannel(uint32_t chan_id) +{ + std::cerr << "RpcQueueService::clearEventsForChannel(" << chan_id; + std::cerr << ")"; + std::cerr << std::endl; + + RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ + + std::list toMapRemove; + + std::map >::iterator mit; + for(mit = mEventRegister.begin(); mit != mEventRegister.end(); mit++) + { + std::list::iterator lit; + for(lit = mit->second.begin(); lit != mit->second.end();) + { + /* remove all matches */ + if (lit->mChanId == chan_id) + { + std::cerr << "RpcQueueService::clearEventsForChannel() "; + std::cerr << " Removing ReqId: " << lit->mReqId; + std::cerr << " for EventId: " << lit->mEventId; + std::cerr << std::endl; + + lit = mit->second.erase(lit); + } + else + { + lit++; + } + } + if (mit->second.empty()) + { + toMapRemove.push_back(mit->first); + } + } + + /* remove any empty lists now */ + std::list::iterator rit; + for(rit = toMapRemove.begin(); rit != toMapRemove.end(); rit++) + { + mit = mEventRegister.find(*rit); + mEventRegister.erase(mit); + } + + return 1; +} + + +int RpcQueueService::printEventRegister(std::ostream &out) +{ + out << "RpcQueueService::printEventRegister()"; + out << std::endl; + + RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ + + std::list toMapRemove; + + std::map >::iterator mit; + for(mit = mEventRegister.begin(); mit != mEventRegister.end(); mit++) + { + out << "Event: " << mit->first; + out << std::endl; + + std::list::iterator lit; + for(lit = mit->second.begin(); lit != mit->second.end(); lit++) + { + out << "\tRegistrant: ReqId: " << lit->mReqId; + out << "\t ChanId: " << lit->mChanId; + out << std::endl; + } + } + return 1; +} + + + + // Lower 8 bits. uint8_t getRpcMsgIdSubMsg(uint32_t msg_id) { diff --git a/retroshare-nogui/src/rpc/rpcserver.h b/retroshare-nogui/src/rpc/rpcserver.h index 4d506ef36..f92b4b5aa 100644 --- a/retroshare-nogui/src/rpc/rpcserver.h +++ b/retroshare-nogui/src/rpc/rpcserver.h @@ -48,6 +48,7 @@ uint32_t constructMsgId(uint8_t ext, uint16_t service, uint8_t submsg, bool is_r class RpcQueuedMsg { public: + uint32_t mChanId; uint32_t mMsgId; uint32_t mReqId; std::string mMsg; @@ -58,26 +59,47 @@ class RpcService { public: RpcService(uint32_t /* serviceId */ ) { return; } - virtual void reset() { return; } + virtual void reset(uint32_t /* chan_id */) { return; } virtual int msgsAccepted(std::list & /* msgIds */) { return 0; } /* not used at the moment */ - virtual int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg) = 0; /* returns 0 - not handled, > 0, accepted */ - virtual int getResponse(uint32_t &msgId, uint32_t &req_id, std::string &msg) = 0; /* 0 - not ready, > 0 heres the response */ + virtual int processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) = 0; /* returns 0 - not handled, > 0, accepted */ + virtual int getResponse(uint32_t &chan_id, uint32_t &msgId, uint32_t &req_id, std::string &msg) = 0; /* 0 - not ready, > 0 heres the response */ + + virtual int getEvents(std::list & /* events */) { return 0; } /* 0 = none, optional feature */ }; +class RpcEventRegister +{ + public: + uint32_t mChanId; + uint32_t mReqId; + uint32_t mEventId; // THIS IS A LOCAL PARAMETER, Service Specific +}; /* Implements a Queue for quick implementation of Instant Response Services */ class RpcQueueService: public RpcService { public: RpcQueueService(uint32_t serviceId); -virtual void reset(); -virtual int getResponse(uint32_t &msgId, uint32_t &req_id, std::string &msg); +virtual void reset(uint32_t chan_id); +virtual int getResponse(uint32_t &chan_id, uint32_t &msg_id, uint32_t &req_id, std::string &msg); +virtual int getEvents(std::list &events); protected: - int queueResponse(uint32_t msgId, uint32_t req_id, const std::string &msg); + int queueResponse(uint32_t chan_id, uint32_t msgId, uint32_t req_id, const std::string &msg); + +virtual int locked_checkForEvents(uint32_t event, const std::list ®istered, std::list &events); // Overload for functionality. + + int registerForEvents(uint32_t chan_id, uint32_t req_id, uint32_t event_id); + int deregisterForEvents(uint32_t chan_id, uint32_t req_id, uint32_t event_id); + + int clearEventsForChannel(uint32_t chan_id); + int printEventRegister(std::ostream &out); + private: RsMutex mQueueMtx; + std::map mResponses; + std::map > mEventRegister; }; @@ -98,14 +120,15 @@ class RpcServer public: RpcServer(RpcMediator *med); int addService(RpcService *service); - int processMsg(uint32_t msgId, uint32_t req_id, const std::string &msg); + int processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); bool checkPending(); + bool checkEvents(); - void reset(); + void reset(uint32_t chan_id); private: bool sendQueuedMsgs(std::list &msgs); - int queueRequest_locked(uint32_t msgId, uint32_t req_id, RpcService *service); + int queueRequest_locked(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, RpcService *service); RpcMediator *mMediator; diff --git a/retroshare-nogui/src/rpcsystem.h b/retroshare-nogui/src/rpcsystem.h index db81d5702..dd1ccfc43 100644 --- a/retroshare-nogui/src/rpcsystem.h +++ b/retroshare-nogui/src/rpcsystem.h @@ -25,6 +25,7 @@ #ifndef RS_RPC_SYSTEM_H #define RS_RPC_SYSTEM_H +#include #include #include @@ -32,18 +33,20 @@ class RpcComms { public: virtual int isOkay() = 0; - virtual int error(std::string msg) = 0; + virtual int error(uint32_t chan_id, std::string msg) = 0; - virtual int recv_ready() = 0; - virtual int recv(uint8_t *buffer, int bytes) = 0; - virtual int recv(std::string &buffer, int bytes) = 0; + virtual int active_channels(std::list &chan_ids) = 0; + + virtual int recv_ready(uint32_t chan_id) = 0; + virtual int recv(uint32_t chan_id, uint8_t *buffer, int bytes) = 0; + virtual int recv(uint32_t chan_id, std::string &buffer, int bytes) = 0; // these make it easier... - virtual int recv_blocking(uint8_t *buffer, int bytes) = 0; - virtual int recv_blocking(std::string &buffer, int bytes) = 0; + virtual int recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes) = 0; + virtual int recv_blocking(uint32_t chan_id, std::string &buffer, int bytes) = 0; - virtual int send(uint8_t *buffer, int bytes) = 0; - virtual int send(const std::string &buffer) = 0; + virtual int send(uint32_t chan_id, uint8_t *buffer, int bytes) = 0; + virtual int send(uint32_t chan_id, const std::string &buffer) = 0; virtual int setSleepPeriods(float /* busy */, float /* idle */) { return 0; } }; @@ -53,7 +56,7 @@ class RpcSystem { public: /* this must be regularly ticked to update the display */ - virtual void reset() = 0; + virtual void reset(uint32_t chan_id) = 0; virtual int tick() = 0; }; diff --git a/retroshare-nogui/src/ssh/rssshd.cc b/retroshare-nogui/src/ssh/rssshd.cc index 94b1baf59..94360baed 100644 --- a/retroshare-nogui/src/ssh/rssshd.cc +++ b/retroshare-nogui/src/ssh/rssshd.cc @@ -471,7 +471,8 @@ int RsSshd::doRpcSystem() return 0; } - mRpcSystem->reset(); // clear everything for new user. + uint32_t dummy_chan_id = 1; + mRpcSystem->reset(dummy_chan_id); // clear everything for new user. bool okay = true; while(okay) @@ -511,8 +512,18 @@ int RsSshd::isOkay() return (mState == RSSSHD_STATE_CONNECTED); } + std::list::iterator it; +int RsSshd::active_channels(std::list &chan_ids) +{ + if (isOkay()) + { + chan_ids.push_back(1); // dummy for now. + } -int RsSshd::error(std::string msg) + return 1; +} + +int RsSshd::error(uint32_t chan_id, std::string msg) { std::cerr << "RsSshd::error(" << msg << ")"; std::cerr << std::endl; @@ -522,14 +533,14 @@ int RsSshd::error(std::string msg) } -int RsSshd::recv_ready() +int RsSshd::recv_ready(uint32_t chan_id) { int bytes = ssh_channel_poll(mChannel, 0); return bytes; } -int RsSshd::recv(uint8_t *buffer, int bytes) +int RsSshd::recv(uint32_t chan_id, uint8_t *buffer, int bytes) { #if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0) int size = ssh_channel_read_nonblocking(mChannel, buffer, bytes, 0); @@ -540,7 +551,7 @@ int RsSshd::recv(uint8_t *buffer, int bytes) } -int RsSshd::recv(std::string &buffer, int bytes) +int RsSshd::recv(uint32_t chan_id, std::string &buffer, int bytes) { char input[bytes]; #if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0) @@ -556,7 +567,7 @@ int RsSshd::recv(std::string &buffer, int bytes) } -int RsSshd::recv_blocking(uint8_t *buffer, int bytes) +int RsSshd::recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes) { #if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0) int size = ssh_channel_read(mChannel, buffer, bytes, 0); @@ -567,7 +578,7 @@ int RsSshd::recv_blocking(uint8_t *buffer, int bytes) } -int RsSshd::recv_blocking(std::string &buffer, int bytes) +int RsSshd::recv_blocking(uint32_t chan_id, std::string &buffer, int bytes) { char input[bytes]; #if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0) @@ -582,7 +593,7 @@ int RsSshd::recv_blocking(std::string &buffer, int bytes) return size; } -int RsSshd::send(uint8_t *buffer, int bytes) +int RsSshd::send(uint32_t chan_id, uint8_t *buffer, int bytes) { #if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0) ssh_channel_write(mChannel, buffer, bytes); @@ -592,7 +603,7 @@ int RsSshd::send(uint8_t *buffer, int bytes) return 1; } -int RsSshd::send(const std::string &buffer) +int RsSshd::send(uint32_t chan_id, const std::string &buffer) { #if LIBSSH_VERSION_INT >= SSH_VERSION_INT(0,5,0) ssh_channel_write(mChannel, buffer.c_str(), buffer.size()); diff --git a/retroshare-nogui/src/ssh/rssshd.h b/retroshare-nogui/src/ssh/rssshd.h index 2f0a87d36..07f289d2a 100644 --- a/retroshare-nogui/src/ssh/rssshd.h +++ b/retroshare-nogui/src/ssh/rssshd.h @@ -83,17 +83,18 @@ int adduserpwdhash(std::string username, std::string hash); // RsComms Interface. virtual int isOkay(); - virtual int error(std::string msg); + virtual int error(uint32_t chan_id, std::string msg); - virtual int recv_ready(); + virtual int active_channels(std::list &chan_ids); + virtual int recv_ready(uint32_t chan_id); - virtual int recv(uint8_t *buffer, int bytes); - virtual int recv(std::string &buffer, int bytes); - virtual int recv_blocking(uint8_t *buffer, int bytes); - virtual int recv_blocking(std::string &buffer, int bytes); + virtual int recv(uint32_t chan_id, uint8_t *buffer, int bytes); + virtual int recv(uint32_t chan_id, std::string &buffer, int bytes); + virtual int recv_blocking(uint32_t chan_id, uint8_t *buffer, int bytes); + virtual int recv_blocking(uint32_t chan_id, std::string &buffer, int bytes); - virtual int send(uint8_t *buffer, int bytes); - virtual int send(const std::string &buffer); + virtual int send(uint32_t chan_id, uint8_t *buffer, int bytes); + virtual int send(uint32_t chan_id, const std::string &buffer); virtual int setSleepPeriods(float busy, float idle);