Fix mailbox behaviour, renamings

This commit is contained in:
Manfred Karrer 2016-01-27 01:56:56 +01:00
parent 7b87c39ffd
commit a91822803f
24 changed files with 122 additions and 111 deletions

View file

@ -26,7 +26,7 @@ import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.common.util.Utilities;
import io.bitsquare.p2p.NetWorkReadyListener;
import io.bitsquare.p2p.BootstrapListener;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.storage.HashMapChangedListener;
@ -89,7 +89,7 @@ public class ArbitratorManager {
));
private static final String publicKeyForTesting = "027a381b5333a56e1cc3d90d3a7d07f26509adf7029ed06fc997c656621f8da1ee";
private final boolean isDevTest;
private NetWorkReadyListener netWorkReadyListener;
private BootstrapListener bootstrapListener;
private ScheduledThreadPoolExecutor republishArbitratorExecutor;
@Inject
@ -121,14 +121,14 @@ public class ArbitratorManager {
if (user.getRegisteredArbitrator() != null) {
P2PService p2PService = arbitratorService.getP2PService();
if (!p2PService.isNetworkReady()) {
netWorkReadyListener = new NetWorkReadyListener() {
if (!p2PService.isBootstrapped()) {
bootstrapListener = new BootstrapListener() {
@Override
public void onBootstrapped() {
public void onBootstrapComplete() {
republishArbitrator();
}
};
p2PService.addP2PServiceListener(netWorkReadyListener);
p2PService.addP2PServiceListener(bootstrapListener);
} else {
republishArbitrator();
@ -144,8 +144,8 @@ public class ArbitratorManager {
}
private void republishArbitrator() {
if (netWorkReadyListener != null)
arbitratorService.getP2PService().removeP2PServiceListener(netWorkReadyListener);
if (bootstrapListener != null)
arbitratorService.getP2PService().removeP2PServiceListener(bootstrapListener);
Arbitrator registeredArbitrator = user.getRegisteredArbitrator();
if (registeredArbitrator != null) {

View file

@ -27,8 +27,8 @@ import io.bitsquare.btc.exceptions.TransactionVerificationException;
import io.bitsquare.btc.exceptions.WalletException;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.crypto.PubKeyRing;
import io.bitsquare.p2p.BootstrapListener;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NetWorkReadyListener;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.messaging.DecryptedMsgWithPubKey;
@ -68,7 +68,7 @@ public class DisputeManager {
private final DisputeList<Dispute> disputes;
transient private final ObservableList<Dispute> disputesObservableList;
private final String disputeInfo;
private final NetWorkReadyListener netWorkReadyListener;
private final BootstrapListener bootstrapListener;
private final CopyOnWriteArraySet<DecryptedMsgWithPubKey> decryptedMailboxMessageWithPubKeys = new CopyOnWriteArraySet<>();
private final CopyOnWriteArraySet<DecryptedMsgWithPubKey> decryptedDirectMessageWithPubKeys = new CopyOnWriteArraySet<>();
@ -105,24 +105,25 @@ public class DisputeManager {
"Please read more in detail about the dispute process in our wiki:\nhttps://github" +
".com/bitsquare/bitsquare/wiki/Dispute-process";
// We get first the message handler called then the onBootstrapped
p2PService.addDecryptedDirectMessageListener((decryptedMessageWithPubKey, senderAddress) -> {
decryptedDirectMessageWithPubKeys.add(decryptedMessageWithPubKey);
if (p2PService.isNetworkReady())
if (p2PService.isBootstrapped())
applyMessages();
});
p2PService.addDecryptedMailboxListener((decryptedMessageWithPubKey, senderAddress) -> {
decryptedMailboxMessageWithPubKeys.add(decryptedMessageWithPubKey);
if (p2PService.isNetworkReady())
if (p2PService.isBootstrapped())
applyMessages();
});
netWorkReadyListener = new NetWorkReadyListener() {
bootstrapListener = new BootstrapListener() {
@Override
public void onBootstrapped() {
public void onBootstrapComplete() {
applyMessages();
}
};
p2PService.addP2PServiceListener(netWorkReadyListener);
p2PService.addP2PServiceListener(bootstrapListener);
}
private void applyMessages() {
@ -138,13 +139,12 @@ public class DisputeManager {
log.debug("decryptedMessageWithPubKey.message " + message);
if (message instanceof DisputeMessage) {
dispatchMessage((DisputeMessage) message);
//TODO
//p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey);
p2PService.removeEntryFromMailbox(decryptedMessageWithPubKey);
}
});
decryptedMailboxMessageWithPubKeys.clear();
p2PService.removeP2PServiceListener(netWorkReadyListener);
p2PService.removeP2PServiceListener(bootstrapListener);
}

View file

@ -26,8 +26,8 @@ import io.bitsquare.btc.WalletService;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.handlers.FaultHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.p2p.BootstrapListener;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NetWorkReadyListener;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.messaging.DecryptedDirectMessageListener;
@ -80,7 +80,7 @@ public class TradeManager {
private final Storage<TradableList<Trade>> tradableListStorage;
private final TradableList<Trade> trades;
private final BooleanProperty pendingTradesInitialized = new SimpleBooleanProperty();
private final NetWorkReadyListener netWorkReadyListener;
private final BootstrapListener bootstrapListener;
///////////////////////////////////////////////////////////////////////////////////////////
@ -141,21 +141,23 @@ public class TradeManager {
log.trace("Received TradeMessage: " + message);
String tradeId = ((TradeMessage) message).tradeId;
Optional<Trade> tradeOptional = trades.stream().filter(e -> e.getId().equals(tradeId)).findAny();
// The mailbox message will be removed inside the tasks after they are processed successfully
if (tradeOptional.isPresent())
tradeOptional.get().setMailboxMessage(decryptedMsgWithPubKey);
}
}
});
netWorkReadyListener = new NetWorkReadyListener() {
bootstrapListener = new BootstrapListener() {
@Override
public void onBootstrapped() {
public void onBootstrapComplete() {
Log.traceCall("onNetworkReady");
// Get called after onMailboxMessageAdded from initial data request
// The mailbox message will be removed inside the tasks after they are processed successfully
initPendingTrades();
}
};
p2PService.addP2PServiceListener(netWorkReadyListener);
p2PService.addP2PServiceListener(bootstrapListener);
}
@ -165,7 +167,7 @@ public class TradeManager {
private void initPendingTrades() {
Log.traceCall();
if (netWorkReadyListener != null) p2PService.removeP2PServiceListener(netWorkReadyListener);
p2PService.removeP2PServiceListener(bootstrapListener);
//List<Trade> failedTrades = new ArrayList<>();
for (Trade trade : trades) {
@ -180,13 +182,7 @@ public class TradeManager {
trade.updateDepositTxFromWallet(tradeWalletService);
initTrade(trade);
// after network is ready we remove mailbox messages.
DecryptedMsgWithPubKey mailboxMessage = trade.getMailboxMessage();
if (mailboxMessage != null) {
log.trace("initPendingTrades/removeEntryFromMailbox mailboxMessage = " + mailboxMessage);
p2PService.removeEntryFromMailbox(mailboxMessage);
trade.setMailboxMessage(null);
}
// }
}
pendingTradesInitialized.set(true);

View file

@ -24,8 +24,8 @@ import io.bitsquare.common.UserThread;
import io.bitsquare.common.crypto.KeyRing;
import io.bitsquare.common.handlers.ErrorMessageHandler;
import io.bitsquare.common.handlers.ResultHandler;
import io.bitsquare.p2p.BootstrapListener;
import io.bitsquare.p2p.Message;
import io.bitsquare.p2p.NetWorkReadyListener;
import io.bitsquare.p2p.NodeAddress;
import io.bitsquare.p2p.P2PService;
import io.bitsquare.p2p.messaging.SendDirectMessageListener;
@ -67,7 +67,7 @@ public class OpenOfferManager {
private final TradableList<OpenOffer> openOffers;
private final Storage<TradableList<OpenOffer>> openOffersStorage;
private boolean shutDownRequested;
private NetWorkReadyListener netWorkReadyListener;
private BootstrapListener bootstrapListener;
private final Timer timer = new Timer();
///////////////////////////////////////////////////////////////////////////////////////////
@ -126,14 +126,14 @@ public class OpenOfferManager {
// Before the TTL is reached we re-publish our offers
// If offer removal at shutdown fails we don't want to have long term dangling dead offers, so we set
// TTL quite short and use re-publish as strategy. Offerers need to be online anyway.
if (!p2PService.isNetworkReady()) {
netWorkReadyListener = new NetWorkReadyListener() {
if (!p2PService.isBootstrapped()) {
bootstrapListener = new BootstrapListener() {
@Override
public void onBootstrapped() {
public void onBootstrapComplete() {
startRePublishThread();
}
};
p2PService.addP2PServiceListener(netWorkReadyListener);
p2PService.addP2PServiceListener(bootstrapListener);
} else {
startRePublishThread();
@ -141,8 +141,8 @@ public class OpenOfferManager {
}
private void startRePublishThread() {
if (netWorkReadyListener != null)
p2PService.removeP2PServiceListener(netWorkReadyListener);
if (bootstrapListener != null)
p2PService.removeP2PServiceListener(bootstrapListener);
long period = (long) (Offer.TTL * 0.8); // republish sufficiently before offer would expire
TimerTask timerTask = new TimerTask() {