Add param for using mailbox if msg send failes

This commit is contained in:
Manfred Karrer 2015-05-20 00:38:52 +02:00
parent f971a787ba
commit 5cc6c5c552
12 changed files with 31 additions and 5 deletions

View file

@ -23,7 +23,7 @@ import io.bitsquare.p2p.listener.SendMessageListener;
public interface MessageService extends P2PService { public interface MessageService extends P2PService {
void sendEncryptedMessage(Peer peer, PubKeyRing pubKeyRing, Message message, SendMessageListener listener); void sendEncryptedMessage(Peer peer, PubKeyRing pubKeyRing, Message message, boolean useMailboxIfUnreachable, SendMessageListener listener);
void addMessageHandler(MessageHandler listener); void addMessageHandler(MessageHandler listener);

View file

@ -200,7 +200,10 @@ public class BootstrappedPeerBuilder {
peer.peerBean().peerMap().addPeerMapChangeListener(new PeerMapChangeListener() { peer.peerBean().peerMap().addPeerMapChangeListener(new PeerMapChangeListener() {
@Override @Override
public void peerInserted(PeerAddress peerAddress, boolean verified) { public void peerInserted(PeerAddress peerAddress, boolean verified) {
if (verified)
log.debug("Peer inserted: peerAddress=" + peerAddress + ", verified=" + verified); log.debug("Peer inserted: peerAddress=" + peerAddress + ", verified=" + verified);
else
log.trace("Peer inserted: peerAddress=" + peerAddress + ", verified=" + verified);
} }
@Override @Override

View file

@ -72,7 +72,7 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
} }
@Override @Override
public void sendEncryptedMessage(Peer peer, PubKeyRing pubKeyRing, Message message, SendMessageListener listener) { public void sendEncryptedMessage(Peer peer, PubKeyRing pubKeyRing, Message message, boolean useMailboxIfUnreachable, SendMessageListener listener) {
assert pubKeyRing != null; assert pubKeyRing != null;
log.debug("sendMessage called"); log.debug("sendMessage called");
@ -97,16 +97,30 @@ public class TomP2PMessageService extends TomP2PService implements MessageServic
else { else {
log.info("sendMessage failed. We will try to send the message to the mailbox. Fault reason: " + log.info("sendMessage failed. We will try to send the message to the mailbox. Fault reason: " +
futureDirect.failedReason()); futureDirect.failedReason());
if (useMailboxIfUnreachable) {
sendMailboxMessage(pubKeyRing, (SealedAndSignedMessage) encryptedMessage, listener); sendMailboxMessage(pubKeyRing, (SealedAndSignedMessage) encryptedMessage, listener);
} }
else {
openRequestsDown();
log.error("Send message was not successful");
executor.execute(listener::handleFault);
}
}
} }
@Override @Override
public void exceptionCaught(Throwable t) throws Exception { public void exceptionCaught(Throwable t) throws Exception {
log.info("sendMessage failed with exception. We will try to send the message to the mailbox. Exception: " log.info("sendMessage failed with exception. We will try to send the message to the mailbox. Exception: "
+ t.getMessage()); + t.getMessage());
if (useMailboxIfUnreachable) {
sendMailboxMessage(pubKeyRing, (SealedAndSignedMessage) encryptedMessage, listener); sendMailboxMessage(pubKeyRing, (SealedAndSignedMessage) encryptedMessage, listener);
} }
else {
openRequestsDown();
log.error("Send message was not successful");
executor.execute(listener::handleFault);
}
}
} }
); );
} catch (Throwable t) { } catch (Throwable t) {

View file

@ -268,6 +268,7 @@ public class OpenOfferManager {
messageService.sendEncryptedMessage(sender, messageService.sendEncryptedMessage(sender,
message.getPubKeyRing(), message.getPubKeyRing(),
offerAvailabilityResponse, offerAvailabilityResponse,
false,
new SendMessageListener() { new SendMessageListener() {
@Override @Override
public void handleResult() { public void handleResult() {

View file

@ -42,6 +42,7 @@ public class SendOfferAvailabilityRequest extends Task<OfferAvailabilityModel> {
model.messageService.sendEncryptedMessage(model.getPeer(), model.messageService.sendEncryptedMessage(model.getPeer(),
model.offer.getPubKeyRing(), model.offer.getPubKeyRing(),
message, message,
false,
new SendMessageListener() { new SendMessageListener() {
@Override @Override
public void handleResult() { public void handleResult() {

View file

@ -44,6 +44,7 @@ public class SendDepositTxPublishedMessage extends TradeTask {
trade.getTradingPeer(), trade.getTradingPeer(),
processModel.tradingPeer.getPubKeyRing(), processModel.tradingPeer.getPubKeyRing(),
tradeMessage, tradeMessage,
true,
new SendMessageListener() { new SendMessageListener() {
@Override @Override
public void handleResult() { public void handleResult() {

View file

@ -46,6 +46,7 @@ public class SendFiatTransferStartedMessage extends TradeTask {
trade.getTradingPeer(), trade.getTradingPeer(),
processModel.tradingPeer.getPubKeyRing(), processModel.tradingPeer.getPubKeyRing(),
tradeMessage, tradeMessage,
true,
new SendMessageListener() { new SendMessageListener() {
@Override @Override
public void handleResult() { public void handleResult() {

View file

@ -55,6 +55,7 @@ public class SendPayDepositRequest extends TradeTask {
trade.getTradingPeer(), trade.getTradingPeer(),
processModel.tradingPeer.getPubKeyRing(), processModel.tradingPeer.getPubKeyRing(),
tradeMessage, tradeMessage,
false,
new SendMessageListener() { new SendMessageListener() {
@Override @Override
public void handleResult() { public void handleResult() {

View file

@ -43,6 +43,7 @@ public class SendPayoutTxFinalizedMessage extends TradeTask {
trade.getTradingPeer(), trade.getTradingPeer(),
processModel.tradingPeer.getPubKeyRing(), processModel.tradingPeer.getPubKeyRing(),
tradeMessage, tradeMessage,
true,
new SendMessageListener() { new SendMessageListener() {
@Override @Override
public void handleResult() { public void handleResult() {

View file

@ -53,6 +53,7 @@ public class SendDepositTxInputsRequest extends TradeTask {
trade.getTradingPeer(), trade.getTradingPeer(),
processModel.tradingPeer.getPubKeyRing(), processModel.tradingPeer.getPubKeyRing(),
message, message,
false,
new SendMessageListener() { new SendMessageListener() {
@Override @Override
public void handleResult() { public void handleResult() {

View file

@ -49,6 +49,7 @@ public class SendFinalizePayoutTxRequest extends TradeTask {
trade.getTradingPeer(), trade.getTradingPeer(),
processModel.tradingPeer.getPubKeyRing(), processModel.tradingPeer.getPubKeyRing(),
message, message,
true,
new SendMessageListener() { new SendMessageListener() {
@Override @Override
public void handleResult() { public void handleResult() {

View file

@ -53,6 +53,7 @@ public class SendPublishDepositTxRequest extends TradeTask {
trade.getTradingPeer(), trade.getTradingPeer(),
processModel.tradingPeer.getPubKeyRing(), processModel.tradingPeer.getPubKeyRing(),
tradeMessage, tradeMessage,
false,
new SendMessageListener() { new SendMessageListener() {
@Override @Override
public void handleResult() { public void handleResult() {