from retroshare/code/branches/v0.5-rpc-b1

--- Merging r6107 through r6332 into the rest of retroshare.

Updated version of RPC protocol with fancy new features like:
 - streaming.
 - add / remove friends.
 - file listings.



git-svn-id: http://svn.code.sf.net/p/retroshare/code/trunk@6335 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2013-04-23 21:10:34 +00:00
commit f02383f5d6
25 changed files with 2234 additions and 209 deletions

View File

@ -144,11 +144,13 @@ introserver {
sshserver {
LIBSSH_DIR = ../../../../libssh-0.5.4
# This Requires libssh-0.5.* to compile.
# Modify path below to point at it.
# Probably will only work on Linux for the moment.
# Please use this path below.
# (You can modify it locally if required - but dont commit it!)
LIBSSH_DIR = ../../../lib/libssh-0.5.4
#
# Use the following commend to generate a Server RSA Key.
# Key should be in current directory - when run/
@ -164,7 +166,6 @@ sshserver {
win32 {
DEFINES *= LIBSSH_STATIC
LIBSSH_DIR = ../../../libssh-0.5.4
}
INCLUDEPATH += $$LIBSSH_DIR/include/
@ -243,12 +244,16 @@ protorpc {
rpc/proto/rpcprotochat.h \
rpc/proto/rpcprotosearch.h \
rpc/proto/rpcprotofiles.h \
rpc/proto/rpcprotostream.h \
rpc/proto/rpcprotoutils.h \
SOURCES += rpc/proto/rpcprotopeers.cc \
rpc/proto/rpcprotosystem.cc \
rpc/proto/rpcprotochat.cc \
rpc/proto/rpcprotosearch.cc \
rpc/proto/rpcprotofiles.cc \
rpc/proto/rpcprotostream.cc \
rpc/proto/rpcprotoutils.cc \
# Offical Generated Code (protobuf 2.4.1)
HEADERS += rpc/proto/gencc/core.pb.h \
@ -257,6 +262,7 @@ protorpc {
rpc/proto/gencc/chat.pb.h \
rpc/proto/gencc/search.pb.h \
rpc/proto/gencc/files.pb.h \
rpc/proto/gencc/stream.pb.h \
SOURCES += rpc/proto/gencc/core.pb.cc \
rpc/proto/gencc/peers.pb.cc \
@ -264,6 +270,7 @@ protorpc {
rpc/proto/gencc/chat.pb.cc \
rpc/proto/gencc/search.pb.cc \
rpc/proto/gencc/files.pb.cc \
rpc/proto/gencc/stream.pb.cc \
# Generated ProtoBuf Code the RPC System
# If you are developing, or have a different version of protobuf
@ -274,6 +281,7 @@ protorpc {
# ../../rsctrl/src/gencc/chat.pb.h \
# ../../rsctrl/src/gencc/search.pb.h \
# ../../rsctrl/src/gencc/files.pb.h \
# ../../rsctrl/src/gencc/stream.pb.h \
#SOURCES += ../../rsctrl/src/gencc/core.pb.cc \
# ../../rsctrl/src/gencc/peers.pb.cc \
@ -281,6 +289,7 @@ protorpc {
# ../../rsctrl/src/gencc/chat.pb.cc \
# ../../rsctrl/src/gencc/search.pb.cc \
# ../../rsctrl/src/gencc/files.pb.cc \
# ../../rsctrl/src/gencc/stream.pb.cc \
INCLUDEPATH *= rpc/proto/gencc

View File

@ -26,6 +26,7 @@
#include <retroshare/rsmsgs.h>
#include <retroshare/rspeers.h>
#include <retroshare/rshistory.h>
#include "util/rsstring.h"
@ -48,6 +49,8 @@ bool fillLobbyInfoFromChatLobbyInfo(const ChatLobbyInfo &cfi, rsctrl::chat::Chat
bool fillLobbyInfoFromVisibleChatLobbyRecord(const VisibleChatLobbyRecord &pclr, rsctrl::chat::ChatLobbyInfo *lobby);
bool fillLobbyInfoFromChatLobbyInvite(const ChatLobbyInvite &cli, rsctrl::chat::ChatLobbyInfo *lobby);
bool fillChatMessageFromHistoryMsg(const HistoryMsg &histmsg, rsctrl::chat::ChatMessage *rpcmsg);
bool createQueuedEventSendMsg(const ChatInfo &chatinfo, rsctrl::chat::ChatType ctype,
std::string chat_id, const RpcEventRegister &ereg, RpcQueuedMsg &qmsg);
@ -127,6 +130,10 @@ int RpcProtoChat::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id,
processReqSendMessage(chan_id, msg_id, req_id, msg);
break;
case rsctrl::chat::MsgId_RequestChatHistory:
processReqChatHistory(chan_id, msg_id, req_id, msg);
break;
default:
std::cerr << "RpcProtoChat::processMsg() ERROR should never get here";
@ -349,7 +356,7 @@ int RpcProtoChat::processReqCreateLobby(uint32_t chan_id, uint32_t /*msg_id*/, u
std::string lobby_name = req.lobby_name();
std::string lobby_topic = req.lobby_topic();
std::list<std::string> invited_friends;
uint32_t lobby_privacy_type;
uint32_t lobby_privacy_type = 0;
switch(req.privacy_level())
{
@ -806,6 +813,172 @@ int RpcProtoChat::processReqRegisterEvents(uint32_t chan_id, uint32_t /*msg_id*/
int RpcProtoChat::processReqChatHistory(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoChat::processReqChatHistory()";
std::cerr << std::endl;
// parse msg.
rsctrl::chat::RequestChatHistory req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoChat::processReqChatHistory() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::chat::ResponseChatHistory resp;
bool success = true;
std::string errorMsg;
// Get the Chat History for specified IDs....
/* switch depending on type */
bool private_chat = false;
bool lobby_chat = false;
std::string chat_id;
// copy the ID over.
rsctrl::chat::ChatId *id = resp.mutable_id();
*id = req.id();
switch(req.id().chat_type())
{
case rsctrl::chat::TYPE_PRIVATE:
{
// easy one.
chat_id = req.id().chat_id();
private_chat = true;
std::cerr << "RpcProtoChat::processReqChatHistory() Getting Private Chat History for: ";
std::cerr << chat_id;
std::cerr << std::endl;
break;
}
case rsctrl::chat::TYPE_LOBBY:
{
std::cerr << "RpcProtoChat::processReqChatHistory() Lobby Chat History NOT IMPLEMENTED YET";
std::cerr << std::endl;
success = false;
lobby_chat = true;
errorMsg = "Lobby Chat History Not Implemented";
#if 0
/* convert string->ChatLobbyId */
ChatLobbyId lobby_id;
if (!convertStringToLobbyId(req.msg().id().chat_id(), lobby_id))
{
std::cerr << "ERROR Failed conversion of Lobby Id";
std::cerr << std::endl;
success = false;
errorMsg = "Failed Conversion of Lobby Id";
}
/* convert lobby id to virtual peer id */
else if (!rsMsgs->getVirtualPeerId(lobby_id, chat_id))
{
std::cerr << "ERROR Invalid Lobby Id";
std::cerr << std::endl;
success = false;
errorMsg = "Invalid Lobby Id";
}
lobby_chat = true;
std::cerr << "RpcProtoChat::processReqChatHistory() Getting Lobby Chat History for: ";
std::cerr << chat_id;
std::cerr << std::endl;
#endif
break;
}
case rsctrl::chat::TYPE_GROUP:
std::cerr << "RpcProtoChat::processReqChatHistory() Group Chat History NOT IMPLEMENTED YET";
std::cerr << std::endl;
success = false;
errorMsg = "Group Chat History Not Implemented";
break;
default:
std::cerr << "ERROR Chat Type invalid";
std::cerr << std::endl;
success = false;
errorMsg = "Invalid Chat Type";
break;
}
// Should be able to reply using the existing message types.
if (success)
{
if (private_chat)
{
/* extract the history */
std::list<HistoryMsg> msgs;
std::list<HistoryMsg>::iterator it;
rsHistory->getMessages(chat_id, msgs, 0);
//rsctrl::chat::ChatId *id = resp.mutable_id();
//id->set_chat_type(rsctrl::chat::TYPE_PRIVATE);
//id->set_chat_id(chat_id);
for(it = msgs.begin(); it != msgs.end(); it++)
{
rsctrl::chat::ChatMessage *msg = resp.add_msgs();
fillChatMessageFromHistoryMsg(*it, msg);
std::cerr << "\t Message: " << it->message;
std::cerr << std::endl;
}
}
#if 0
else if (lobby_chat)
{
}
#endif
}
/* DONE - Generate Reply */
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg(errorMsg);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoChat::processReqChatHistory() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::CHAT,
rsctrl::chat::MsgId_ResponseChatHistory, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
int RpcProtoChat::processReqSendMessage(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoChat::processReqSendMessage()";
@ -939,8 +1112,8 @@ int RpcProtoChat::processReqSendMessage(uint32_t chan_id, uint32_t /*msg_id*/, u
int RpcProtoChat::locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> &registered, std::list<RpcQueuedMsg> &events)
{
/* Wow - here already! */
std::cerr << "locked_checkForEvents()";
std::cerr << std::endl;
//std::cerr << "locked_checkForEvents()";
//std::cerr << std::endl;
/* only one event type for now */
if (event != REGISTRATION_EVENT_CHAT)
@ -1220,6 +1393,25 @@ bool fillLobbyInfoFromChatLobbyInvite(const ChatLobbyInvite &cli, rsctrl::chat::
}
bool fillChatMessageFromHistoryMsg(const HistoryMsg &histmsg, rsctrl::chat::ChatMessage *rpcmsg)
{
rsctrl::chat::ChatId *id = rpcmsg->mutable_id();
id->set_chat_type(rsctrl::chat::TYPE_PRIVATE);
id->set_chat_id(histmsg.chatPeerId);
rpcmsg->set_msg(histmsg.message);
rpcmsg->set_peer_nickname(histmsg.peerName);
rpcmsg->set_chat_flags(0);
rpcmsg->set_send_time(histmsg.sendTime);
rpcmsg->set_recv_time(histmsg.recvTime);
return true;
}
bool createQueuedEventSendMsg(const ChatInfo &chatinfo, rsctrl::chat::ChatType ctype,
std::string chat_id, const RpcEventRegister &ereg, RpcQueuedMsg &qmsg)
{
@ -1266,6 +1458,9 @@ bool createQueuedEventSendMsg(const ChatInfo &chatinfo, rsctrl::chat::ChatType c
return true;
}
bool convertUTF8toWString(const std::string &msg_utf8, std::wstring &msg_wstr)
{
return librs::util::ConvertUtf8ToUtf16(msg_utf8, msg_wstr);

View File

@ -45,6 +45,9 @@ protected:
int processReqRegisterEvents(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqSendMessage(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqChatHistory(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
// EVENTS.
virtual int locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> &registered, std::list<RpcQueuedMsg> &events);
};

View File

@ -25,8 +25,10 @@
#include "rpc/proto/gencc/files.pb.h"
#include <retroshare/rsfiles.h>
#include <retroshare/rspeers.h>
#include "util/rsstring.h"
#include "util/rsdir.h"
#include <stdio.h>
@ -36,6 +38,9 @@
#include <set>
bool fill_file_from_details(rsctrl::core::File *file, DirDetails &details);
bool fill_file_as_dir(rsctrl::core::File *file, const std::string &dir_name);
RpcProtoFiles::RpcProtoFiles(uint32_t serviceId)
:RpcQueueService(serviceId)
{
@ -93,6 +98,10 @@ int RpcProtoFiles::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id
processReqControlDownload(chan_id, msg_id, req_id, msg);
break;
case rsctrl::files::MsgId_RequestShareDirList:
processReqShareDirList(chan_id, msg_id, req_id, msg);
break;
default:
std::cerr << "RpcProtoFiles::processMsg() ERROR should never get here";
std::cerr << std::endl;
@ -105,7 +114,7 @@ int RpcProtoFiles::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id
int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoFiles::processReqTransferList()";
std::cerr << std::endl;
@ -171,6 +180,35 @@ int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t msg_id, uin
transfer->set_fraction( (float) info.transfered / info.size );
transfer->set_rate_kbs( info.tfRate );
switch(info.downloadStatus)
{
case FT_STATE_FAILED:
transfer->set_state(rsctrl::files::TRANSFER_FAILED);
break;
default:
case FT_STATE_OKAY:
transfer->set_state(rsctrl::files::TRANSFER_OKAY);
break;
case FT_STATE_PAUSED:
transfer->set_state(rsctrl::files::TRANSFER_PAUSED);
break;
case FT_STATE_QUEUED:
transfer->set_state(rsctrl::files::TRANSFER_QUEUED);
break;
case FT_STATE_WAITING:
transfer->set_state(rsctrl::files::TRANSFER_WAITING);
break;
case FT_STATE_DOWNLOADING:
transfer->set_state(rsctrl::files::TRANSFER_DOWNLOADING);
break;
case FT_STATE_CHECKING_HASH:
transfer->set_state(rsctrl::files::TRANSFER_CHECKING_HASH);
break;
case FT_STATE_COMPLETE:
transfer->set_state(rsctrl::files::TRANSFER_COMPLETE);
break;
}
}
/* DONE - Generate Reply */
@ -204,7 +242,7 @@ int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t msg_id, uin
return 1;
}
int RpcProtoFiles::processReqControlDownload(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoFiles::processReqControlDownload(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoFiles::processReqControlDownload()";
std::cerr << std::endl;
@ -340,5 +378,253 @@ int RpcProtoFiles::processReqControlDownload(uint32_t chan_id, uint32_t msg_id,
}
int RpcProtoFiles::processReqShareDirList(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoFiles::processReqShareDirList()";
std::cerr << std::endl;
// parse msg.
rsctrl::files::RequestShareDirList req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoFiles::processReqShareDirList() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::files::ResponseShareDirList resp;
bool success = true;
std::string errorMsg;
std::string uid = req.ssl_id();
std::string path = req.path();
DirDetails details;
if (uid.empty())
{
uid = rsPeers->getOwnId();
}
std::cerr << "RpcProtoFiles::processReqShareDirList() For uid: " << uid << " & path: " << path;
std::cerr << std::endl;
if (path.empty())
{
/* we have to do a nasty hack to get anything useful.
* we do a ref=NULL to get the pointers to People,
* then use the correct one to get root directories
*/
std::cerr << "RpcProtoFiles::processReqShareDirList() Hack to get root Dirs!";
std::cerr << std::endl;
FileSearchFlags flags;
if (uid == rsPeers->getOwnId())
{
flags |= RS_FILE_HINTS_LOCAL;
}
DirDetails root_details;
if (!rsFiles->RequestDirDetails(NULL, root_details, flags))
{
std::cerr << "RpcProtoFiles::processReqShareDirList() ref=NULL Hack failed";
std::cerr << std::endl;
success = false;
errorMsg = "Root Directory Request Failed.";
}
else
{
void *person_ref = NULL;
std::list<DirStub>::iterator sit;
for(sit = root_details.children.begin(); sit != root_details.children.end(); sit++)
{
//std::cerr << "RpcProtoFiles::processReqShareDirList() Root.child->name : " << sit->name;
if (sit->name == uid)
{
person_ref = sit->ref;
break;
}
}
if (!person_ref)
{
std::cerr << "RpcProtoFiles::processReqShareDirList() Person match failed";
std::cerr << std::endl;
success = false;
errorMsg = "Missing Person Root Directory.";
}
else
{
// Doing the REAL request!
if (!rsFiles->RequestDirDetails(person_ref, details, flags))
{
std::cerr << "RpcProtoFiles::processReqShareDirList() Personal Shared Dir Hack failed";
std::cerr << std::endl;
success = false;
errorMsg = "Missing Person Shared Directories";
}
}
}
}
else
{
// Path must begin with / for proper matching.
if (path[0] != '/')
{
path = '/' + path;
}
if (!rsFiles->RequestDirDetails(uid, path, details))
{
std::cerr << "RpcProtoFiles::processReqShareDirList() ERROR Unknown Dir";
std::cerr << std::endl;
success = false;
errorMsg = "Directory Request Failed.";
}
}
if (success)
{
// setup basics of response.
resp.set_ssl_id(uid);
resp.set_path(path);
switch(details.type)
{
case DIR_TYPE_ROOT:
{
std::cerr << "RpcProtoFiles::processReqShareDirList() Details.type == ROOT ??";
std::cerr << std::endl;
resp.set_list_type(rsctrl::files::ResponseShareDirList::DIRQUERY_ROOT);
rsctrl::core::File *file = resp.add_files();
fill_file_as_dir(file, details.name);
}
break;
case DIR_TYPE_FILE:
{
std::cerr << "RpcProtoFiles::processReqShareDirList() Details.type == FILE";
std::cerr << std::endl;
resp.set_list_type(rsctrl::files::ResponseShareDirList::DIRQUERY_FILE);
rsctrl::core::File *file = resp.add_files();
fill_file_from_details(file, details);
}
break;
default:
std::cerr << "RpcProtoFiles::processReqShareDirList() Details.type == UNKNOWN => default to DIR";
std::cerr << std::endl;
case DIR_TYPE_PERSON:
case DIR_TYPE_DIR:
{
std::cerr << "RpcProtoFiles::processReqShareDirList() Details.type == DIR or PERSON";
std::cerr << std::endl;
resp.set_list_type(rsctrl::files::ResponseShareDirList::DIRQUERY_DIR);
//std::string dir_path = RsDirUtil::makePath(details.path, details.name);
std::string dir_path = details.path;
std::cerr << "RpcProtoFiles::processReqShareDirList() details.path: " << details.path;
std::cerr << " details.name: " << details.name;
std::cerr << std::endl;
std::list<DirStub>::iterator sit;
for(sit = details.children.begin(); sit != details.children.end(); sit++)
{
std::cerr << "RpcProtoFiles::processReqShareDirList() checking child: " << sit->name;
std::cerr << std::endl;
if (sit->type == DIR_TYPE_FILE)
{
std::cerr << "RpcProtoFiles::processReqShareDirList() is FILE, fetching details.";
std::cerr << std::endl;
DirDetails child_details;
std::string child_path = RsDirUtil::makePath(dir_path, sit->name);
if (rsFiles->RequestDirDetails(uid, child_path, child_details))
{
rsctrl::core::File *file = resp.add_files();
fill_file_from_details(file, child_details);
}
else
{
std::cerr << "RpcProtoFiles::processReqShareDirList() RequestDirDetails(" << child_path << ") Failed!!!";
std::cerr << std::endl;
}
}
else
{
std::cerr << "RpcProtoFiles::processReqShareDirList() is DIR";
std::cerr << std::endl;
rsctrl::core::File *file = resp.add_files();
fill_file_as_dir(file, sit->name);
}
}
}
break;
}
}
/* DONE - Generate Reply */
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg(errorMsg);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoFiles::processReqTransferList() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::FILES,
rsctrl::files::MsgId_ResponseShareDirList, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
/***** HELPER FUNCTIONS *****/
bool fill_file_from_details(rsctrl::core::File *file, DirDetails &details)
{
file->set_hash(details.hash);
file->set_name(details.name);
file->set_size(details.count);
return true;
}
bool fill_file_as_dir(rsctrl::core::File *file, const std::string &dir_name)
{
file->set_hash("");
file->set_name(dir_name);
file->set_size(0);
return true;
}

View File

@ -38,6 +38,7 @@ protected:
int processReqTransferList(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqControlDownload(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqShareDirList(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg);
};

View File

@ -30,6 +30,9 @@
#include <iostream>
#include <algorithm>
bool load_person_details(std::string pgp_id, rsctrl::core::Person *person,
bool getLocations, bool onlyConnected);
RpcProtoPeers::RpcProtoPeers(uint32_t serviceId)
:RpcQueueService(serviceId)
{
@ -88,9 +91,12 @@ int RpcProtoPeers::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id
case rsctrl::peers::MsgId_RequestAddPeer:
processAddPeer(chan_id, msg_id, req_id, msg);
break;
case rsctrl::peers::MsgId_RequestModifyPeer:
processModifyPeer(chan_id, msg_id, req_id, msg);
case rsctrl::peers::MsgId_RequestExaminePeer:
processExaminePeer(chan_id, msg_id, req_id, msg);
break;
//case rsctrl::peers::MsgId_RequestModifyPeer:
// processModifyPeer(chan_id, msg_id, req_id, msg);
// break;
default:
std::cerr << "RpcProtoPeers::processMsg() ERROR should never get here";
std::cerr << std::endl;
@ -102,15 +108,135 @@ int RpcProtoPeers::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id
}
int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoPeers::processAddPeer() NOT FINISHED";
std::cerr << "RpcProtoPeers::processAddPeer()";
std::cerr << std::endl;
// parse msg.
rsctrl::peers::RequestAddPeer req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoPeers::processAddPeer() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::peers::ResponseAddPeer resp;
rsctrl::peers::ResponsePeerList resp;
bool success = true;
std::string errorMsg;
/* check if the gpg_id is valid */
std::string pgp_id = req.pgp_id();
std::string ssl_id;
if (req.has_ssl_id())
{
ssl_id = req.ssl_id();
}
RsPeerDetails details;
if (!rsPeers->getGPGDetails(pgp_id, details))
{
success = false;
errorMsg = "Invalid PGP ID";
}
else
{
switch(req.cmd())
{
default:
success = false;
errorMsg = "Invalid AddCmd";
break;
case rsctrl::peers::RequestAddPeer::ADD:
// TODO. NEED TO HANDLE SERVICE PERMISSION FLAGS.
success = rsPeers->addFriend(ssl_id,pgp_id, RS_SERVICE_PERM_ALL);
break;
case rsctrl::peers::RequestAddPeer::REMOVE:
success = rsPeers->removeFriend(pgp_id);
break;
}
if (success)
{
rsctrl::core::Person *person = resp.add_peers();
load_person_details(pgp_id, person, true, false);
}
}
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::NO_IMPL_YET);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoPeers::processAddPeer() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::PEERS,
rsctrl::peers::MsgId_ResponsePeerList, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
int RpcProtoPeers::processExaminePeer(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoPeers::processExaminePeer() NOT FINISHED";
std::cerr << std::endl;
// parse msg.
rsctrl::peers::RequestExaminePeer req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoPeers::processExaminePeer() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::peers::ResponsePeerList resp;
bool success = false;
if (success)
{
switch(req.cmd())
{
default:
success = false;
break;
case rsctrl::peers::RequestExaminePeer::IMPORT:
break;
case rsctrl::peers::RequestExaminePeer::EXAMINE:
// Gets the GPG details, but does not add the key to the keyring.
//virtual bool loadDetailsFromStringCert(const std::string& certGPG, RsPeerDetails &pd,uint32_t& error_code) = 0;
break;
}
}
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
@ -133,7 +259,7 @@ int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t re
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::PEERS,
rsctrl::peers::MsgId_ResponseAddPeer, true);
rsctrl::peers::MsgId_ResponsePeerList, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
@ -142,14 +268,24 @@ int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t re
}
int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoPeers::processModifyPeer() NOT FINISHED";
std::cerr << std::endl;
// parse msg.
rsctrl::peers::RequestModifyPeer req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoPeers::processModifyPeer() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::peers::ResponseModifyPeer resp;
rsctrl::peers::ResponsePeerList resp;
bool success = false;
if (success)
@ -174,7 +310,7 @@ int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::PEERS,
rsctrl::peers::MsgId_ResponseModifyPeer, true);
rsctrl::peers::MsgId_ResponsePeerList, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
@ -184,7 +320,7 @@ int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t
int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoPeers::processRequestPeers()";
std::cerr << std::endl;
@ -198,10 +334,14 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
return 0;
}
// response.
rsctrl::peers::ResponsePeerList respp;
bool success = true;
std::string errorMsg;
// Get the list of gpg_id to generate data for.
std::list<std::string> ids;
bool onlyConnected = false;
bool success = true;
switch(reqp.set())
{
case rsctrl::peers::RequestPeers::OWNID:
@ -216,9 +356,14 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
{
std::cerr << "RpcProtoPeers::processRequestPeers() LISTED";
std::cerr << std::endl;
/* extract ids from request (TODO) */
std::string own_id = rsPeers->getGPGOwnId();
ids.push_back(own_id);
int no_pgp_ids = reqp.pgp_ids_size();
for (int i = 0; i < no_pgp_ids; i++)
{
std::string listed_id = reqp.pgp_ids(i);
std::cerr << "RpcProtoPeers::processRequestPeers() Adding Id: " << listed_id;
std::cerr << std::endl;
ids.push_back(listed_id);
}
break;
}
@ -281,117 +426,21 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
break;
}
// response.
rsctrl::peers::ResponsePeerList respp;
/* now iterate through the peers and fill in the response. */
std::list<std::string>::const_iterator git;
for(git = ids.begin(); git != ids.end(); git++)
{
RsPeerDetails details;
if (!rsPeers->getGPGDetails(*git, details))
{
continue; /* uhm.. */
}
rsctrl::core::Person *person = respp.add_peers();
/* fill in key gpg details */
person->set_gpg_id(*git);
person->set_name(details.name);
std::cerr << "RpcProtoPeers::processRequestPeers() Adding GPGID: ";
std::cerr << *git << " name: " << details.name;
std::cerr << std::endl;
if (details.state & RS_PEER_STATE_FRIEND)
if (!load_person_details(*git, person, getLocations, onlyConnected))
{
person->set_relation(rsctrl::core::Person::FRIEND);
}
else
{
std::list<std::string> common_friends;
rsDisc->getDiscGPGFriends(*git, common_friends);
int size = common_friends.size();
if (size)
{
if (size > 2)
{
person->set_relation(rsctrl::core::Person::FRIEND_OF_MANY_FRIENDS);
}
else
{
person->set_relation(rsctrl::core::Person::FRIEND_OF_FRIENDS);
}
}
else
{
person->set_relation(rsctrl::core::Person::UNKNOWN);
}
}
if (getLocations)
{
std::list<std::string> ssl_ids;
std::list<std::string>::const_iterator sit;
std::cerr << "RpcProtoPeers::processRequestPeers() ERROR Finding GPGID: ";
std::cerr << *git;
std::cerr << std::endl;
if (!rsPeers->getAssociatedSSLIds(*git, ssl_ids))
{
continue; /* end of this peer */
}
for(sit = ssl_ids.begin(); sit != ssl_ids.end(); sit++)
{
RsPeerDetails ssldetails;
if (!rsPeers->getPeerDetails(*sit, ssldetails))
{
continue; /* uhm.. */
}
if ((onlyConnected) &&
(!(ssldetails.state & RS_PEER_STATE_CONNECTED)))
{
continue;
}
rsctrl::core::Location *loc = person->add_locations();
std::cerr << "RpcProtoPeers::processRequestPeers() \t Adding Location: ";
std::cerr << *sit << " loc: " << ssldetails.location;
std::cerr << std::endl;
/* fill in ssl details */
loc->set_ssl_id(*sit);
loc->set_location(ssldetails.location);
/* set addresses */
rsctrl::core::IpAddr *laddr = loc->mutable_localaddr();
laddr->set_addr(ssldetails.localAddr);
laddr->set_port(ssldetails.localPort);
rsctrl::core::IpAddr *eaddr = loc->mutable_extaddr();
eaddr->set_addr(ssldetails.extAddr);
eaddr->set_port(ssldetails.extPort);
/* translate status */
uint32_t loc_state = 0;
//dont think this state should be here.
//if (ssldetails.state & RS_PEER_STATE_FRIEND)
if (ssldetails.state & RS_PEER_STATE_ONLINE)
{
loc_state |= (uint32_t) rsctrl::core::Location::ONLINE;
}
if (ssldetails.state & RS_PEER_STATE_CONNECTED)
{
loc_state |= (uint32_t) rsctrl::core::Location::CONNECTED;
}
if (ssldetails.state & RS_PEER_STATE_UNREACHABLE)
{
loc_state |= (uint32_t) rsctrl::core::Location::UNREACHABLE;
}
loc->set_state(loc_state);
}
/* cleanup peers */
success = false;
errorMsg = "Error Loading PeerID";
}
}
@ -404,7 +453,7 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
{
rsctrl::core::Status *status = respp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg("Unknown ERROR");
status->set_msg(errorMsg);
}
@ -427,3 +476,135 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
}
bool load_person_details(std::string pgp_id, rsctrl::core::Person *person,
bool getLocations, bool onlyConnected)
{
RsPeerDetails details;
if (!rsPeers->getGPGDetails(pgp_id, details))
{
std::cerr << "RpcProtoPeers::processRequestPeers() ERROR Finding GPGID: ";
std::cerr << pgp_id;
std::cerr << std::endl;
return false;
}
/* fill in key gpg details */
person->set_gpg_id(pgp_id);
person->set_name(details.name);
std::cerr << "RpcProtoPeers::processRequestPeers() Adding GPGID: ";
std::cerr << pgp_id << " name: " << details.name;
std::cerr << std::endl;
//if (details.state & RS_PEER_STATE_FRIEND)
if (pgp_id == rsPeers->getGPGOwnId())
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation YOURSELF";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::YOURSELF);
}
else if (rsPeers->isGPGAccepted(pgp_id))
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::FRIEND);
}
else
{
std::list<std::string> common_friends;
rsDisc->getDiscGPGFriends(pgp_id, common_friends);
int size = common_friends.size();
if (size)
{
if (size > 2)
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND_OF_MANY_FRIENDS";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::FRIEND_OF_MANY_FRIENDS);
}
else
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND_OF_FRIENDS";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::FRIEND_OF_FRIENDS);
}
}
else
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation UNKNOWN";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::UNKNOWN);
}
}
if (getLocations)
{
std::list<std::string> ssl_ids;
std::list<std::string>::const_iterator sit;
if (!rsPeers->getAssociatedSSLIds(pgp_id, ssl_ids))
{
std::cerr << "RpcProtoPeers::processRequestPeers() No Locations";
std::cerr << std::endl;
return true; /* end of this peer */
}
for(sit = ssl_ids.begin(); sit != ssl_ids.end(); sit++)
{
RsPeerDetails ssldetails;
if (!rsPeers->getPeerDetails(*sit, ssldetails))
{
continue; /* uhm.. */
}
if ((onlyConnected) &&
(!(ssldetails.state & RS_PEER_STATE_CONNECTED)))
{
continue;
}
rsctrl::core::Location *loc = person->add_locations();
std::cerr << "RpcProtoPeers::processRequestPeers() \t Adding Location: ";
std::cerr << *sit << " loc: " << ssldetails.location;
std::cerr << std::endl;
/* fill in ssl details */
loc->set_ssl_id(*sit);
loc->set_location(ssldetails.location);
/* set addresses */
rsctrl::core::IpAddr *laddr = loc->mutable_localaddr();
laddr->set_addr(ssldetails.localAddr);
laddr->set_port(ssldetails.localPort);
rsctrl::core::IpAddr *eaddr = loc->mutable_extaddr();
eaddr->set_addr(ssldetails.extAddr);
eaddr->set_port(ssldetails.extPort);
/* translate status */
uint32_t loc_state = 0;
//dont think this state should be here.
//if (ssldetails.state & RS_PEER_STATE_FRIEND)
if (ssldetails.state & RS_PEER_STATE_ONLINE)
{
loc_state |= (uint32_t) rsctrl::core::Location::ONLINE;
}
if (ssldetails.state & RS_PEER_STATE_CONNECTED)
{
loc_state |= (uint32_t) rsctrl::core::Location::CONNECTED;
}
if (ssldetails.state & RS_PEER_STATE_UNREACHABLE)
{
loc_state |= (uint32_t) rsctrl::core::Location::UNREACHABLE;
}
loc->set_state(loc_state);
}
}
return true; /* end of this peer */
}

View File

@ -36,6 +36,9 @@ public:
virtual int processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
virtual int processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
// these aren't implemented yet.
virtual int processExaminePeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
virtual int processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
};

View File

@ -38,6 +38,9 @@
#include <set>
bool condenseSearchResults(const std::list<TurtleFileInfo> &searchResults, uint32_t limit,
rsctrl::search::SearchSet *result_set);
RpcProtoSearch::RpcProtoSearch(uint32_t serviceId, NotifyTxt *notify)
:RpcQueueService(serviceId), mNotify(notify), searchMtx("RpcProtoSearch")
@ -420,21 +423,7 @@ int RpcProtoSearch::processReqSearchResults(uint32_t chan_id, uint32_t /* msg_id
std::list<TurtleFileInfo> searchResults;
mNotify->getSearchResults(*rit, searchResults);
/* convert into useful list */
for(it = searchResults.begin(); it != searchResults.end(); it++)
{
/* add to answer */
rsctrl::search::SearchHit *hit = set->add_hits();
rsctrl::core::File *file = hit->mutable_file();
file->set_hash(it->hash);
file->set_name(it->name);
file->set_size(it->size);
// Uhm not provided for now. default to NETWORK
hit->set_loc(rsctrl::search::SearchHit::NETWORK);
hit->set_no_hits(1); // No aggregation yet.
}
condenseSearchResults(searchResults, req.result_limit(), set);
}
/* DONE - Generate Reply */
@ -591,3 +580,99 @@ int RpcProtoSearch::clear_searches(uint32_t chan_id)
return 1;
}
class RpcSearchInfo
{
public:
std::string hash;
std::string name;
uint64_t size;
std::map<std::string, uint32_t> name_map;
uint32_t hits;
};
bool condenseSearchResults(const std::list<TurtleFileInfo> &searchResults, uint32_t limit,
rsctrl::search::SearchSet *result_set)
{
std::map<std::string, RpcSearchInfo> searchMap;
std::map<std::string, RpcSearchInfo>::iterator mit;
std::list<TurtleFileInfo>::const_iterator it;
for(it = searchResults.begin(); it != searchResults.end(); it++)
{
mit = searchMap.find(it->hash);
if (mit != searchMap.end())
{
mit->second.hits++;
if (mit->second.name_map.find(it->name) == mit->second.name_map.end())
{
mit->second.name_map[it->name] = 1;
}
else
{
mit->second.name_map[it->name]++;
}
if (it->size != mit->second.size)
{
// ERROR.
}
}
else
{
RpcSearchInfo info;
info.hash = it->hash;
info.size = it->size;
info.name_map[it->name] = 1;
info.hits = 1;
searchMap[it->hash] = info;
}
}
unsigned int i = 0;
for(mit = searchMap.begin(); (mit != searchMap.end()) && (i < limit); mit++, i++)
{
std::map<std::string, uint32_t>::reverse_iterator nit;
nit = mit->second.name_map.rbegin();
/* add to answer */
rsctrl::search::SearchHit *hit = result_set->add_hits();
rsctrl::core::File *file = hit->mutable_file();
file->set_hash(mit->second.hash);
file->set_name(nit->first);
file->set_size(mit->second.size);
// Uhm not provided for now. default to NETWORK
hit->set_loc(rsctrl::search::SearchHit::NETWORK);
hit->set_no_hits(mit->second.hits); // No aggregation yet.
// guarenteed to have one item here.
for(nit++; nit != mit->second.name_map.rend(); nit++)
{
hit->add_alt_names(nit->first);
}
}
return true;
}

View File

@ -0,0 +1,824 @@
/*
* RetroShare External Interface.
*
* Copyright 2012-2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "rpc/proto/rpcprotostream.h"
#include "rpc/proto/rpcprotoutils.h"
#include "rpc/proto/gencc/stream.pb.h"
#include "rpc/proto/gencc/core.pb.h"
#include <retroshare/rsexpr.h>
#include <retroshare/rsfiles.h>
// from libretroshare
#include "util/rsdir.h"
//#include <retroshare/rsmsgs.h>
//#include <retroshare/rspeers.h>
//#include <retroshare/rshistory.h>
#include "util/rsstring.h"
#include <stdio.h>
#include <iostream>
#include <algorithm>
#include <set>
#define MAX_DESIRED_RATE 1000.0 // 1Mb/s
#define MIN_STREAM_CHUNK_SIZE 10
#define MAX_STREAM_CHUNK_SIZE 100000
#define STREAM_STANDARD_MIN_DT 0.1
#define STREAM_BACKGROUND_MIN_DT 0.5
bool fill_stream_details(rsctrl::stream::ResponseStreamDetail &resp,
const std::list<RpcStream> &streams);
bool fill_stream_desc(rsctrl::stream::StreamDesc &desc,
const RpcStream &stream);
bool fill_stream_data(rsctrl::stream::StreamData &data, const RpcStream &stream);
bool createQueuedStreamMsg(const RpcStream &stream, rsctrl::stream::ResponseStreamData &resp, RpcQueuedMsg &qmsg);
RpcProtoStream::RpcProtoStream(uint32_t serviceId)
:RpcQueueService(serviceId)
{
mNextStreamId = 1;
return;
}
void RpcProtoStream::reset(uint32_t chan_id)
{
// We should be using a mutex for all stream operations!!!!
// TODO
//RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/
std::list<uint32_t> toRemove;
std::map<uint32_t, RpcStream>::iterator it;
for(it = mStreams.begin(); it != mStreams.end(); it++)
{
if (it->second.chan_id == chan_id)
{
toRemove.push_back(it->first);
}
}
std::list<uint32_t>::iterator rit;
for(rit = toRemove.begin(); rit != toRemove.end(); rit++)
{
it = mStreams.find(*rit);
if (it != mStreams.end())
{
mStreams.erase(it);
}
}
// Call the rest of reset.
RpcQueueService::reset(chan_id);
}
//RpcProtoStream::msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
int RpcProtoStream::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
{
/* check the msgId */
uint8_t topbyte = getRpcMsgIdExtension(msg_id);
uint16_t service = getRpcMsgIdService(msg_id);
uint8_t submsg = getRpcMsgIdSubMsg(msg_id);
bool isResponse = isRpcMsgIdResponse(msg_id);
std::cerr << "RpcProtoStream::processMsg() topbyte: " << (int32_t) topbyte;
std::cerr << " service: " << (int32_t) service << " submsg: " << (int32_t) submsg;
std::cerr << std::endl;
if (isResponse)
{
std::cerr << "RpcProtoStream::processMsg() isResponse() - not processing";
std::cerr << std::endl;
return 0;
}
if (topbyte != (uint8_t) rsctrl::core::CORE)
{
std::cerr << "RpcProtoStream::processMsg() Extension Mismatch - not processing";
std::cerr << std::endl;
return 0;
}
if (service != (uint16_t) rsctrl::core::STREAM)
{
std::cerr << "RpcProtoStream::processMsg() Service Mismatch - not processing";
std::cerr << std::endl;
return 0;
}
if (!rsctrl::stream::RequestMsgIds_IsValid(submsg))
{
std::cerr << "RpcProtoStream::processMsg() SubMsg Mismatch - not processing";
std::cerr << std::endl;
return 0;
}
switch(submsg)
{
case rsctrl::stream::MsgId_RequestStartFileStream:
processReqStartFileStream(chan_id, msg_id, req_id, msg);
break;
case rsctrl::stream::MsgId_RequestControlStream:
processReqControlStream(chan_id, msg_id, req_id, msg);
break;
case rsctrl::stream::MsgId_RequestListStreams:
processReqListStreams(chan_id, msg_id, req_id, msg);
break;
// case rsctrl::stream::MsgId_RequestRegisterStreams:
// processReqRegisterStreams(chan_id, msg_id, req_id, msg);
// break;
default:
std::cerr << "RpcProtoStream::processMsg() ERROR should never get here";
std::cerr << std::endl;
return 0;
}
/* must have matched id to get here */
return 1;
}
int RpcProtoStream::processReqStartFileStream(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoStream::processReqStartFileStream()";
std::cerr << std::endl;
// parse msg.
rsctrl::stream::RequestStartFileStream req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoStream::processReqStartFileStream() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::stream::ResponseStreamDetail resp;
bool success = true;
std::string errorMsg;
// SETUP STREAM.
// FIND the FILE.
std::list<std::string> hashes;
hashes.push_back(req.file().hash());
//HashExpression exp(StringOperator::EqualsString, hashes);
HashExpression exp(EqualsString, hashes);
std::list<DirDetails> results;
FileSearchFlags flags = RS_FILE_HINTS_LOCAL;
int ans = rsFiles->SearchBoolExp(&exp, results, flags);
// CREATE A STREAM OBJECT.
if (results.size() < 1)
{
success = false;
errorMsg = "No Matching File";
}
else
{
DirDetails &dirdetail = results.front();
RpcStream stream;
stream.chan_id = chan_id;
stream.req_id = req_id;
stream.stream_id = getNextStreamId();
stream.state = RpcStream::RUNNING;
// Convert to Full local path.
std::string virtual_path = RsDirUtil::makePath(dirdetail.path, dirdetail.name);
if (!rsFiles->ConvertSharedFilePath(virtual_path, stream.path))
{
success = false;
errorMsg = "Cannot Match to Shared Directory";
}
stream.length = dirdetail.count;
stream.hash = dirdetail.hash;
stream.name = dirdetail.name;
stream.offset = 0;
stream.start_byte = 0;
stream.end_byte = stream.length;
stream.desired_rate = req.rate_kbs();
if (stream.desired_rate > MAX_DESIRED_RATE)
{
stream.desired_rate = MAX_DESIRED_RATE;
}
// make response
rsctrl::stream::StreamDesc *desc = resp.add_streams();
if (!fill_stream_desc(*desc, stream))
{
success = false;
errorMsg = "Failed to Invalid Action";
}
else
{
// insert.
mStreams[stream.stream_id] = stream;
// register the stream too.
std::cerr << "RpcProtoStream::processReqStartFileStream() Registering the stream event.";
std::cerr << std::endl;
registerForEvents(chan_id, req_id, REGISTRATION_STREAMS);
}
std::cerr << "RpcProtoStream::processReqStartFileStream() List of Registered Streams:";
std::cerr << std::endl;
printEventRegister(std::cerr);
}
/* DONE - Generate Reply */
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg(errorMsg);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoStream::processReqStartFileStream() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM,
rsctrl::stream::MsgId_ResponseStreamDetail, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
int RpcProtoStream::processReqControlStream(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoStream::processReqControlStream()";
std::cerr << std::endl;
// parse msg.
rsctrl::stream::RequestControlStream req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoStream::processReqControlStream() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::stream::ResponseStreamDetail resp;
bool success = true;
std::string errorMsg;
// FIND MATCHING STREAM.
std::map<uint32_t, RpcStream>::iterator it;
it = mStreams.find(req.stream_id());
if (it != mStreams.end())
{
// TWEAK
if (it->second.state == RpcStream::STREAMERR)
{
if (req.action() == rsctrl::stream::RequestControlStream::STREAM_STOP)
{
it->second.state = RpcStream::FINISHED;
}
else
{
success = false;
errorMsg = "Stream Error";
}
}
else
{
switch(req.action())
{
case rsctrl::stream::RequestControlStream::STREAM_START:
if (it->second.state == RpcStream::PAUSED)
{
it->second.state = RpcStream::RUNNING;
}
break;
case rsctrl::stream::RequestControlStream::STREAM_STOP:
it->second.state = RpcStream::FINISHED;
break;
case rsctrl::stream::RequestControlStream::STREAM_PAUSE:
if (it->second.state == RpcStream::RUNNING)
{
it->second.state = RpcStream::PAUSED;
it->second.transfer_time = 0; // reset timings.
}
break;
case rsctrl::stream::RequestControlStream::STREAM_CHANGE_RATE:
it->second.desired_rate = req.rate_kbs();
break;
case rsctrl::stream::RequestControlStream::STREAM_SEEK:
if (req.seek_byte() < it->second.end_byte)
{
it->second.offset = req.seek_byte();
}
break;
default:
success = false;
errorMsg = "Invalid Action";
}
}
// FILL IN REPLY.
if (success)
{
rsctrl::stream::StreamDesc *desc = resp.add_streams();
if (!fill_stream_desc(*desc, it->second))
{
success = false;
errorMsg = "Invalid Action";
}
}
// Cleanup - TODO, this is explicit at the moment. - should be automatic after finish.
if (it->second.state == RpcStream::FINISHED)
{
deregisterForEvents(it->second.chan_id, it->second.req_id, REGISTRATION_STREAMS);
mStreams.erase(it);
}
}
else
{
success = false;
errorMsg = "No Matching Stream";
}
/* DONE - Generate Reply */
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg(errorMsg);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoStream::processReqControlStream() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM,
rsctrl::stream::MsgId_ResponseStreamDetail, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
int RpcProtoStream::processReqListStreams(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoStream::processReqListStreams()";
std::cerr << std::endl;
// parse msg.
rsctrl::stream::RequestListStreams req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoStream::processReqListStreams() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::stream::ResponseStreamDetail resp;
bool success = false;
std::string errorMsg;
std::map<uint32_t, RpcStream>::iterator it;
for(it = mStreams.begin(); it != mStreams.end(); it++)
{
bool match = true;
// TODO fill in match!
/* check that it matches */
if (! match)
{
continue;
}
rsctrl::stream::StreamDesc *desc = resp.add_streams();
if (!fill_stream_desc(*desc, it->second))
{
success = false;
errorMsg = "Some Details Failed to Fill";
}
}
/* DONE - Generate Reply */
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg(errorMsg);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoStream::processReqListStreams() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM,
rsctrl::stream::MsgId_ResponseStreamDetail, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
// EVENTS. (STREAMS)
int RpcProtoStream::locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> & /* registered */, std::list<RpcQueuedMsg> &stream_msgs)
{
/* only one event type for now */
if (event != REGISTRATION_STREAMS)
{
std::cerr << "ERROR Invalid Stream Event Type";
std::cerr << std::endl;
/* error */
return 0;
}
/* iterate through streams, and get next chunk of data.
* package up and send it.
* NOTE we'll have to something more complex for VoIP!
*/
double ts = getTimeStamp();
double dt = ts - mStreamRates.last_ts;
uint32_t data_sent = 0;
#define FILTER_K (0.75)
if (dt < 5.0) //mStreamRates.last_ts != 0)
{
mStreamRates.avg_dt = FILTER_K * mStreamRates.avg_dt
+ (1.0 - FILTER_K) * dt;
}
else
{
std::cerr << "RpcProtoStream::locked_checkForEvents() Large dT - resetting avg";
std::cerr << std::endl;
mStreamRates.avg_dt = 0.0;
}
mStreamRates.last_ts = ts;
std::map<uint32_t, RpcStream>::iterator it;
for(it = mStreams.begin(); it != mStreams.end(); it++)
{
RpcStream &stream = it->second;
if (!(stream.state == RpcStream::RUNNING))
{
continue;
}
double stream_dt = ts - stream.transfer_time;
switch(stream.transfer_type)
{
case RpcStream::REALTIME:
// let it go through always.
break;
case RpcStream::STANDARD:
if (stream_dt < STREAM_STANDARD_MIN_DT)
{
continue;
}
break;
case RpcStream::BACKGROUND:
if (stream_dt < STREAM_BACKGROUND_MIN_DT)
{
continue;
}
break;
}
if (!stream.transfer_time)
{
std::cerr << "RpcProtoStream::locked_checkForEvents() Null stream.transfer_time .. resetting";
std::cerr << std::endl;
stream.transfer_avg_dt = STREAM_STANDARD_MIN_DT;
}
else
{
std::cerr << "RpcProtoStream::locked_checkForEvents() stream.transfer_avg_dt: " << stream.transfer_avg_dt;
std::cerr << " stream_dt: " << stream_dt;
std::cerr << std::endl;
stream.transfer_avg_dt = FILTER_K * stream.transfer_avg_dt
+ (1.0 - FILTER_K) * stream_dt;
std::cerr << "RpcProtoStream::locked_checkForEvents() ==> stream.transfer_avg_dt: " << stream.transfer_avg_dt;
std::cerr << std::endl;
}
uint32_t size = stream.desired_rate * 1000.0 * stream.transfer_avg_dt;
stream.transfer_time = ts;
if (size < MIN_STREAM_CHUNK_SIZE)
{
size = MIN_STREAM_CHUNK_SIZE;
}
if (size > MAX_STREAM_CHUNK_SIZE)
{
size = MAX_STREAM_CHUNK_SIZE;
}
/* get data */
uint64_t remaining = stream.end_byte - stream.offset;
if (remaining < size)
{
size = remaining;
stream.state = RpcStream::FINISHED;
std::cerr << "RpcProtoStream::locked_checkForEvents() Sending Remaining: " << size;
std::cerr << std::endl;
}
std::cerr << "RpcProtoStream::locked_checkForEvents() Handling Stream: " << stream.stream_id << " state: " << stream.state;
std::cerr << std::endl;
std::cerr << "path: " << stream.path;
std::cerr << std::endl;
std::cerr << "offset: " << stream.offset;
std::cerr << " avg_dt: " << stream.transfer_avg_dt;
std::cerr << " x desired_rate: " << stream.desired_rate;
std::cerr << " => chunk_size: " << size;
std::cerr << std::endl;
/* fill in the answer */
rsctrl::stream::ResponseStreamData resp;
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
rsctrl::stream::StreamData *data = resp.mutable_data();
data->set_stream_id(stream.stream_id);
// convert state.
switch(stream.state)
{
case RpcStream::RUNNING:
data->set_stream_state(rsctrl::stream::STREAM_STATE_RUN);
break;
// This case cannot happen.
case RpcStream::PAUSED:
data->set_stream_state(rsctrl::stream::STREAM_STATE_PAUSED);
break;
// This can only happen at last chunk.
default:
case RpcStream::FINISHED:
data->set_stream_state(rsctrl::stream::STREAM_STATE_FINISHED);
break;
}
rsctrl::core::Timestamp *ts = data->mutable_send_time();
setTimeStamp(ts);
data->set_offset(stream.offset);
data->set_size(size);
if (fill_stream_data(*data, stream))
{
/* increment seek_location - for next request */
stream.offset += size;
RpcQueuedMsg qmsg;
if (createQueuedStreamMsg(stream, resp, qmsg))
{
std::cerr << "Created Stream Msg.";
std::cerr << std::endl;
stream_msgs.push_back(qmsg);
}
else
{
std::cerr << "ERROR Creating Stream Msg";
std::cerr << std::endl;
}
}
else
{
stream.state = RpcStream::STREAMERR;
std::cerr << "ERROR Filling Stream Data";
std::cerr << std::endl;
}
}
return 1;
}
// TODO
int RpcProtoStream::cleanup_checkForEvents(uint32_t /* event */, const std::list<RpcEventRegister> & /* registered */)
{
std::list<uint32_t> to_remove;
std::list<uint32_t>::iterator rit;
for(rit = to_remove.begin(); rit != to_remove.end(); rit++)
{
/* kill the stream! */
std::map<uint32_t, RpcStream>::iterator it;
it = mStreams.find(*rit);
if (it != mStreams.end())
{
mStreams.erase(it);
}
}
return 1;
}
/***** HELPER FUNCTIONS *****/
bool createQueuedStreamMsg(const RpcStream &stream, rsctrl::stream::ResponseStreamData &resp, RpcQueuedMsg &qmsg)
{
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoStream::createQueuedEventSendMsg() ERROR SerialiseToString()";
std::cerr << std::endl;
return false;
}
// Correctly Name Message.
qmsg.mMsgId = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM,
rsctrl::stream::MsgId_ResponseStreamData, true);
qmsg.mChanId = stream.chan_id;
qmsg.mReqId = stream.req_id;
qmsg.mMsg = outmsg;
return true;
}
/****************** NEW HELPER FNS ******************/
bool fill_stream_details(rsctrl::stream::ResponseStreamDetail &resp,
const std::list<RpcStream> &streams)
{
std::cerr << "fill_stream_details()";
std::cerr << std::endl;
bool val = true;
std::list<RpcStream>::const_iterator it;
for (it = streams.begin(); it != streams.end(); it++)
{
rsctrl::stream::StreamDesc *desc = resp.add_streams();
val &= fill_stream_desc(*desc, *it);
}
return val;
}
bool fill_stream_desc(rsctrl::stream::StreamDesc &desc, const RpcStream &stream)
{
std::cerr << "fill_stream_desc()";
std::cerr << std::endl;
return true;
}
bool fill_stream_data(rsctrl::stream::StreamData &data, const RpcStream &stream)
{
/* fill the StreamData from stream */
/* open file */
FILE *fd = RsDirUtil::rs_fopen(stream.path.c_str(), "rb");
if (!fd)
{
std::cerr << "fill_stream_data() Failed to open file: " << stream.path;
std::cerr << std::endl;
return false;
}
uint32_t data_size = data.size();
uint64_t base_loc = data.offset();
void *buffer = malloc(data_size);
/* seek to correct spot */
fseeko64(fd, base_loc, SEEK_SET);
/* copy data into bytes */
if (1 != fread(buffer, data_size, 1, fd))
{
std::cerr << "fill_stream_data() Failed to get data. data_size=" << data_size << ", base_loc=" << base_loc << " !";
std::cerr << std::endl;
return false;
}
data.set_stream_data(buffer, data_size);
free(buffer);
fclose(fd);
return true;
}
uint32_t RpcProtoStream::getNextStreamId()
{
return mNextStreamId++;
}

View File

@ -0,0 +1,124 @@
/*
* RetroShare External Interface.
*
* Copyright 2012-2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#ifndef RS_RPC_PROTO_STREAM_H
#define RS_RPC_PROTO_STREAM_H
#include "rpc/rpcserver.h"
// Registrations.
#define REGISTRATION_STREAMS 1
class RpcStream
{
public:
RpcStream(): chan_id(0), req_id(0), stream_id(0), state(0),
offset(0), length(0), start_byte(0), end_byte(0), desired_rate(0),
transfer_type(0), transfer_time(0), transfer_avg_dt(0)
{ return; }
static const uint32_t STREAMERR = 0x00000;
static const uint32_t RUNNING = 0x00001;
static const uint32_t PAUSED = 0x00002;
static const uint32_t FINISHED = 0x00003;
uint32_t chan_id;
uint32_t req_id;
uint32_t stream_id;
uint32_t state;
std::string name;
std::string hash;
std::string path;
uint64_t offset; // where we currently are.
uint64_t length; // filesize.
uint64_t start_byte;
uint64_t end_byte;
float desired_rate; // Kb/s
// Transfer Type
static const uint32_t STANDARD = 0x00000;
static const uint32_t REALTIME = 0x00001;
static const uint32_t BACKGROUND = 0x00002;
uint32_t transfer_type;
double transfer_time;
double transfer_avg_dt;
};
class RpcStreamRates
{
public:
RpcStreamRates(): avg_data_rate(0), avg_dt(1), last_data_rate(0), last_ts(0) { return; }
double avg_data_rate;
double avg_dt;
double last_data_rate;
double last_ts;
};
class RpcProtoStream: public RpcQueueService
{
public:
RpcProtoStream(uint32_t serviceId);
virtual int processMsg(uint32_t chan_id, uint32_t msgId, uint32_t req_id, const std::string &msg);
virtual void reset(uint32_t chan_id);
uint32_t getNextStreamId();
protected:
int processReqStartFileStream(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqControlStream(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqListStreams(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqRegisterStreams(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
uint32_t mNextStreamId;
RpcStreamRates mStreamRates;
std::map<uint32_t, RpcStream> mStreams;
// EVENTS.
virtual int locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> &registered, std::list<RpcQueuedMsg> &events);
// Not actually used yet.
int cleanup_checkForEvents(uint32_t event, const std::list<RpcEventRegister> &registered);
};
#endif /* RS_PROTO_STREAM_H */

View File

@ -109,7 +109,7 @@ int RpcProtoSystem::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_i
}
int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoSystem::processSystemStatus()";
std::cerr << std::endl;
@ -224,7 +224,7 @@ int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint3
int RpcProtoSystem::processSystemQuit(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoSystem::processSystemQuit(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoSystem::processSystemQuit()";
std::cerr << std::endl;
@ -291,7 +291,7 @@ int RpcProtoSystem::processSystemQuit(uint32_t chan_id, uint32_t msg_id, uint32_
}
int RpcProtoSystem::processSystemExternalAccess(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoSystem::processSystemExternalAccess(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoSystem::processSystemExternalAccess()";
std::cerr << std::endl;

View File

@ -0,0 +1,59 @@
/*
* RetroShare External Interface.
*
* Copyright 2012-2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "rpc/proto/rpcprotoutils.h"
#include <sys/time.h>
double getTimeStamp()
{
struct timeval tv;
double ts = 0;
if (0 == gettimeofday(&tv, NULL))
{
ts = tv.tv_sec + (tv.tv_usec / 1000000.0);
}
return ts;
}
bool setTimeStamp(rsctrl::core::Timestamp *ts)
{
struct timeval tv;
if (0 != gettimeofday(&tv, NULL))
{
ts->set_secs(tv.tv_sec);
ts->set_microsecs(tv.tv_usec);
return true;
}
else
{
ts->set_secs(0);
ts->set_microsecs(0);
return false;
}
return false;
}

View File

@ -0,0 +1,35 @@
/*
* RetroShare External Interface.
*
* Copyright 2012-2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#ifndef RS_RPC_PROTO_UTILS_H
#define RS_RPC_PROTO_UTILS_H
#include "rpc/proto/gencc/core.pb.h"
double getTimeStamp();
bool setTimeStamp(rsctrl::core::Timestamp *ts);
#endif /* RS_RPC_PROTO_UTILS_H */

View File

@ -29,6 +29,7 @@
#include "rpc/proto/rpcprotochat.h"
#include "rpc/proto/rpcprotosearch.h"
#include "rpc/proto/rpcprotofiles.h"
#include "rpc/proto/rpcprotostream.h"
#include "rpc/rpcecho.h"
@ -53,6 +54,9 @@ RpcMediator *CreateRpcSystem(RpcComms *comms, NotifyTxt *notify)
RpcProtoFiles *files = new RpcProtoFiles(1);
server->addService(files);
RpcProtoStream *streamer = new RpcProtoStream(1);
server->addService(streamer);
/* Finally an Echo Service - which will echo back any unprocesses commands. */
RpcEcho *echo = new RpcEcho(1);
server->addService(echo);

View File

@ -1,7 +1,7 @@
EXEC = protoc
#PROTO = core.proto peers.proto system.proto chat.proto search.proto files.proto gxs.proto msgs.proto
PROTO = core.proto peers.proto system.proto chat.proto search.proto files.proto
#PROTO = core.proto peers.proto system.proto chat.proto search.proto files.proto stream.proto gxs.proto msgs.proto
PROTO = core.proto peers.proto system.proto chat.proto search.proto files.proto stream.proto
PROTOPATH = ./definition
#CDESTPATH = ./gencc
@ -15,7 +15,7 @@ HCODE = $(patsubst %.proto,$(CDESTPATH)/%.pb.h, $(PROTO))
PYCODE = $(patsubst %.proto,$(PYDESTPATH)/%_pb2.py, $(PROTO))
all: allc
all: allc python_proto
allc: $(CCODE)
echo $(CCODE)

View File

@ -19,6 +19,7 @@ enum RequestMsgIds {
MsgId_RequestSetLobbyNickname = 4;
MsgId_RequestRegisterEvents = 5;
MsgId_RequestSendMessage = 6;
MsgId_RequestChatHistory = 7;
}
enum ResponseMsgIds {
@ -27,6 +28,7 @@ enum ResponseMsgIds {
MsgId_ResponseSetLobbyNickname = 4;
MsgId_ResponseRegisterEvents = 5;
MsgId_ResponseSendMessage = 6;
MsgId_ResponseChatHistory = 7;
// EVENTS
MsgId_EventLobbyInvite = 101;
@ -223,21 +225,22 @@ message ResponseSendMessage {
}
///////////////////////////////////////////////////////////////
//// Chat History.
//// THIS IS NOT IMPLEMENTED YET, DONT USE.
//
//// REQUEST: RequestChatHistory
//message RequestChatHistory {
// required ChatId id = 1; /* lobby or chat, group id */
// required bool incoming = 2;
//}
//
//// RESPONSE: ResponseChatHistory
//message ResponseChatHistory {
// required rsctrl.core.Status status = 1;
// repeated ChatMessage msg = 2;
//}
//
// Chat History.
// INITIALLY THIS WILL ONLY WORK WITH PRIVATE CHATS.
// NEED TO EXTEND INTERNALS TO HANDLE LOBBIES.
// REQUEST: RequestChatHistory
message RequestChatHistory {
required ChatId id = 1; /* lobby or chat, group id */
}
// RESPONSE: ResponseChatHistory
message ResponseChatHistory {
required rsctrl.core.Status status = 1;
required ChatId id = 2; /* lobby or chat, group id */
repeated ChatMessage msgs = 3;
}
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////

View File

@ -17,9 +17,10 @@ enum PackageId {
CHAT = 3;
SEARCH = 4;
FILES = 5;
STREAM = 6;
// BELOW HERE IS STILL BEING DESIGNED.
//MSGS = 5;
//TRANSFER = 6;
//MSGS = 7;
// THEORETICAL ONES.
GXS = 1000;
@ -74,10 +75,11 @@ message Location {
message Person {
enum Relationship {
FRIEND = 1;
FRIEND_OF_MANY_FRIENDS = 2; // 3+ at the moment.
FRIEND_OF_FRIENDS = 3; // 1 or 2.
UNKNOWN = 4;
YOURSELF = 1;
FRIEND = 2;
FRIEND_OF_MANY_FRIENDS = 3; // 3+ at the moment.
FRIEND_OF_FRIENDS = 4; // 1 or 2.
UNKNOWN = 5;
}
required string gpg_id = 1;
@ -110,6 +112,14 @@ message Dir {
}
///////////////////////////////////////////////////////////////
message Timestamp {
required uint64 secs = 1;
required uint32 microsecs = 2;
}
///////////////////////////////////////////////////////////////
// System Status

View File

@ -13,11 +13,13 @@ import "core.proto";
enum RequestMsgIds {
MsgId_RequestTransferList = 1;
MsgId_RequestControlDownload = 2;
MsgId_RequestShareDirList = 3;
}
enum ResponseMsgIds {
MsgId_ResponseTransferList = 1;
MsgId_ResponseControlDownload = 2;
MsgId_ResponseShareDirList = 3;
}
///////////////////////////////////////////////////////////////
@ -30,12 +32,26 @@ enum Direction {
DIRECTION_DOWNLOAD = 2;
}
enum TransferState {
TRANSFER_FAILED = 1;
TRANSFER_OKAY = 2;
TRANSFER_PAUSED = 3;
TRANSFER_QUEUED = 4;
TRANSFER_WAITING = 5;
TRANSFER_DOWNLOADING = 6;
TRANSFER_CHECKING_HASH = 7;
TRANSFER_COMPLETE = 8;
}
message FileTransfer {
required rsctrl.core.File file = 1;
required Direction direction = 2;
required float fraction = 3;
required float rate_kBs = 4;
required TransferState state = 5;
}
///////////////////////////////////////////////////////////////
@ -86,22 +102,32 @@ message ResponseControlDownload {
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
// SHARED FILES
// THIS STUFF IS NOT FINISHED YET!
//
//// REQUEST: RequestListShares
//message RequestListShares {
//
// required uint32 depth = 1; // HOW Many Directories to drill down.
// repeated string ShareLocation = 2;
//}
//
//message ShareLocation {
// required string ssl_id = 1;
// required string path = 2;
//}
//
//
//
// REQUEST: RequestShareDirList
message RequestShareDirList {
required string ssl_id = 1;
required string path = 2;
}
// RESPONSE: ResponseShareDirList
message ResponseShareDirList {
required rsctrl.core.Status status = 1;
enum ListType {
DIRQUERY_ROOT = 1; // the query refers to root.
DIRQUERY_PERSON = 2; // the query refers to person
DIRQUERY_FILE = 3; // the query refers to a file.
DIRQUERY_DIR = 4; // move to top of queue.
}
required string ssl_id = 2;
required string path = 3;
required ListType list_type = 4;
repeated rsctrl.core.File files = 5;
}
//// REQUEST: RequestChangeShares
//
//// REQUEST: RequestLiCloseSearch

View File

@ -9,13 +9,12 @@ import "core.proto";
enum RequestMsgIds {
MsgId_RequestPeers = 1;
MsgId_RequestAddPeer = 2;
MsgId_RequestModifyPeer = 3;
MsgId_RequestExaminePeer = 3;
MsgId_RequestModifyPeer = 4;
}
enum ResponseMsgIds {
MsgId_ResponsePeerList = 1;
MsgId_ResponseAddPeer = 2;
MsgId_ResponseModifyPeer = 3;
}
///////////////////////////////////////////////////////////////
@ -44,7 +43,7 @@ message RequestPeers {
required SetOption set = 1;
required InfoOption info = 2;
repeated string gpg_ids = 3;
repeated string pgp_ids = 3;
}
@ -60,27 +59,37 @@ message ResponsePeerList {
message RequestAddPeer {
enum AddCmd {
NOOP = 0; // No op.
ADD = 1; // Add existing from gpg_id.
REMOVE = 2; // Remove existing from gpg_id.
IMPORT = 3; // Import from cert, with gpg_id.
ADD = 1; // Add existing from gpg_id.
REMOVE = 2; // Remove existing from gpg_id.
}
required AddCmd cmd = 1;
required string pgp_id = 2;
optional string ssl_id = 3;
}
///////////////////////////////////////////////////////////////
// REQUEST: RequestExaminePeer
message RequestExaminePeer {
enum ExamineCmd {
IMPORT = 3; // Import from cert, with gpg_id.
EXAMINE = 4; // Examine cert, but no action.
}
required string gpg_id = 1;
required AddCmd cmd = 2;
optional string cert = 3;
}
// Must have GPG ID to import. Proves you've looked at it.
required string pgp_id = 1;
required ExamineCmd cmd = 2;
required string cert = 3;
// RESPONSE: ResponseAddPeer
message ResponseAddPeer {
required rsctrl.core.Status status = 1;
repeated rsctrl.core.Person peers = 2;
}
///////////////////////////////////////////////////////////////
// REQUEST: RequestModifyPeer
// THIS IS INCOMPLETE... DON'T USE.
message RequestModifyPeer {
enum ModCmd {
@ -98,11 +107,5 @@ message RequestModifyPeer {
repeated rsctrl.core.Person peers = 2;
}
// RESPONSE: ResponseModifyPeer
message ResponseModifyPeer {
required rsctrl.core.Status status = 1;
repeated rsctrl.core.Person peers = 2;
}
///////////////////////////////////////////////////////////////

View File

@ -33,7 +33,7 @@ message SearchHit {
required rsctrl.core.File file = 1;
required uint32 loc = 2; // OR of LocFlag so uint field
required uint32 no_hits = 3; // NOT USED YET.
repeated string alt_names = 4;
}
message SearchSet {
@ -100,8 +100,8 @@ message RequestListSearches {
// Empty search_ids => all results.
message RequestSearchResults {
optional uint32 result_limit = 1;
repeated uint32 search_ids = 2;
}
// RESPONSE: ResponseSearchResults

View File

@ -0,0 +1,152 @@
package rsctrl.stream;
import "core.proto";
///////////////////////////////////////////////////////////////
// This protocol defines how to stream data from retroshare.
// It can be used for VoIP or Files, or whatever.
//
// There are two parts.
// Control and actual streaming.
//
///////////////////////////////////////////////////////////////
enum RequestMsgIds {
MsgId_RequestStartFileStream = 1;
MsgId_RequestControlStream = 2;
MsgId_RequestListStreams = 3;
}
enum ResponseMsgIds {
MsgId_ResponseStreamDetail = 1; // RESPONSE to List and Control.
MsgId_ResponseStreamData = 101;
}
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
// Building Blocks
enum StreamType {
STREAM_TYPE_ALL = 1; // all streams
STREAM_TYPE_FILES = 2; // files stream
STREAM_TYPE_VOIP = 3; // VoIP stream
STREAM_TYPE_OTHER = 4; // Who knows what else.
}
enum StreamState {
STREAM_STATE_ERROR = 0;
STREAM_STATE_RUN = 1;
STREAM_STATE_PAUSED = 2;
STREAM_STATE_FINISHED = 3;
}
message StreamFileDetail {
required rsctrl.core.File file = 1;
required uint64 offset = 5;
}
message StreamVoipDetail {
// THIS NEEDS MORE DEFINITION.
required string peer_id = 1;
required uint64 duration = 2;
required uint64 offset = 3;
}
message StreamDesc {
required uint32 stream_id = 1;
required StreamType stream_type = 2;
required StreamState stream_state = 3;
required float rate_kbs = 4;
optional StreamFileDetail file = 5;
optional StreamVoipDetail voip = 6;
}
message StreamData {
required uint32 stream_id = 1;
required StreamState stream_state = 2;
required rsctrl.core.Timestamp send_time = 3;
required uint64 offset = 4;
required uint32 size = 5;
required bytes stream_data = 6;
}
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
// REQUEST: RequestStartFileStream
message RequestStartFileStream {
required rsctrl.core.File file = 1;
required float rate_kbs = 2;
// byte range. allows to restart transfer!
optional uint64 start_byte = 3;
optional uint64 end_byte = 4;
}
// RESPONSE: ResponseStreamDetail
message ResponseStreamDetail {
required rsctrl.core.Status status = 1;
repeated StreamDesc streams = 2;
}
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
// REQUEST: RequestControlStream
message RequestControlStream {
enum StreamAction {
STREAM_START = 1; // start stream
STREAM_STOP = 2; // stop stream
STREAM_PAUSE = 3; // pause stream
STREAM_CHANGE_RATE = 4; // rate of the stream
STREAM_SEEK = 5; // move streaming position.
}
required uint32 stream_id = 1;
required StreamAction action = 2;
optional float rate_kbs = 3;
optional uint64 seek_byte = 4;
}
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
// REQUEST: RequestListStreams
message RequestListStreams {
required StreamType request_type = 1;
}
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
// RESPONSE: ResponseStreamData
message ResponseStreamData {
required rsctrl.core.Status status = 1;
required StreamData data = 2;
}
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////

View File

@ -10,12 +10,14 @@ enum RequestMsgIds {
MsgId_RequestSystemStatus = 1;
MsgId_RequestSystemQuit = 2;
MsgId_RequestSystemExternalAccess = 3;
MsgId_RequestSystemAccount = 4;
}
enum ResponseMsgIds {
MsgId_ResponseSystemStatus = 1;
MsgId_ResponseSystemQuit = 2;
MsgId_ResponseSystemExternalAccess = 3;
MsgId_ResponseSystemAccount = 4;
}
///////////////////////////////////////////////////////////////
@ -93,6 +95,26 @@ message ResponseSystemExternalAccess {
required string dht_key = 3;
}
///////////////////////////////////////////////////////////////
// REQUEST: RequestSystemAccount
message RequestSystemAccount {
// Nothing here?
}
// RESPONSE: ResponseSystemAccount
message ResponseSystemAccount {
// Status of response.
required rsctrl.core.Status status = 1;
required string pgp_name = 2;
required string location = 3;
required string pgp_id = 4;
required string ssl_id = 5;
}
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////