Add callback for broadcaster when sending mailbox msg

This commit is contained in:
Manfred Karrer 2016-01-27 00:04:03 +01:00
parent 9bb4683379
commit 602c503cea
22 changed files with 231 additions and 112 deletions

View File

@ -87,8 +87,7 @@ public class TaskRunner<T extends Model> {
}
void handleErrorMessage(String errorMessage) {
log.error("Task failed: " + currentTask.getSimpleName());
log.error("errorMessage: " + errorMessage);
log.error("Task failed: " + currentTask.getSimpleName() + " / errorMessage: " + errorMessage);
failed = true;
errorMessageHandler.handleErrorMessage(errorMessage);
}

View File

@ -196,7 +196,7 @@ public class DisputeManager {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
log.error("sendEncryptedMessage failed");
}
}
@ -263,7 +263,7 @@ public class DisputeManager {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
log.error("sendEncryptedMessage failed");
}
}
@ -313,7 +313,7 @@ public class DisputeManager {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
log.error("sendEncryptedMessage failed");
}
}
@ -354,7 +354,7 @@ public class DisputeManager {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
log.error("sendEncryptedMessage failed");
}
}
@ -381,7 +381,7 @@ public class DisputeManager {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
log.error("sendEncryptedMessage failed");
}
}

View File

@ -61,7 +61,7 @@ public class PaymentMethod implements Serializable, Comparable {
public static final List<PaymentMethod> ALL_VALUES = new ArrayList<>(Arrays.asList(
OK_PAY = new PaymentMethod(OK_PAY_ID, 0, DAY), // tx instant so min. wait time
PERFECT_MONEY = new PaymentMethod(PERFECT_MONEY_ID, 0, DAY),
SEPA = new PaymentMethod(SEPA_ID, 0, 8 * DAY), // sepa takes 1-3 business days. We use 8 days to include safety for holidays
SEPA = new PaymentMethod(SEPA_ID, 0, 7 * DAY), // sepa takes 1-3 business days. We use 7 days to include safety for holidays
SWISH = new PaymentMethod(SWISH_ID, 0, DAY),
ALI_PAY = new PaymentMethod(ALI_PAY_ID, 0, DAY),
/* FED_WIRE = new PaymentMethod(FED_WIRE_ID, 0, DAY),*/

View File

@ -18,6 +18,8 @@
package io.bitsquare.trade;
import io.bitsquare.app.Version;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.storage.Storage;
import io.bitsquare.trade.offer.Offer;
@ -50,9 +52,9 @@ public abstract class BuyerTrade extends Trade implements Serializable {
state = State.PREPARATION;
}
public void onFiatPaymentStarted() {
checkArgument(tradeProtocol instanceof BuyerProtocol, "tradeProtocol NOT instanceof BuyerProtocol");
((BuyerProtocol) tradeProtocol).onFiatPaymentStarted();
public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
checkArgument(tradeProtocol instanceof BuyerProtocol, "Check failed: tradeProtocol instanceof BuyerProtocol");
((BuyerProtocol) tradeProtocol).onFiatPaymentStarted(resultHandler, errorMessageHandler);
}

View File

@ -17,6 +17,8 @@
package io.bitsquare.trade.protocol.trade;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.messaging.MailboxMessage;
@ -145,12 +147,18 @@ public class BuyerAsOffererProtocol extends TradeProtocol implements BuyerProtoc
// User clicked the "bank transfer started" button
@Override
public void onFiatPaymentStarted() {
public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
buyerAsOffererTrade.setState(Trade.State.FIAT_PAYMENT_STARTED);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsOffererTrade,
() -> handleTaskRunnerSuccess("onFiatPaymentStarted"),
this::handleTaskRunnerFault);
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentStarted");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
VerifyTakeOfferFeePayment.class,
SendFiatTransferStartedMessage.class

View File

@ -18,6 +18,8 @@
package io.bitsquare.trade.protocol.trade;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.messaging.MailboxMessage;
@ -131,12 +133,18 @@ public class BuyerAsTakerProtocol extends TradeProtocol implements BuyerProtocol
// User clicked the "bank transfer started" button
@Override
public void onFiatPaymentStarted() {
public void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
buyerAsTakerTrade.setState(Trade.State.FIAT_PAYMENT_STARTED);
TradeTaskRunner taskRunner = new TradeTaskRunner(buyerAsTakerTrade,
() -> handleTaskRunnerSuccess("onFiatPaymentStarted"),
this::handleTaskRunnerFault);
() -> {
resultHandler.handleResult();
handleTaskRunnerSuccess("onFiatPaymentStarted");
},
(errorMessage) -> {
errorMessageHandler.handleErrorMessage(errorMessage);
handleTaskRunnerFault(errorMessage);
});
taskRunner.addTasks(
VerifyOfferFeePayment.class,
SendFiatTransferStartedMessage.class

View File

@ -17,6 +17,9 @@
package io.bitsquare.trade.protocol.trade;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
public interface BuyerProtocol {
void onFiatPaymentStarted();
void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler);
}

View File

@ -36,7 +36,6 @@ public class SendFiatTransferStartedMessage extends TradeTask {
protected void run() {
try {
runInterceptHook();
processModel.getP2PService().sendEncryptedMailboxMessage(
trade.getTradingPeerNodeAddress(),
processModel.tradingPeer.getPubKeyRing(),
@ -48,22 +47,22 @@ public class SendFiatTransferStartedMessage extends TradeTask {
new SendMailboxMessageListener() {
@Override
public void onArrived() {
log.trace("Message arrived at peer.");
log.info("Message arrived at peer.");
trade.setState(Trade.State.FIAT_PAYMENT_STARTED_MSG_SENT);
complete();
}
@Override
public void onStoredInMailbox() {
log.trace("Message stored in mailbox.");
log.info("Message stored in mailbox.");
trade.setState(Trade.State.FIAT_PAYMENT_STARTED_MSG_SENT);
complete();
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
appendToErrorMessage("FiatTransferStartedMessage sending failed");
failed();
failed(errorMessage);
}
}
);

View File

@ -61,7 +61,7 @@ public class SendPayoutTxFinalizedMessage extends TradeTask {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
appendToErrorMessage("PayoutTxFinalizedMessage sending failed");
failed();
}

View File

@ -65,7 +65,7 @@ public class SendFinalizePayoutTxRequest extends TradeTask {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
appendToErrorMessage("FinalizePayoutTxRequest sending failed");
failed();
}

View File

@ -61,7 +61,7 @@ public class SendDepositTxPublishedMessage extends TradeTask {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
appendToErrorMessage("DepositTxPublishedMessage sending failed");
failed();
}

View File

@ -72,7 +72,7 @@ public class SendPayDepositRequest extends TradeTask {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
appendToErrorMessage("PayDepositRequest sending failed");
failed();
}

View File

@ -39,8 +39,11 @@ import io.bitsquare.gui.popups.DisplayAlertMessagePopup;
import io.bitsquare.gui.popups.Popup;
import io.bitsquare.gui.popups.WalletPasswordPopup;
import io.bitsquare.gui.util.BSFormatter;
import io.bitsquare.locale.CountryUtil;
import io.bitsquare.locale.CurrencyUtil;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.P2PServiceListener;
import io.bitsquare.payment.OKPayAccount;
import io.bitsquare.trade.Trade;
import io.bitsquare.trade.TradeManager;
import io.bitsquare.trade.offer.OpenOffer;
@ -411,6 +414,15 @@ public class MainViewModel implements ViewModel {
// now show app
showAppScreen.set(true);
if (BitsquareApp.DEV_MODE && user.getPaymentAccounts().isEmpty()) {
OKPayAccount okPayAccount = new OKPayAccount();
okPayAccount.setAccountNr("dummy");
okPayAccount.setAccountName("OKPay dummy");
okPayAccount.setSelectedTradeCurrency(CurrencyUtil.getDefaultTradeCurrency());
okPayAccount.setCountry(CountryUtil.getDefaultCountry());
user.addPaymentAccount(okPayAccount);
}
}
private void checkPeriodicallyForBtcSyncState() {

View File

@ -26,6 +26,8 @@ import io.bitsquare.btc.TradeWalletService;
import io.bitsquare.btc.WalletService;
import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.gui.Navigation;
import io.bitsquare.gui.common.model.ActivatableDataModel;
import io.bitsquare.gui.main.MainView;
@ -54,6 +56,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class PendingTradesDataModel extends ActivatableDataModel {
@ -152,10 +155,11 @@ public class PendingTradesDataModel extends ActivatableDataModel {
}
}
void onFiatPaymentStarted() {
void onFiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
checkNotNull(trade, "trade must not be null");
if (trade instanceof BuyerTrade && trade.getDisputeState() == Trade.DisputeState.NONE)
((BuyerTrade) trade).onFiatPaymentStarted();
checkArgument(trade instanceof BuyerTrade, "Check failed: trade instanceof BuyerTrade");
checkArgument(trade.getDisputeState() == Trade.DisputeState.NONE, "Check failed: trade.getDisputeState() == Trade.DisputeState.NONE");
((BuyerTrade) trade).onFiatPaymentStarted(resultHandler, errorMessageHandler);
}
void onFiatPaymentReceived() {

View File

@ -131,7 +131,7 @@ public class PendingTradesView extends ActivatableViewAndModel<VBox, PendingTrad
appFocusProperty = scene.getWindow().focusedProperty();
appFocusProperty.addListener(appFocusChangeListener);
model.currentTrade().addListener(currentTradeChangeListener);
setNewSubView(model.currentTrade().get());
//setNewSubView(model.currentTrade().get());
table.setItems(model.getList());
table.getSelectionModel().selectedItemProperty().addListener(selectedItemChangeListener);
PendingTradesListItem selectedItem = model.getSelectedItem();

View File

@ -19,6 +19,8 @@ package io.bitsquare.gui.main.portfolio.pendingtrades;
import com.google.inject.Inject;
import io.bitsquare.btc.FeePolicy;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.gui.common.model.ActivatableWithDataModel;
import io.bitsquare.gui.common.model.ViewModel;
import io.bitsquare.gui.util.BSFormatter;
@ -179,8 +181,8 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
return dataModel.getTradeProperty();
}
public void fiatPaymentStarted() {
dataModel.onFiatPaymentStarted();
public void fiatPaymentStarted(ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
dataModel.onFiatPaymentStarted(resultHandler, errorMessageHandler);
}
public void fiatPaymentReceived() {
@ -362,13 +364,10 @@ public class PendingTradesViewModel extends ActivatableWithDataModel<PendingTrad
break;
case DEPOSIT_CONFIRMED:
case FIAT_PAYMENT_STARTED:
sellerState.set(WAIT_FOR_FIAT_PAYMENT_STARTED);
buyerState.set(PendingTradesViewModel.BuyerState.REQUEST_START_FIAT_PAYMENT);
break;
case FIAT_PAYMENT_STARTED:
break;
case FIAT_PAYMENT_STARTED_MSG_SENT:
buyerState.set(PendingTradesViewModel.BuyerState.WAIT_FOR_FIAT_PAYMENT_RECEIPT);
break;

View File

@ -86,7 +86,7 @@ public class StartPaymentView extends TradeStepDetailsView {
model.getTxId().removeListener(txIdChangeListener);
txIdTextField.cleanup();
statusProgressIndicator.setProgress(0);
removeStatusProgressIndicator();
}
@ -161,12 +161,40 @@ public class StartPaymentView extends TradeStepDetailsView {
private void confirmPaymentStarted() {
paymentStartedButton.setDisable(true);
paymentStartedButton.setMinWidth(130);
statusProgressIndicator.setVisible(true);
statusProgressIndicator.setManaged(true);
statusProgressIndicator.setProgress(-1);
statusLabel.setText("Sending message to trading partner...");
model.fiatPaymentStarted();
statusLabel.setWrapText(true);
statusLabel.setPrefWidth(220);
statusLabel.setText("Sending message to your trading partner.\n" +
"Please wait until you get the confirmation that the message has arrived.");
model.fiatPaymentStarted(() -> {
// We would not really need an update as the success triggers a screen change
removeStatusProgressIndicator();
statusLabel.setText("");
// In case the first send failed we got the support button displayed.
// If it succeeds at a second try we remove the support button again.
if (openSupportTicketButton != null) {
gridPane.getChildren().remove(openSupportTicketButton);
openSupportTicketButton = null;
}
}, errorMessage -> {
removeStatusProgressIndicator();
statusLabel.setText("Sending message to your trading partner failed.\n" +
"Please try again and if it continue to fail report a bug.");
paymentStartedButton.setDisable(false);
});
}
private void removeStatusProgressIndicator() {
statusProgressIndicator.setVisible(false);
statusProgressIndicator.setProgress(0);
statusProgressIndicator.setManaged(false);
}

View File

@ -22,6 +22,7 @@ import io.bitsquare.gui.components.TitledGroupBg;
import io.bitsquare.gui.main.help.Help;
import io.bitsquare.gui.main.help.HelpId;
import io.bitsquare.gui.main.portfolio.pendingtrades.PendingTradesViewModel;
import io.bitsquare.gui.popups.Popup;
import io.bitsquare.gui.util.Layout;
import io.bitsquare.trade.Trade;
import javafx.geometry.HPos;
@ -52,7 +53,7 @@ public abstract class TradeStepDetailsView extends AnchorPane {
protected Label infoLabel;
protected TitledGroupBg infoTitledGroupBg;
protected Button openDisputeButton;
private Button openSupportTicketButton;
protected Button openSupportTicketButton;
private Trade trade;
private Subscription errorMessageSubscription;
@ -181,7 +182,12 @@ public abstract class TradeStepDetailsView extends AnchorPane {
}
private void addErrorLabel() {
if (infoLabel == null) {
new Popup().warning(trade.errorMessageProperty().getValue()
+ "\n\nPlease report the problem to your arbitrator. He will forward it to the developers to investigate the problem.\n" +
"After the problem has be analysed you will get back all the funds you paid in.\n" +
"There will be no arbitration fee charged if it was a technical error.").show();
/*if (infoLabel == null) {
infoTitledGroupBg = addTitledGroupBg(gridPane, ++gridRow, 1, "Error", Layout.GROUP_DISTANCE);
infoLabel = addMultilineLabel(gridPane, gridRow, Layout.FIRST_ROW_AND_GROUP_DISTANCE);
}
@ -190,11 +196,10 @@ public abstract class TradeStepDetailsView extends AnchorPane {
+ "\n\nPlease report the problem to your arbitrator. He will forward it to the developers to investigate the problem.\n" +
"After the problem has be analysed you will get back all the funds you paid in.\n" +
"There will be no arbitration fee charged if it was a technical error.");
infoLabel.setStyle(" -fx-text-fill: -bs-error-red;");
infoLabel.setStyle(" -fx-text-fill: -bs-error-red;");*/
if (openSupportTicketButton == null) {
openSupportTicketButton = addButtonAfterGroup(gridPane, ++gridRow, "Request support");
GridPane.setColumnIndex(openSupportTicketButton, 0);
openSupportTicketButton = addButton(gridPane, ++gridRow, "Request support");
GridPane.setHalignment(openSupportTicketButton, HPos.LEFT);
openSupportTicketButton.setOnAction(e -> model.dataModel.onOpenSupportTicket());
}

View File

@ -27,6 +27,7 @@ import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload;
import io.bitsquare.p2p.storage.data.ExpirablePayload;
import io.bitsquare.p2p.storage.data.ProtectedData;
import io.bitsquare.p2p.storage.data.ProtectedMailboxData;
import io.bitsquare.p2p.storage.messages.AddDataMessage;
import javafx.beans.property.*;
import javafx.beans.value.ChangeListener;
import org.fxmisc.easybind.EasyBind;
@ -45,7 +46,8 @@ import java.util.concurrent.CopyOnWriteArraySet;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
public class P2PService implements SetupListener, MessageListener, ConnectionListener, RequestDataManager.Listener, HashMapChangedListener {
public class P2PService implements SetupListener, MessageListener, ConnectionListener, RequestDataManager.Listener,
HashMapChangedListener {
private static final Logger log = LoggerFactory.getLogger(P2PService.class);
private final SeedNodesRepository seedNodesRepository;
@ -56,6 +58,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// set in init
private NetworkNode networkNode;
private Broadcaster broadcaster;
private P2PDataStorage p2PDataStorage;
private PeerManager peerManager;
private RequestDataManager requestDataManager;
@ -118,7 +121,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
networkNode.addConnectionListener(this);
networkNode.addMessageListener(this);
Broadcaster broadcaster = new Broadcaster(networkNode);
broadcaster = new Broadcaster(networkNode);
p2PDataStorage = new P2PDataStorage(broadcaster, networkNode, storageDir);
p2PDataStorage.addHashMapChangedListener(this);
@ -455,77 +458,110 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
}
}
public void sendEncryptedMailboxMessage(NodeAddress peerNodeAddress, PubKeyRing peersPubKeyRing,
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
public void sendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing,
MailboxMessage message,
SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall("message " + message);
checkNotNull(peerNodeAddress, "PeerAddress must not be null (sendEncryptedMailboxMessage)");
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkArgument(!optionalKeyRing.get().getPubKeyRing().equals(peersPubKeyRing), "We got own keyring instead of that from peer");
checkNotNull(peersNodeAddress,
"PeerAddress must not be null (sendEncryptedMailboxMessage)");
checkNotNull(networkNode.getNodeAddress(),
"My node address must not be null at sendEncryptedMailboxMessage");
checkArgument(optionalKeyRing.isPresent(),
"keyRing not set. Seems that is called on a seed node which must not happen.");
checkArgument(!optionalKeyRing.get().getPubKeyRing().equals(peersPubKeyRing),
"We got own keyring instead of that from peer");
checkArgument(optionalEncryptionService.isPresent(),
"EncryptionService not set. Seems that is called on a seed node which must not happen.");
if (isNetworkReady()) {
trySendEncryptedMailboxMessage(peerNodeAddress, peersPubKeyRing, message, sendMailboxMessageListener);
if (!networkNode.getAllConnections().isEmpty()) {
try {
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Encrypt message:\nmessage={}"
+ "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", message);
DirectMessage directMessage = new DirectMessage(
networkNode.getNodeAddress(),
optionalEncryptionService.get().encryptAndSign(peersPubKeyRing, message),
peersNodeAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, directMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("SendEncryptedMailboxMessage onSuccess");
sendMailboxMessageListener.onArrived();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.trace("SendEncryptedMailboxMessage onFailure");
log.debug(throwable.toString());
log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox.");
log.trace("create MailboxEntry with peerAddress " + peersNodeAddress);
PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey();
addMailboxData(new ExpirableMailboxPayload(directMessage,
optionalKeyRing.get().getSignatureKeyPair().getPublic(),
receiverStoragePublicKey),
receiverStoragePublicKey,
sendMailboxMessageListener);
}
});
} catch (CryptoException e) {
log.error("sendEncryptedMessage failed");
e.printStackTrace();
sendMailboxMessageListener.onFault("Data already exist in our local database");
}
} else {
sendMailboxMessageListener.onFault("There are no P2P network nodes connected. " +
"Please check your internet connection.");
}
} else {
throw new NetworkNotReadyException();
}
}
// send message and if it fails (peer offline) we store the data to the network
private void trySendEncryptedMailboxMessage(NodeAddress peersNodeAddress, PubKeyRing peersPubKeyRing,
MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall();
checkNotNull(networkNode.getNodeAddress(), "My node address must not be null at trySendEncryptedMailboxMessage");
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkArgument(optionalEncryptionService.isPresent(), "EncryptionService not set. Seems that is called on a seed node which must not happen.");
checkNotNull(networkNode.getNodeAddress(), "networkNode.getNodeAddress() must not be null.");
try {
log.info("\n\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n" +
"Encrypt message:\nmessage={}"
+ "\n>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>\n", message);
DirectMessage directMessage = new DirectMessage(
networkNode.getNodeAddress(),
optionalEncryptionService.get().encryptAndSign(peersPubKeyRing, message),
peersNodeAddress.getAddressPrefixHash());
SettableFuture<Connection> future = networkNode.sendMessage(peersNodeAddress, directMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("SendEncryptedMailboxMessage onSuccess");
sendMailboxMessageListener.onArrived();
}
@Override
public void onFailure(@NotNull Throwable throwable) {
log.trace("SendEncryptedMailboxMessage onFailure");
log.debug(throwable.toString());
log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox.");
log.trace("create MailboxEntry with peerAddress " + peersNodeAddress);
PublicKey receiverStoragePublicKey = peersPubKeyRing.getSignaturePubKey();
addMailboxData(new ExpirableMailboxPayload(directMessage,
optionalKeyRing.get().getSignatureKeyPair().getPublic(),
receiverStoragePublicKey),
receiverStoragePublicKey);
sendMailboxMessageListener.onStoredInMailbox();
}
});
} catch (CryptoException e) {
log.error("sendEncryptedMessage failed");
e.printStackTrace();
sendMailboxMessageListener.onFault();
}
}
private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload, PublicKey receiversPublicKey) {
private void addMailboxData(ExpirableMailboxPayload expirableMailboxPayload,
PublicKey receiversPublicKey,
SendMailboxMessageListener sendMailboxMessageListener) {
Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
checkArgument(optionalKeyRing.isPresent(),
"keyRing not set. Seems that is called on a seed node which must not happen.");
if (isNetworkReady()) {
try {
ProtectedMailboxData protectedMailboxData = p2PDataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
optionalKeyRing.get().getSignatureKeyPair(),
receiversPublicKey);
p2PDataStorage.add(protectedMailboxData, networkNode.getNodeAddress());
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
if (!networkNode.getAllConnections().isEmpty()) {
try {
ProtectedMailboxData protectedMailboxData = p2PDataStorage.getMailboxDataWithSignedSeqNr(
expirableMailboxPayload,
optionalKeyRing.get().getSignatureKeyPair(),
receiversPublicKey);
Timer sendMailboxMessageTimeoutTimer = UserThread.runAfter(() -> {
boolean result = p2PDataStorage.remove(protectedMailboxData, networkNode.getNodeAddress());
log.debug("remove result=" + result);
sendMailboxMessageListener.onFault("A timeout occurred when trying to broadcast mailbox data.");
}, 30);
broadcaster.addOneTimeListener(message -> {
if (message instanceof AddDataMessage &&
((AddDataMessage) message).data.equals(protectedMailboxData)) {
sendMailboxMessageListener.onStoredInMailbox();
sendMailboxMessageTimeoutTimer.cancel();
}
});
boolean result = p2PDataStorage.add(protectedMailboxData, networkNode.getNodeAddress());
if (!result) {
sendMailboxMessageTimeoutTimer.cancel();
sendMailboxMessageListener.onFault("Data already exists in our local database");
boolean result2 = p2PDataStorage.remove(protectedMailboxData, networkNode.getNodeAddress());
log.debug("remove result=" + result2);
}
} catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
}
} else {
sendMailboxMessageListener.onFault("There are no P2P network nodes connected. " +
"Please check your internet connection.");
}
} else {
throw new NetworkNotReadyException();
@ -651,8 +687,6 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
///////////////////////////////////////////////////////////////////////////////////////////
public boolean isNetworkReady() {
log.debug("###### isNetworkReady networkReadyBinding " + networkReadyBinding.get());
log.debug("###### isNetworkReady hiddenServicePublished.get() && preliminaryDataReceived.get() " + (hiddenServicePublished.get() && preliminaryDataReceived.get()));
return hiddenServicePublished.get() && preliminaryDataReceived.get();
}

View File

@ -5,5 +5,5 @@ public interface SendMailboxMessageListener {
void onStoredInMailbox();
void onFault();
void onFault(String errorMessage);
}

View File

@ -14,10 +14,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
public class Broadcaster {
private static final Logger log = LoggerFactory.getLogger(Broadcaster.class);
public interface Listener {
void onBroadcasted(DataBroadcastMessage message);
}
private final NetworkNode networkNode;
private final Set<Listener> listeners = new CopyOnWriteArraySet<>();
public Broadcaster(NetworkNode networkNode) {
this.networkNode = networkNode;
@ -38,6 +46,10 @@ public class Broadcaster {
@Override
public void onSuccess(Connection connection) {
log.trace("Broadcast from " + networkNode.getNodeAddress() + " to " + connection + " succeeded.");
listeners.stream().forEach(listener -> {
listener.onBroadcasted(message);
listeners.remove(listener);
});
}
@Override
@ -51,4 +63,10 @@ public class Broadcaster {
"message = {}", message);
}
}
// That listener gets immediately removed after the handler is called
public void addOneTimeListener(Listener listener) {
listeners.add(listener);
}
}

View File

@ -317,7 +317,7 @@ public class P2PServiceTest {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
log.error("onFault");
}
}
@ -353,7 +353,7 @@ public class P2PServiceTest {
}
@Override
public void onFault() {
public void onFault(String errorMessage) {
log.error("onFault");
}
}