Safer RsGxsChannel API

Deprecated old method which exposed interna async mechanism to the API
  users, making their and out life difficult
Things that really need to be async like turtle search/requests now accept
  callbacks, so the caller can be notified everytime some result is got
  back
Implement RsThread::async commodity wrapper to execute blocking API
  calls without blocking the caller, this could be optimized
  trasparently using a thread pool if necessary
Added hints into some retroshare-gui files on how to use RsThread::async
  thoghether with QMetaObject::invokeMethod and blocking RetroShare API
  to simplyfy interaction between GUI and libretroshare
This commit is contained in:
Gioacchino Mazzurco 2018-11-01 07:04:01 +01:00
parent 8fd22c8fd1
commit ea86fe2615
No known key found for this signature in database
GPG Key ID: A1FBCA3872E87051
8 changed files with 507 additions and 236 deletions

View File

@ -302,13 +302,10 @@ void RsGxsDataAccess::setReq(GxsRequest* req, uint32_t token, uint32_t ansType,
}
void RsGxsDataAccess::storeRequest(GxsRequest* req)
{
RsStackMutex stack(mDataMutex); /****** LOCKED *****/
RS_STACK_MUTEX(mDataMutex);
req->status = PENDING;
req->reqTime = time(NULL);
mRequests[req->token] = req;
return;
}
RsTokenService::GxsRequestStatus RsGxsDataAccess::requestStatus(uint32_t token)

View File

@ -5291,6 +5291,7 @@ void RsGxsNetService::receiveTurtleSearchResults(TurtleRequestId req,const unsig
GXSNETDEBUG___ << " passing the grp data to observer." << std::endl;
#endif
mObserver->receiveNewGroups(new_grps);
mObserver->receiveDistantSearchResults(req, grpId);
}
bool RsGxsNetService::search( const std::string& substring,

View File

@ -32,6 +32,7 @@
#include "retroshare/rsgxscommon.h"
#include "serialiser/rsserializable.h"
#include "retroshare/rsturtle.h"
#include "util/rsdeprecate.h"
class RsGxsChannels;
@ -100,10 +101,70 @@ std::ostream &operator<<(std::ostream& out, const RsGxsChannelPost& post);
class RsGxsChannels: public RsGxsIfaceHelper, public RsGxsCommentService
{
public:
explicit RsGxsChannels(RsGxsIface& gxs) : RsGxsIfaceHelper(gxs) {}
virtual ~RsGxsChannels() {}
/**
* @brief Create channel. Blocking API.
* @jsonapi{development}
* @param[inout] channel Channel data (name, description...)
* @return false on error, true otherwise
*/
virtual bool createChannel(RsGxsChannelGroup& channel) = 0;
/**
* @brief Create channel post. Blocking API.
* @jsonapi{development}
* @param[inout] post
* @return false on error, true otherwise
*/
virtual bool createPost(RsGxsChannelPost& post) = 0;
/**
* @brief Edit channel details.
* @jsonapi{development}
* @param[in] channel Channel data (name, description...) with modifications
* @return false on error, true otherwise
*/
virtual bool editChannel(RsGxsChannelGroup& channel) = 0;
/**
* @brief Share extra file
* Can be used to share extra file attached to a channel post
* @jsonapi{development}
* @param[in] path file path
* @return false on error, true otherwise
*/
virtual bool ExtraFileHash(const std::string& path) = 0;
/**
* @brief Remove extra file from shared files
* @jsonapi{development}
* @param[in] hash hash of the file to remove
* @return false on error, true otherwise
*/
virtual bool ExtraFileRemove(const RsFileHash& hash) = 0;
/**
* @brief Get auto-download option value for given channel
* @jsonapi{development}
* @param[in] channelId channel id
* @param[out] enabled storage for the auto-download option value
* @return false if something failed, true otherwhise
*/
virtual bool getChannelAutoDownload(
const RsGxsGroupId& channelId, bool& enabled ) = 0;
/**
* @brief Get download directory for the given channel
* @jsonapi{development}
* @param[in] channelId id of the channel
* @param[out] directory reference to string where to store the path
* @return false on error, true otherwise
*/
virtual bool getChannelDownloadDirectory( const RsGxsGroupId& channelId,
std::string& directory ) = 0;
/**
* @brief Get channels summaries list. Blocking API.
* @jsonapi{development}
@ -138,20 +199,44 @@ public:
std::vector<RsGxsComment>& comments ) = 0;
/**
* @brief Create channel. Blocking API.
* @brief Toggle post read status. Blocking API.
* @jsonapi{development}
* @param[inout] channel Channel data (name, description...)
* @param[in] postId post identifier
* @param[in] read true to mark as read, false to mark as unread
* @return false on error, true otherwise
*/
virtual bool createChannel(RsGxsChannelGroup& channel) = 0;
virtual bool markRead(const RsGxsGrpMsgIdPair& postId, bool read) = 0;
/**
* @brief Create channel post. Blocking API.
* @brief Enable or disable auto-download for given channel. Blocking API
* @jsonapi{development}
* @param[inout] post
* @param[in] channelId channel id
* @param[in] enable true to enable, false to disable
* @return false if something failed, true otherwhise
*/
virtual bool setChannelAutoDownload(
const RsGxsGroupId& channelId, bool enable ) = 0;
/**
* @brief Share channel publishing key
* This can be used to authorize other peers to post on the channel
* @jsonapi{development}
* @param[in] channelId id of the channel
* @param[in] peers peers to share the key with
* @return false on error, true otherwise
*/
virtual bool createPost(RsGxsChannelPost& post) = 0;
virtual bool shareChannelKeys(
const RsGxsGroupId& channelId, const std::set<RsPeerId>& peers ) = 0;
/**
* @brief Set download directory for the given channel. Blocking API.
* @jsonapi{development}
* @param[in] channelId id of the channel
* @param[in] directory path
* @return false on error, true otherwise
*/
virtual bool setChannelDownloadDirectory(
const RsGxsGroupId& channelId, const std::string& directory) = 0;
/**
* @brief Subscrbe to a channel. Blocking API
@ -163,141 +248,6 @@ public:
virtual bool subscribeToChannel( const RsGxsGroupId &channelId,
bool subscribe ) = 0;
/* Specific Service Data
* TODO: change the orrible const uint32_t &token to uint32_t token
* TODO: create a new typedef for token so code is easier to read
*/
virtual bool getGroupData(const uint32_t &token, std::vector<RsGxsChannelGroup> &groups) = 0;
virtual bool getPostData(const uint32_t &token, std::vector<RsGxsChannelPost> &posts, std::vector<RsGxsComment> &cmts) = 0;
virtual bool getPostData(const uint32_t &token, std::vector<RsGxsChannelPost> &posts) = 0;
/**
* @brief toggle message read status
* @jsonapi{development}
* @param[out] token GXS token queue token
* @param[in] msgId
* @param[in] read
*/
virtual void setMessageReadStatus(
uint32_t& token, const RsGxsGrpMsgIdPair& msgId, bool read) = 0;
/**
* @brief Enable or disable auto-download for given channel. Blocking API
* @jsonapi{development}
* @param[in] channelId channel id
* @param[in] enable true to enable, false to disable
* @return false if something failed, true otherwhise
*/
virtual bool setChannelAutoDownload(
const RsGxsGroupId& channelId, bool enable) = 0;
/**
* @brief Get auto-download option value for given channel
* @jsonapi{development}
* @param[in] channelId channel id
* @param[out] enabled storage for the auto-download option value
* @return false if something failed, true otherwhise
*/
virtual bool getChannelAutoDownload(
const RsGxsGroupId& channelId, bool& enabled) = 0;
/**
* @brief Set download directory for the given channel
* @jsonapi{development}
* @param[in] channelId id of the channel
* @param[in] directory path
* @return false on error, true otherwise
*/
virtual bool setChannelDownloadDirectory(
const RsGxsGroupId& channelId, const std::string& directory) = 0;
/**
* @brief Get download directory for the given channel
* @jsonapi{development}
* @param[in] channelId id of the channel
* @param[out] directory reference to string where to store the path
* @return false on error, true otherwise
*/
virtual bool getChannelDownloadDirectory( const RsGxsGroupId& channelId,
std::string& directory ) = 0;
/**
* @brief Share channel publishing key
* This can be used to authorize other peers to post on the channel
* @jsonapi{development}
* @param[in] groupId Channel id
* @param[in] peers peers to which share the key
* @return false on error, true otherwise
*/
virtual bool groupShareKeys(
const RsGxsGroupId& groupId, const std::set<RsPeerId>& peers ) = 0;
/**
* @brief Request subscription to a group.
* The action is performed asyncronously, so it could fail in a subsequent
* phase even after returning true.
* @param[out] token Storage for RsTokenService token to track request
* status.
* @param[in] groupId Channel id
* @param[in] subscribe
* @return false on error, true otherwise
*/
virtual bool subscribeToGroup( uint32_t& token, const RsGxsGroupId &groupId,
bool subscribe ) = 0;
/**
* @brief Request channel creation.
* The action is performed asyncronously, so it could fail in a subsequent
* phase even after returning true.
* @param[out] token Storage for RsTokenService token to track request
* status.
* @param[in] group Channel data (name, description...)
* @return false on error, true otherwise
*/
virtual bool createGroup(uint32_t& token, RsGxsChannelGroup& group) = 0;
/**
* @brief Request post creation.
* The action is performed asyncronously, so it could fail in a subsequent
* phase even after returning true.
* @param[out] token Storage for RsTokenService token to track request
* status.
* @param[in] post
* @return false on error, true otherwise
*/
virtual bool createPost(uint32_t& token, RsGxsChannelPost& post) = 0;
/**
* @brief Request channel change.
* The action is performed asyncronously, so it could fail in a subsequent
* phase even after returning true.
* @jsonapi{development}
* @param[out] token Storage for RsTokenService token to track request
* status.
* @param[in] group Channel data (name, description...) with modifications
* @return false on error, true otherwise
*/
virtual bool updateGroup(uint32_t& token, RsGxsChannelGroup& group) = 0;
/**
* @brief Share extra file
* Can be used to share extra file attached to a channel post
* @jsonapi{development}
* @param[in] path file path
* @return false on error, true otherwise
*/
virtual bool ExtraFileHash(const std::string& path) = 0;
/**
* @brief Remove extra file from shared files
* @jsonapi{development}
* @param[in] hash hash of the file to remove
* @return false on error, true otherwise
*/
virtual bool ExtraFileRemove(const RsFileHash& hash) = 0;
/**
* @brief Request remote channels search
* @jsonapi{development}
@ -312,15 +262,123 @@ public:
const std::function<void (const RsGxsGroupSummary& result)>& multiCallback,
rstime_t maxWait = 300 ) = 0;
/**
* @brief Request remote channel
* @jsonapi{development}
* @param[in] channelId id of the channel to request to distants peers
* @param multiCallback function that will be called each time a result is
* received
* @param[in] maxWait maximum wait time in seconds for search results
* @return false on error, true otherwise
*/
virtual bool turtleChannelRequest(
const RsGxsGroupId& channelId,
const std::function<void (const RsGxsChannelGroup& result)>& multiCallback,
rstime_t maxWait = 300 ) = 0;
/* Following functions are deprecated as they expose internal functioning
* semantic, instead of a safe to use API */
RS_DEPRECATED_FOR(getChannelsInfo)
virtual bool getGroupData(const uint32_t &token, std::vector<RsGxsChannelGroup> &groups) = 0;
RS_DEPRECATED_FOR(getChannelsContent)
virtual bool getPostData(const uint32_t &token, std::vector<RsGxsChannelPost> &posts, std::vector<RsGxsComment> &cmts) = 0;
RS_DEPRECATED_FOR(getChannelsContent)
virtual bool getPostData(const uint32_t &token, std::vector<RsGxsChannelPost> &posts) = 0;
/**
* @brief toggle message read status
* @deprecated
* @param[out] token GXS token queue token
* @param[in] msgId
* @param[in] read
*/
RS_DEPRECATED_FOR(markRead)
virtual void setMessageReadStatus(
uint32_t& token, const RsGxsGrpMsgIdPair& msgId, bool read) = 0;
/**
* @brief Share channel publishing key
* This can be used to authorize other peers to post on the channel
* @deprecated
* @param[in] groupId Channel id
* @param[in] peers peers to which share the key
* @return false on error, true otherwise
*/
RS_DEPRECATED_FOR(shareChannelKeys)
virtual bool groupShareKeys(
const RsGxsGroupId& groupId, const std::set<RsPeerId>& peers ) = 0;
/**
* @brief Request subscription to a group.
* The action is performed asyncronously, so it could fail in a subsequent
* phase even after returning true.
* @deprecated
* @param[out] token Storage for RsTokenService token to track request
* status.
* @param[in] groupId Channel id
* @param[in] subscribe
* @return false on error, true otherwise
*/
RS_DEPRECATED_FOR(subscribeToChannel)
virtual bool subscribeToGroup( uint32_t& token, const RsGxsGroupId &groupId,
bool subscribe ) = 0;
/**
* @brief Request channel creation.
* The action is performed asyncronously, so it could fail in a subsequent
* phase even after returning true.
* @deprecated
* @param[out] token Storage for RsTokenService token to track request
* status.
* @param[in] group Channel data (name, description...)
* @return false on error, true otherwise
*/
RS_DEPRECATED_FOR(createChannel)
virtual bool createGroup(uint32_t& token, RsGxsChannelGroup& group) = 0;
/**
* @brief Request post creation.
* The action is performed asyncronously, so it could fail in a subsequent
* phase even after returning true.
* @deprecated
* @param[out] token Storage for RsTokenService token to track request
* status.
* @param[in] post
* @return false on error, true otherwise
*/
RS_DEPRECATED
virtual bool createPost(uint32_t& token, RsGxsChannelPost& post) = 0;
/**
* @brief Request channel change.
* The action is performed asyncronously, so it could fail in a subsequent
* phase even after returning true.
* @deprecated
* @param[out] token Storage for RsTokenService token to track request
* status.
* @param[in] group Channel data (name, description...) with modifications
* @return false on error, true otherwise
*/
RS_DEPRECATED_FOR(editChannel)
virtual bool updateGroup(uint32_t& token, RsGxsChannelGroup& group) = 0;
//////////////////////////////////////////////////////////////////////////////
/// Distant synchronisation methods ///
//////////////////////////////////////////////////////////////////////////////
///
RS_DEPRECATED_FOR(turtleChannelRequest)
virtual TurtleRequestId turtleGroupRequest(const RsGxsGroupId& group_id)=0;
RS_DEPRECATED
virtual TurtleRequestId turtleSearchRequest(const std::string& match_string)=0;
RS_DEPRECATED_FOR(turtleSearchRequest)
virtual bool retrieveDistantSearchResults(TurtleRequestId req, std::map<RsGxsGroupId, RsGxsGroupSummary> &results) =0;
RS_DEPRECATED
virtual bool clearDistantSearchResults(TurtleRequestId req)=0;
RS_DEPRECATED_FOR(turtleChannelRequest)
virtual bool retrieveDistantGroup(const RsGxsGroupId& group_id,RsGxsChannelGroup& distant_group)=0;
//////////////////////////////////////////////////////////////////////////////
};

View File

@ -78,7 +78,8 @@ p3GxsChannels::p3GxsChannels(
RsGxsChannels(static_cast<RsGxsIface&>(*this)), GxsTokenQueue(this),
mSubscribedGroupsMutex("GXS channels subscribed groups cache"),
mKnownChannelsMutex("GXS channels known channels timestamp cache"),
mSearchCallbacksMapMutex("GXS channels search")
mSearchCallbacksMapMutex("GXS channels search callbacks map"),
mDistantChannelsCallbacksMapMutex("GXS channels distant channels callbacks map")
{
// For Dummy Msgs.
mGenActive = false;
@ -574,10 +575,12 @@ bool p3GxsChannels::getChannelAutoDownload(const RsGxsGroupId &groupId, bool& en
return autoDownloadEnabled(groupId,enabled);
}
bool p3GxsChannels::setChannelDownloadDirectory(const RsGxsGroupId &groupId, const std::string& directory)
bool p3GxsChannels::setChannelDownloadDirectory(
const RsGxsGroupId &groupId, const std::string& directory )
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::setDownloadDirectory() id: " << groupId << " to: " << directory << std::endl;
std::cerr << __PRETTY_FUNCTION__ << " id: " << groupId << " to: "
<< directory << std::endl;
#endif
RS_STACK_MUTEX(mSubscribedGroupsMutex);
@ -586,9 +589,8 @@ bool p3GxsChannels::setChannelDownloadDirectory(const RsGxsGroupId &groupId, con
it = mSubscribedGroups.find(groupId);
if (it == mSubscribedGroups.end())
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::setAutoDownload() Missing Group" << std::endl;
#endif
std::cerr << __PRETTY_FUNCTION__ << " Error! Unknown groupId: "
<< groupId.toStdString() << std::endl;
return false;
}
@ -598,13 +600,12 @@ bool p3GxsChannels::setChannelDownloadDirectory(const RsGxsGroupId &groupId, con
if (directory == ss.mDownloadDirectory)
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::setDownloadDirectory() WARNING setting looks okay already" << std::endl;
#endif
std::cerr << __PRETTY_FUNCTION__ << " Warning! groupId: " << groupId
<< " Was already configured to download into: " << directory
<< std::endl;
return false;
}
/* we are just going to set it anyway. */
ss.mDownloadDirectory = directory;
std::string serviceString = ss.save();
uint32_t token;
@ -612,6 +613,13 @@ bool p3GxsChannels::setChannelDownloadDirectory(const RsGxsGroupId &groupId, con
it->second.mServiceString = serviceString; // update Local Cache.
RsGenExchange::setGroupServiceString(token, groupId, serviceString); // update dbase.
if(waitToken(token) != RsTokenService::COMPLETE)
{
std::cerr << __PRETTY_FUNCTION__ << " Error! Feiled setting group "
<< " service string" << std::endl;
return false;
}
/* now reload it */
std::list<RsGxsGroupId> groups;
groups.push_back(groupId);
@ -632,13 +640,10 @@ bool p3GxsChannels::getChannelDownloadDirectory(const RsGxsGroupId & groupId,std
std::map<RsGxsGroupId, RsGroupMetaData>::iterator it;
it = mSubscribedGroups.find(groupId);
if (it == mSubscribedGroups.end())
{
#ifdef GXSCHANNELS_DEBUG
std::cerr << "p3GxsChannels::getChannelDownloadDirectory() No Entry" << std::endl;
#endif
std::cerr << __PRETTY_FUNCTION__ << " Error! Unknown groupId: "
<< groupId.toStdString() << std::endl;
return false;
}
@ -647,7 +652,7 @@ bool p3GxsChannels::getChannelDownloadDirectory(const RsGxsGroupId & groupId,std
ss.load(it->second.mServiceString);
directory = ss.mDownloadDirectory;
return true ;
return true;
}
void p3GxsChannels::request_AllSubscribedGroups()
@ -1053,20 +1058,63 @@ bool p3GxsChannels::getChannelsContent(
bool p3GxsChannels::createChannel(RsGxsChannelGroup& channel)
{
uint32_t token;
if( !createGroup(token, channel)
|| waitToken(token) != RsTokenService::COMPLETE )
return false;
if(RsGenExchange::getPublishedGroupMeta(token, channel.mMeta))
if(!createGroup(token, channel))
{
std::cerr << __PRETTY_FUNCTION__ << "Error! Failed updating group."
<< std::endl;
return false;
}
if(waitToken(token) != RsTokenService::COMPLETE)
{
std::cerr << __PRETTY_FUNCTION__ << "Error! GXS operation failed."
<< std::endl;
return false;
}
if(!RsGenExchange::getPublishedGroupMeta(token, channel.mMeta))
{
std::cerr << __PRETTY_FUNCTION__ << "Error! Failure getting updated "
<< " group data." << std::endl;
return false;
}
#ifdef RS_DEEP_SEARCH
DeepSearch::indexChannelGroup(channel);
#endif // RS_DEEP_SEARCH
return true;
}
bool p3GxsChannels::editChannel(RsGxsChannelGroup& channel)
{
uint32_t token;
if(!updateGroup(token, channel))
{
std::cerr << __PRETTY_FUNCTION__ << "Error! Failed updating group."
<< std::endl;
return false;
}
if(waitToken(token) != RsTokenService::COMPLETE)
{
std::cerr << __PRETTY_FUNCTION__ << "Error! GXS operation failed."
<< std::endl;
return false;
}
if(!RsGenExchange::getPublishedGroupMeta(token, channel.mMeta))
{
std::cerr << __PRETTY_FUNCTION__ << "Error! Failure getting updated "
<< " group data." << std::endl;
return false;
}
#ifdef RS_DEEP_SEARCH
DeepSearch::indexChannelGroup(channel);
#endif // RS_DEEP_SEARCH
return true;
}
bool p3GxsChannels::createPost(RsGxsChannelPost& post)
@ -1096,6 +1144,20 @@ bool p3GxsChannels::subscribeToChannel(
return true;
}
bool p3GxsChannels::markRead(const RsGxsGrpMsgIdPair& msgId, bool read)
{
uint32_t token;
setMessageReadStatus(token, msgId, read);
if(waitToken(token) != RsTokenService::COMPLETE ) return false;
return true;
}
bool p3GxsChannels::shareChannelKeys(
const RsGxsGroupId& channelId, const std::set<RsPeerId>& peers)
{
return groupShareKeys(channelId, peers);
}
////////////////////////////////////////////////////////////////////////////////
/// Blocking API implementation end
@ -1609,7 +1671,7 @@ void p3GxsChannels::dummy_tick()
}
cleanTimedOutSearches();
cleanTimedOutCallbacks();
}
@ -1781,7 +1843,7 @@ TurtleRequestId p3GxsChannels::turtleGroupRequest(const RsGxsGroupId& group_id)
}
TurtleRequestId p3GxsChannels::turtleSearchRequest(const std::string& match_string)
{
return netService()->turtleSearchRequest(match_string) ;
return netService()->turtleSearchRequest(match_string);
}
bool p3GxsChannels::clearDistantSearchResults(TurtleRequestId req)
@ -1839,6 +1901,7 @@ bool p3GxsChannels::turtleSearchRequest(
TurtleRequestId sId = turtleSearchRequest(matchString);
{
RS_STACK_MUTEX(mSearchCallbacksMapMutex);
mSearchCallbacksMap.emplace(
sId,
@ -1846,6 +1909,35 @@ bool p3GxsChannels::turtleSearchRequest(
multiCallback,
std::chrono::system_clock::now() +
std::chrono::seconds(maxWait) ) );
}
return true;
}
/// @see RsGxsChannels::turtleChannelRequest
bool p3GxsChannels::turtleChannelRequest(
const RsGxsGroupId& channelId,
const std::function<void (const RsGxsChannelGroup& result)>& multiCallback,
rstime_t maxWait)
{
if(channelId.isNull())
{
std::cerr << __PRETTY_FUNCTION__ << "Error! channelId can't be null!"
<< std::endl;
return false;
}
TurtleRequestId sId = turtleGroupRequest(channelId);
{
RS_STACK_MUTEX(mDistantChannelsCallbacksMapMutex);
mDistantChannelsCallbacksMap.emplace(
sId,
std::make_pair(
multiCallback,
std::chrono::system_clock::now() +
std::chrono::seconds(maxWait) ) );
}
return true;
}
@ -1856,6 +1948,7 @@ void p3GxsChannels::receiveDistantSearchResults(
std::cerr << __PRETTY_FUNCTION__ << "(" << id << ", " << grpId << ")"
<< std::endl;
{
RsGenExchange::receiveDistantSearchResults(id, grpId);
RsGxsGroupSummary gs;
gs.mGroupId = grpId;
@ -1865,14 +1958,48 @@ void p3GxsChannels::receiveDistantSearchResults(
RS_STACK_MUTEX(mSearchCallbacksMapMutex);
auto cbpt = mSearchCallbacksMap.find(id);
if(cbpt != mSearchCallbacksMap.end())
{
cbpt->second.first(gs);
return;
}
} // end RS_STACK_MUTEX(mSearchCallbacksMapMutex);
}
{
RS_STACK_MUTEX(mDistantChannelsCallbacksMapMutex);
auto cbpt = mDistantChannelsCallbacksMap.find(id);
if(cbpt != mDistantChannelsCallbacksMap.end())
{
std::function<void (const RsGxsChannelGroup&)> callback =
cbpt->second.first;
RsThread::async([this, callback, grpId]()
{
std::list<RsGxsGroupId> chanIds({grpId});
std::vector<RsGxsChannelGroup> channelsInfo;
if(!getChannelsInfo(chanIds, channelsInfo))
{
std::cerr << __PRETTY_FUNCTION__ << " Error! Received "
<< "distant channel result grpId: " << grpId
<< " but failed getting channel info"
<< std::endl;
return;
}
for(const RsGxsChannelGroup& chan : channelsInfo)
callback(chan);
} );
return;
}
} // RS_STACK_MUTEX(mDistantChannelsCallbacksMapMutex);
}
void p3GxsChannels::cleanTimedOutSearches()
void p3GxsChannels::cleanTimedOutCallbacks()
{
RS_STACK_MUTEX(mSearchCallbacksMapMutex);
auto now = std::chrono::system_clock::now();
{
RS_STACK_MUTEX(mSearchCallbacksMapMutex);
for( auto cbpt = mSearchCallbacksMap.begin();
cbpt != mSearchCallbacksMap.end(); )
if(cbpt->second.second <= now)
@ -1881,4 +2008,17 @@ void p3GxsChannels::cleanTimedOutSearches()
cbpt = mSearchCallbacksMap.erase(cbpt);
}
else ++cbpt;
} // RS_STACK_MUTEX(mSearchCallbacksMapMutex);
{
RS_STACK_MUTEX(mDistantChannelsCallbacksMapMutex);
for( auto cbpt = mDistantChannelsCallbacksMap.begin();
cbpt != mDistantChannelsCallbacksMap.end(); )
if(cbpt->second.second <= now)
{
clearDistantSearchResults(cbpt->first);
cbpt = mDistantChannelsCallbacksMap.erase(cbpt);
}
else ++cbpt;
} // RS_STACK_MUTEX(mDistantChannelsCallbacksMapMutex)
}

View File

@ -112,6 +112,12 @@ virtual bool getChannelDownloadDirectory(const RsGxsGroupId &groupId, std::strin
const std::function<void (const RsGxsGroupSummary&)>& multiCallback,
rstime_t maxWait = 300 );
/// @see RsGxsChannels::turtleChannelRequest
virtual bool turtleChannelRequest(
const RsGxsGroupId& channelId,
const std::function<void (const RsGxsChannelGroup& result)>& multiCallback,
rstime_t maxWait = 300 );
/**
* Receive results from turtle search @see RsGenExchange @see RsNxsObserver
* @see RsGxsNetService::receiveTurtleSearchResults
@ -183,6 +189,9 @@ virtual bool ExtraFileRemove(const RsFileHash &hash);
/// Implementation of @see RsGxsChannels::createChannel
virtual bool createChannel(RsGxsChannelGroup& channel);
/// Implementation of @see RsGxsChannels::editChannel
virtual bool editChannel(RsGxsChannelGroup& channel);
/// Implementation of @see RsGxsChannels::createPost
virtual bool createPost(RsGxsChannelPost& post);
@ -190,6 +199,12 @@ virtual bool ExtraFileRemove(const RsFileHash &hash);
virtual bool subscribeToChannel( const RsGxsGroupId &groupId,
bool subscribe );
/// Implementation of @see RsGxsChannels::setPostRead
virtual bool markRead(const RsGxsGrpMsgIdPair& msgId, bool read);
virtual bool shareChannelKeys(
const RsGxsGroupId& channelId, const std::set<RsPeerId>& peers );
protected:
// Overloaded from GxsTokenQueue for Request callbacks.
virtual void handleResponse(uint32_t token, uint32_t req_type);
@ -270,8 +285,17 @@ bool generateGroup(uint32_t &token, std::string groupName);
> mSearchCallbacksMap;
RsMutex mSearchCallbacksMapMutex;
/// Cleanup mSearchCallbacksMap
void cleanTimedOutSearches();
/** Store distant channels requests callbacks with timeout*/
std::map<
TurtleRequestId,
std::pair<
std::function<void (const RsGxsChannelGroup&)>,
std::chrono::system_clock::time_point >
> mDistantChannelsCallbacksMap;
RsMutex mDistantChannelsCallbacksMapMutex;
/// Cleanup mSearchCallbacksMap and mDistantChannelsCallbacksMap
void cleanTimedOutCallbacks();
};
#endif

View File

@ -27,6 +27,8 @@
#include <iostream>
#include <unistd.h>
#include <semaphore.h>
#include <thread>
#include <functional>
#include <util/rsmemory.h>
#include "util/rstime.h"
@ -239,7 +241,7 @@ pthread_t createThread(RsThread &thread);
class RsThread
{
public:
public:
RsThread();
virtual ~RsThread() {}
@ -259,6 +261,17 @@ class RsThread
void ask_for_stop();
/**
* Execute given function on another thread without blocking the caller
* execution.
* This can be generalized with variadic template, ATM it is enough to wrap
* any kind of function call or job into a lamba which get no paramethers
* and return nothing but can capture
* This can be easly optimized later by using a thread pool
*/
static void async(const std::function<void()>& fn)
{ std::thread(fn).detach(); }
protected:
virtual void runloop() =0; /* called once the thread is started. Should be overloaded by subclasses. */
void go() ; // this one calls runloop and also sets the flags correctly when the thread is finished running.

View File

@ -21,6 +21,7 @@
#include <QMenu>
#include <QFileDialog>
#include <QMetaObject>
#include <retroshare/rsfiles.h>
@ -276,17 +277,34 @@ QWidget *GxsChannelDialog::createCommentHeaderWidget(const RsGxsGroupId &grpId,
void GxsChannelDialog::toggleAutoDownload()
{
RsGxsGroupId grpId = groupId();
if (grpId.isNull()) {
if (grpId.isNull()) return;
bool autoDownload;
if(!rsGxsChannels->getChannelAutoDownload(grpId, autoDownload))
{
std::cerr << __PRETTY_FUNCTION__ << " failed to get autodownload value "
<< "for channel: " << grpId.toStdString() << std::endl;
return;
}
bool autoDownload ;
if(!rsGxsChannels->getChannelAutoDownload(grpId,autoDownload) || !rsGxsChannels->setChannelAutoDownload(grpId, !autoDownload))
RsThread::async([this, grpId, autoDownload]()
{
std::cerr << "GxsChannelDialog::toggleAutoDownload() Auto Download failed to set";
std::cerr << std::endl;
if(!rsGxsChannels->setChannelAutoDownload(grpId, !autoDownload))
{
std::cerr << __PRETTY_FUNCTION__ << " failed to set autodownload "
<< "for channel: " << grpId << std::endl;
return;
}
QMetaObject::invokeMethod(this, [=]()
{
/* Here it goes any code you want to be executed on the Qt Gui
* thread, for example to update the data model with new information
* after a blocking call to RetroShare API complete, note that
* Qt::QueuedConnection is important!
*/
}, Qt::QueuedConnection);
});
}
void GxsChannelDialog::loadGroupSummaryToken(const uint32_t &token, std::list<RsGroupMetaData> &groupInfo, RsUserdata *&userdata)

View File

@ -622,13 +622,13 @@ bool GxsChannelPostsWidget::navigatePostItem(const RsGxsMessageId &msgId)
void GxsChannelPostsWidget::subscribeGroup(bool subscribe)
{
if (groupId().isNull()) {
return;
}
RsGxsGroupId grpId(groupId());
if (grpId.isNull()) return;
uint32_t token;
rsGxsChannels->subscribeToGroup(token, groupId(), subscribe);
// mChannelQueue->queueRequest(token, 0, RS_TOKREQ_ANSTYPE_ACK, TOKEN_TYPE_SUBSCRIBE_CHANGE);
RsThread::async([=]()
{
rsGxsChannels->subscribeToChannel(grpId, subscribe);
} );
}
void GxsChannelPostsWidget::setAutoDownload(bool autoDl)
@ -644,12 +644,32 @@ void GxsChannelPostsWidget::toggleAutoDownload()
return;
}
bool autoDownload ;
if(!rsGxsChannels->getChannelAutoDownload(grpId,autoDownload) || !rsGxsChannels->setChannelAutoDownload(grpId, !autoDownload))
bool autoDownload;
if(!rsGxsChannels->getChannelAutoDownload(grpId, autoDownload))
{
std::cerr << "GxsChannelDialog::toggleAutoDownload() Auto Download failed to set";
std::cerr << std::endl;
std::cerr << __PRETTY_FUNCTION__ << " failed to get autodownload value "
<< "for channel: " << grpId.toStdString() << std::endl;
return;
}
RsThread::async([this, grpId, autoDownload]()
{
if(!rsGxsChannels->setChannelAutoDownload(grpId, !autoDownload))
{
std::cerr << __PRETTY_FUNCTION__ << " failed to set autodownload "
<< "for channel: " << grpId.toStdString() << std::endl;
return;
}
QMetaObject::invokeMethod(this, [=]()
{
/* Here it goes any code you want to be executed on the Qt Gui
* thread, for example to update the data model with new information
* after a blocking call to RetroShare API complete, note that
* Qt::QueuedConnection is important!
*/
}, Qt::QueuedConnection);
});
}
bool GxsChannelPostsWidget::insertGroupData(const uint32_t &token, RsGroupMetaData &metaData)