client periodically sends keep alive notification

This commit is contained in:
woodser 2022-01-15 15:43:10 -05:00
parent 7cdcdffb72
commit be86aafeff
4 changed files with 106 additions and 20 deletions

View File

@ -2,7 +2,7 @@
// import haveno types
import {HavenoDaemon} from "./HavenoDaemon";
import {HavenoUtils} from "./HavenoUtils";
import {HavenoUtils} from "./utils/HavenoUtils";
import * as grpcWeb from 'grpc-web';
import {MarketPriceInfo, NotificationMessage, OfferInfo, TradeInfo, XmrBalanceInfo} from './protobuf/grpc_pb'; // TODO (woodser): better names; haveno_grpc_pb, haveno_pb
import {PaymentAccount} from './protobuf/pb_pb';
@ -669,10 +669,14 @@ test("Can complete a trade", async () => {
expect(trade.getPhase()).toEqual("DEPOSIT_PUBLISHED");
console.log("Bob done taking offer in " + (Date.now() - startTime) + " ms");
// alice is notified offer is taken
assert.equal(1, aliceNotifications.length);
assert.equal("Offer Taken", aliceNotifications[0].getTitle());
assert.equal("Your offer " + offer.getId() + " has been accepted", aliceNotifications[0].getMessage());
// alice is notified that offer is taken
let tradeNotifications = getNotifications(aliceNotifications, NotificationMessage.NotificationType.TRADE_UPDATE);
expect(tradeNotifications.length).toBe(1);
expect(tradeNotifications[0].getTrade()!.getPhase()).toEqual("DEPOSIT_PUBLISHED");
expect(tradeNotifications[0].getTitle()).toEqual("Offer Taken");
expect(tradeNotifications[0].getMessage()).toEqual("Your offer " + offer.getId() + " has been accepted");
// alice is notified of balance change
// bob can get trade
let fetchedTrade: TradeInfo = await bob.getTrade(trade.getTradeId());
@ -686,9 +690,6 @@ test("Can complete a trade", async () => {
// bob is notified of balance change
// alice notified of balance changes and that offer is taken
await wait(TestConfig.maxTimePeerNoticeMs);
// alice can get trade
fetchedTrade = await alice.getTrade(trade.getTradeId());
expect(fetchedTrade.getPhase()).toEqual("DEPOSIT_PUBLISHED");
@ -915,7 +916,11 @@ async function waitForUnlockedBalance(amount: bigint, ...wallets: any[]) {
let unlockedBalance = await wallet.getUnlockedBalance();
if (unlockedBalance < amount) miningNeeded = true;
let depositNeeded: bigint = amount - unlockedBalance - await wallet.getLockedBalance();
if (depositNeeded > BigInt("0") && wallet._wallet !== fundingWallet) fundConfig.addDestination(await wallet.getDepositAddress(), depositNeeded * BigInt("10")); // deposit 10 times more than needed
if (depositNeeded > BigInt("0") && wallet._wallet !== fundingWallet) {
for (let i = 0; i < 5; i++) {
fundConfig.addDestination(await wallet.getDepositAddress(), depositNeeded * BigInt("2")); // make several deposits
}
}
}
if (fundConfig.getDestinations()) {
await waitForUnlockedBalance(TestConfig.fundingWallet.minimumFunding, fundingWallet); // TODO (woodser): wait for enough to cover tx amount + fee
@ -977,6 +982,16 @@ async function wait(durationMs: number) {
return new Promise(function(resolve) { setTimeout(resolve, durationMs); });
}
function getNotifications(notifications: NotificationMessage[], notificationType: NotificationMessage.NotificationType) {
let filteredNotifications: NotificationMessage[] = [];
for (let notification of notifications) {
if (notification.getType() === notificationType) {
filteredNotifications.push(notification);
}
}
return filteredNotifications;
}
function testTx(tx: XmrTx, ctx: TxContext) {
assert(tx.getHash());
expect(BigInt(tx.getFee())).toBeLessThan(TestConfig.maxFee);

View File

@ -1,4 +1,5 @@
import {HavenoUtils} from "./HavenoUtils";
import {HavenoUtils} from "./utils/HavenoUtils";
import {TaskLooper} from "./utils/TaskLooper";
import * as grpcWeb from 'grpc-web';
import {DisputeAgentsClient, GetVersionClient, NotificationsClient, PriceClient, WalletsClient, OffersClient, PaymentAccountsClient, TradesClient} from './protobuf/GrpcServiceClientPb';
import {CancelOfferRequest, ConfirmPaymentReceivedRequest, ConfirmPaymentStartedRequest, CreateCryptoCurrencyPaymentAccountReply, CreateCryptoCurrencyPaymentAccountRequest, CreateOfferReply, CreateOfferRequest, CreateXmrTxReply, CreateXmrTxRequest, GetBalancesReply, GetBalancesRequest, GetNewDepositSubaddressReply, GetNewDepositSubaddressRequest, GetOffersReply, GetOffersRequest, GetPaymentAccountsReply, GetPaymentAccountsRequest, GetTradeReply, GetTradeRequest, GetTradesReply, GetTradesRequest, GetVersionReply, GetVersionRequest, GetXmrTxsReply, GetXmrTxsRequest, MarketPriceInfo, MarketPriceReply, MarketPriceRequest, MarketPricesReply, MarketPricesRequest, NotificationMessage, OfferInfo, RegisterDisputeAgentRequest, RegisterNotificationListenerRequest, RelayXmrTxReply, RelayXmrTxRequest, SendNotificationRequest, TakeOfferReply, TakeOfferRequest, TradeInfo, XmrBalanceInfo, XmrDestination, XmrTx} from './protobuf/grpc_pb';
@ -10,13 +11,7 @@ const console = require('console');
*/
class HavenoDaemon {
// instance variables
_url: string;
_password: string;
_process: any;
_processLogging: boolean = false;
_notificationListeners: ((notification: NotificationMessage) => void)[] = [];
_walletRpcPort: number|undefined;
// grpc clients
_getVersionClient: GetVersionClient;
_disputeAgentsClient: DisputeAgentsClient;
_notificationsClient: NotificationsClient;
@ -26,6 +21,15 @@ class HavenoDaemon {
_offersClient: OffersClient;
_tradesClient: TradesClient;
// other instance variables
_url: string;
_password: string;
_process: any;
_processLogging: boolean = false;
_walletRpcPort: number|undefined;
_notificationListeners: ((notification: NotificationMessage) => void)[] = [];
_keepAlivePeriodMs: number = 60000;
/**
* Construct a client connected to a Haveno daemon.
*
@ -510,8 +514,8 @@ class HavenoDaemon {
async takeOffer(offerId: string, paymentAccountId: string): Promise<TradeInfo> {
let that = this;
let request = new TakeOfferRequest()
.setOfferId(offerId)
.setPaymentAccountId(paymentAccountId);
.setOfferId(offerId)
.setPaymentAccountId(paymentAccountId);
return new Promise(function(resolve, reject) {
that._tradesClient.takeOffer(request, {password: that._password}, function(err: grpcWeb.RpcError, response: TakeOfferReply) {
if (err) reject(err);
@ -592,13 +596,30 @@ class HavenoDaemon {
async _registerNotificationListener(): Promise<void> {
let that = this;
return new Promise(function(resolve) {
// send request to register client listener
that._notificationsClient.registerNotificationListener(new RegisterNotificationListenerRequest(), {password: that._password})
.on("data", (data) => {
if (data instanceof NotificationMessage) {
for (let listener of that._notificationListeners) listener(data);
}
});
setTimeout(function() { resolve(); }, 1000); // TODO: call returns before listener registered
// periodically send keep alive requests // TODO (woodser): better way to keep notification stream alive?
let firstRequest = true;
let taskLooper = new TaskLooper(async function() {
if (firstRequest) {
firstRequest = false;
return;
}
await that._sendNotification(new NotificationMessage()
.setType(NotificationMessage.NotificationType.KEEP_ALIVE)
.setTimestamp(Date.now()));
});
taskLooper.start(that._keepAlivePeriodMs);
// TODO: call returns before listener registered
setTimeout(function() { resolve(); }, 1000);
});
}

50
src/utils/TaskLooper.ts Normal file
View File

@ -0,0 +1,50 @@
/**
* Run a task in a fixed period loop.
*/
class TaskLooper {
_fn: () => Promise<void>;
_isStarted: boolean;
_isLooping: boolean;
/**
* Build the looper with a function to invoke on a fixed period loop.
*
* @param {function} fn - the async function to invoke
*/
constructor(fn: () => Promise<void>) {
this._fn = fn;
this._isStarted = false;
this._isLooping = false;
}
/**
* Start the task loop.
*
* @param {int} periodInMs the loop period in milliseconds
*/
start(periodInMs: number) {
if (this._isStarted) return;
this._isStarted = true;
this._runLoop(periodInMs);
}
/**
* Stop the task loop.
*/
stop() {
this._isStarted = false;
}
async _runLoop(periodInMs: number) {
this._isLooping = true;
while (this._isStarted) {
let startTime = Date.now();
await this._fn();
if (this._isStarted) await new Promise(function(resolve) { setTimeout(resolve, periodInMs - (Date.now() - startTime)); });
}
this._isLooping = false;
}
}
export {TaskLooper};