adding thread_group for managing async tasks

This commit is contained in:
Lee Clagett 2016-11-02 16:41:43 -04:00
parent d51f1af75f
commit 64094e5f4e
4 changed files with 352 additions and 101 deletions

View file

@ -28,9 +28,9 @@
// STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
// THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <boost/asio.hpp>
#include "misc_log_ex.h"
#include "common/perf_timer.h"
#include "common/thread_group.h"
#include "common/util.h"
#include "rctSigs.h"
#include "cryptonote_core/cryptonote_format_utils.h"
@ -38,17 +38,22 @@
using namespace crypto;
using namespace std;
#define KILL_IOSERVICE() \
if(ioservice_active) \
{ \
work.reset(); \
while (!ioservice.stopped()) ioservice.poll(); \
threadpool.join_all(); \
ioservice.stop(); \
ioservice_active = false; \
}
namespace rct {
namespace {
struct verRangeWrapper_ {
void operator()(const key & C, const rangeSig & as, bool &result) const {
result = verRange(C, as);
}
};
constexpr const verRangeWrapper_ verRangeWrapper{};
struct verRctMGSimpleWrapper_ {
void operator()(const key &message, const mgSig &mg, const ctkeyV & pubs, const key & C, bool &result) const {
result = verRctMGSimple(message, mg, pubs, C);
}
};
constexpr const verRctMGSimpleWrapper_ verRctMGSimpleWrapper{};
}
//Schnorr Non-linkable
//Gen Gives a signature (L1, s1, s2) proving that the sender knows "x" such that xG = one of P1 or P2
@ -360,10 +365,6 @@ namespace rct {
return true;
}
void verRangeWrapper(const key & C, const rangeSig & as, bool &result) {
result = verRange(C, as);
}
key get_pre_mlsag_hash(const rctSig &rv)
{
keyV hashes;
@ -544,9 +545,6 @@ namespace rct {
return MLSAG_Ver(message, M, mg, rows);
}
void verRctMGSimpleWrapper(const key &message, const mgSig &mg, const ctkeyV & pubs, const key & C, bool &result) {
result = verRctMGSimple(message, mg, pubs, C);
}
//These functions get keys from blockchain
//replace these when connecting blockchain
@ -767,38 +765,20 @@ namespace rct {
// some rct ops can throw
try
{
boost::asio::io_service ioservice;
boost::thread_group threadpool;
std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(ioservice));
size_t threads = tools::get_max_concurrency();
threads = std::min(threads, rv.outPk.size());
for (size_t i = 0; i < threads; ++i)
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
bool ioservice_active = true;
std::deque<bool> results(rv.outPk.size(), false);
epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); });
tools::thread_group threadpool(rv.outPk.size()); // this must destruct before results
DP("range proofs verified?");
for (size_t i = 0; i < rv.outPk.size(); i++) {
if (threads > 1) {
ioservice.dispatch(boost::bind(&verRangeWrapper, std::cref(rv.outPk[i].mask), std::cref(rv.p.rangeSigs[i]), std::ref(results[i])));
}
else {
bool tmp = verRange(rv.outPk[i].mask, rv.p.rangeSigs[i]);
DP(tmp);
if (!tmp) {
LOG_ERROR("Range proof verification failed for input " << i);
return false;
}
}
threadpool.dispatch(
std::bind(verRangeWrapper, std::cref(rv.outPk[i].mask), std::cref(rv.p.rangeSigs[i]), std::ref(results[i]))
);
}
KILL_IOSERVICE();
if (threads > 1) {
for (size_t i = 0; i < rv.outPk.size(); ++i) {
if (!results[i]) {
LOG_ERROR("Range proof verified failed for input " << i);
return false;
}
threadpool.sync();
for (size_t i = 0; i < rv.outPk.size(); ++i) {
if (!results[i]) {
LOG_ERROR("Range proof verified failed for input " << i);
return false;
}
}
@ -832,34 +812,23 @@ namespace rct {
CHECK_AND_ASSERT_MES(rv.pseudoOuts.size() == rv.p.MGs.size(), false, "Mismatched sizes of rv.pseudoOuts and rv.p.MGs");
CHECK_AND_ASSERT_MES(rv.pseudoOuts.size() == rv.mixRing.size(), false, "Mismatched sizes of rv.pseudoOuts and mixRing");
const size_t threads = std::max(rv.outPk.size(), rv.mixRing.size());
tools::thread_group threadpool(threads);
{
boost::asio::io_service ioservice;
boost::thread_group threadpool;
std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(ioservice));
size_t threads = tools::get_max_concurrency();
threads = std::min(threads, rv.outPk.size());
for (size_t i = 0; i < threads; ++i)
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
bool ioservice_active = true;
std::deque<bool> results(rv.outPk.size(), false);
epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); });
for (i = 0; i < rv.outPk.size(); i++) {
if (threads > 1) {
ioservice.dispatch(boost::bind(&verRangeWrapper, std::cref(rv.outPk[i].mask), std::cref(rv.p.rangeSigs[i]), std::ref(results[i])));
{
const std::unique_ptr<tools::thread_group, tools::thread_group::lazy_sync>
sync(std::addressof(threadpool));
for (i = 0; i < rv.outPk.size(); i++) {
threadpool.dispatch(
std::bind(verRangeWrapper, std::cref(rv.outPk[i].mask), std::cref(rv.p.rangeSigs[i]), std::ref(results[i]))
);
}
else if (!verRange(rv.outPk[i].mask, rv.p.rangeSigs[i])) {
LOG_ERROR("Range proof verified failed for input " << i);
return false;
}
}
KILL_IOSERVICE();
if (threads > 1) {
for (size_t i = 0; i < rv.outPk.size(); ++i) {
if (!results[i]) {
LOG_ERROR("Range proof verified failed for input " << i);
return false;
}
} // threadpool.sync();
for (size_t i = 0; i < rv.outPk.size(); ++i) {
if (!results[i]) {
LOG_ERROR("Range proof verified failed for input " << i);
return false;
}
}
}
@ -875,37 +844,20 @@ namespace rct {
key message = get_pre_mlsag_hash(rv);
{
boost::asio::io_service ioservice;
boost::thread_group threadpool;
std::unique_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(ioservice));
size_t threads = tools::get_max_concurrency();
threads = std::min(threads, rv.mixRing.size());
for (size_t i = 0; i < threads; ++i)
threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &ioservice));
bool ioservice_active = true;
std::deque<bool> results(rv.mixRing.size(), false);
epee::misc_utils::auto_scope_leave_caller ioservice_killer = epee::misc_utils::create_scope_leave_handler([&]() { KILL_IOSERVICE(); });
for (i = 0 ; i < rv.mixRing.size() ; i++) {
if (threads > 1) {
ioservice.dispatch(boost::bind(&verRctMGSimpleWrapper, std::cref(message), std::cref(rv.p.MGs[i]), std::cref(rv.mixRing[i]), std::cref(rv.pseudoOuts[i]), std::ref(results[i])));
{
const std::unique_ptr<tools::thread_group, tools::thread_group::lazy_sync>
sync(std::addressof(threadpool));
for (i = 0 ; i < rv.mixRing.size() ; i++) {
threadpool.dispatch(
std::bind(verRctMGSimpleWrapper, std::cref(message), std::cref(rv.p.MGs[i]), std::cref(rv.mixRing[i]), std::cref(rv.pseudoOuts[i]), std::ref(results[i]))
);
}
else {
bool tmpb = verRctMGSimple(message, rv.p.MGs[i], rv.mixRing[i], rv.pseudoOuts[i]);
DP(tmpb);
if (!tmpb) {
LOG_ERROR("verRctMGSimple failed for input " << i);
return false;
}
}
}
KILL_IOSERVICE();
if (threads > 1) {
for (size_t i = 0; i < results.size(); ++i) {
if (!results[i]) {
LOG_ERROR("verRctMGSimple failed for input " << i);
return false;
}
} // threadpool.sync();
for (size_t i = 0; i < results.size(); ++i) {
if (!results[i]) {
LOG_ERROR("verRctMGSimple failed for input " << i);
return false;
}
}
}