diff --git a/retroshare-nogui/src/retroshare-nogui.pro b/retroshare-nogui/src/retroshare-nogui.pro index 88747fa76..9b13daa88 100644 --- a/retroshare-nogui/src/retroshare-nogui.pro +++ b/retroshare-nogui/src/retroshare-nogui.pro @@ -144,11 +144,13 @@ introserver { sshserver { - LIBSSH_DIR = ../../../../libssh-0.5.4 # This Requires libssh-0.5.* to compile. - # Modify path below to point at it. - # Probably will only work on Linux for the moment. + # Please use this path below. + # (You can modify it locally if required - but dont commit it!) + + LIBSSH_DIR = ../../../lib/libssh-0.5.4 + # # Use the following commend to generate a Server RSA Key. # Key should be in current directory - when run/ @@ -164,7 +166,6 @@ sshserver { win32 { DEFINES *= LIBSSH_STATIC - LIBSSH_DIR = ../../../libssh-0.5.4 } INCLUDEPATH += $$LIBSSH_DIR/include/ @@ -243,12 +244,16 @@ protorpc { rpc/proto/rpcprotochat.h \ rpc/proto/rpcprotosearch.h \ rpc/proto/rpcprotofiles.h \ + rpc/proto/rpcprotostream.h \ + rpc/proto/rpcprotoutils.h \ SOURCES += rpc/proto/rpcprotopeers.cc \ rpc/proto/rpcprotosystem.cc \ rpc/proto/rpcprotochat.cc \ rpc/proto/rpcprotosearch.cc \ rpc/proto/rpcprotofiles.cc \ + rpc/proto/rpcprotostream.cc \ + rpc/proto/rpcprotoutils.cc \ # Offical Generated Code (protobuf 2.4.1) HEADERS += rpc/proto/gencc/core.pb.h \ @@ -257,6 +262,7 @@ protorpc { rpc/proto/gencc/chat.pb.h \ rpc/proto/gencc/search.pb.h \ rpc/proto/gencc/files.pb.h \ + rpc/proto/gencc/stream.pb.h \ SOURCES += rpc/proto/gencc/core.pb.cc \ rpc/proto/gencc/peers.pb.cc \ @@ -264,6 +270,7 @@ protorpc { rpc/proto/gencc/chat.pb.cc \ rpc/proto/gencc/search.pb.cc \ rpc/proto/gencc/files.pb.cc \ + rpc/proto/gencc/stream.pb.cc \ # Generated ProtoBuf Code the RPC System # If you are developing, or have a different version of protobuf @@ -274,6 +281,7 @@ protorpc { # ../../rsctrl/src/gencc/chat.pb.h \ # ../../rsctrl/src/gencc/search.pb.h \ # ../../rsctrl/src/gencc/files.pb.h \ + # ../../rsctrl/src/gencc/stream.pb.h \ #SOURCES += ../../rsctrl/src/gencc/core.pb.cc \ # ../../rsctrl/src/gencc/peers.pb.cc \ @@ -281,6 +289,7 @@ protorpc { # ../../rsctrl/src/gencc/chat.pb.cc \ # ../../rsctrl/src/gencc/search.pb.cc \ # ../../rsctrl/src/gencc/files.pb.cc \ + # ../../rsctrl/src/gencc/stream.pb.cc \ INCLUDEPATH *= rpc/proto/gencc diff --git a/retroshare-nogui/src/rpc/proto/gencc/core.pb.cc b/retroshare-nogui/src/rpc/proto/gencc/core.pb.cc deleted file mode 100644 index e69de29bb..000000000 diff --git a/retroshare-nogui/src/rpc/proto/gencc/files.pb.cc b/retroshare-nogui/src/rpc/proto/gencc/files.pb.cc deleted file mode 100644 index e69de29bb..000000000 diff --git a/retroshare-nogui/src/rpc/proto/gencc/search.pb.cc b/retroshare-nogui/src/rpc/proto/gencc/search.pb.cc deleted file mode 100644 index e69de29bb..000000000 diff --git a/retroshare-nogui/src/rpc/proto/rpcprotochat.cc b/retroshare-nogui/src/rpc/proto/rpcprotochat.cc index 3f489ad86..1dfe166eb 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotochat.cc +++ b/retroshare-nogui/src/rpc/proto/rpcprotochat.cc @@ -26,6 +26,7 @@ #include #include +#include #include "util/rsstring.h" @@ -48,6 +49,8 @@ bool fillLobbyInfoFromChatLobbyInfo(const ChatLobbyInfo &cfi, rsctrl::chat::Chat bool fillLobbyInfoFromVisibleChatLobbyRecord(const VisibleChatLobbyRecord &pclr, rsctrl::chat::ChatLobbyInfo *lobby); bool fillLobbyInfoFromChatLobbyInvite(const ChatLobbyInvite &cli, rsctrl::chat::ChatLobbyInfo *lobby); +bool fillChatMessageFromHistoryMsg(const HistoryMsg &histmsg, rsctrl::chat::ChatMessage *rpcmsg); + bool createQueuedEventSendMsg(const ChatInfo &chatinfo, rsctrl::chat::ChatType ctype, std::string chat_id, const RpcEventRegister &ereg, RpcQueuedMsg &qmsg); @@ -127,6 +130,10 @@ int RpcProtoChat::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, processReqSendMessage(chan_id, msg_id, req_id, msg); break; + case rsctrl::chat::MsgId_RequestChatHistory: + processReqChatHistory(chan_id, msg_id, req_id, msg); + break; + default: std::cerr << "RpcProtoChat::processMsg() ERROR should never get here"; @@ -349,7 +356,7 @@ int RpcProtoChat::processReqCreateLobby(uint32_t chan_id, uint32_t /*msg_id*/, u std::string lobby_name = req.lobby_name(); std::string lobby_topic = req.lobby_topic(); std::list invited_friends; - uint32_t lobby_privacy_type; + uint32_t lobby_privacy_type = 0; switch(req.privacy_level()) { @@ -806,6 +813,172 @@ int RpcProtoChat::processReqRegisterEvents(uint32_t chan_id, uint32_t /*msg_id*/ + +int RpcProtoChat::processReqChatHistory(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg) +{ + std::cerr << "RpcProtoChat::processReqChatHistory()"; + std::cerr << std::endl; + + // parse msg. + rsctrl::chat::RequestChatHistory req; + if (!req.ParseFromString(msg)) + { + std::cerr << "RpcProtoChat::processReqChatHistory() ERROR ParseFromString()"; + std::cerr << std::endl; + return 0; + } + + // response. + rsctrl::chat::ResponseChatHistory resp; + bool success = true; + std::string errorMsg; + + // Get the Chat History for specified IDs.... + + /* switch depending on type */ + bool private_chat = false; + bool lobby_chat = false; + std::string chat_id; + + // copy the ID over. + rsctrl::chat::ChatId *id = resp.mutable_id(); + *id = req.id(); + + switch(req.id().chat_type()) + { + case rsctrl::chat::TYPE_PRIVATE: + { + // easy one. + chat_id = req.id().chat_id(); + private_chat = true; + + std::cerr << "RpcProtoChat::processReqChatHistory() Getting Private Chat History for: "; + std::cerr << chat_id; + std::cerr << std::endl; + + break; + } + case rsctrl::chat::TYPE_LOBBY: + { + std::cerr << "RpcProtoChat::processReqChatHistory() Lobby Chat History NOT IMPLEMENTED YET"; + std::cerr << std::endl; + success = false; + lobby_chat = true; + errorMsg = "Lobby Chat History Not Implemented"; + +#if 0 + /* convert string->ChatLobbyId */ + ChatLobbyId lobby_id; + if (!convertStringToLobbyId(req.msg().id().chat_id(), lobby_id)) + { + std::cerr << "ERROR Failed conversion of Lobby Id"; + std::cerr << std::endl; + + success = false; + errorMsg = "Failed Conversion of Lobby Id"; + } + /* convert lobby id to virtual peer id */ + else if (!rsMsgs->getVirtualPeerId(lobby_id, chat_id)) + { + std::cerr << "ERROR Invalid Lobby Id"; + std::cerr << std::endl; + + success = false; + errorMsg = "Invalid Lobby Id"; + } + lobby_chat = true; + std::cerr << "RpcProtoChat::processReqChatHistory() Getting Lobby Chat History for: "; + std::cerr << chat_id; + std::cerr << std::endl; +#endif + + break; + } + case rsctrl::chat::TYPE_GROUP: + + std::cerr << "RpcProtoChat::processReqChatHistory() Group Chat History NOT IMPLEMENTED YET"; + std::cerr << std::endl; + success = false; + errorMsg = "Group Chat History Not Implemented"; + + break; + default: + + std::cerr << "ERROR Chat Type invalid"; + std::cerr << std::endl; + + success = false; + errorMsg = "Invalid Chat Type"; + break; + } + + // Should be able to reply using the existing message types. + if (success) + { + if (private_chat) + { + /* extract the history */ + std::list msgs; + std::list::iterator it; + rsHistory->getMessages(chat_id, msgs, 0); + + //rsctrl::chat::ChatId *id = resp.mutable_id(); + //id->set_chat_type(rsctrl::chat::TYPE_PRIVATE); + //id->set_chat_id(chat_id); + + for(it = msgs.begin(); it != msgs.end(); it++) + { + rsctrl::chat::ChatMessage *msg = resp.add_msgs(); + fillChatMessageFromHistoryMsg(*it, msg); + + std::cerr << "\t Message: " << it->message; + std::cerr << std::endl; + } + } +#if 0 + else if (lobby_chat) + { + + + } +#endif + } + + + /* DONE - Generate Reply */ + if (success) + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::SUCCESS); + } + else + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::FAILED); + status->set_msg(errorMsg); + } + + std::string outmsg; + if (!resp.SerializeToString(&outmsg)) + { + std::cerr << "RpcProtoChat::processReqChatHistory() ERROR SerialiseToString()"; + std::cerr << std::endl; + return 0; + } + + // Correctly Name Message. + uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::CHAT, + rsctrl::chat::MsgId_ResponseChatHistory, true); + + // queue it. + queueResponse(chan_id, out_msg_id, req_id, outmsg); + + return 1; +} + + + + int RpcProtoChat::processReqSendMessage(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoChat::processReqSendMessage()"; @@ -939,8 +1112,8 @@ int RpcProtoChat::processReqSendMessage(uint32_t chan_id, uint32_t /*msg_id*/, u int RpcProtoChat::locked_checkForEvents(uint32_t event, const std::list ®istered, std::list &events) { /* Wow - here already! */ - std::cerr << "locked_checkForEvents()"; - std::cerr << std::endl; + //std::cerr << "locked_checkForEvents()"; + //std::cerr << std::endl; /* only one event type for now */ if (event != REGISTRATION_EVENT_CHAT) @@ -1220,6 +1393,25 @@ bool fillLobbyInfoFromChatLobbyInvite(const ChatLobbyInvite &cli, rsctrl::chat:: } +bool fillChatMessageFromHistoryMsg(const HistoryMsg &histmsg, rsctrl::chat::ChatMessage *rpcmsg) +{ + rsctrl::chat::ChatId *id = rpcmsg->mutable_id(); + + id->set_chat_type(rsctrl::chat::TYPE_PRIVATE); + id->set_chat_id(histmsg.chatPeerId); + + rpcmsg->set_msg(histmsg.message); + + rpcmsg->set_peer_nickname(histmsg.peerName); + rpcmsg->set_chat_flags(0); + + rpcmsg->set_send_time(histmsg.sendTime); + rpcmsg->set_recv_time(histmsg.recvTime); + + return true; +} + + bool createQueuedEventSendMsg(const ChatInfo &chatinfo, rsctrl::chat::ChatType ctype, std::string chat_id, const RpcEventRegister &ereg, RpcQueuedMsg &qmsg) { @@ -1266,6 +1458,9 @@ bool createQueuedEventSendMsg(const ChatInfo &chatinfo, rsctrl::chat::ChatType c return true; } + + + bool convertUTF8toWString(const std::string &msg_utf8, std::wstring &msg_wstr) { return librs::util::ConvertUtf8ToUtf16(msg_utf8, msg_wstr); diff --git a/retroshare-nogui/src/rpc/proto/rpcprotochat.h b/retroshare-nogui/src/rpc/proto/rpcprotochat.h index d42ab37de..08595bf3e 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotochat.h +++ b/retroshare-nogui/src/rpc/proto/rpcprotochat.h @@ -45,6 +45,9 @@ protected: int processReqRegisterEvents(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); int processReqSendMessage(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + int processReqChatHistory(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + + // EVENTS. virtual int locked_checkForEvents(uint32_t event, const std::list ®istered, std::list &events); }; diff --git a/retroshare-nogui/src/rpc/proto/rpcprotofiles.cc b/retroshare-nogui/src/rpc/proto/rpcprotofiles.cc index 6a523f899..c5838ec50 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotofiles.cc +++ b/retroshare-nogui/src/rpc/proto/rpcprotofiles.cc @@ -25,8 +25,10 @@ #include "rpc/proto/gencc/files.pb.h" #include +#include #include "util/rsstring.h" +#include "util/rsdir.h" #include @@ -36,6 +38,9 @@ #include +bool fill_file_from_details(rsctrl::core::File *file, DirDetails &details); +bool fill_file_as_dir(rsctrl::core::File *file, const std::string &dir_name); + RpcProtoFiles::RpcProtoFiles(uint32_t serviceId) :RpcQueueService(serviceId) { @@ -93,6 +98,10 @@ int RpcProtoFiles::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id processReqControlDownload(chan_id, msg_id, req_id, msg); break; + case rsctrl::files::MsgId_RequestShareDirList: + processReqShareDirList(chan_id, msg_id, req_id, msg); + break; + default: std::cerr << "RpcProtoFiles::processMsg() ERROR should never get here"; std::cerr << std::endl; @@ -105,7 +114,7 @@ int RpcProtoFiles::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id -int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoFiles::processReqTransferList()"; std::cerr << std::endl; @@ -171,6 +180,35 @@ int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t msg_id, uin transfer->set_fraction( (float) info.transfered / info.size ); transfer->set_rate_kbs( info.tfRate ); + + switch(info.downloadStatus) + { + case FT_STATE_FAILED: + transfer->set_state(rsctrl::files::TRANSFER_FAILED); + break; + default: + case FT_STATE_OKAY: + transfer->set_state(rsctrl::files::TRANSFER_OKAY); + break; + case FT_STATE_PAUSED: + transfer->set_state(rsctrl::files::TRANSFER_PAUSED); + break; + case FT_STATE_QUEUED: + transfer->set_state(rsctrl::files::TRANSFER_QUEUED); + break; + case FT_STATE_WAITING: + transfer->set_state(rsctrl::files::TRANSFER_WAITING); + break; + case FT_STATE_DOWNLOADING: + transfer->set_state(rsctrl::files::TRANSFER_DOWNLOADING); + break; + case FT_STATE_CHECKING_HASH: + transfer->set_state(rsctrl::files::TRANSFER_CHECKING_HASH); + break; + case FT_STATE_COMPLETE: + transfer->set_state(rsctrl::files::TRANSFER_COMPLETE); + break; + } } /* DONE - Generate Reply */ @@ -204,7 +242,7 @@ int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t msg_id, uin return 1; } -int RpcProtoFiles::processReqControlDownload(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoFiles::processReqControlDownload(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoFiles::processReqControlDownload()"; std::cerr << std::endl; @@ -340,5 +378,253 @@ int RpcProtoFiles::processReqControlDownload(uint32_t chan_id, uint32_t msg_id, } +int RpcProtoFiles::processReqShareDirList(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) +{ + std::cerr << "RpcProtoFiles::processReqShareDirList()"; + std::cerr << std::endl; + + // parse msg. + rsctrl::files::RequestShareDirList req; + if (!req.ParseFromString(msg)) + { + std::cerr << "RpcProtoFiles::processReqShareDirList() ERROR ParseFromString()"; + std::cerr << std::endl; + return 0; + } + + // response. + rsctrl::files::ResponseShareDirList resp; + bool success = true; + std::string errorMsg; + + + std::string uid = req.ssl_id(); + std::string path = req.path(); + DirDetails details; + + if (uid.empty()) + { + uid = rsPeers->getOwnId(); + } + + std::cerr << "RpcProtoFiles::processReqShareDirList() For uid: " << uid << " & path: " << path; + std::cerr << std::endl; + + if (path.empty()) + { + /* we have to do a nasty hack to get anything useful. + * we do a ref=NULL to get the pointers to People, + * then use the correct one to get root directories + */ + std::cerr << "RpcProtoFiles::processReqShareDirList() Hack to get root Dirs!"; + std::cerr << std::endl; + + FileSearchFlags flags; + if (uid == rsPeers->getOwnId()) + { + flags |= RS_FILE_HINTS_LOCAL; + } + + DirDetails root_details; + if (!rsFiles->RequestDirDetails(NULL, root_details, flags)) + { + std::cerr << "RpcProtoFiles::processReqShareDirList() ref=NULL Hack failed"; + std::cerr << std::endl; + success = false; + errorMsg = "Root Directory Request Failed."; + } + else + { + void *person_ref = NULL; + std::list::iterator sit; + for(sit = root_details.children.begin(); sit != root_details.children.end(); sit++) + { + //std::cerr << "RpcProtoFiles::processReqShareDirList() Root.child->name : " << sit->name; + if (sit->name == uid) + { + person_ref = sit->ref; + break; + } + } + + if (!person_ref) + { + std::cerr << "RpcProtoFiles::processReqShareDirList() Person match failed"; + std::cerr << std::endl; + success = false; + errorMsg = "Missing Person Root Directory."; + } + else + { + // Doing the REAL request! + if (!rsFiles->RequestDirDetails(person_ref, details, flags)) + { + std::cerr << "RpcProtoFiles::processReqShareDirList() Personal Shared Dir Hack failed"; + std::cerr << std::endl; + success = false; + errorMsg = "Missing Person Shared Directories"; + } + } + } + + } + else + { + // Path must begin with / for proper matching. + if (path[0] != '/') + { + path = '/' + path; + } + + if (!rsFiles->RequestDirDetails(uid, path, details)) + { + std::cerr << "RpcProtoFiles::processReqShareDirList() ERROR Unknown Dir"; + std::cerr << std::endl; + success = false; + errorMsg = "Directory Request Failed."; + } + } + + + + + + if (success) + { + // setup basics of response. + resp.set_ssl_id(uid); + resp.set_path(path); + + switch(details.type) + { + case DIR_TYPE_ROOT: + { + std::cerr << "RpcProtoFiles::processReqShareDirList() Details.type == ROOT ??"; + std::cerr << std::endl; + resp.set_list_type(rsctrl::files::ResponseShareDirList::DIRQUERY_ROOT); + rsctrl::core::File *file = resp.add_files(); + fill_file_as_dir(file, details.name); + + } + break; + + case DIR_TYPE_FILE: + { + std::cerr << "RpcProtoFiles::processReqShareDirList() Details.type == FILE"; + std::cerr << std::endl; + resp.set_list_type(rsctrl::files::ResponseShareDirList::DIRQUERY_FILE); + rsctrl::core::File *file = resp.add_files(); + fill_file_from_details(file, details); + } + break; + + default: + std::cerr << "RpcProtoFiles::processReqShareDirList() Details.type == UNKNOWN => default to DIR"; + std::cerr << std::endl; + + case DIR_TYPE_PERSON: + case DIR_TYPE_DIR: + { + std::cerr << "RpcProtoFiles::processReqShareDirList() Details.type == DIR or PERSON"; + std::cerr << std::endl; + + resp.set_list_type(rsctrl::files::ResponseShareDirList::DIRQUERY_DIR); + + //std::string dir_path = RsDirUtil::makePath(details.path, details.name); + std::string dir_path = details.path; + + std::cerr << "RpcProtoFiles::processReqShareDirList() details.path: " << details.path; + std::cerr << " details.name: " << details.name; + std::cerr << std::endl; + + std::list::iterator sit; + for(sit = details.children.begin(); sit != details.children.end(); sit++) + { + std::cerr << "RpcProtoFiles::processReqShareDirList() checking child: " << sit->name; + std::cerr << std::endl; + if (sit->type == DIR_TYPE_FILE) + { + std::cerr << "RpcProtoFiles::processReqShareDirList() is FILE, fetching details."; + std::cerr << std::endl; + + DirDetails child_details; + std::string child_path = RsDirUtil::makePath(dir_path, sit->name); + if (rsFiles->RequestDirDetails(uid, child_path, child_details)) + { + rsctrl::core::File *file = resp.add_files(); + fill_file_from_details(file, child_details); + } + else + { + std::cerr << "RpcProtoFiles::processReqShareDirList() RequestDirDetails(" << child_path << ") Failed!!!"; + std::cerr << std::endl; + } + } + else + { + std::cerr << "RpcProtoFiles::processReqShareDirList() is DIR"; + std::cerr << std::endl; + + rsctrl::core::File *file = resp.add_files(); + fill_file_as_dir(file, sit->name); + } + } + } + break; + } + } + + /* DONE - Generate Reply */ + if (success) + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::SUCCESS); + } + else + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::FAILED); + status->set_msg(errorMsg); + } + + std::string outmsg; + if (!resp.SerializeToString(&outmsg)) + { + std::cerr << "RpcProtoFiles::processReqTransferList() ERROR SerialiseToString()"; + std::cerr << std::endl; + return 0; + } + + // Correctly Name Message. + uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::FILES, + rsctrl::files::MsgId_ResponseShareDirList, true); + + // queue it. + queueResponse(chan_id, out_msg_id, req_id, outmsg); + + return 1; +} + + /***** HELPER FUNCTIONS *****/ + +bool fill_file_from_details(rsctrl::core::File *file, DirDetails &details) +{ + file->set_hash(details.hash); + file->set_name(details.name); + file->set_size(details.count); + + return true; +} + + +bool fill_file_as_dir(rsctrl::core::File *file, const std::string &dir_name) +{ + file->set_hash(""); + file->set_name(dir_name); + file->set_size(0); + + return true; +} + diff --git a/retroshare-nogui/src/rpc/proto/rpcprotofiles.h b/retroshare-nogui/src/rpc/proto/rpcprotofiles.h index 89f8d17f9..7f0af0c9e 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotofiles.h +++ b/retroshare-nogui/src/rpc/proto/rpcprotofiles.h @@ -38,6 +38,7 @@ protected: int processReqTransferList(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); int processReqControlDownload(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + int processReqShareDirList(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg); }; diff --git a/retroshare-nogui/src/rpc/proto/rpcprotopeers.cc b/retroshare-nogui/src/rpc/proto/rpcprotopeers.cc index aefe0312d..f1cc7f10d 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotopeers.cc +++ b/retroshare-nogui/src/rpc/proto/rpcprotopeers.cc @@ -30,6 +30,9 @@ #include #include +bool load_person_details(std::string pgp_id, rsctrl::core::Person *person, + bool getLocations, bool onlyConnected); + RpcProtoPeers::RpcProtoPeers(uint32_t serviceId) :RpcQueueService(serviceId) { @@ -88,9 +91,12 @@ int RpcProtoPeers::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id case rsctrl::peers::MsgId_RequestAddPeer: processAddPeer(chan_id, msg_id, req_id, msg); break; - case rsctrl::peers::MsgId_RequestModifyPeer: - processModifyPeer(chan_id, msg_id, req_id, msg); + case rsctrl::peers::MsgId_RequestExaminePeer: + processExaminePeer(chan_id, msg_id, req_id, msg); break; + //case rsctrl::peers::MsgId_RequestModifyPeer: + // processModifyPeer(chan_id, msg_id, req_id, msg); + // break; default: std::cerr << "RpcProtoPeers::processMsg() ERROR should never get here"; std::cerr << std::endl; @@ -102,15 +108,135 @@ int RpcProtoPeers::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id } -int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) { - std::cerr << "RpcProtoPeers::processAddPeer() NOT FINISHED"; + std::cerr << "RpcProtoPeers::processAddPeer()"; std::cerr << std::endl; + + // parse msg. + rsctrl::peers::RequestAddPeer req; + if (!req.ParseFromString(msg)) + { + std::cerr << "RpcProtoPeers::processAddPeer() ERROR ParseFromString()"; + std::cerr << std::endl; + return 0; + } + // response. - rsctrl::peers::ResponseAddPeer resp; + rsctrl::peers::ResponsePeerList resp; + bool success = true; + std::string errorMsg; + + /* check if the gpg_id is valid */ + std::string pgp_id = req.pgp_id(); + std::string ssl_id; + if (req.has_ssl_id()) + { + ssl_id = req.ssl_id(); + } + + RsPeerDetails details; + if (!rsPeers->getGPGDetails(pgp_id, details)) + { + success = false; + errorMsg = "Invalid PGP ID"; + } + else + { + switch(req.cmd()) + { + default: + success = false; + errorMsg = "Invalid AddCmd"; + break; + case rsctrl::peers::RequestAddPeer::ADD: + + // TODO. NEED TO HANDLE SERVICE PERMISSION FLAGS. + success = rsPeers->addFriend(ssl_id,pgp_id, RS_SERVICE_PERM_ALL); + + break; + case rsctrl::peers::RequestAddPeer::REMOVE: + + success = rsPeers->removeFriend(pgp_id); + break; + } + + if (success) + { + rsctrl::core::Person *person = resp.add_peers(); + load_person_details(pgp_id, person, true, false); + } + } + + if (success) + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::SUCCESS); + } + else + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::NO_IMPL_YET); + } + + + std::string outmsg; + if (!resp.SerializeToString(&outmsg)) + { + std::cerr << "RpcProtoPeers::processAddPeer() ERROR SerialiseToString()"; + std::cerr << std::endl; + return 0; + } + + // Correctly Name Message. + uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::PEERS, + rsctrl::peers::MsgId_ResponsePeerList, true); + + // queue it. + queueResponse(chan_id, out_msg_id, req_id, outmsg); + + return 1; +} + + +int RpcProtoPeers::processExaminePeer(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) +{ + std::cerr << "RpcProtoPeers::processExaminePeer() NOT FINISHED"; + std::cerr << std::endl; + + + // parse msg. + rsctrl::peers::RequestExaminePeer req; + if (!req.ParseFromString(msg)) + { + std::cerr << "RpcProtoPeers::processExaminePeer() ERROR ParseFromString()"; + std::cerr << std::endl; + return 0; + } + + // response. + rsctrl::peers::ResponsePeerList resp; bool success = false; + if (success) + { + switch(req.cmd()) + { + default: + success = false; + break; + case rsctrl::peers::RequestExaminePeer::IMPORT: + break; + case rsctrl::peers::RequestExaminePeer::EXAMINE: + + // Gets the GPG details, but does not add the key to the keyring. + //virtual bool loadDetailsFromStringCert(const std::string& certGPG, RsPeerDetails &pd,uint32_t& error_code) = 0; + + break; + } + } + if (success) { rsctrl::core::Status *status = resp.mutable_status(); @@ -133,7 +259,7 @@ int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t re // Correctly Name Message. uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::PEERS, - rsctrl::peers::MsgId_ResponseAddPeer, true); + rsctrl::peers::MsgId_ResponsePeerList, true); // queue it. queueResponse(chan_id, out_msg_id, req_id, outmsg); @@ -142,14 +268,24 @@ int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t re } -int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoPeers::processModifyPeer() NOT FINISHED"; std::cerr << std::endl; + // parse msg. + rsctrl::peers::RequestModifyPeer req; + if (!req.ParseFromString(msg)) + { + std::cerr << "RpcProtoPeers::processModifyPeer() ERROR ParseFromString()"; + std::cerr << std::endl; + return 0; + } + + // response. - rsctrl::peers::ResponseModifyPeer resp; + rsctrl::peers::ResponsePeerList resp; bool success = false; if (success) @@ -174,7 +310,7 @@ int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t // Correctly Name Message. uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::PEERS, - rsctrl::peers::MsgId_ResponseModifyPeer, true); + rsctrl::peers::MsgId_ResponsePeerList, true); // queue it. queueResponse(chan_id, out_msg_id, req_id, outmsg); @@ -184,7 +320,7 @@ int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t -int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoPeers::processRequestPeers()"; std::cerr << std::endl; @@ -198,10 +334,14 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32 return 0; } + // response. + rsctrl::peers::ResponsePeerList respp; + bool success = true; + std::string errorMsg; + // Get the list of gpg_id to generate data for. std::list ids; bool onlyConnected = false; - bool success = true; switch(reqp.set()) { case rsctrl::peers::RequestPeers::OWNID: @@ -216,9 +356,14 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32 { std::cerr << "RpcProtoPeers::processRequestPeers() LISTED"; std::cerr << std::endl; - /* extract ids from request (TODO) */ - std::string own_id = rsPeers->getGPGOwnId(); - ids.push_back(own_id); + int no_pgp_ids = reqp.pgp_ids_size(); + for (int i = 0; i < no_pgp_ids; i++) + { + std::string listed_id = reqp.pgp_ids(i); + std::cerr << "RpcProtoPeers::processRequestPeers() Adding Id: " << listed_id; + std::cerr << std::endl; + ids.push_back(listed_id); + } break; } @@ -281,117 +426,21 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32 break; } - // response. - rsctrl::peers::ResponsePeerList respp; /* now iterate through the peers and fill in the response. */ std::list::const_iterator git; for(git = ids.begin(); git != ids.end(); git++) { - - RsPeerDetails details; - if (!rsPeers->getGPGDetails(*git, details)) - { - continue; /* uhm.. */ - } - rsctrl::core::Person *person = respp.add_peers(); - - /* fill in key gpg details */ - person->set_gpg_id(*git); - person->set_name(details.name); - - std::cerr << "RpcProtoPeers::processRequestPeers() Adding GPGID: "; - std::cerr << *git << " name: " << details.name; - std::cerr << std::endl; - - if (details.state & RS_PEER_STATE_FRIEND) + if (!load_person_details(*git, person, getLocations, onlyConnected)) { - person->set_relation(rsctrl::core::Person::FRIEND); - } - else - { - std::list common_friends; - rsDisc->getDiscGPGFriends(*git, common_friends); - int size = common_friends.size(); - if (size) - { - if (size > 2) - { - person->set_relation(rsctrl::core::Person::FRIEND_OF_MANY_FRIENDS); - } - else - { - person->set_relation(rsctrl::core::Person::FRIEND_OF_FRIENDS); - } - } - else - { - person->set_relation(rsctrl::core::Person::UNKNOWN); - } - } - - if (getLocations) - { - std::list ssl_ids; - std::list::const_iterator sit; + std::cerr << "RpcProtoPeers::processRequestPeers() ERROR Finding GPGID: "; + std::cerr << *git; + std::cerr << std::endl; - if (!rsPeers->getAssociatedSSLIds(*git, ssl_ids)) - { - continue; /* end of this peer */ - } - - for(sit = ssl_ids.begin(); sit != ssl_ids.end(); sit++) - { - RsPeerDetails ssldetails; - if (!rsPeers->getPeerDetails(*sit, ssldetails)) - { - continue; /* uhm.. */ - } - if ((onlyConnected) && - (!(ssldetails.state & RS_PEER_STATE_CONNECTED))) - { - continue; - } - - rsctrl::core::Location *loc = person->add_locations(); - - std::cerr << "RpcProtoPeers::processRequestPeers() \t Adding Location: "; - std::cerr << *sit << " loc: " << ssldetails.location; - std::cerr << std::endl; - - /* fill in ssl details */ - loc->set_ssl_id(*sit); - loc->set_location(ssldetails.location); - - /* set addresses */ - rsctrl::core::IpAddr *laddr = loc->mutable_localaddr(); - laddr->set_addr(ssldetails.localAddr); - laddr->set_port(ssldetails.localPort); - - rsctrl::core::IpAddr *eaddr = loc->mutable_extaddr(); - eaddr->set_addr(ssldetails.extAddr); - eaddr->set_port(ssldetails.extPort); - - /* translate status */ - uint32_t loc_state = 0; - //dont think this state should be here. - //if (ssldetails.state & RS_PEER_STATE_FRIEND) - if (ssldetails.state & RS_PEER_STATE_ONLINE) - { - loc_state |= (uint32_t) rsctrl::core::Location::ONLINE; - } - if (ssldetails.state & RS_PEER_STATE_CONNECTED) - { - loc_state |= (uint32_t) rsctrl::core::Location::CONNECTED; - } - if (ssldetails.state & RS_PEER_STATE_UNREACHABLE) - { - loc_state |= (uint32_t) rsctrl::core::Location::UNREACHABLE; - } - - loc->set_state(loc_state); - } + /* cleanup peers */ + success = false; + errorMsg = "Error Loading PeerID"; } } @@ -404,7 +453,7 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32 { rsctrl::core::Status *status = respp.mutable_status(); status->set_code(rsctrl::core::Status::FAILED); - status->set_msg("Unknown ERROR"); + status->set_msg(errorMsg); } @@ -427,3 +476,135 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32 } + + + +bool load_person_details(std::string pgp_id, rsctrl::core::Person *person, + bool getLocations, bool onlyConnected) +{ + RsPeerDetails details; + if (!rsPeers->getGPGDetails(pgp_id, details)) + { + std::cerr << "RpcProtoPeers::processRequestPeers() ERROR Finding GPGID: "; + std::cerr << pgp_id; + std::cerr << std::endl; + return false; + } + + /* fill in key gpg details */ + person->set_gpg_id(pgp_id); + person->set_name(details.name); + + std::cerr << "RpcProtoPeers::processRequestPeers() Adding GPGID: "; + std::cerr << pgp_id << " name: " << details.name; + std::cerr << std::endl; + + //if (details.state & RS_PEER_STATE_FRIEND) + if (pgp_id == rsPeers->getGPGOwnId()) + { + std::cerr << "RpcProtoPeers::processRequestPeers() Relation YOURSELF"; + std::cerr << std::endl; + person->set_relation(rsctrl::core::Person::YOURSELF); + } + else if (rsPeers->isGPGAccepted(pgp_id)) + { + std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND"; + std::cerr << std::endl; + person->set_relation(rsctrl::core::Person::FRIEND); + } + else + { + std::list common_friends; + rsDisc->getDiscGPGFriends(pgp_id, common_friends); + int size = common_friends.size(); + if (size) + { + if (size > 2) + { + std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND_OF_MANY_FRIENDS"; + std::cerr << std::endl; + person->set_relation(rsctrl::core::Person::FRIEND_OF_MANY_FRIENDS); + } + else + { + std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND_OF_FRIENDS"; + std::cerr << std::endl; + person->set_relation(rsctrl::core::Person::FRIEND_OF_FRIENDS); + } + } + else + { + std::cerr << "RpcProtoPeers::processRequestPeers() Relation UNKNOWN"; + std::cerr << std::endl; + person->set_relation(rsctrl::core::Person::UNKNOWN); + } + } + + if (getLocations) + { + std::list ssl_ids; + std::list::const_iterator sit; + + if (!rsPeers->getAssociatedSSLIds(pgp_id, ssl_ids)) + { + std::cerr << "RpcProtoPeers::processRequestPeers() No Locations"; + std::cerr << std::endl; + return true; /* end of this peer */ + } + + for(sit = ssl_ids.begin(); sit != ssl_ids.end(); sit++) + { + RsPeerDetails ssldetails; + if (!rsPeers->getPeerDetails(*sit, ssldetails)) + { + continue; /* uhm.. */ + } + if ((onlyConnected) && + (!(ssldetails.state & RS_PEER_STATE_CONNECTED))) + { + continue; + } + + rsctrl::core::Location *loc = person->add_locations(); + + std::cerr << "RpcProtoPeers::processRequestPeers() \t Adding Location: "; + std::cerr << *sit << " loc: " << ssldetails.location; + std::cerr << std::endl; + + /* fill in ssl details */ + loc->set_ssl_id(*sit); + loc->set_location(ssldetails.location); + + /* set addresses */ + rsctrl::core::IpAddr *laddr = loc->mutable_localaddr(); + laddr->set_addr(ssldetails.localAddr); + laddr->set_port(ssldetails.localPort); + + rsctrl::core::IpAddr *eaddr = loc->mutable_extaddr(); + eaddr->set_addr(ssldetails.extAddr); + eaddr->set_port(ssldetails.extPort); + + /* translate status */ + uint32_t loc_state = 0; + //dont think this state should be here. + //if (ssldetails.state & RS_PEER_STATE_FRIEND) + if (ssldetails.state & RS_PEER_STATE_ONLINE) + { + loc_state |= (uint32_t) rsctrl::core::Location::ONLINE; + } + if (ssldetails.state & RS_PEER_STATE_CONNECTED) + { + loc_state |= (uint32_t) rsctrl::core::Location::CONNECTED; + } + if (ssldetails.state & RS_PEER_STATE_UNREACHABLE) + { + loc_state |= (uint32_t) rsctrl::core::Location::UNREACHABLE; + } + + loc->set_state(loc_state); + } + } + return true; /* end of this peer */ +} + + diff --git a/retroshare-nogui/src/rpc/proto/rpcprotopeers.h b/retroshare-nogui/src/rpc/proto/rpcprotopeers.h index 92e91d88a..7e15e0b07 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotopeers.h +++ b/retroshare-nogui/src/rpc/proto/rpcprotopeers.h @@ -36,6 +36,9 @@ public: virtual int processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); virtual int processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + + // these aren't implemented yet. + virtual int processExaminePeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); virtual int processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); }; diff --git a/retroshare-nogui/src/rpc/proto/rpcprotosearch.cc b/retroshare-nogui/src/rpc/proto/rpcprotosearch.cc index 31f97cd72..ba15dd78d 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotosearch.cc +++ b/retroshare-nogui/src/rpc/proto/rpcprotosearch.cc @@ -38,6 +38,9 @@ #include +bool condenseSearchResults(const std::list &searchResults, uint32_t limit, + rsctrl::search::SearchSet *result_set); + RpcProtoSearch::RpcProtoSearch(uint32_t serviceId, NotifyTxt *notify) :RpcQueueService(serviceId), mNotify(notify), searchMtx("RpcProtoSearch") @@ -420,21 +423,7 @@ int RpcProtoSearch::processReqSearchResults(uint32_t chan_id, uint32_t /* msg_id std::list searchResults; mNotify->getSearchResults(*rit, searchResults); - /* convert into useful list */ - for(it = searchResults.begin(); it != searchResults.end(); it++) - { - /* add to answer */ - rsctrl::search::SearchHit *hit = set->add_hits(); - rsctrl::core::File *file = hit->mutable_file(); - - file->set_hash(it->hash); - file->set_name(it->name); - file->set_size(it->size); - - // Uhm not provided for now. default to NETWORK - hit->set_loc(rsctrl::search::SearchHit::NETWORK); - hit->set_no_hits(1); // No aggregation yet. - } + condenseSearchResults(searchResults, req.result_limit(), set); } /* DONE - Generate Reply */ @@ -591,3 +580,99 @@ int RpcProtoSearch::clear_searches(uint32_t chan_id) return 1; } + + +class RpcSearchInfo +{ + public: + std::string hash; + std::string name; + uint64_t size; + std::map name_map; + uint32_t hits; +}; + + +bool condenseSearchResults(const std::list &searchResults, uint32_t limit, + rsctrl::search::SearchSet *result_set) +{ + std::map searchMap; + std::map::iterator mit; + + std::list::const_iterator it; + for(it = searchResults.begin(); it != searchResults.end(); it++) + { + mit = searchMap.find(it->hash); + if (mit != searchMap.end()) + { + mit->second.hits++; + + if (mit->second.name_map.find(it->name) == mit->second.name_map.end()) + { + mit->second.name_map[it->name] = 1; + } + else + { + mit->second.name_map[it->name]++; + } + + if (it->size != mit->second.size) + { + // ERROR. + } + } + else + { + RpcSearchInfo info; + info.hash = it->hash; + info.size = it->size; + info.name_map[it->name] = 1; + info.hits = 1; + + searchMap[it->hash] = info; + } + } + + unsigned int i = 0; + for(mit = searchMap.begin(); (mit != searchMap.end()) && (i < limit); mit++, i++) + { + std::map::reverse_iterator nit; + nit = mit->second.name_map.rbegin(); + + /* add to answer */ + rsctrl::search::SearchHit *hit = result_set->add_hits(); + rsctrl::core::File *file = hit->mutable_file(); + + file->set_hash(mit->second.hash); + file->set_name(nit->first); + file->set_size(mit->second.size); + + // Uhm not provided for now. default to NETWORK + hit->set_loc(rsctrl::search::SearchHit::NETWORK); + hit->set_no_hits(mit->second.hits); // No aggregation yet. + + // guarenteed to have one item here. + for(nit++; nit != mit->second.name_map.rend(); nit++) + { + hit->add_alt_names(nit->first); + } + } + + return true; +} + + + + + + + + + + + + + + + + diff --git a/retroshare-nogui/src/rpc/proto/rpcprotostream.cc b/retroshare-nogui/src/rpc/proto/rpcprotostream.cc new file mode 100644 index 000000000..445d5136a --- /dev/null +++ b/retroshare-nogui/src/rpc/proto/rpcprotostream.cc @@ -0,0 +1,824 @@ +/* + * RetroShare External Interface. + * + * Copyright 2012-2012 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#include "rpc/proto/rpcprotostream.h" +#include "rpc/proto/rpcprotoutils.h" + +#include "rpc/proto/gencc/stream.pb.h" +#include "rpc/proto/gencc/core.pb.h" + +#include +#include + +// from libretroshare +#include "util/rsdir.h" + +//#include +//#include +//#include + +#include "util/rsstring.h" + +#include + +#include +#include + +#include + +#define MAX_DESIRED_RATE 1000.0 // 1Mb/s + +#define MIN_STREAM_CHUNK_SIZE 10 +#define MAX_STREAM_CHUNK_SIZE 100000 + +#define STREAM_STANDARD_MIN_DT 0.1 +#define STREAM_BACKGROUND_MIN_DT 0.5 + + +bool fill_stream_details(rsctrl::stream::ResponseStreamDetail &resp, + const std::list &streams); +bool fill_stream_desc(rsctrl::stream::StreamDesc &desc, + const RpcStream &stream); + +bool fill_stream_data(rsctrl::stream::StreamData &data, const RpcStream &stream); + +bool createQueuedStreamMsg(const RpcStream &stream, rsctrl::stream::ResponseStreamData &resp, RpcQueuedMsg &qmsg); + + +RpcProtoStream::RpcProtoStream(uint32_t serviceId) + :RpcQueueService(serviceId) +{ + mNextStreamId = 1; + + return; +} + + +void RpcProtoStream::reset(uint32_t chan_id) +{ + // We should be using a mutex for all stream operations!!!! + // TODO + //RsStackMutex stack(mQueueMtx); /********** LOCKED MUTEX ***************/ + + std::list toRemove; + std::map::iterator it; + for(it = mStreams.begin(); it != mStreams.end(); it++) + { + if (it->second.chan_id == chan_id) + { + toRemove.push_back(it->first); + } + } + + std::list::iterator rit; + for(rit = toRemove.begin(); rit != toRemove.end(); rit++) + { + it = mStreams.find(*rit); + if (it != mStreams.end()) + { + mStreams.erase(it); + } + } + + // Call the rest of reset. + RpcQueueService::reset(chan_id); +} + + + + + +//RpcProtoStream::msgsAccepted(std::list &msgIds); /* not used at the moment */ + +int RpcProtoStream::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) +{ + /* check the msgId */ + uint8_t topbyte = getRpcMsgIdExtension(msg_id); + uint16_t service = getRpcMsgIdService(msg_id); + uint8_t submsg = getRpcMsgIdSubMsg(msg_id); + bool isResponse = isRpcMsgIdResponse(msg_id); + + + std::cerr << "RpcProtoStream::processMsg() topbyte: " << (int32_t) topbyte; + std::cerr << " service: " << (int32_t) service << " submsg: " << (int32_t) submsg; + std::cerr << std::endl; + + if (isResponse) + { + std::cerr << "RpcProtoStream::processMsg() isResponse() - not processing"; + std::cerr << std::endl; + return 0; + } + + if (topbyte != (uint8_t) rsctrl::core::CORE) + { + std::cerr << "RpcProtoStream::processMsg() Extension Mismatch - not processing"; + std::cerr << std::endl; + return 0; + } + + if (service != (uint16_t) rsctrl::core::STREAM) + { + std::cerr << "RpcProtoStream::processMsg() Service Mismatch - not processing"; + std::cerr << std::endl; + return 0; + } + + if (!rsctrl::stream::RequestMsgIds_IsValid(submsg)) + { + std::cerr << "RpcProtoStream::processMsg() SubMsg Mismatch - not processing"; + std::cerr << std::endl; + return 0; + } + + switch(submsg) + { + + case rsctrl::stream::MsgId_RequestStartFileStream: + processReqStartFileStream(chan_id, msg_id, req_id, msg); + break; + + case rsctrl::stream::MsgId_RequestControlStream: + processReqControlStream(chan_id, msg_id, req_id, msg); + break; + + case rsctrl::stream::MsgId_RequestListStreams: + processReqListStreams(chan_id, msg_id, req_id, msg); + break; + +// case rsctrl::stream::MsgId_RequestRegisterStreams: +// processReqRegisterStreams(chan_id, msg_id, req_id, msg); +// break; + + default: + std::cerr << "RpcProtoStream::processMsg() ERROR should never get here"; + std::cerr << std::endl; + return 0; + } + + /* must have matched id to get here */ + return 1; +} + + + +int RpcProtoStream::processReqStartFileStream(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg) +{ + std::cerr << "RpcProtoStream::processReqStartFileStream()"; + std::cerr << std::endl; + + // parse msg. + rsctrl::stream::RequestStartFileStream req; + if (!req.ParseFromString(msg)) + { + std::cerr << "RpcProtoStream::processReqStartFileStream() ERROR ParseFromString()"; + std::cerr << std::endl; + return 0; + } + + // response. + rsctrl::stream::ResponseStreamDetail resp; + bool success = true; + std::string errorMsg; + + + // SETUP STREAM. + + // FIND the FILE. + std::list hashes; + hashes.push_back(req.file().hash()); + + //HashExpression exp(StringOperator::EqualsString, hashes); + HashExpression exp(EqualsString, hashes); + std::list results; + + FileSearchFlags flags = RS_FILE_HINTS_LOCAL; + int ans = rsFiles->SearchBoolExp(&exp, results, flags); + + // CREATE A STREAM OBJECT. + if (results.size() < 1) + { + success = false; + errorMsg = "No Matching File"; + } + else + { + DirDetails &dirdetail = results.front(); + + RpcStream stream; + stream.chan_id = chan_id; + stream.req_id = req_id; + stream.stream_id = getNextStreamId(); + stream.state = RpcStream::RUNNING; + + // Convert to Full local path. + std::string virtual_path = RsDirUtil::makePath(dirdetail.path, dirdetail.name); + if (!rsFiles->ConvertSharedFilePath(virtual_path, stream.path)) + { + success = false; + errorMsg = "Cannot Match to Shared Directory"; + } + + stream.length = dirdetail.count; + stream.hash = dirdetail.hash; + stream.name = dirdetail.name; + + stream.offset = 0; + stream.start_byte = 0; + stream.end_byte = stream.length; + stream.desired_rate = req.rate_kbs(); + + if (stream.desired_rate > MAX_DESIRED_RATE) + { + stream.desired_rate = MAX_DESIRED_RATE; + } + + // make response + rsctrl::stream::StreamDesc *desc = resp.add_streams(); + if (!fill_stream_desc(*desc, stream)) + { + success = false; + errorMsg = "Failed to Invalid Action"; + } + else + { + // insert. + mStreams[stream.stream_id] = stream; + + // register the stream too. + std::cerr << "RpcProtoStream::processReqStartFileStream() Registering the stream event."; + std::cerr << std::endl; + registerForEvents(chan_id, req_id, REGISTRATION_STREAMS); + } + + std::cerr << "RpcProtoStream::processReqStartFileStream() List of Registered Streams:"; + std::cerr << std::endl; + printEventRegister(std::cerr); + } + + /* DONE - Generate Reply */ + if (success) + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::SUCCESS); + } + else + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::FAILED); + status->set_msg(errorMsg); + } + + std::string outmsg; + if (!resp.SerializeToString(&outmsg)) + { + std::cerr << "RpcProtoStream::processReqStartFileStream() ERROR SerialiseToString()"; + std::cerr << std::endl; + return 0; + } + + // Correctly Name Message. + uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM, + rsctrl::stream::MsgId_ResponseStreamDetail, true); + + // queue it. + queueResponse(chan_id, out_msg_id, req_id, outmsg); + return 1; +} + + + +int RpcProtoStream::processReqControlStream(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg) +{ + std::cerr << "RpcProtoStream::processReqControlStream()"; + std::cerr << std::endl; + + // parse msg. + rsctrl::stream::RequestControlStream req; + if (!req.ParseFromString(msg)) + { + std::cerr << "RpcProtoStream::processReqControlStream() ERROR ParseFromString()"; + std::cerr << std::endl; + return 0; + } + + // response. + rsctrl::stream::ResponseStreamDetail resp; + bool success = true; + std::string errorMsg; + + // FIND MATCHING STREAM. + std::map::iterator it; + it = mStreams.find(req.stream_id()); + if (it != mStreams.end()) + { + // TWEAK + + if (it->second.state == RpcStream::STREAMERR) + { + if (req.action() == rsctrl::stream::RequestControlStream::STREAM_STOP) + { + it->second.state = RpcStream::FINISHED; + } + else + { + success = false; + errorMsg = "Stream Error"; + } + } + else + { + switch(req.action()) + { + case rsctrl::stream::RequestControlStream::STREAM_START: + if (it->second.state == RpcStream::PAUSED) + { + it->second.state = RpcStream::RUNNING; + } + break; + + case rsctrl::stream::RequestControlStream::STREAM_STOP: + it->second.state = RpcStream::FINISHED; + break; + + case rsctrl::stream::RequestControlStream::STREAM_PAUSE: + if (it->second.state == RpcStream::RUNNING) + { + it->second.state = RpcStream::PAUSED; + it->second.transfer_time = 0; // reset timings. + } + break; + + case rsctrl::stream::RequestControlStream::STREAM_CHANGE_RATE: + it->second.desired_rate = req.rate_kbs(); + break; + + case rsctrl::stream::RequestControlStream::STREAM_SEEK: + if (req.seek_byte() < it->second.end_byte) + { + it->second.offset = req.seek_byte(); + } + break; + default: + success = false; + errorMsg = "Invalid Action"; + } + } + + // FILL IN REPLY. + if (success) + { + rsctrl::stream::StreamDesc *desc = resp.add_streams(); + if (!fill_stream_desc(*desc, it->second)) + { + success = false; + errorMsg = "Invalid Action"; + } + } + + // Cleanup - TODO, this is explicit at the moment. - should be automatic after finish. + if (it->second.state == RpcStream::FINISHED) + { + deregisterForEvents(it->second.chan_id, it->second.req_id, REGISTRATION_STREAMS); + mStreams.erase(it); + } + } + else + { + success = false; + errorMsg = "No Matching Stream"; + } + + /* DONE - Generate Reply */ + if (success) + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::SUCCESS); + } + else + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::FAILED); + status->set_msg(errorMsg); + } + + std::string outmsg; + if (!resp.SerializeToString(&outmsg)) + { + std::cerr << "RpcProtoStream::processReqControlStream() ERROR SerialiseToString()"; + std::cerr << std::endl; + return 0; + } + + // Correctly Name Message. + uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM, + rsctrl::stream::MsgId_ResponseStreamDetail, true); + + // queue it. + queueResponse(chan_id, out_msg_id, req_id, outmsg); + + return 1; +} + + + +int RpcProtoStream::processReqListStreams(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg) +{ + std::cerr << "RpcProtoStream::processReqListStreams()"; + std::cerr << std::endl; + + // parse msg. + rsctrl::stream::RequestListStreams req; + if (!req.ParseFromString(msg)) + { + std::cerr << "RpcProtoStream::processReqListStreams() ERROR ParseFromString()"; + std::cerr << std::endl; + return 0; + } + + // response. + rsctrl::stream::ResponseStreamDetail resp; + bool success = false; + std::string errorMsg; + + + + std::map::iterator it; + for(it = mStreams.begin(); it != mStreams.end(); it++) + { + bool match = true; + + // TODO fill in match! + /* check that it matches */ + if (! match) + { + continue; + } + + rsctrl::stream::StreamDesc *desc = resp.add_streams(); + if (!fill_stream_desc(*desc, it->second)) + { + success = false; + errorMsg = "Some Details Failed to Fill"; + } + } + + /* DONE - Generate Reply */ + if (success) + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::SUCCESS); + } + else + { + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::FAILED); + status->set_msg(errorMsg); + } + + std::string outmsg; + if (!resp.SerializeToString(&outmsg)) + { + std::cerr << "RpcProtoStream::processReqListStreams() ERROR SerialiseToString()"; + std::cerr << std::endl; + return 0; + } + + // Correctly Name Message. + uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM, + rsctrl::stream::MsgId_ResponseStreamDetail, true); + + // queue it. + queueResponse(chan_id, out_msg_id, req_id, outmsg); + return 1; +} + + + // EVENTS. (STREAMS) +int RpcProtoStream::locked_checkForEvents(uint32_t event, const std::list & /* registered */, std::list &stream_msgs) +{ + /* only one event type for now */ + if (event != REGISTRATION_STREAMS) + { + std::cerr << "ERROR Invalid Stream Event Type"; + std::cerr << std::endl; + + /* error */ + return 0; + } + + /* iterate through streams, and get next chunk of data. + * package up and send it. + * NOTE we'll have to something more complex for VoIP! + */ + + double ts = getTimeStamp(); + double dt = ts - mStreamRates.last_ts; + uint32_t data_sent = 0; + +#define FILTER_K (0.75) + + if (dt < 5.0) //mStreamRates.last_ts != 0) + { + mStreamRates.avg_dt = FILTER_K * mStreamRates.avg_dt + + (1.0 - FILTER_K) * dt; + } + else + { + std::cerr << "RpcProtoStream::locked_checkForEvents() Large dT - resetting avg"; + std::cerr << std::endl; + mStreamRates.avg_dt = 0.0; + } + + mStreamRates.last_ts = ts; + + + std::map::iterator it; + for(it = mStreams.begin(); it != mStreams.end(); it++) + { + RpcStream &stream = it->second; + + if (!(stream.state == RpcStream::RUNNING)) + { + continue; + } + + double stream_dt = ts - stream.transfer_time; + + switch(stream.transfer_type) + { + case RpcStream::REALTIME: + // let it go through always. + break; + + case RpcStream::STANDARD: + if (stream_dt < STREAM_STANDARD_MIN_DT) + { + continue; + } + break; + + + case RpcStream::BACKGROUND: + if (stream_dt < STREAM_BACKGROUND_MIN_DT) + { + continue; + } + break; + } + + if (!stream.transfer_time) + { + std::cerr << "RpcProtoStream::locked_checkForEvents() Null stream.transfer_time .. resetting"; + std::cerr << std::endl; + stream.transfer_avg_dt = STREAM_STANDARD_MIN_DT; + } + else + { + std::cerr << "RpcProtoStream::locked_checkForEvents() stream.transfer_avg_dt: " << stream.transfer_avg_dt; + std::cerr << " stream_dt: " << stream_dt; + std::cerr << std::endl; + stream.transfer_avg_dt = FILTER_K * stream.transfer_avg_dt + + (1.0 - FILTER_K) * stream_dt; + + std::cerr << "RpcProtoStream::locked_checkForEvents() ==> stream.transfer_avg_dt: " << stream.transfer_avg_dt; + std::cerr << std::endl; + } + + uint32_t size = stream.desired_rate * 1000.0 * stream.transfer_avg_dt; + stream.transfer_time = ts; + + + if (size < MIN_STREAM_CHUNK_SIZE) + { + size = MIN_STREAM_CHUNK_SIZE; + } + if (size > MAX_STREAM_CHUNK_SIZE) + { + size = MAX_STREAM_CHUNK_SIZE; + } + + + /* get data */ + uint64_t remaining = stream.end_byte - stream.offset; + if (remaining < size) + { + size = remaining; + stream.state = RpcStream::FINISHED; + std::cerr << "RpcProtoStream::locked_checkForEvents() Sending Remaining: " << size; + std::cerr << std::endl; + } + + std::cerr << "RpcProtoStream::locked_checkForEvents() Handling Stream: " << stream.stream_id << " state: " << stream.state; + std::cerr << std::endl; + std::cerr << "path: " << stream.path; + std::cerr << std::endl; + std::cerr << "offset: " << stream.offset; + std::cerr << " avg_dt: " << stream.transfer_avg_dt; + std::cerr << " x desired_rate: " << stream.desired_rate; + std::cerr << " => chunk_size: " << size; + std::cerr << std::endl; + + /* fill in the answer */ + + rsctrl::stream::ResponseStreamData resp; + rsctrl::core::Status *status = resp.mutable_status(); + status->set_code(rsctrl::core::Status::SUCCESS); + + rsctrl::stream::StreamData *data = resp.mutable_data(); + data->set_stream_id(stream.stream_id); + + // convert state. + switch(stream.state) + { + case RpcStream::RUNNING: + data->set_stream_state(rsctrl::stream::STREAM_STATE_RUN); + break; + // This case cannot happen. + case RpcStream::PAUSED: + data->set_stream_state(rsctrl::stream::STREAM_STATE_PAUSED); + break; + // This can only happen at last chunk. + default: + case RpcStream::FINISHED: + data->set_stream_state(rsctrl::stream::STREAM_STATE_FINISHED); + break; + } + + + rsctrl::core::Timestamp *ts = data->mutable_send_time(); + setTimeStamp(ts); + + data->set_offset(stream.offset); + data->set_size(size); + + if (fill_stream_data(*data, stream)) + { + + /* increment seek_location - for next request */ + stream.offset += size; + + RpcQueuedMsg qmsg; + if (createQueuedStreamMsg(stream, resp, qmsg)) + { + std::cerr << "Created Stream Msg."; + std::cerr << std::endl; + + stream_msgs.push_back(qmsg); + } + else + { + std::cerr << "ERROR Creating Stream Msg"; + std::cerr << std::endl; + } + } + else + { + stream.state = RpcStream::STREAMERR; + std::cerr << "ERROR Filling Stream Data"; + std::cerr << std::endl; + } + + } + return 1; +} + + + +// TODO +int RpcProtoStream::cleanup_checkForEvents(uint32_t /* event */, const std::list & /* registered */) +{ + std::list to_remove; + std::list::iterator rit; + for(rit = to_remove.begin(); rit != to_remove.end(); rit++) + { + /* kill the stream! */ + std::map::iterator it; + it = mStreams.find(*rit); + if (it != mStreams.end()) + { + mStreams.erase(it); + } + } + return 1; +} + + +/***** HELPER FUNCTIONS *****/ + +bool createQueuedStreamMsg(const RpcStream &stream, rsctrl::stream::ResponseStreamData &resp, RpcQueuedMsg &qmsg) +{ + std::string outmsg; + if (!resp.SerializeToString(&outmsg)) + { + std::cerr << "RpcProtoStream::createQueuedEventSendMsg() ERROR SerialiseToString()"; + std::cerr << std::endl; + return false; + } + + // Correctly Name Message. + qmsg.mMsgId = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM, + rsctrl::stream::MsgId_ResponseStreamData, true); + + qmsg.mChanId = stream.chan_id; + qmsg.mReqId = stream.req_id; + qmsg.mMsg = outmsg; + + return true; +} + + + + +/****************** NEW HELPER FNS ******************/ + +bool fill_stream_details(rsctrl::stream::ResponseStreamDetail &resp, + const std::list &streams) +{ + std::cerr << "fill_stream_details()"; + std::cerr << std::endl; + + bool val = true; + std::list::const_iterator it; + for (it = streams.begin(); it != streams.end(); it++) + { + rsctrl::stream::StreamDesc *desc = resp.add_streams(); + val &= fill_stream_desc(*desc, *it); + } + + return val; +} + +bool fill_stream_desc(rsctrl::stream::StreamDesc &desc, const RpcStream &stream) +{ + std::cerr << "fill_stream_desc()"; + std::cerr << std::endl; + + return true; +} + +bool fill_stream_data(rsctrl::stream::StreamData &data, const RpcStream &stream) +{ + /* fill the StreamData from stream */ + + /* open file */ + FILE *fd = RsDirUtil::rs_fopen(stream.path.c_str(), "rb"); + if (!fd) + { + std::cerr << "fill_stream_data() Failed to open file: " << stream.path; + std::cerr << std::endl; + return false; + } + + uint32_t data_size = data.size(); + uint64_t base_loc = data.offset(); + void *buffer = malloc(data_size); + + /* seek to correct spot */ + fseeko64(fd, base_loc, SEEK_SET); + + /* copy data into bytes */ + if (1 != fread(buffer, data_size, 1, fd)) + { + std::cerr << "fill_stream_data() Failed to get data. data_size=" << data_size << ", base_loc=" << base_loc << " !"; + std::cerr << std::endl; + return false; + } + + data.set_stream_data(buffer, data_size); + free(buffer); + + fclose(fd); + + return true; +} + + + + +uint32_t RpcProtoStream::getNextStreamId() +{ + return mNextStreamId++; +} + + diff --git a/retroshare-nogui/src/rpc/proto/rpcprotostream.h b/retroshare-nogui/src/rpc/proto/rpcprotostream.h new file mode 100644 index 000000000..698e9a52f --- /dev/null +++ b/retroshare-nogui/src/rpc/proto/rpcprotostream.h @@ -0,0 +1,124 @@ +/* + * RetroShare External Interface. + * + * Copyright 2012-2012 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + + +#ifndef RS_RPC_PROTO_STREAM_H +#define RS_RPC_PROTO_STREAM_H + +#include "rpc/rpcserver.h" + +// Registrations. +#define REGISTRATION_STREAMS 1 + +class RpcStream +{ + public: + RpcStream(): chan_id(0), req_id(0), stream_id(0), state(0), + offset(0), length(0), start_byte(0), end_byte(0), desired_rate(0), + transfer_type(0), transfer_time(0), transfer_avg_dt(0) + { return; } + +static const uint32_t STREAMERR = 0x00000; +static const uint32_t RUNNING = 0x00001; +static const uint32_t PAUSED = 0x00002; +static const uint32_t FINISHED = 0x00003; + + uint32_t chan_id; + uint32_t req_id; + uint32_t stream_id; + uint32_t state; + + std::string name; + std::string hash; + std::string path; + + uint64_t offset; // where we currently are. + uint64_t length; // filesize. + + uint64_t start_byte; + uint64_t end_byte; + + float desired_rate; // Kb/s + + + // Transfer Type +static const uint32_t STANDARD = 0x00000; +static const uint32_t REALTIME = 0x00001; +static const uint32_t BACKGROUND = 0x00002; + + uint32_t transfer_type; + double transfer_time; + double transfer_avg_dt; + + +}; + + +class RpcStreamRates +{ + public: + RpcStreamRates(): avg_data_rate(0), avg_dt(1), last_data_rate(0), last_ts(0) { return; } + + double avg_data_rate; + double avg_dt; + + double last_data_rate; + double last_ts; +}; + + + + +class RpcProtoStream: public RpcQueueService +{ +public: + RpcProtoStream(uint32_t serviceId); + virtual int processMsg(uint32_t chan_id, uint32_t msgId, uint32_t req_id, const std::string &msg); + virtual void reset(uint32_t chan_id); + + uint32_t getNextStreamId(); + +protected: + + + int processReqStartFileStream(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + int processReqControlStream(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + int processReqListStreams(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + int processReqRegisterStreams(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg); + + + uint32_t mNextStreamId; + + RpcStreamRates mStreamRates; + std::map mStreams; + + // EVENTS. + virtual int locked_checkForEvents(uint32_t event, const std::list ®istered, std::list &events); + + // Not actually used yet. + int cleanup_checkForEvents(uint32_t event, const std::list ®istered); + +}; + + +#endif /* RS_PROTO_STREAM_H */ diff --git a/retroshare-nogui/src/rpc/proto/rpcprotosystem.cc b/retroshare-nogui/src/rpc/proto/rpcprotosystem.cc index 265840725..084ca9502 100644 --- a/retroshare-nogui/src/rpc/proto/rpcprotosystem.cc +++ b/retroshare-nogui/src/rpc/proto/rpcprotosystem.cc @@ -109,7 +109,7 @@ int RpcProtoSystem::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_i } -int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoSystem::processSystemStatus()"; std::cerr << std::endl; @@ -224,7 +224,7 @@ int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint3 -int RpcProtoSystem::processSystemQuit(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoSystem::processSystemQuit(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoSystem::processSystemQuit()"; std::cerr << std::endl; @@ -291,7 +291,7 @@ int RpcProtoSystem::processSystemQuit(uint32_t chan_id, uint32_t msg_id, uint32_ } -int RpcProtoSystem::processSystemExternalAccess(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg) +int RpcProtoSystem::processSystemExternalAccess(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg) { std::cerr << "RpcProtoSystem::processSystemExternalAccess()"; std::cerr << std::endl; diff --git a/retroshare-nogui/src/rpc/proto/rpcprotoutils.cc b/retroshare-nogui/src/rpc/proto/rpcprotoutils.cc new file mode 100644 index 000000000..4568560dd --- /dev/null +++ b/retroshare-nogui/src/rpc/proto/rpcprotoutils.cc @@ -0,0 +1,59 @@ +/* + * RetroShare External Interface. + * + * Copyright 2012-2012 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + +#include "rpc/proto/rpcprotoutils.h" +#include + + +double getTimeStamp() +{ + struct timeval tv; + double ts = 0; + if (0 == gettimeofday(&tv, NULL)) + { + ts = tv.tv_sec + (tv.tv_usec / 1000000.0); + } + return ts; +} + +bool setTimeStamp(rsctrl::core::Timestamp *ts) +{ + struct timeval tv; + if (0 != gettimeofday(&tv, NULL)) + { + ts->set_secs(tv.tv_sec); + ts->set_microsecs(tv.tv_usec); + return true; + } + else + { + ts->set_secs(0); + ts->set_microsecs(0); + return false; + } + return false; +} + + + + diff --git a/retroshare-nogui/src/rpc/proto/rpcprotoutils.h b/retroshare-nogui/src/rpc/proto/rpcprotoutils.h new file mode 100644 index 000000000..9c9ecf647 --- /dev/null +++ b/retroshare-nogui/src/rpc/proto/rpcprotoutils.h @@ -0,0 +1,35 @@ +/* + * RetroShare External Interface. + * + * Copyright 2012-2012 by Robert Fernie. + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License Version 2.1 as published by the Free Software Foundation. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 + * USA. + * + * Please report all bugs and problems to "retroshare@lunamutt.com". + * + */ + + +#ifndef RS_RPC_PROTO_UTILS_H +#define RS_RPC_PROTO_UTILS_H + + +#include "rpc/proto/gencc/core.pb.h" + +double getTimeStamp(); +bool setTimeStamp(rsctrl::core::Timestamp *ts); + + +#endif /* RS_RPC_PROTO_UTILS_H */ diff --git a/retroshare-nogui/src/rpc/rpcsetup.cc b/retroshare-nogui/src/rpc/rpcsetup.cc index 015d72142..0db9a786d 100644 --- a/retroshare-nogui/src/rpc/rpcsetup.cc +++ b/retroshare-nogui/src/rpc/rpcsetup.cc @@ -29,6 +29,7 @@ #include "rpc/proto/rpcprotochat.h" #include "rpc/proto/rpcprotosearch.h" #include "rpc/proto/rpcprotofiles.h" +#include "rpc/proto/rpcprotostream.h" #include "rpc/rpcecho.h" @@ -53,6 +54,9 @@ RpcMediator *CreateRpcSystem(RpcComms *comms, NotifyTxt *notify) RpcProtoFiles *files = new RpcProtoFiles(1); server->addService(files); + RpcProtoStream *streamer = new RpcProtoStream(1); + server->addService(streamer); + /* Finally an Echo Service - which will echo back any unprocesses commands. */ RpcEcho *echo = new RpcEcho(1); server->addService(echo); diff --git a/rsctrl/src/Makefile b/rsctrl/src/Makefile index b6305a374..a4c556640 100644 --- a/rsctrl/src/Makefile +++ b/rsctrl/src/Makefile @@ -1,7 +1,7 @@ EXEC = protoc -#PROTO = core.proto peers.proto system.proto chat.proto search.proto files.proto gxs.proto msgs.proto -PROTO = core.proto peers.proto system.proto chat.proto search.proto files.proto +#PROTO = core.proto peers.proto system.proto chat.proto search.proto files.proto stream.proto gxs.proto msgs.proto +PROTO = core.proto peers.proto system.proto chat.proto search.proto files.proto stream.proto PROTOPATH = ./definition #CDESTPATH = ./gencc @@ -15,7 +15,7 @@ HCODE = $(patsubst %.proto,$(CDESTPATH)/%.pb.h, $(PROTO)) PYCODE = $(patsubst %.proto,$(PYDESTPATH)/%_pb2.py, $(PROTO)) -all: allc +all: allc python_proto allc: $(CCODE) echo $(CCODE) diff --git a/rsctrl/src/definition/chat.proto b/rsctrl/src/definition/chat.proto index 4b6b2eb5c..0c0b8465b 100644 --- a/rsctrl/src/definition/chat.proto +++ b/rsctrl/src/definition/chat.proto @@ -19,6 +19,7 @@ enum RequestMsgIds { MsgId_RequestSetLobbyNickname = 4; MsgId_RequestRegisterEvents = 5; MsgId_RequestSendMessage = 6; + MsgId_RequestChatHistory = 7; } enum ResponseMsgIds { @@ -27,6 +28,7 @@ enum ResponseMsgIds { MsgId_ResponseSetLobbyNickname = 4; MsgId_ResponseRegisterEvents = 5; MsgId_ResponseSendMessage = 6; + MsgId_ResponseChatHistory = 7; // EVENTS MsgId_EventLobbyInvite = 101; @@ -223,21 +225,22 @@ message ResponseSendMessage { } /////////////////////////////////////////////////////////////// -//// Chat History. -//// THIS IS NOT IMPLEMENTED YET, DONT USE. -// -//// REQUEST: RequestChatHistory -//message RequestChatHistory { -// required ChatId id = 1; /* lobby or chat, group id */ -// required bool incoming = 2; -//} -// -//// RESPONSE: ResponseChatHistory -//message ResponseChatHistory { -// required rsctrl.core.Status status = 1; -// repeated ChatMessage msg = 2; -//} -// +// Chat History. +// INITIALLY THIS WILL ONLY WORK WITH PRIVATE CHATS. +// NEED TO EXTEND INTERNALS TO HANDLE LOBBIES. + +// REQUEST: RequestChatHistory +message RequestChatHistory { + required ChatId id = 1; /* lobby or chat, group id */ +} + +// RESPONSE: ResponseChatHistory +message ResponseChatHistory { + required rsctrl.core.Status status = 1; + required ChatId id = 2; /* lobby or chat, group id */ + repeated ChatMessage msgs = 3; +} + /////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////// diff --git a/rsctrl/src/definition/core.proto b/rsctrl/src/definition/core.proto index 69443af5d..ffa051124 100644 --- a/rsctrl/src/definition/core.proto +++ b/rsctrl/src/definition/core.proto @@ -17,9 +17,10 @@ enum PackageId { CHAT = 3; SEARCH = 4; FILES = 5; + STREAM = 6; + // BELOW HERE IS STILL BEING DESIGNED. - //MSGS = 5; - //TRANSFER = 6; + //MSGS = 7; // THEORETICAL ONES. GXS = 1000; @@ -74,10 +75,11 @@ message Location { message Person { enum Relationship { - FRIEND = 1; - FRIEND_OF_MANY_FRIENDS = 2; // 3+ at the moment. - FRIEND_OF_FRIENDS = 3; // 1 or 2. - UNKNOWN = 4; + YOURSELF = 1; + FRIEND = 2; + FRIEND_OF_MANY_FRIENDS = 3; // 3+ at the moment. + FRIEND_OF_FRIENDS = 4; // 1 or 2. + UNKNOWN = 5; } required string gpg_id = 1; @@ -110,6 +112,14 @@ message Dir { } +/////////////////////////////////////////////////////////////// + +message Timestamp { + required uint64 secs = 1; + required uint32 microsecs = 2; +} + + /////////////////////////////////////////////////////////////// // System Status diff --git a/rsctrl/src/definition/files.proto b/rsctrl/src/definition/files.proto index 5546c2efc..4a12be782 100644 --- a/rsctrl/src/definition/files.proto +++ b/rsctrl/src/definition/files.proto @@ -13,11 +13,13 @@ import "core.proto"; enum RequestMsgIds { MsgId_RequestTransferList = 1; MsgId_RequestControlDownload = 2; + MsgId_RequestShareDirList = 3; } enum ResponseMsgIds { MsgId_ResponseTransferList = 1; MsgId_ResponseControlDownload = 2; + MsgId_ResponseShareDirList = 3; } /////////////////////////////////////////////////////////////// @@ -30,12 +32,26 @@ enum Direction { DIRECTION_DOWNLOAD = 2; } + +enum TransferState { + TRANSFER_FAILED = 1; + TRANSFER_OKAY = 2; + TRANSFER_PAUSED = 3; + TRANSFER_QUEUED = 4; + TRANSFER_WAITING = 5; + TRANSFER_DOWNLOADING = 6; + TRANSFER_CHECKING_HASH = 7; + TRANSFER_COMPLETE = 8; +} + + message FileTransfer { required rsctrl.core.File file = 1; required Direction direction = 2; required float fraction = 3; required float rate_kBs = 4; + required TransferState state = 5; } /////////////////////////////////////////////////////////////// @@ -86,22 +102,32 @@ message ResponseControlDownload { /////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////// // SHARED FILES -// THIS STUFF IS NOT FINISHED YET! -// -//// REQUEST: RequestListShares -//message RequestListShares { -// -// required uint32 depth = 1; // HOW Many Directories to drill down. -// repeated string ShareLocation = 2; -//} -// -//message ShareLocation { -// required string ssl_id = 1; -// required string path = 2; -//} -// -// -// + +// REQUEST: RequestShareDirList +message RequestShareDirList { + required string ssl_id = 1; + required string path = 2; +} + + +// RESPONSE: ResponseShareDirList +message ResponseShareDirList { + required rsctrl.core.Status status = 1; + + enum ListType { + DIRQUERY_ROOT = 1; // the query refers to root. + DIRQUERY_PERSON = 2; // the query refers to person + DIRQUERY_FILE = 3; // the query refers to a file. + DIRQUERY_DIR = 4; // move to top of queue. + } + + required string ssl_id = 2; + required string path = 3; + required ListType list_type = 4; + repeated rsctrl.core.File files = 5; +} + + //// REQUEST: RequestChangeShares // //// REQUEST: RequestLiCloseSearch diff --git a/rsctrl/src/definition/peers.proto b/rsctrl/src/definition/peers.proto index 8e27ae8f8..5cc38be27 100644 --- a/rsctrl/src/definition/peers.proto +++ b/rsctrl/src/definition/peers.proto @@ -9,13 +9,12 @@ import "core.proto"; enum RequestMsgIds { MsgId_RequestPeers = 1; MsgId_RequestAddPeer = 2; - MsgId_RequestModifyPeer = 3; + MsgId_RequestExaminePeer = 3; + MsgId_RequestModifyPeer = 4; } enum ResponseMsgIds { MsgId_ResponsePeerList = 1; - MsgId_ResponseAddPeer = 2; - MsgId_ResponseModifyPeer = 3; } /////////////////////////////////////////////////////////////// @@ -44,7 +43,7 @@ message RequestPeers { required SetOption set = 1; required InfoOption info = 2; - repeated string gpg_ids = 3; + repeated string pgp_ids = 3; } @@ -60,27 +59,37 @@ message ResponsePeerList { message RequestAddPeer { enum AddCmd { - NOOP = 0; // No op. - ADD = 1; // Add existing from gpg_id. - REMOVE = 2; // Remove existing from gpg_id. - IMPORT = 3; // Import from cert, with gpg_id. + ADD = 1; // Add existing from gpg_id. + REMOVE = 2; // Remove existing from gpg_id. + } + + required AddCmd cmd = 1; + required string pgp_id = 2; + optional string ssl_id = 3; + +} + +/////////////////////////////////////////////////////////////// + +// REQUEST: RequestExaminePeer +message RequestExaminePeer { + + enum ExamineCmd { + IMPORT = 3; // Import from cert, with gpg_id. EXAMINE = 4; // Examine cert, but no action. } - required string gpg_id = 1; - required AddCmd cmd = 2; - optional string cert = 3; -} + // Must have GPG ID to import. Proves you've looked at it. + required string pgp_id = 1; + required ExamineCmd cmd = 2; + required string cert = 3; -// RESPONSE: ResponseAddPeer -message ResponseAddPeer { - required rsctrl.core.Status status = 1; - repeated rsctrl.core.Person peers = 2; } /////////////////////////////////////////////////////////////// // REQUEST: RequestModifyPeer +// THIS IS INCOMPLETE... DON'T USE. message RequestModifyPeer { enum ModCmd { @@ -98,11 +107,5 @@ message RequestModifyPeer { repeated rsctrl.core.Person peers = 2; } -// RESPONSE: ResponseModifyPeer -message ResponseModifyPeer { - required rsctrl.core.Status status = 1; - repeated rsctrl.core.Person peers = 2; -} - /////////////////////////////////////////////////////////////// diff --git a/rsctrl/src/definition/search.proto b/rsctrl/src/definition/search.proto index 6aecdb49f..d058a6195 100644 --- a/rsctrl/src/definition/search.proto +++ b/rsctrl/src/definition/search.proto @@ -33,7 +33,7 @@ message SearchHit { required rsctrl.core.File file = 1; required uint32 loc = 2; // OR of LocFlag so uint field required uint32 no_hits = 3; // NOT USED YET. - + repeated string alt_names = 4; } message SearchSet { @@ -100,8 +100,8 @@ message RequestListSearches { // Empty search_ids => all results. message RequestSearchResults { + optional uint32 result_limit = 1; repeated uint32 search_ids = 2; - } // RESPONSE: ResponseSearchResults diff --git a/rsctrl/src/definition/stream.proto b/rsctrl/src/definition/stream.proto new file mode 100644 index 000000000..9fcbf8636 --- /dev/null +++ b/rsctrl/src/definition/stream.proto @@ -0,0 +1,152 @@ +package rsctrl.stream; + +import "core.proto"; + +/////////////////////////////////////////////////////////////// +// This protocol defines how to stream data from retroshare. +// It can be used for VoIP or Files, or whatever. +// +// There are two parts. +// Control and actual streaming. +// +/////////////////////////////////////////////////////////////// + +enum RequestMsgIds { + MsgId_RequestStartFileStream = 1; + MsgId_RequestControlStream = 2; + MsgId_RequestListStreams = 3; +} + +enum ResponseMsgIds { + MsgId_ResponseStreamDetail = 1; // RESPONSE to List and Control. + MsgId_ResponseStreamData = 101; +} + +/////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////// + +// Building Blocks + +enum StreamType { + STREAM_TYPE_ALL = 1; // all streams + STREAM_TYPE_FILES = 2; // files stream + STREAM_TYPE_VOIP = 3; // VoIP stream + STREAM_TYPE_OTHER = 4; // Who knows what else. +} + +enum StreamState { + STREAM_STATE_ERROR = 0; + STREAM_STATE_RUN = 1; + STREAM_STATE_PAUSED = 2; + STREAM_STATE_FINISHED = 3; +} + +message StreamFileDetail { + + required rsctrl.core.File file = 1; + required uint64 offset = 5; + +} + + +message StreamVoipDetail { + + // THIS NEEDS MORE DEFINITION. + required string peer_id = 1; + required uint64 duration = 2; + required uint64 offset = 3; + +} + + +message StreamDesc { + + required uint32 stream_id = 1; + required StreamType stream_type = 2; + required StreamState stream_state = 3; + required float rate_kbs = 4; + optional StreamFileDetail file = 5; + optional StreamVoipDetail voip = 6; + +} + + +message StreamData { + + required uint32 stream_id = 1; + required StreamState stream_state = 2; + required rsctrl.core.Timestamp send_time = 3; + required uint64 offset = 4; + required uint32 size = 5; + required bytes stream_data = 6; + +} + +/////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////// + + +// REQUEST: RequestStartFileStream +message RequestStartFileStream { + + required rsctrl.core.File file = 1; + required float rate_kbs = 2; + + // byte range. allows to restart transfer! + optional uint64 start_byte = 3; + optional uint64 end_byte = 4; + +} + +// RESPONSE: ResponseStreamDetail +message ResponseStreamDetail { + + required rsctrl.core.Status status = 1; + repeated StreamDesc streams = 2; + +} + + +/////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////// + +// REQUEST: RequestControlStream +message RequestControlStream { + + enum StreamAction { + STREAM_START = 1; // start stream + STREAM_STOP = 2; // stop stream + STREAM_PAUSE = 3; // pause stream + STREAM_CHANGE_RATE = 4; // rate of the stream + STREAM_SEEK = 5; // move streaming position. + } + + required uint32 stream_id = 1; + required StreamAction action = 2; + optional float rate_kbs = 3; + optional uint64 seek_byte = 4; +} + +/////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////// + +// REQUEST: RequestListStreams +message RequestListStreams { + + required StreamType request_type = 1; + +} + +/////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////// + +// RESPONSE: ResponseStreamData +message ResponseStreamData { + required rsctrl.core.Status status = 1; + required StreamData data = 2; +} + + +/////////////////////////////////////////////////////////////// +/////////////////////////////////////////////////////////////// + diff --git a/rsctrl/src/definition/system.proto b/rsctrl/src/definition/system.proto index b4cb3517d..d979b2696 100644 --- a/rsctrl/src/definition/system.proto +++ b/rsctrl/src/definition/system.proto @@ -10,12 +10,14 @@ enum RequestMsgIds { MsgId_RequestSystemStatus = 1; MsgId_RequestSystemQuit = 2; MsgId_RequestSystemExternalAccess = 3; + MsgId_RequestSystemAccount = 4; } enum ResponseMsgIds { MsgId_ResponseSystemStatus = 1; MsgId_ResponseSystemQuit = 2; MsgId_ResponseSystemExternalAccess = 3; + MsgId_ResponseSystemAccount = 4; } /////////////////////////////////////////////////////////////// @@ -93,6 +95,26 @@ message ResponseSystemExternalAccess { required string dht_key = 3; } + +/////////////////////////////////////////////////////////////// + +// REQUEST: RequestSystemAccount +message RequestSystemAccount { + // Nothing here? +} + +// RESPONSE: ResponseSystemAccount +message ResponseSystemAccount { + + // Status of response. + required rsctrl.core.Status status = 1; + + required string pgp_name = 2; + required string location = 3; + required string pgp_id = 4; + required string ssl_id = 5; +} + /////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////