* Added partial implementation of new stream protocol. (UNFINISHED)

* Added ADD/REMOVE Peer command to rs-nogui (Compiles but UNTESTED).
 


git-svn-id: http://svn.code.sf.net/p/retroshare/code/branches/v0.5-rpc-b1@6148 b45a01b8-16f6-495d-af2f-9b41ad6348cc
This commit is contained in:
drbob 2013-02-25 23:33:03 +00:00
parent 021713f548
commit 350ab5fd94
12 changed files with 7673 additions and 1234 deletions

View File

@ -39,6 +39,9 @@ const ::google::protobuf::internal::GeneratedMessageReflection*
const ::google::protobuf::Descriptor* Dir_descriptor_ = NULL;
const ::google::protobuf::internal::GeneratedMessageReflection*
Dir_reflection_ = NULL;
const ::google::protobuf::Descriptor* Timestamp_descriptor_ = NULL;
const ::google::protobuf::internal::GeneratedMessageReflection*
Timestamp_reflection_ = NULL;
const ::google::protobuf::Descriptor* SystemStatus_descriptor_ = NULL;
const ::google::protobuf::internal::GeneratedMessageReflection*
SystemStatus_reflection_ = NULL;
@ -168,7 +171,23 @@ void protobuf_AssignDesc_core_2eproto() {
::google::protobuf::DescriptorPool::generated_pool(),
::google::protobuf::MessageFactory::generated_factory(),
sizeof(Dir));
SystemStatus_descriptor_ = file->message_type(6);
Timestamp_descriptor_ = file->message_type(6);
static const int Timestamp_offsets_[2] = {
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Timestamp, secs_),
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Timestamp, microsecs_),
};
Timestamp_reflection_ =
new ::google::protobuf::internal::GeneratedMessageReflection(
Timestamp_descriptor_,
Timestamp::default_instance_,
Timestamp_offsets_,
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Timestamp, _has_bits_[0]),
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Timestamp, _unknown_fields_),
-1,
::google::protobuf::DescriptorPool::generated_pool(),
::google::protobuf::MessageFactory::generated_factory(),
sizeof(Timestamp));
SystemStatus_descriptor_ = file->message_type(7);
static const int SystemStatus_offsets_[2] = {
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(SystemStatus, net_status_),
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(SystemStatus, msg_),
@ -185,7 +204,7 @@ void protobuf_AssignDesc_core_2eproto() {
::google::protobuf::MessageFactory::generated_factory(),
sizeof(SystemStatus));
SystemStatus_NetCode_descriptor_ = SystemStatus_descriptor_->enum_type(0);
Bandwidth_descriptor_ = file->message_type(7);
Bandwidth_descriptor_ = file->message_type(8);
static const int Bandwidth_offsets_[3] = {
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Bandwidth, up_),
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(Bandwidth, down_),
@ -202,7 +221,7 @@ void protobuf_AssignDesc_core_2eproto() {
::google::protobuf::DescriptorPool::generated_pool(),
::google::protobuf::MessageFactory::generated_factory(),
sizeof(Bandwidth));
BandwidthSet_descriptor_ = file->message_type(8);
BandwidthSet_descriptor_ = file->message_type(9);
static const int BandwidthSet_offsets_[1] = {
GOOGLE_PROTOBUF_GENERATED_MESSAGE_FIELD_OFFSET(BandwidthSet, bandwidths_),
};
@ -243,6 +262,8 @@ void protobuf_RegisterTypes(const ::std::string&) {
File_descriptor_, &File::default_instance());
::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
Dir_descriptor_, &Dir::default_instance());
::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
Timestamp_descriptor_, &Timestamp::default_instance());
::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
SystemStatus_descriptor_, &SystemStatus::default_instance());
::google::protobuf::MessageFactory::InternalRegisterGeneratedMessage(
@ -266,6 +287,8 @@ void protobuf_ShutdownFile_core_2eproto() {
delete File_reflection_;
delete Dir::default_instance_;
delete Dir_reflection_;
delete Timestamp::default_instance_;
delete Timestamp_reflection_;
delete SystemStatus::default_instance_;
delete SystemStatus_reflection_;
delete Bandwidth::default_instance_;
@ -302,18 +325,20 @@ void protobuf_AddDesc_core_2eproto() {
"(\t\022\014\n\004size\030\003 \002(\004\"f\n\003Dir\022\014\n\004name\030\001 \002(\t\022\014\n"
"\004path\030\002 \002(\t\022!\n\007subdirs\030\003 \003(\0132\020.rsctrl.co"
"re.Dir\022 \n\005files\030\004 \003(\0132\021.rsctrl.core.File"
"\"\372\001\n\014SystemStatus\0225\n\nnet_status\030\001 \002(\0162!."
"rsctrl.core.SystemStatus.NetCode\022\013\n\003msg\030"
"\002 \001(\t\"\245\001\n\007NetCode\022\017\n\013BAD_UNKNOWN\020\000\022\017\n\013BA"
"D_OFFLINE\020\001\022\016\n\nBAD_NATSYM\020\002\022\021\n\rBAD_NODHT"
"_NAT\020\003\022\023\n\017WARNING_RESTART\020\004\022\022\n\016WARNING_N"
"ATTED\020\005\022\021\n\rWARNING_NODHT\020\006\022\010\n\004GOOD\020\007\022\017\n\013"
"ADV_FORWARD\020\010\"3\n\tBandwidth\022\n\n\002up\030\001 \002(\002\022\014"
"\n\004down\030\002 \002(\002\022\014\n\004name\030\003 \001(\t\":\n\014BandwidthS"
"et\022*\n\nbandwidths\030\001 \003(\0132\026.rsctrl.core.Ban"
"dwidth*\027\n\013ExtensionId\022\010\n\004CORE\020\000*M\n\tPacka"
"geId\022\t\n\005PEERS\020\001\022\n\n\006SYSTEM\020\002\022\010\n\004CHAT\020\003\022\n\n"
"\006SEARCH\020\004\022\t\n\005FILES\020\005\022\010\n\003GXS\020\350\007", 1310);
"\",\n\tTimestamp\022\014\n\004secs\030\001 \002(\004\022\021\n\tmicrosecs"
"\030\002 \002(\r\"\372\001\n\014SystemStatus\0225\n\nnet_status\030\001 "
"\002(\0162!.rsctrl.core.SystemStatus.NetCode\022\013"
"\n\003msg\030\002 \001(\t\"\245\001\n\007NetCode\022\017\n\013BAD_UNKNOWN\020\000"
"\022\017\n\013BAD_OFFLINE\020\001\022\016\n\nBAD_NATSYM\020\002\022\021\n\rBAD"
"_NODHT_NAT\020\003\022\023\n\017WARNING_RESTART\020\004\022\022\n\016WAR"
"NING_NATTED\020\005\022\021\n\rWARNING_NODHT\020\006\022\010\n\004GOOD"
"\020\007\022\017\n\013ADV_FORWARD\020\010\"3\n\tBandwidth\022\n\n\002up\030\001"
" \002(\002\022\014\n\004down\030\002 \002(\002\022\014\n\004name\030\003 \001(\t\":\n\014Band"
"widthSet\022*\n\nbandwidths\030\001 \003(\0132\026.rsctrl.co"
"re.Bandwidth*\027\n\013ExtensionId\022\010\n\004CORE\020\000*Y\n"
"\tPackageId\022\t\n\005PEERS\020\001\022\n\n\006SYSTEM\020\002\022\010\n\004CHA"
"T\020\003\022\n\n\006SEARCH\020\004\022\t\n\005FILES\020\005\022\n\n\006STREAM\020\006\022\010"
"\n\003GXS\020\350\007", 1368);
::google::protobuf::MessageFactory::InternalRegisterGeneratedFile(
"core.proto", &protobuf_RegisterTypes);
Status::default_instance_ = new Status();
@ -322,6 +347,7 @@ void protobuf_AddDesc_core_2eproto() {
Person::default_instance_ = new Person();
File::default_instance_ = new File();
Dir::default_instance_ = new Dir();
Timestamp::default_instance_ = new Timestamp();
SystemStatus::default_instance_ = new SystemStatus();
Bandwidth::default_instance_ = new Bandwidth();
BandwidthSet::default_instance_ = new BandwidthSet();
@ -331,6 +357,7 @@ void protobuf_AddDesc_core_2eproto() {
Person::default_instance_->InitAsDefaultInstance();
File::default_instance_->InitAsDefaultInstance();
Dir::default_instance_->InitAsDefaultInstance();
Timestamp::default_instance_->InitAsDefaultInstance();
SystemStatus::default_instance_->InitAsDefaultInstance();
Bandwidth::default_instance_->InitAsDefaultInstance();
BandwidthSet::default_instance_->InitAsDefaultInstance();
@ -368,6 +395,7 @@ bool PackageId_IsValid(int value) {
case 3:
case 4:
case 5:
case 6:
case 1000:
return true;
default:
@ -2470,6 +2498,254 @@ void Dir::Swap(Dir* other) {
}
// ===================================================================
#ifndef _MSC_VER
const int Timestamp::kSecsFieldNumber;
const int Timestamp::kMicrosecsFieldNumber;
#endif // !_MSC_VER
Timestamp::Timestamp()
: ::google::protobuf::Message() {
SharedCtor();
}
void Timestamp::InitAsDefaultInstance() {
}
Timestamp::Timestamp(const Timestamp& from)
: ::google::protobuf::Message() {
SharedCtor();
MergeFrom(from);
}
void Timestamp::SharedCtor() {
_cached_size_ = 0;
secs_ = GOOGLE_ULONGLONG(0);
microsecs_ = 0u;
::memset(_has_bits_, 0, sizeof(_has_bits_));
}
Timestamp::~Timestamp() {
SharedDtor();
}
void Timestamp::SharedDtor() {
if (this != default_instance_) {
}
}
void Timestamp::SetCachedSize(int size) const {
GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
_cached_size_ = size;
GOOGLE_SAFE_CONCURRENT_WRITES_END();
}
const ::google::protobuf::Descriptor* Timestamp::descriptor() {
protobuf_AssignDescriptorsOnce();
return Timestamp_descriptor_;
}
const Timestamp& Timestamp::default_instance() {
if (default_instance_ == NULL) protobuf_AddDesc_core_2eproto(); return *default_instance_;
}
Timestamp* Timestamp::default_instance_ = NULL;
Timestamp* Timestamp::New() const {
return new Timestamp;
}
void Timestamp::Clear() {
if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
secs_ = GOOGLE_ULONGLONG(0);
microsecs_ = 0u;
}
::memset(_has_bits_, 0, sizeof(_has_bits_));
mutable_unknown_fields()->Clear();
}
bool Timestamp::MergePartialFromCodedStream(
::google::protobuf::io::CodedInputStream* input) {
#define DO_(EXPRESSION) if (!(EXPRESSION)) return false
::google::protobuf::uint32 tag;
while ((tag = input->ReadTag()) != 0) {
switch (::google::protobuf::internal::WireFormatLite::GetTagFieldNumber(tag)) {
// required uint64 secs = 1;
case 1: {
if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
::google::protobuf::uint64, ::google::protobuf::internal::WireFormatLite::TYPE_UINT64>(
input, &secs_)));
set_has_secs();
} else {
goto handle_uninterpreted;
}
if (input->ExpectTag(16)) goto parse_microsecs;
break;
}
// required uint32 microsecs = 2;
case 2: {
if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
::google::protobuf::internal::WireFormatLite::WIRETYPE_VARINT) {
parse_microsecs:
DO_((::google::protobuf::internal::WireFormatLite::ReadPrimitive<
::google::protobuf::uint32, ::google::protobuf::internal::WireFormatLite::TYPE_UINT32>(
input, &microsecs_)));
set_has_microsecs();
} else {
goto handle_uninterpreted;
}
if (input->ExpectAtEnd()) return true;
break;
}
default: {
handle_uninterpreted:
if (::google::protobuf::internal::WireFormatLite::GetTagWireType(tag) ==
::google::protobuf::internal::WireFormatLite::WIRETYPE_END_GROUP) {
return true;
}
DO_(::google::protobuf::internal::WireFormat::SkipField(
input, tag, mutable_unknown_fields()));
break;
}
}
}
return true;
#undef DO_
}
void Timestamp::SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream* output) const {
// required uint64 secs = 1;
if (has_secs()) {
::google::protobuf::internal::WireFormatLite::WriteUInt64(1, this->secs(), output);
}
// required uint32 microsecs = 2;
if (has_microsecs()) {
::google::protobuf::internal::WireFormatLite::WriteUInt32(2, this->microsecs(), output);
}
if (!unknown_fields().empty()) {
::google::protobuf::internal::WireFormat::SerializeUnknownFields(
unknown_fields(), output);
}
}
::google::protobuf::uint8* Timestamp::SerializeWithCachedSizesToArray(
::google::protobuf::uint8* target) const {
// required uint64 secs = 1;
if (has_secs()) {
target = ::google::protobuf::internal::WireFormatLite::WriteUInt64ToArray(1, this->secs(), target);
}
// required uint32 microsecs = 2;
if (has_microsecs()) {
target = ::google::protobuf::internal::WireFormatLite::WriteUInt32ToArray(2, this->microsecs(), target);
}
if (!unknown_fields().empty()) {
target = ::google::protobuf::internal::WireFormat::SerializeUnknownFieldsToArray(
unknown_fields(), target);
}
return target;
}
int Timestamp::ByteSize() const {
int total_size = 0;
if (_has_bits_[0 / 32] & (0xffu << (0 % 32))) {
// required uint64 secs = 1;
if (has_secs()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::UInt64Size(
this->secs());
}
// required uint32 microsecs = 2;
if (has_microsecs()) {
total_size += 1 +
::google::protobuf::internal::WireFormatLite::UInt32Size(
this->microsecs());
}
}
if (!unknown_fields().empty()) {
total_size +=
::google::protobuf::internal::WireFormat::ComputeUnknownFieldsSize(
unknown_fields());
}
GOOGLE_SAFE_CONCURRENT_WRITES_BEGIN();
_cached_size_ = total_size;
GOOGLE_SAFE_CONCURRENT_WRITES_END();
return total_size;
}
void Timestamp::MergeFrom(const ::google::protobuf::Message& from) {
GOOGLE_CHECK_NE(&from, this);
const Timestamp* source =
::google::protobuf::internal::dynamic_cast_if_available<const Timestamp*>(
&from);
if (source == NULL) {
::google::protobuf::internal::ReflectionOps::Merge(from, this);
} else {
MergeFrom(*source);
}
}
void Timestamp::MergeFrom(const Timestamp& from) {
GOOGLE_CHECK_NE(&from, this);
if (from._has_bits_[0 / 32] & (0xffu << (0 % 32))) {
if (from.has_secs()) {
set_secs(from.secs());
}
if (from.has_microsecs()) {
set_microsecs(from.microsecs());
}
}
mutable_unknown_fields()->MergeFrom(from.unknown_fields());
}
void Timestamp::CopyFrom(const ::google::protobuf::Message& from) {
if (&from == this) return;
Clear();
MergeFrom(from);
}
void Timestamp::CopyFrom(const Timestamp& from) {
if (&from == this) return;
Clear();
MergeFrom(from);
}
bool Timestamp::IsInitialized() const {
if ((_has_bits_[0] & 0x00000003) != 0x00000003) return false;
return true;
}
void Timestamp::Swap(Timestamp* other) {
if (other != this) {
std::swap(secs_, other->secs_);
std::swap(microsecs_, other->microsecs_);
std::swap(_has_bits_[0], other->_has_bits_[0]);
_unknown_fields_.Swap(&other->_unknown_fields_);
std::swap(_cached_size_, other->_cached_size_);
}
}
::google::protobuf::Metadata Timestamp::GetMetadata() const {
protobuf_AssignDescriptorsOnce();
::google::protobuf::Metadata metadata;
metadata.descriptor = Timestamp_descriptor_;
metadata.reflection = Timestamp_reflection_;
return metadata;
}
// ===================================================================
const ::google::protobuf::EnumDescriptor* SystemStatus_NetCode_descriptor() {

View File

@ -39,6 +39,7 @@ class Location;
class Person;
class File;
class Dir;
class Timestamp;
class SystemStatus;
class Bandwidth;
class BandwidthSet;
@ -158,6 +159,7 @@ enum PackageId {
CHAT = 3,
SEARCH = 4,
FILES = 5,
STREAM = 6,
GXS = 1000
};
bool PackageId_IsValid(int value);
@ -940,6 +942,98 @@ class Dir : public ::google::protobuf::Message {
};
// -------------------------------------------------------------------
class Timestamp : public ::google::protobuf::Message {
public:
Timestamp();
virtual ~Timestamp();
Timestamp(const Timestamp& from);
inline Timestamp& operator=(const Timestamp& from) {
CopyFrom(from);
return *this;
}
inline const ::google::protobuf::UnknownFieldSet& unknown_fields() const {
return _unknown_fields_;
}
inline ::google::protobuf::UnknownFieldSet* mutable_unknown_fields() {
return &_unknown_fields_;
}
static const ::google::protobuf::Descriptor* descriptor();
static const Timestamp& default_instance();
void Swap(Timestamp* other);
// implements Message ----------------------------------------------
Timestamp* New() const;
void CopyFrom(const ::google::protobuf::Message& from);
void MergeFrom(const ::google::protobuf::Message& from);
void CopyFrom(const Timestamp& from);
void MergeFrom(const Timestamp& from);
void Clear();
bool IsInitialized() const;
int ByteSize() const;
bool MergePartialFromCodedStream(
::google::protobuf::io::CodedInputStream* input);
void SerializeWithCachedSizes(
::google::protobuf::io::CodedOutputStream* output) const;
::google::protobuf::uint8* SerializeWithCachedSizesToArray(::google::protobuf::uint8* output) const;
int GetCachedSize() const { return _cached_size_; }
private:
void SharedCtor();
void SharedDtor();
void SetCachedSize(int size) const;
public:
::google::protobuf::Metadata GetMetadata() const;
// nested types ----------------------------------------------------
// accessors -------------------------------------------------------
// required uint64 secs = 1;
inline bool has_secs() const;
inline void clear_secs();
static const int kSecsFieldNumber = 1;
inline ::google::protobuf::uint64 secs() const;
inline void set_secs(::google::protobuf::uint64 value);
// required uint32 microsecs = 2;
inline bool has_microsecs() const;
inline void clear_microsecs();
static const int kMicrosecsFieldNumber = 2;
inline ::google::protobuf::uint32 microsecs() const;
inline void set_microsecs(::google::protobuf::uint32 value);
// @@protoc_insertion_point(class_scope:rsctrl.core.Timestamp)
private:
inline void set_has_secs();
inline void clear_has_secs();
inline void set_has_microsecs();
inline void clear_has_microsecs();
::google::protobuf::UnknownFieldSet _unknown_fields_;
::google::protobuf::uint64 secs_;
::google::protobuf::uint32 microsecs_;
mutable int _cached_size_;
::google::protobuf::uint32 _has_bits_[(2 + 31) / 32];
friend void protobuf_AddDesc_core_2eproto();
friend void protobuf_AssignDesc_core_2eproto();
friend void protobuf_ShutdownFile_core_2eproto();
void InitAsDefaultInstance();
static Timestamp* default_instance_;
};
// -------------------------------------------------------------------
class SystemStatus : public ::google::protobuf::Message {
public:
SystemStatus();
@ -2110,6 +2204,54 @@ Dir::mutable_files() {
// -------------------------------------------------------------------
// Timestamp
// required uint64 secs = 1;
inline bool Timestamp::has_secs() const {
return (_has_bits_[0] & 0x00000001u) != 0;
}
inline void Timestamp::set_has_secs() {
_has_bits_[0] |= 0x00000001u;
}
inline void Timestamp::clear_has_secs() {
_has_bits_[0] &= ~0x00000001u;
}
inline void Timestamp::clear_secs() {
secs_ = GOOGLE_ULONGLONG(0);
clear_has_secs();
}
inline ::google::protobuf::uint64 Timestamp::secs() const {
return secs_;
}
inline void Timestamp::set_secs(::google::protobuf::uint64 value) {
set_has_secs();
secs_ = value;
}
// required uint32 microsecs = 2;
inline bool Timestamp::has_microsecs() const {
return (_has_bits_[0] & 0x00000002u) != 0;
}
inline void Timestamp::set_has_microsecs() {
_has_bits_[0] |= 0x00000002u;
}
inline void Timestamp::clear_has_microsecs() {
_has_bits_[0] &= ~0x00000002u;
}
inline void Timestamp::clear_microsecs() {
microsecs_ = 0u;
clear_has_microsecs();
}
inline ::google::protobuf::uint32 Timestamp::microsecs() const {
return microsecs_;
}
inline void Timestamp::set_microsecs(::google::protobuf::uint32 value) {
set_has_microsecs();
microsecs_ = value;
}
// -------------------------------------------------------------------
// SystemStatus
// required .rsctrl.core.SystemStatus.NetCode net_status = 1;

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -105,7 +105,7 @@ int RpcProtoFiles::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id
int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoFiles::processReqTransferList()";
std::cerr << std::endl;
@ -204,7 +204,7 @@ int RpcProtoFiles::processReqTransferList(uint32_t chan_id, uint32_t msg_id, uin
return 1;
}
int RpcProtoFiles::processReqControlDownload(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoFiles::processReqControlDownload(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoFiles::processReqControlDownload()";
std::cerr << std::endl;

View File

@ -30,6 +30,9 @@
#include <iostream>
#include <algorithm>
bool load_person_details(std::string pgp_id, rsctrl::core::Person *person,
bool getLocations, bool onlyConnected);
RpcProtoPeers::RpcProtoPeers(uint32_t serviceId)
:RpcQueueService(serviceId)
{
@ -88,6 +91,9 @@ int RpcProtoPeers::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id
case rsctrl::peers::MsgId_RequestAddPeer:
processAddPeer(chan_id, msg_id, req_id, msg);
break;
case rsctrl::peers::MsgId_RequestExaminePeer:
processExaminePeer(chan_id, msg_id, req_id, msg);
break;
case rsctrl::peers::MsgId_RequestModifyPeer:
processModifyPeer(chan_id, msg_id, req_id, msg);
break;
@ -102,14 +108,66 @@ int RpcProtoPeers::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id
}
int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoPeers::processAddPeer() NOT FINISHED";
std::cerr << "RpcProtoPeers::processAddPeer()";
std::cerr << std::endl;
// parse msg.
rsctrl::peers::RequestAddPeer req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoPeers::processAddPeer() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::peers::ResponseAddPeer resp;
bool success = false;
rsctrl::peers::ResponsePeerList resp;
bool success = true;
std::string errorMsg;
/* check if the gpg_id is valid */
std::string pgp_id = req.pgp_id();
std::string ssl_id;
if (req.has_ssl_id())
{
ssl_id = req.ssl_id();
}
RsPeerDetails details;
if (!rsPeers->getGPGDetails(pgp_id, details))
{
success = false;
errorMsg = "Invalid PGP ID";
}
else
{
switch(req.cmd())
{
default:
success = false;
errorMsg = "Invalid AddCmd";
break;
case rsctrl::peers::RequestAddPeer::ADD:
// TODO. NEED TO HANDLE SERVICE PERMISSION FLAGS.
success = rsPeers->addFriend(ssl_id,pgp_id, RS_SERVICE_PERM_ALL);
break;
case rsctrl::peers::RequestAddPeer::REMOVE:
success = rsPeers->removeFriend(pgp_id);
break;
}
if (success)
{
rsctrl::core::Person *person = resp.add_peers();
load_person_details(pgp_id, person, true, false);
}
}
if (success)
{
@ -133,7 +191,7 @@ int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t re
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::PEERS,
rsctrl::peers::MsgId_ResponseAddPeer, true);
rsctrl::peers::MsgId_ResponsePeerList, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
@ -142,14 +200,92 @@ int RpcProtoPeers::processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t re
}
int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoPeers::processExaminePeer(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoPeers::processExaminePeer() NOT FINISHED";
std::cerr << std::endl;
// parse msg.
rsctrl::peers::RequestExaminePeer req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoPeers::processExaminePeer() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::peers::ResponsePeerList resp;
bool success = false;
if (success)
{
switch(req.cmd())
{
default:
success = false;
break;
case rsctrl::peers::RequestExaminePeer::IMPORT:
break;
case rsctrl::peers::RequestExaminePeer::EXAMINE:
// Gets the GPG details, but does not add the key to the keyring.
//virtual bool loadDetailsFromStringCert(const std::string& certGPG, RsPeerDetails &pd,uint32_t& error_code) = 0;
break;
}
}
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::NO_IMPL_YET);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoPeers::processAddPeer() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::PEERS,
rsctrl::peers::MsgId_ResponsePeerList, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoPeers::processModifyPeer() NOT FINISHED";
std::cerr << std::endl;
// parse msg.
rsctrl::peers::RequestModifyPeer req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoPeers::processModifyPeer() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::peers::ResponseModifyPeer resp;
rsctrl::peers::ResponsePeerList resp;
bool success = false;
if (success)
@ -174,7 +310,7 @@ int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::PEERS,
rsctrl::peers::MsgId_ResponseModifyPeer, true);
rsctrl::peers::MsgId_ResponsePeerList, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
@ -184,7 +320,7 @@ int RpcProtoPeers::processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t
int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoPeers::processRequestPeers()";
std::cerr << std::endl;
@ -198,10 +334,14 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
return 0;
}
// response.
rsctrl::peers::ResponsePeerList respp;
bool success = true;
std::string errorMsg;
// Get the list of gpg_id to generate data for.
std::list<std::string> ids;
bool onlyConnected = false;
bool success = true;
switch(reqp.set())
{
case rsctrl::peers::RequestPeers::OWNID:
@ -216,10 +356,10 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
{
std::cerr << "RpcProtoPeers::processRequestPeers() LISTED";
std::cerr << std::endl;
int no_gpg_ids = reqp.gpg_ids_size();
for (int i = 0; i < no_gpg_ids; i++)
int no_pgp_ids = reqp.pgp_ids_size();
for (int i = 0; i < no_pgp_ids; i++)
{
std::string listed_id = reqp.gpg_ids(i);
std::string listed_id = reqp.pgp_ids(i);
std::cerr << "RpcProtoPeers::processRequestPeers() Adding Id: " << listed_id;
std::cerr << std::endl;
ids.push_back(listed_id);
@ -286,137 +426,21 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
break;
}
// response.
rsctrl::peers::ResponsePeerList respp;
/* now iterate through the peers and fill in the response. */
std::list<std::string>::const_iterator git;
for(git = ids.begin(); git != ids.end(); git++)
{
RsPeerDetails details;
if (!rsPeers->getGPGDetails(*git, details))
rsctrl::core::Person *person = respp.add_peers();
if (!load_person_details(*git, person, getLocations, onlyConnected))
{
std::cerr << "RpcProtoPeers::processRequestPeers() ERROR Finding GPGID: ";
std::cerr << *git;
std::cerr << std::endl;
continue; /* uhm.. */
}
rsctrl::core::Person *person = respp.add_peers();
/* fill in key gpg details */
person->set_gpg_id(*git);
person->set_name(details.name);
std::cerr << "RpcProtoPeers::processRequestPeers() Adding GPGID: ";
std::cerr << *git << " name: " << details.name;
std::cerr << std::endl;
//if (details.state & RS_PEER_STATE_FRIEND)
if (*git == rsPeers->getGPGOwnId())
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation YOURSELF";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::YOURSELF);
}
else if (rsPeers->isGPGAccepted(*git))
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::FRIEND);
}
else
{
std::list<std::string> common_friends;
rsDisc->getDiscGPGFriends(*git, common_friends);
int size = common_friends.size();
if (size)
{
if (size > 2)
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND_OF_MANY_FRIENDS";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::FRIEND_OF_MANY_FRIENDS);
}
else
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND_OF_FRIENDS";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::FRIEND_OF_FRIENDS);
}
}
else
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation UNKNOWN";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::UNKNOWN);
}
}
if (getLocations)
{
std::list<std::string> ssl_ids;
std::list<std::string>::const_iterator sit;
if (!rsPeers->getAssociatedSSLIds(*git, ssl_ids))
{
std::cerr << "RpcProtoPeers::processRequestPeers() No Locations";
std::cerr << std::endl;
continue; /* end of this peer */
}
for(sit = ssl_ids.begin(); sit != ssl_ids.end(); sit++)
{
RsPeerDetails ssldetails;
if (!rsPeers->getPeerDetails(*sit, ssldetails))
{
continue; /* uhm.. */
}
if ((onlyConnected) &&
(!(ssldetails.state & RS_PEER_STATE_CONNECTED)))
{
continue;
}
rsctrl::core::Location *loc = person->add_locations();
std::cerr << "RpcProtoPeers::processRequestPeers() \t Adding Location: ";
std::cerr << *sit << " loc: " << ssldetails.location;
std::cerr << std::endl;
/* fill in ssl details */
loc->set_ssl_id(*sit);
loc->set_location(ssldetails.location);
/* set addresses */
rsctrl::core::IpAddr *laddr = loc->mutable_localaddr();
laddr->set_addr(ssldetails.localAddr);
laddr->set_port(ssldetails.localPort);
rsctrl::core::IpAddr *eaddr = loc->mutable_extaddr();
eaddr->set_addr(ssldetails.extAddr);
eaddr->set_port(ssldetails.extPort);
/* translate status */
uint32_t loc_state = 0;
//dont think this state should be here.
//if (ssldetails.state & RS_PEER_STATE_FRIEND)
if (ssldetails.state & RS_PEER_STATE_ONLINE)
{
loc_state |= (uint32_t) rsctrl::core::Location::ONLINE;
}
if (ssldetails.state & RS_PEER_STATE_CONNECTED)
{
loc_state |= (uint32_t) rsctrl::core::Location::CONNECTED;
}
if (ssldetails.state & RS_PEER_STATE_UNREACHABLE)
{
loc_state |= (uint32_t) rsctrl::core::Location::UNREACHABLE;
}
loc->set_state(loc_state);
}
/* cleanup peers */
success = false;
errorMsg = "Error Loading PeerID";
}
}
@ -429,7 +453,7 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
{
rsctrl::core::Status *status = respp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg("Unknown ERROR");
status->set_msg(errorMsg);
}
@ -452,3 +476,135 @@ int RpcProtoPeers::processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32
}
bool load_person_details(std::string pgp_id, rsctrl::core::Person *person,
bool getLocations, bool onlyConnected)
{
RsPeerDetails details;
if (!rsPeers->getGPGDetails(pgp_id, details))
{
std::cerr << "RpcProtoPeers::processRequestPeers() ERROR Finding GPGID: ";
std::cerr << pgp_id;
std::cerr << std::endl;
return false;
}
/* fill in key gpg details */
person->set_gpg_id(pgp_id);
person->set_name(details.name);
std::cerr << "RpcProtoPeers::processRequestPeers() Adding GPGID: ";
std::cerr << pgp_id << " name: " << details.name;
std::cerr << std::endl;
//if (details.state & RS_PEER_STATE_FRIEND)
if (pgp_id == rsPeers->getGPGOwnId())
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation YOURSELF";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::YOURSELF);
}
else if (rsPeers->isGPGAccepted(pgp_id))
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::FRIEND);
}
else
{
std::list<std::string> common_friends;
rsDisc->getDiscGPGFriends(pgp_id, common_friends);
int size = common_friends.size();
if (size)
{
if (size > 2)
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND_OF_MANY_FRIENDS";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::FRIEND_OF_MANY_FRIENDS);
}
else
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation FRIEND_OF_FRIENDS";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::FRIEND_OF_FRIENDS);
}
}
else
{
std::cerr << "RpcProtoPeers::processRequestPeers() Relation UNKNOWN";
std::cerr << std::endl;
person->set_relation(rsctrl::core::Person::UNKNOWN);
}
}
if (getLocations)
{
std::list<std::string> ssl_ids;
std::list<std::string>::const_iterator sit;
if (!rsPeers->getAssociatedSSLIds(pgp_id, ssl_ids))
{
std::cerr << "RpcProtoPeers::processRequestPeers() No Locations";
std::cerr << std::endl;
return true; /* end of this peer */
}
for(sit = ssl_ids.begin(); sit != ssl_ids.end(); sit++)
{
RsPeerDetails ssldetails;
if (!rsPeers->getPeerDetails(*sit, ssldetails))
{
continue; /* uhm.. */
}
if ((onlyConnected) &&
(!(ssldetails.state & RS_PEER_STATE_CONNECTED)))
{
continue;
}
rsctrl::core::Location *loc = person->add_locations();
std::cerr << "RpcProtoPeers::processRequestPeers() \t Adding Location: ";
std::cerr << *sit << " loc: " << ssldetails.location;
std::cerr << std::endl;
/* fill in ssl details */
loc->set_ssl_id(*sit);
loc->set_location(ssldetails.location);
/* set addresses */
rsctrl::core::IpAddr *laddr = loc->mutable_localaddr();
laddr->set_addr(ssldetails.localAddr);
laddr->set_port(ssldetails.localPort);
rsctrl::core::IpAddr *eaddr = loc->mutable_extaddr();
eaddr->set_addr(ssldetails.extAddr);
eaddr->set_port(ssldetails.extPort);
/* translate status */
uint32_t loc_state = 0;
//dont think this state should be here.
//if (ssldetails.state & RS_PEER_STATE_FRIEND)
if (ssldetails.state & RS_PEER_STATE_ONLINE)
{
loc_state |= (uint32_t) rsctrl::core::Location::ONLINE;
}
if (ssldetails.state & RS_PEER_STATE_CONNECTED)
{
loc_state |= (uint32_t) rsctrl::core::Location::CONNECTED;
}
if (ssldetails.state & RS_PEER_STATE_UNREACHABLE)
{
loc_state |= (uint32_t) rsctrl::core::Location::UNREACHABLE;
}
loc->set_state(loc_state);
}
}
return true; /* end of this peer */
}

View File

@ -36,6 +36,9 @@ public:
virtual int processRequestPeers(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
virtual int processAddPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
// these aren't implemented yet.
virtual int processExaminePeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
virtual int processModifyPeer(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
};

View File

@ -0,0 +1,693 @@
/*
* RetroShare External Interface.
*
* Copyright 2012-2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#include "rpc/proto/rpcprotochat.h"
#include "rpc/proto/gencc/chat.pb.h"
#include <retroshare/rsmsgs.h>
#include <retroshare/rspeers.h>
#include <retroshare/rshistory.h>
#include "util/rsstring.h"
#include <stdio.h>
#include <iostream>
#include <algorithm>
#include <set>
bool fill_stream_details(rsctrl::stream::ResponseStreamDetail &resp,
const std::list<RpcStream> &streams);
bool fill_stream_detail(rsctrl::stream::StreamDetail &detail,
const RpcStream &stream);
// Helper Functions -> maybe move to libretroshare/utils ??
bool convertUTF8toWString(const std::string &msg_utf8, std::wstring &msg_wstr);
bool convertWStringToUTF8(const std::wstring &msg_wstr, std::string &msg_utf8);
bool convertStringToLobbyId(const std::string &chat_id, ChatLobbyId &lobby_id);
bool convertLobbyIdToString(const ChatLobbyId &lobby_id, std::string &chat_id);
bool fillLobbyInfoFromChatLobbyInfo(const ChatLobbyInfo &cfi, rsctrl::stream::ChatLobbyInfo *lobby);
bool fillLobbyInfoFromVisibleChatLobbyRecord(const VisibleChatLobbyRecord &pclr, rsctrl::stream::ChatLobbyInfo *lobby);
bool fillLobbyInfoFromChatLobbyInvite(const ChatLobbyInvite &cli, rsctrl::stream::ChatLobbyInfo *lobby);
bool fillChatMessageFromHistoryMsg(const HistoryMsg &histmsg, rsctrl::stream::ChatMessage *rpcmsg);
bool createQueuedEventSendMsg(const ChatInfo &chatinfo, rsctrl::stream::ChatType ctype,
std::string chat_id, const RpcEventRegister &ereg, RpcQueuedMsg &qmsg);
RpcProtoStream::RpcProtoStream(uint32_t serviceId)
:RpcQueueService(serviceId)
{
return;
}
//RpcProtoStream::msgsAccepted(std::list<uint32_t> &msgIds); /* not used at the moment */
int RpcProtoStream::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
{
/* check the msgId */
uint8_t topbyte = getRpcMsgIdExtension(msg_id);
uint16_t service = getRpcMsgIdService(msg_id);
uint8_t submsg = getRpcMsgIdSubMsg(msg_id);
bool isResponse = isRpcMsgIdResponse(msg_id);
std::cerr << "RpcProtoStream::processMsg() topbyte: " << (int32_t) topbyte;
std::cerr << " service: " << (int32_t) service << " submsg: " << (int32_t) submsg;
std::cerr << std::endl;
if (isResponse)
{
std::cerr << "RpcProtoStream::processMsg() isResponse() - not processing";
std::cerr << std::endl;
return 0;
}
if (topbyte != (uint8_t) rsctrl::core::CORE)
{
std::cerr << "RpcProtoStream::processMsg() Extension Mismatch - not processing";
std::cerr << std::endl;
return 0;
}
if (service != (uint16_t) rsctrl::core::STREAM)
{
std::cerr << "RpcProtoStream::processMsg() Service Mismatch - not processing";
std::cerr << std::endl;
return 0;
}
if (!rsctrl::stream::RequestMsgIds_IsValid(submsg))
{
std::cerr << "RpcProtoStream::processMsg() SubMsg Mismatch - not processing";
std::cerr << std::endl;
return 0;
}
switch(submsg)
{
case rsctrl::stream::MsgId_RequestStartFileStream:
processReqStartFileStream(chan_id, msg_id, req_id, msg);
break;
case rsctrl::stream::MsgId_RequestCreateLobby:
processReqControlStream(chan_id, msg_id, req_id, msg);
break;
case rsctrl::stream::MsgId_RequestJoinOrLeaveLobby:
processReqListStreams(chan_id, msg_id, req_id, msg);
break;
case rsctrl::stream::MsgId_RequestRegisterStreams:
processReqRegisterStreams(chan_id, msg_id, req_id, msg);
break;
default:
std::cerr << "RpcProtoStream::processMsg() ERROR should never get here";
std::cerr << std::endl;
return 0;
}
/* must have matched id to get here */
return 1;
}
int RpcProtoStream::processReqStartFileStream(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoStream::processReqStartFileStream()";
std::cerr << std::endl;
// parse msg.
rsctrl::stream::RequestStartFileStream req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoStream::processReqStartFileStream() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::stream::ResponseStreamDetail resp;
bool success = true;
std::string errorMsg;
// SETUP STREAM.
// FIND the FILE.
Expression * exp;
std::list<DirDetails> results;
FileSearchFlags flags;
int ans = rsFiles->SearchBoolExp(exp, results, flags);
// CREATE A STREAM OBJECT.
if (results.size() < 1)
{
success = false;
errorMsg = "No Matching File";
}
else
{
RpcStream stream;
stream.chan_id = chan_id;
stream.stream_id = getNextStreamId();
stream.state = RUNNING;
stream.path = ... // from results.
stream.offset = 0;
stream.length = file.size;
stream.start_byte = 0;
stream.end_byte = stream.length;
stream.desired_rate = req.rate_kBs;
// make response
rsctrl::stream::StreamDetail &detail = resp.streams_add();
if (!fill_stream_detail(detail, stream))
{
success = false;
errorMsg = "Failed to Invalid Action";
}
else
{
// insert.
mStreams[stream.stream_id] = stream;
}
}
/* DONE - Generate Reply */
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg(errorMsg);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoStream::processReqStartFileStream() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM,
rsctrl::stream::MsgId_ResponseStreamDetail, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
int RpcProtoStream::processReqControlStream(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoStream::processReqControlStream()";
std::cerr << std::endl;
// parse msg.
rsctrl::stream::RequestControlStream req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoStream::processReqControlStream() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::stream::ResponseStreamDetail resp;
bool success = true;
std::string errorMsg;
// FIND MATCHING STREAM.
std::map<uint32_t, RpcStream>::iterator it;
it = mStreams.find(resp.stream_id);
if (it != mStreams.end())
{
// TWEAK
// FILL IN REPLY.
// DONE.
switch(resp.action)
{
case STREAM_START:
if (it->state == PAUSED)
{
it->state = RUNNING;
}
break;
case STREAM_STOP:
it->state = FINISHED;
break;
case STREAM_PAUSE:
if (it->state == RUNNING)
{
it->state = PAUSED;
}
break;
case STREAM_CHANGE_RATE:
it->desired_rate = req.rate_kBs;
break;
case STREAM_SEEK:
if (req.seek_byte < it-> endByte)
{
it->offset = req.seek_byte;
}
break;
default:
success = false;
errorMsg = "Invalid Action";
}
if (success)
{
rsctrl::stream::StreamDetail &detail = resp.streams_add();
if (!fill_stream_detail(detail, it->second))
{
success = false;
errorMsg = "Invalid Action";
}
}
}
else
{
success = false;
errorMsg = "No Matching Stream";
}
/* DONE - Generate Reply */
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg(errorMsg);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoStream::processReqControlStream() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM,
rsctrl::stream::MsgId_ResponseStreamDetail, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
int RpcProtoStream::processReqListStreams(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoStream::processReqListStreams()";
std::cerr << std::endl;
// parse msg.
rsctrl::stream::RequestListStreams req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoStream::processReqListStreams() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::stream::ResponseStreamDetail resp;
bool success = false;
std::string errorMsg;
for(it = mStreams.begin(); it != mStreams.end(); it++)
{
/* check that it matches */
if (! match)
{
continue;
}
rsctrl::stream::StreamDetail &detail = resp.streams_add();
if (!fill_stream_detail(detail, it->second))
{
success = false;
errorMsg = "Some Details Failed to Fill";
}
}
/* DONE - Generate Reply */
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg(errorMsg);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoStream::processReqListStreams() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM,
rsctrl::stream::MsgId_ResponseStreamDetail, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
int RpcProtoStream::processReqRegisterStreams(uint32_t chan_id, uint32_t /*msg_id*/, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoStream::processReqRegisterStreams()";
std::cerr << std::endl;
// parse msg.
rsctrl::stream::RequestRegisterStreams req;
if (!req.ParseFromString(msg))
{
std::cerr << "RpcProtoStream::processReqRegisterStreams() ERROR ParseFromString()";
std::cerr << std::endl;
return 0;
}
// response.
rsctrl::stream::ResponseRegisterStreams resp;
bool success = true;
bool doregister = false;
std::string errorMsg;
switch(req.action())
{
case rsctrl::stream::RequestRegisterStreams::REGISTER:
doregister = true;
break;
case rsctrl::stream::RequestRegisterStreams::DEREGISTER:
doregister = false;
break;
default:
std::cerr << "ERROR action is invalid";
std::cerr << std::endl;
success = false;
errorMsg = "RegisterStreams.Action is invalid";
break;
}
if (success)
{
if (doregister)
{
std::cerr << "Registering for Streams";
std::cerr << std::endl;
registerForEvents(chan_id, req_id, REGISTRATION_STREAMS);
}
else
{
std::cerr << "Deregistering for Streams";
std::cerr << std::endl;
deregisterForEvents(chan_id, req_id, REGISTRATION_STREAMS);
}
printEventRegister(std::cerr);
}
/* DONE - Generate Reply */
if (success)
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::SUCCESS);
}
else
{
rsctrl::core::Status *status = resp.mutable_status();
status->set_code(rsctrl::core::Status::FAILED);
status->set_msg(errorMsg);
}
std::string outmsg;
if (!resp.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoStream::processReqCreateLobbies() ERROR SerialiseToString()";
std::cerr << std::endl;
return 0;
}
// Correctly Name Message.
uint32_t out_msg_id = constructMsgId(rsctrl::core::CORE, rsctrl::core::STREAM,
rsctrl::stream::MsgId_ResponseRegisterStreams, true);
// queue it.
queueResponse(chan_id, out_msg_id, req_id, outmsg);
return 1;
}
// EVENTS. (STREAMS)
int RpcProtoStream::locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> &registered, std::list<RpcQueuedMsg> &stream_msgs)
{
/* Wow - here already! */
std::cerr << "locked_checkForEvents()";
std::cerr << std::endl;
/* only one event type for now */
if (event != REGISTRATION_STREAMS)
{
std::cerr << "ERROR Invalid Stream Event Type";
std::cerr << std::endl;
/* error */
return 0;
}
/* iterate through streams, and get next chunk of data.
* package up and send it.
* NOTE we'll have to something more complex for VoIP!
*/
float ts = getTimeStamp();
float dt = ts - mStreamRates.last_ts;
uint32_t data_sent = 0;
#define FILTER_K (0.75)
if (mStreamRates.last_ts != 0)
{
mStreamRates.avg_dt = FILTER_K * mStreamRates.avg_dt
+ (1.0 - FILTER_K) * dt;
}
else
{
mStreamRates.avg_dt = dt;
}
mStreamRates.last_ts = ts;
std::list<uint32_t> to_remove;
std::map<uint32_t, RpcStream>::iterator it;
for(it = mStreams.begin(); it != mStreams.end(); it++)
{
RpcStream &stream = it->second;
if (stream.state == PAUSED)
{
continue;
}
/* we ignore the Events Register... just use stream info, */
int channel_id = stream.chan_id;
uint32_t size = stream.desired_rate * mStreamRates.avg_dt;
/* get data */
uint64_t remaining = stream.end_byte - stream.offset;
if (remaining < size)
{
size = remaining;
stream.state = FINISHED;
to_remove.push_back(it->first);
}
/* fill in the answer */
StreamData data;
data.stream_id = stream.stream_id;
data.stream_state = stream.state;
data.send_time = getTimeStamp();
data.offset = stream.offset;
data.size = size;
fill_stream_data(stream, data);
RpcQueuedMsg qmsg;
rsctrl::stream::ChatType ctype = rsctrl::stream::TYPE_GROUP;
std::string chat_id = ""; // No ID for group.
if (createQueuedEventSendMsg(*it, ctype, chat_id, *rit, qmsg))
{
std::cerr << "Created MsgEvent";
std::cerr << std::endl;
events.push_back(qmsg);
}
else
{
std::cerr << "ERROR Creating MsgEvent";
std::cerr << std::endl;
}
}
std::list<uint32_t>::iterator rit;
for(rit = to_remove.begin(); rit != to_remove.end(); rit++)
{
/* kill the stream! */
it = mStreams.find(*rit);
if (it != mStreams.end())
{
mStreams.erase(it);
}
}
return 1;
}
/***** HELPER FUNCTIONS *****/
bool createQueuedEventSendMsg(const ChatInfo &chatinfo, rsctrl::stream::ChatType ctype,
std::string chat_id, const RpcEventRegister &ereg, RpcQueuedMsg &qmsg)
{
rsctrl::stream::EventChatMessage event;
rsctrl::stream::ChatMessage *msg = event.mutable_msg();
rsctrl::stream::ChatId *id = msg->mutable_id();
id->set_chat_type(ctype);
id->set_chat_id(chat_id);
msg->set_peer_nickname(chatinfo.peer_nickname);
msg->set_chat_flags(chatinfo.chatflags);
msg->set_send_time(chatinfo.sendTime);
msg->set_recv_time(chatinfo.recvTime);
std::string msg_utf8;
if (!convertWStringToUTF8(chatinfo.msg, msg_utf8))
{
std::cerr << "RpcProtoStream::createQueuedEventSendMsg() ERROR Converting Msg";
std::cerr << std::endl;
return false;
}
msg->set_msg(msg_utf8);
/* DONE - Generate Reply */
std::string outmsg;
if (!event.SerializeToString(&outmsg))
{
std::cerr << "RpcProtoStream::createQueuedEventSendMsg() ERROR SerialiseToString()";
std::cerr << std::endl;
return false;
}
// Correctly Name Message.
qmsg.mMsgId = constructMsgId(rsctrl::core::CORE, rsctrl::core::CHAT,
rsctrl::stream::MsgId_EventChatMessage, true);
qmsg.mChanId = ereg.mChanId;
qmsg.mReqId = ereg.mReqId;
qmsg.mMsg = outmsg;
return true;
}
/****************** NEW HELPER FNS ******************/
bool fill_stream_details(rsctrl::stream::ResponseStreamDetail &resp,
const std::list<RpcStream> &streams)
{
std::list<RpcStream>::const_iterator it;
for (it = streams.begin(); it != streams.end(); it++)
{
rsctrl::stream::StreamDetail &detail = resp.streams_add();
fill_stream_detail(detail, *it);
}
return val;
}
bool fill_stream_detail(rsctrl::stream::StreamDetail &detail,
const RpcStream &stream)
{
}

View File

@ -0,0 +1,88 @@
/*
* RetroShare External Interface.
*
* Copyright 2012-2012 by Robert Fernie.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License Version 2.1 as published by the Free Software Foundation.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
* USA.
*
* Please report all bugs and problems to "retroshare@lunamutt.com".
*
*/
#ifndef RS_RPC_PROTO_STREAM_H
#define RS_RPC_PROTO_STREAM_H
#include "rpc/rpcserver.h"
// Registrations.
#define REGISTRATION_STREAMS 1
class RpcStream
{
public:
uint32_t chan_id;
uint32_t stream_id;
uint32_t state;
std::string path;
uint64_t offset; // where we currently are.
uint64_t length; // filesize.
uint64_t start_byte;
uint64_t end_byte;
float desired_rate; // Kb/s
};
class RpcStreamRates
{
public:
float avg_data_rate;
float avg_dt;
float last_data_rate;
float last_ts;
};
class RpcProtoStream: public RpcQueueService
{
public:
RpcProtoStream(uint32_t serviceId);
virtual int processMsg(uint32_t chan_id, uint32_t msgId, uint32_t req_id, const std::string &msg);
protected:
int processReqStartFileStream(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqControlStream(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqListStreams(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
int processReqRegisterStreams(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg);
RpcStreamRates mStreamRates;
std::map<uint32_t, RpcStream> mStreams;
// EVENTS.
virtual int locked_checkForEvents(uint32_t event, const std::list<RpcEventRegister> &registered, std::list<RpcQueuedMsg> &events);
};
#endif /* RS_PROTO_STREAM_H */

View File

@ -109,7 +109,7 @@ int RpcProtoSystem::processMsg(uint32_t chan_id, uint32_t msg_id, uint32_t req_i
}
int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoSystem::processSystemStatus()";
std::cerr << std::endl;
@ -224,7 +224,7 @@ int RpcProtoSystem::processSystemStatus(uint32_t chan_id, uint32_t msg_id, uint3
int RpcProtoSystem::processSystemQuit(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoSystem::processSystemQuit(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoSystem::processSystemQuit()";
std::cerr << std::endl;
@ -291,7 +291,7 @@ int RpcProtoSystem::processSystemQuit(uint32_t chan_id, uint32_t msg_id, uint32_
}
int RpcProtoSystem::processSystemExternalAccess(uint32_t chan_id, uint32_t msg_id, uint32_t req_id, const std::string &msg)
int RpcProtoSystem::processSystemExternalAccess(uint32_t chan_id, uint32_t /* msg_id */, uint32_t req_id, const std::string &msg)
{
std::cerr << "RpcProtoSystem::processSystemExternalAccess()";
std::cerr << std::endl;