From a134c85f24244e47f17985c302b1b2161a0f7e08 Mon Sep 17 00:00:00 2001 From: Manfred Karrer Date: Thu, 13 Nov 2014 12:12:49 +0100 Subject: [PATCH] Remove double future callback in TomP2PNode, refactoring, cleanup --- .../msg/tomp2p/BootstrappedPeerFactory.java | 9 +- .../msg/tomp2p/TomP2PMessageService.java | 14 +-- .../io/bitsquare/msg/tomp2p/TomP2PNode.java | 119 +++++++++--------- .../offer/tomp2p/TomP2POfferRepository.java | 14 +-- .../bitsquare/util/tomp2p/BaseFutureUtil.java | 28 ----- 5 files changed, 74 insertions(+), 110 deletions(-) delete mode 100644 src/main/java/io/bitsquare/util/tomp2p/BaseFutureUtil.java diff --git a/src/main/java/io/bitsquare/msg/tomp2p/BootstrappedPeerFactory.java b/src/main/java/io/bitsquare/msg/tomp2p/BootstrappedPeerFactory.java index 285350f538..561ecccaa9 100644 --- a/src/main/java/io/bitsquare/msg/tomp2p/BootstrappedPeerFactory.java +++ b/src/main/java/io/bitsquare/msg/tomp2p/BootstrappedPeerFactory.java @@ -21,7 +21,6 @@ import io.bitsquare.network.BootstrapState; import io.bitsquare.network.Node; import io.bitsquare.persistence.Persistence; -import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; import com.google.inject.name.Named; @@ -79,9 +78,9 @@ public class BootstrappedPeerFactory { static final String NETWORK_INTERFACE_UNSPECIFIED = ""; private KeyPair keyPair; - private int port; + private final int port; private final Node bootstrapNode; - private String networkInterface; + private final String networkInterface; private final Persistence persistence; private final SettableFuture settableFuture = SettableFuture.create(); @@ -119,7 +118,7 @@ public class BootstrappedPeerFactory { // Public methods /////////////////////////////////////////////////////////////////////////////////////////// - public ListenableFuture start() { + public SettableFuture start() { try { setState(BootstrapState.PEER_CREATION, "We create a P2P node."); @@ -193,7 +192,7 @@ public class BootstrappedPeerFactory { break; } } catch (IOException e) { - handleError(BootstrapState.PEER_CREATION, "Cannot create peer with port: " + port + ". Exeption: " + e); + handleError(BootstrapState.PEER_CREATION, "Cannot create peer with port: " + port + ". Exception: " + e); } return settableFuture; diff --git a/src/main/java/io/bitsquare/msg/tomp2p/TomP2PMessageService.java b/src/main/java/io/bitsquare/msg/tomp2p/TomP2PMessageService.java index 32406fbeca..f73d6751a9 100644 --- a/src/main/java/io/bitsquare/msg/tomp2p/TomP2PMessageService.java +++ b/src/main/java/io/bitsquare/msg/tomp2p/TomP2PMessageService.java @@ -55,8 +55,6 @@ import net.tomp2p.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.bitsquare.util.tomp2p.BaseFutureUtil.isSuccess; - /** * That service delivers direct messaging and DHT functionality from the TomP2P library @@ -95,7 +93,7 @@ class TomP2PMessageService implements MessageService { public void init(BootstrapListener bootstrapListener) { p2pNode.setMessageBroker(this); p2pNode.setKeyPair(user.getMessageKeyPair()); - p2pNode.start(bootstrapListener); + p2pNode.bootstrap(bootstrapListener); } public void shutDown() { @@ -142,7 +140,7 @@ class TomP2PMessageService implements MessageService { futureDirect.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (isSuccess(futureDirect)) { + if (future.isSuccess()) { Platform.runLater(listener::onResult); } else { @@ -185,7 +183,7 @@ class TomP2PMessageService implements MessageService { } })); - if (isSuccess(addFuture)) { + if (future.isSuccess()) { log.trace("Add arbitrator to DHT was successful. Stored data: [key: " + locationKey + ", " + "values: " + arbitratorData + "]"); } @@ -236,7 +234,7 @@ class TomP2PMessageService implements MessageService { FutureGet futureGet = p2pNode.getDataMap(locationKey); futureGet.addListener(new BaseFutureAdapter() { @Override - public void operationComplete(BaseFuture baseFuture) throws Exception { + public void operationComplete(BaseFuture future) throws Exception { Platform.runLater(() -> arbitratorListeners.stream().forEach(listener -> { List arbitrators = new ArrayList<>(); @@ -254,12 +252,12 @@ class TomP2PMessageService implements MessageService { listener.onArbitratorsReceived(arbitrators); })); - if (isSuccess(baseFuture)) { + if (future.isSuccess()) { log.trace("Get arbitrators from DHT was successful. Stored data: [key: " + locationKey + ", " + "values: " + futureGet.dataMap() + "]"); } else { - log.error("Get arbitrators from DHT failed with reason:" + baseFuture.failedReason()); + log.error("Get arbitrators from DHT failed with reason:" + future.failedReason()); } } }); diff --git a/src/main/java/io/bitsquare/msg/tomp2p/TomP2PNode.java b/src/main/java/io/bitsquare/msg/tomp2p/TomP2PNode.java index a36fe73d6f..5723de6eb2 100644 --- a/src/main/java/io/bitsquare/msg/tomp2p/TomP2PNode.java +++ b/src/main/java/io/bitsquare/msg/tomp2p/TomP2PNode.java @@ -17,13 +17,14 @@ package io.bitsquare.msg.tomp2p; +import io.bitsquare.BitsquareException; import io.bitsquare.msg.MessageBroker; import io.bitsquare.msg.listeners.BootstrapListener; import io.bitsquare.network.tomp2p.TomP2PPeer; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import java.io.IOException; @@ -58,7 +59,7 @@ import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.bitsquare.util.tomp2p.BaseFutureUtil.isSuccess; +import static com.google.common.base.Preconditions.checkNotNull; /** * The fully bootstrapped P2PNode which is responsible himself for his availability in the messaging system. It saves @@ -110,28 +111,42 @@ public class TomP2PNode { bootstrappedPeerFactory.setKeyPair(keyPair); } - public void start(BootstrapListener bootstrapListener) { - setupTimerForIPCheck(); + public void bootstrap(BootstrapListener bootstrapListener) { + checkNotNull(keyPair, "keyPair must not be null."); + checkNotNull(messageBroker, "messageBroker must not be null."); - ListenableFuture bootstrapComplete = bootstrap(); - Futures.addCallback(bootstrapComplete, new FutureCallback() { + bootstrappedPeerFactory.bootstrapState.addListener((ov, oldValue, newValue) -> + bootstrapListener.onBootstrapStateChanged(newValue)); + + SettableFuture bootstrapFuture = bootstrappedPeerFactory.start(); + Futures.addCallback(bootstrapFuture, new FutureCallback() { @Override - public void onSuccess(@Nullable PeerDHT result) { - log.debug("p2pNode.start success result = " + result); - Platform.runLater(bootstrapListener::onCompleted); + public void onSuccess(@Nullable PeerDHT peerDHT) { + if (peerDHT != null) { + TomP2PNode.this.peerDHT = peerDHT; + setup(); + Platform.runLater(bootstrapListener::onCompleted); + } + else { + log.error("Error at bootstrap: peerDHT = null"); + Platform.runLater(() -> bootstrapListener.onFailed( + new BitsquareException("Error at bootstrap: peerDHT = null"))); + } } @Override public void onFailure(@NotNull Throwable t) { - log.error(t.toString()); + log.error("Exception at bootstrap " + t.getMessage()); Platform.runLater(() -> bootstrapListener.onFailed(t)); } }); - - bootstrappedPeerFactory.bootstrapState.addListener((ov, oldValue, newValue) -> - bootstrapListener.onBootstrapStateChanged(newValue)); } + private void setup() { + setupTimerForIPCheck(); + setupReplyHandler(); + storeAddressAfterBootstrap(); + } public void shutDown() { bootstrappedPeerFactory.shutDown(); @@ -144,6 +159,7 @@ public class TomP2PNode { return peerDHT; } + /////////////////////////////////////////////////////////////////////////////////////////// // Generic DHT methods /////////////////////////////////////////////////////////////////////////////////////////// @@ -194,7 +210,7 @@ public class TomP2PNode { futureDirect.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (isSuccess(futureDirect)) { + if (future.isSuccess()) { log.debug("sendMessage completed"); } else { @@ -291,51 +307,6 @@ public class TomP2PNode { // Private /////////////////////////////////////////////////////////////////////////////////////////// - private ListenableFuture bootstrap() { - ListenableFuture bootstrapComplete = bootstrappedPeerFactory.start(); - Futures.addCallback(bootstrapComplete, new FutureCallback() { - @Override - public void onSuccess(@Nullable PeerDHT peerDHT) { - try { - if (peerDHT != null) { - TomP2PNode.this.peerDHT = peerDHT; - setupReplyHandler(); - FuturePut futurePut = storePeerAddress(); - futurePut.addListener(new BaseFutureListener() { - @Override - public void operationComplete(BaseFuture future) throws Exception { - if (isSuccess(futurePut)) { - storedPeerAddress = peerDHT.peerAddress(); - log.debug("storedPeerAddress = " + storedPeerAddress); - } - else { - log.error("storedPeerAddress not successful"); - } - } - - @Override - public void exceptionCaught(Throwable t) throws Exception { - log.error("Error at storedPeerAddress " + t.toString()); - } - }); - } - else { - log.error("peerDHT is null"); - } - } catch (IOException e) { - e.printStackTrace(); - log.error("Error at storePeerAddress " + e.toString()); - } - } - - @Override - public void onFailure(@NotNull Throwable t) { - log.error("onFailure bootstrap " + t.toString()); - } - }); - return bootstrapComplete; - } - private void setupReplyHandler() { peerDHT.peer().objectDataReply((sender, request) -> { log.debug("handleMessage peerAddress " + sender); @@ -360,7 +331,7 @@ public class TomP2PNode { public void run() { if (peerDHT != null && !storedPeerAddress.equals(peerDHT.peerAddress())) { try { - storePeerAddress(); + storeAddress(); } catch (IOException e) { e.printStackTrace(); log.error(e.toString()); @@ -370,7 +341,33 @@ public class TomP2PNode { }, checkIfIPChangedPeriod, checkIfIPChangedPeriod); } - private FuturePut storePeerAddress() throws IOException { + private void storeAddressAfterBootstrap() { + try { + FuturePut futurePut = storeAddress(); + futurePut.addListener(new BaseFutureListener() { + @Override + public void operationComplete(BaseFuture future) throws Exception { + if (future.isSuccess()) { + storedPeerAddress = peerDHT.peerAddress(); + log.debug("storedPeerAddress = " + storedPeerAddress); + } + else { + log.error("storedPeerAddress not successful"); + } + } + + @Override + public void exceptionCaught(Throwable t) throws Exception { + log.error("Error at storedPeerAddress " + t.toString()); + } + }); + } catch (IOException e) { + e.printStackTrace(); + log.error("Error at storePeerAddress " + e.toString()); + } + } + + private FuturePut storeAddress() throws IOException { Number160 locationKey = Utils.makeSHAHash(keyPair.getPublic().getEncoded()); Data data = new Data(new TomP2PPeer(peerDHT.peerAddress())); log.debug("storePeerAddress " + peerDHT.peerAddress().toString()); diff --git a/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferRepository.java b/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferRepository.java index b2962eba86..9fdeb5c52c 100644 --- a/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferRepository.java +++ b/src/main/java/io/bitsquare/offer/tomp2p/TomP2POfferRepository.java @@ -48,8 +48,6 @@ import net.tomp2p.storage.Data; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static io.bitsquare.util.tomp2p.BaseFutureUtil.isSuccess; - class TomP2POfferRepository implements OfferRepository { private static final Logger log = LoggerFactory.getLogger(TomP2POfferRepository.class); @@ -79,7 +77,7 @@ class TomP2POfferRepository implements OfferRepository { futurePut.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (isSuccess(future)) { + if (future.isSuccess()) { Platform.runLater(() -> { resultHandler.handleResult(); offerRepositoryListeners.stream().forEach(listener -> { @@ -168,8 +166,8 @@ class TomP2POfferRepository implements OfferRepository { FutureGet futureGet = p2pNode.getDataMap(locationKey); futureGet.addListener(new BaseFutureAdapter() { @Override - public void operationComplete(BaseFuture baseFuture) throws Exception { - if (isSuccess(baseFuture)) { + public void operationComplete(BaseFuture future) throws Exception { + if (future.isSuccess()) { final Map dataMap = futureGet.dataMap(); final List offers = new ArrayList<>(); if (dataMap != null) { @@ -199,7 +197,7 @@ class TomP2POfferRepository implements OfferRepository { listener.onOffersReceived(new ArrayList<>()))); } else { - log.error("Get offers from DHT was not successful with reason:" + baseFuture.failedReason()); + log.error("Get offers from DHT was not successful with reason:" + future.failedReason()); } } } @@ -234,7 +232,7 @@ class TomP2POfferRepository implements OfferRepository { putFuture.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (isSuccess(putFuture)) + if (future.isSuccess()) log.trace("Update invalidationTimestamp to DHT was successful. TimeStamp=" + invalidationTimestamp.get()); else @@ -261,7 +259,7 @@ class TomP2POfferRepository implements OfferRepository { getFuture.addListener(new BaseFutureListener() { @Override public void operationComplete(BaseFuture future) throws Exception { - if (isSuccess(getFuture)) { + if (future.isSuccess()) { Data data = getFuture.data(); if (data != null && data.object() instanceof Long) { final Object object = data.object(); diff --git a/src/main/java/io/bitsquare/util/tomp2p/BaseFutureUtil.java b/src/main/java/io/bitsquare/util/tomp2p/BaseFutureUtil.java deleted file mode 100644 index 5da1301594..0000000000 --- a/src/main/java/io/bitsquare/util/tomp2p/BaseFutureUtil.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * This file is part of Bitsquare. - * - * Bitsquare is free software: you can redistribute it and/or modify it - * under the terms of the GNU Affero General Public License as published by - * the Free Software Foundation, either version 3 of the License, or (at - * your option) any later version. - * - * Bitsquare is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public - * License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with Bitsquare. If not, see . - */ - -package io.bitsquare.util.tomp2p; - -import net.tomp2p.futures.BaseFuture; - -public class BaseFutureUtil { - - // Isolate the success handling as there is bug in port forwarding mode - public static boolean isSuccess(BaseFuture baseFuture) { - return baseFuture.isSuccess(); - } -}