* Adding Chat RPC support into retroshare-nogui. ( About 90% there, and 40% tested. )

- RPC commands are outlined in the proto file.
	- You can: get listing, send msg, register for recv events, change nickname and join/create lobbiey
	- updated chat & core generated files.

 * Added RpcUniqueId(chan_id, req_id) for identifying requests.
 	- Modified Responses Queues to use new datatype.

 * Fixed reset to occur after the connection has died.




git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-gxs-b1@5508 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2012-09-03 19:18:59 +00:00
parent 9ad322d379
commit 84074fdca1
11 changed files with 9736 additions and 17 deletions

View File

@ -186,18 +186,22 @@ protorpc {
# Proto Services # Proto Services
HEADERS += rpc/proto/rpcprotopeers.h \ HEADERS += rpc/proto/rpcprotopeers.h \
rpc/proto/rpcprotosystem.h \ rpc/proto/rpcprotosystem.h \
rpc/proto/rpcprotochat.h \
SOURCES += rpc/proto/rpcprotopeers.cc \ SOURCES += rpc/proto/rpcprotopeers.cc \
rpc/proto/rpcprotosystem.cc \ rpc/proto/rpcprotosystem.cc \
rpc/proto/rpcprotochat.cc \
# Generated ProtoBuf Code the RPC System # Generated ProtoBuf Code the RPC System
HEADERS += rpc/proto/gencc/core.pb.h \ HEADERS += rpc/proto/gencc/core.pb.h \
rpc/proto/gencc/peers.pb.h \ rpc/proto/gencc/peers.pb.h \
rpc/proto/gencc/system.pb.h \ rpc/proto/gencc/system.pb.h \
rpc/proto/gencc/chat.pb.h \
SOURCES += rpc/proto/gencc/core.pb.cc \ SOURCES += rpc/proto/gencc/core.pb.cc \
rpc/proto/gencc/peers.pb.cc \ rpc/proto/gencc/peers.pb.cc \
rpc/proto/gencc/system.pb.cc \ rpc/proto/gencc/system.pb.cc \
rpc/proto/gencc/chat.pb.cc \
QMAKE_CFLAGS += -pthread QMAKE_CFLAGS += -pthread
QMAKE_CXXFLAGS += -pthread QMAKE_CXXFLAGS += -pthread

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -313,9 +313,9 @@ void protobuf_AddDesc_core_2eproto() {
"RWARD\020\010\"3\n\tBandwidth\022\n\n\002up\030\001 \002(\002\022\014\n\004down" "RWARD\020\010\"3\n\tBandwidth\022\n\n\002up\030\001 \002(\002\022\014\n\004down"
"\030\002 \002(\002\022\014\n\004name\030\003 \001(\t\":\n\014BandwidthSet\022*\n\n" "\030\002 \002(\002\022\014\n\004name\030\003 \001(\t\":\n\014BandwidthSet\022*\n\n"
"bandwidths\030\001 \003(\0132\026.rsctrl.core.Bandwidth" "bandwidths\030\001 \003(\0132\026.rsctrl.core.Bandwidth"
"*\027\n\013ExtensionId\022\010\n\004CORE\020\000*A\n\tPackageId\022\t" "*\027\n\013ExtensionId\022\010\n\004CORE\020\000*6\n\tPackageId\022\t"
"\n\005PEERS\020\001\022\n\n\006SYSTEM\020\002\022\t\n\005FILES\020\003\022\010\n\004MSGS" "\n\005PEERS\020\001\022\n\n\006SYSTEM\020\002\022\010\n\004CHAT\020\003\022\010\n\003GXS\020\350"
"\020\004\022\010\n\003GXS\020\350\007", 1292); "\007", 1281);
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile( ::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
"core.proto", &protobuf_RegisterTypes); "core.proto", &protobuf_RegisterTypes);
Status::default_instance_ = new Status(); Status::default_instance_ = new Status();
@ -368,7 +368,6 @@ bool PackageId_IsValid(int value) {
case 1: case 1:
case 2: case 2:
case 3: case 3:
case 4:
case 1000: case 1000:
return true; return true;
default: default:

View File

@ -153,8 +153,7 @@ inline bool ExtensionId_Parse(
enum PackageId { enum PackageId {
PEERS = 1, PEERS = 1,
SYSTEM = 2, SYSTEM = 2,
FILES = 3, CHAT = 3,
MSGS = 4,
GXS = 1000 GXS = 1000
}; };
bool PackageId_IsValid(int value); bool PackageId_IsValid(int value);

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,53 @@
/*
* RetroShare External Interface.
*
* Copyright 2012-2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#ifndef RS_RPC_PROTO_CHAT_H
#define RS_RPC_PROTO_CHAT_H
#include "rpc/rpcserver.h"
// Registrations.
#define REGISTRATION_EVENT_CHAT 1
class RpcProtoChat: public RpcQueueService
{
public:
RpcProtoChat(uint32_t serviceId);
virtual int processMsg(uint32_t chan_id, uint32_t msgId, uint32_t req_id, const std::string &msg);
protected:
int processReqChatLobbies(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqCreateLobby(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqJoinOrLeaveLobby(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqSetLobbyNickname(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqRegisterEvents(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqSendMessage(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
// EVENTS.
virtual int locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> &registered, std::list<RpcQueuedMsg> &events);
};
#endif /* RS_PROTO_CHAT_H */

View File

@ -27,6 +27,16 @@
#include <iostream> #include <iostream>
bool operator<(const RpcUniqueId &a, const RpcUniqueId &b)
{
if (a.mChanId == b.mChanId)
return (a.mReqId < b.mReqId);
return (a.mChanId < b.mChanId);
}
RpcServer::RpcServer(RpcMediator *med) RpcServer::RpcServer(RpcMediator *med)
:mMediator(med), mRpcMtx("RpcMtx") :mMediator(med), mRpcMtx("RpcMtx")
{ {
@ -91,12 +101,13 @@ int RpcServer::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, co
return 0; return 0;
} }
int RpcServer::queueRequest_locked(uint32_t /* chan_id */, uint32_t /* msgId */, uint32_t req_id, RpcService *service) int RpcServer::queueRequest_locked(uint32_t chan_id, uint32_t /* msgId */, uint32_t req_id, RpcService *service)
{ {
std::cerr << "RpcServer::queueRequest_locked() req_id: " << req_id; std::cerr << "RpcServer::queueRequest_locked() req_id: " << req_id;
std::cerr << std::endl; std::cerr << std::endl;
RpcQueuedObj obj; RpcQueuedObj obj;
obj.mChanId = chan_id;
obj.mReqId = req_id; obj.mReqId = req_id;
obj.mService = service; obj.mService = service;
@ -118,7 +129,7 @@ bool RpcServer::checkPending()
std::list<RpcQueuedObj>::iterator it; std::list<RpcQueuedObj>::iterator it;
for(it = mRpcQueue.begin(); it != mRpcQueue.end();) for(it = mRpcQueue.begin(); it != mRpcQueue.end();)
{ {
uint32_t out_chan_id = 0; uint32_t out_chan_id = it->mChanId;
uint32_t out_msg_id = 0; uint32_t out_msg_id = 0;
uint32_t out_req_id = it->mReqId; uint32_t out_req_id = it->mReqId;
std::string out_msg; std::string out_msg;
@ -216,10 +227,10 @@ void RpcQueueService::reset(uint32_t chan_id)
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
std::list<uint32_t> toRemove; std::list<RpcUniqueId> toRemove;
// iterate through and remove only chan_id items. // iterate through and remove only chan_id items.
std::map<uint32_t, RpcQueuedMsg>::iterator mit; std::map<RpcUniqueId, RpcQueuedMsg>::iterator mit;
for(mit = mResponses.begin(); mit != mResponses.end(); mit++) for(mit = mResponses.begin(); mit != mResponses.end(); mit++)
{ {
if (mit->second.mChanId == chan_id) if (mit->second.mChanId == chan_id)
@ -227,7 +238,7 @@ void RpcQueueService::reset(uint32_t chan_id)
} }
/* remove items */ /* remove items */
std::list<uint32_t>::iterator rit; std::list<RpcUniqueId>::iterator rit;
for(rit = toRemove.begin(); rit != toRemove.end(); rit++) for(rit = toRemove.begin(); rit != toRemove.end(); rit++)
{ {
mit = mResponses.find(*rit); mit = mResponses.find(*rit);
@ -242,15 +253,16 @@ int RpcQueueService::getResponse(uint32_t &chan_id, uint32_t &msg_id, uint32_t &
{ {
RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
std::map<uint32_t, RpcQueuedMsg>::iterator it; std::map<RpcUniqueId, RpcQueuedMsg>::iterator it;
it = mResponses.find(req_id); RpcUniqueId uid(chan_id, req_id);
it = mResponses.find(uid);
if (it == mResponses.end()) if (it == mResponses.end())
{ {
return 0; return 0;
} }
chan_id = it->second.mChanId; // chan_id & req_id are already set.
msg_id = it->second.mMsgId; msg_id = it->second.mMsgId;
msg = it->second.mMsg; msg = it->second.mMsg;
@ -269,7 +281,8 @@ int RpcQueueService::queueResponse(uint32_t chan_id, uint32_t msg_id, uint32_t r
qmsg.mReqId = req_id; qmsg.mReqId = req_id;
qmsg.mMsg = msg; qmsg.mMsg = msg;
mResponses[req_id] = qmsg; RpcUniqueId uid(chan_id, req_id);
mResponses[uid] = qmsg;
return 1; return 1;
} }
@ -297,7 +310,7 @@ int RpcQueueService::getEvents(std::list<RpcQueuedMsg> &events)
return 1; return 1;
} }
int RpcQueueService::locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> &registered, std::list<RpcQueuedMsg> &events) int RpcQueueService::locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> &registered, std::list<RpcQueuedMsg> & /* events */)
{ {
std::cerr << "RpcQueueService::locked_checkForEvents() NOT IMPLEMENTED"; std::cerr << "RpcQueueService::locked_checkForEvents() NOT IMPLEMENTED";
std::cerr << std::endl; std::cerr << std::endl;

View File

@ -45,6 +45,19 @@ uint32_t constructMsgId(uint8_t ext, uint16_t service, uint8_t submsg, bool is_r
* Also allows a natural seperation of the full interface into sections. * Also allows a natural seperation of the full interface into sections.
*/ */
// The Combination of ChanId & ReqId must be unique for each RPC call.
// This is used as an map index, so failure to make it unique, will lead to lost entries.
class RpcUniqueId
{
public:
RpcUniqueId():mChanId(0), mReqId(0) {return;}
RpcUniqueId(uint32_t chan_id, uint32_t req_id):mChanId(chan_id), mReqId(req_id) {return;}
uint32_t mChanId;
uint32_t mReqId;
};
bool operator<(const RpcUniqueId &a, const RpcUniqueId &b);
class RpcQueuedMsg class RpcQueuedMsg
{ {
public: public:
@ -67,6 +80,7 @@ public:
virtual int getEvents(std::list<RpcQueuedMsg> & /* events */) { return 0; } /* 0 = none, optional feature */ virtual int getEvents(std::list<RpcQueuedMsg> & /* events */) { return 0; } /* 0 = none, optional feature */
}; };
class RpcEventRegister class RpcEventRegister
{ {
public: public:
@ -98,7 +112,7 @@ virtual int locked_checkForEvents(uint32_t event, const std::list<RpcEventRegist
private: private:
RsMutex mQueueMtx; RsMutex mQueueMtx;
std::map<uint32_t, RpcQueuedMsg> mResponses; std::map<RpcUniqueId, RpcQueuedMsg> mResponses;
std::map<uint32_t, std::list<RpcEventRegister> > mEventRegister; std::map<uint32_t, std::list<RpcEventRegister> > mEventRegister;
}; };
@ -107,6 +121,7 @@ private:
class RpcQueuedObj class RpcQueuedObj
{ {
public: public:
uint32_t mChanId;
uint32_t mReqId; uint32_t mReqId;
RpcService *mService; RpcService *mService;
}; };

View File

@ -26,6 +26,7 @@
#include "rpc/proto/rpcprotopeers.h" #include "rpc/proto/rpcprotopeers.h"
#include "rpc/proto/rpcprotosystem.h" #include "rpc/proto/rpcprotosystem.h"
#include "rpc/proto/rpcprotochat.h"
#include "rpc/rpcecho.h" #include "rpc/rpcecho.h"
@ -41,6 +42,9 @@ RpcMediator *CreateRpcSystem(RpcComms *comms)
RpcProtoSystem *system = new RpcProtoSystem(1); RpcProtoSystem *system = new RpcProtoSystem(1);
server->addService(system); server->addService(system);
RpcProtoChat *chat = new RpcProtoChat(1);
server->addService(chat);
/* Finally an Echo Service - which will echo back any unprocesses commands. */ /* Finally an Echo Service - which will echo back any unprocesses commands. */
RpcEcho *echo = new RpcEcho(1); RpcEcho *echo = new RpcEcho(1);
server->addService(echo); server->addService(echo);

View File

@ -500,6 +500,8 @@ int RsSshd::doRpcSystem()
} }
} }
mRpcSystem->reset(dummy_chan_id); // cleanup old channel items.
std::cerr << "RsSshd::doRpcSystem() Finished"; std::cerr << "RsSshd::doRpcSystem() Finished";
std::cerr << std::endl; std::cerr << std::endl;