diff --git a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java index f271491e4b..b0f8bd2838 100644 --- a/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java +++ b/gui/src/main/java/io/bitsquare/gui/main/settings/network/NetworkSettingsView.java @@ -26,13 +26,11 @@ import io.bitsquare.gui.common.view.ActivatableViewAndModel; import io.bitsquare.gui.common.view.FxmlView; import io.bitsquare.gui.main.popups.Popup; import io.bitsquare.gui.util.BSFormatter; -import io.bitsquare.p2p.NodeAddress; import io.bitsquare.p2p.P2PService; -import io.bitsquare.p2p.P2PServiceListener; import io.bitsquare.p2p.network.Statistic; import io.bitsquare.user.Preferences; -import javafx.beans.value.ChangeListener; import javafx.collections.FXCollections; +import javafx.collections.ObservableList; import javafx.fxml.FXML; import javafx.geometry.Insets; import javafx.geometry.VPos; @@ -41,6 +39,7 @@ import javafx.scene.layout.GridPane; import javafx.util.StringConverter; import org.bitcoinj.core.Peer; import org.fxmisc.easybind.EasyBind; +import org.fxmisc.easybind.Subscription; import org.reactfx.util.FxTimer; import javax.inject.Inject; @@ -69,16 +68,17 @@ public class NetworkSettingsView extends ActivatableViewAndModel p2PPeerTable; + TableView p2PPeerTable; @FXML - TableColumn onionAddressColumn, connectionTypeColumn, creationDateColumn, + TableColumn onionAddressColumn, connectionTypeColumn, creationDateColumn, lastActivityColumn, sentBytesColumn, receivedBytesColumn, peerTypeColumn; /* TableColumn onionAddressColumn, connectionTypeColumn, creationDateColumn, lastActivityColumn, sentBytesColumn, receivedBytesColumn, peerTypeColumn; */ - private P2PServiceListener p2PServiceListener; - private ChangeListener numP2PPeersChangeListener; - private ChangeListener> bitcoinPeersChangeListener; + private Subscription numP2PPeersSubscription; + private Subscription bitcoinPeersSubscription; + private Subscription nodeAddressSubscription; + private ObservableList networkListItems = FXCollections.observableArrayList(); @Inject public NetworkSettingsView(WalletService walletService, P2PService p2PService, Preferences preferences, Clock clock, @@ -111,36 +111,6 @@ public class NetworkSettingsView extends ActivatableViewAndModel ((Integer) o1.statistic.getReceivedBytes()).compareTo(((Integer) o2.statistic.getReceivedBytes())));*/ - } @Override @@ -175,54 +144,43 @@ public class NetworkSettingsView extends ActivatableViewAndModel useTorCheckBox.setSelected(!selected)) .show(); - } }); + bitcoinPeersSubscription = EasyBind.subscribe(walletService.connectedPeersProperty(), connectedPeers -> updateBitcoinPeersTextArea()); - NodeAddress nodeAddress = p2PService.getAddress(); - if (nodeAddress == null) { - p2PService.addP2PServiceListener(p2PServiceListener); - } else { - onionAddress.setText(nodeAddress.getFullAddress()); - } - - bitcoinPeersChangeListener = (observable, oldValue, newValue) -> updateBitcoinPeersTextArea(); - walletService.connectedPeersProperty().addListener(bitcoinPeersChangeListener); - updateBitcoinPeersTextArea(); - - numP2PPeersChangeListener = (observable, oldValue, newValue) -> updateP2PStatistics(); - p2PService.getNumConnectedPeers().addListener(numP2PPeersChangeListener); - updateP2PStatistics(); - + nodeAddressSubscription = EasyBind.subscribe(p2PService.getNetworkNode().nodeAddressProperty(), + nodeAddress -> onionAddress.setText(nodeAddress == null ? "Not known yet..." : p2PService.getAddress().getFullAddress())); + numP2PPeersSubscription = EasyBind.subscribe(p2PService.getNumConnectedPeers(), numPeers -> updateP2PTable()); totalTraffic.textProperty().bind(EasyBind.combine(Statistic.totalSentBytesProperty(), Statistic.totalReceivedBytesProperty(), (sent, received) -> "Sent: " + formatter.formatBytes((int) sent) + ", received: " + formatter.formatBytes((int) received))); + + p2PPeerTable.setItems(networkListItems); + p2PPeerTable.sort(); } @Override public void deactivate() { useTorCheckBox.setOnAction(null); - if (p2PServiceListener != null) - p2PService.removeP2PServiceListener(p2PServiceListener); + if (nodeAddressSubscription != null) + nodeAddressSubscription.unsubscribe(); - if (bitcoinPeersChangeListener != null) - walletService.connectedPeersProperty().removeListener(bitcoinPeersChangeListener); + if (bitcoinPeersSubscription != null) + bitcoinPeersSubscription.unsubscribe(); - if (numP2PPeersChangeListener != null) - p2PService.getNumConnectedPeers().removeListener(numP2PPeersChangeListener); + if (numP2PPeersSubscription != null) + numP2PPeersSubscription.unsubscribe(); - p2PPeerTable.getItems().forEach(NetworkStatisticListItem::cleanup); + p2PPeerTable.getItems().forEach(P2pNetworkListItem::cleanup); totalTraffic.textProperty().unbind(); } - private void updateP2PStatistics() { - p2PPeerTable.getItems().forEach(NetworkStatisticListItem::cleanup); - - List list = p2PService.getNetworkNode().getConfirmedConnections().stream() - .map(connection -> new NetworkStatisticListItem(connection, clock, formatter)) - .collect(Collectors.toList()); - p2PPeerTable.setItems(FXCollections.observableArrayList(list)); - p2PPeerTable.sort(); + private void updateP2PTable() { + p2PPeerTable.getItems().forEach(P2pNetworkListItem::cleanup); + networkListItems.clear(); + networkListItems.setAll(p2PService.getNetworkNode().getAllConnections().stream() + .map(connection -> new P2pNetworkListItem(connection, clock, formatter)) + .collect(Collectors.toList())); } private void updateBitcoinPeersTextArea() { diff --git a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java index 0d66a6c51e..dafcd47e4a 100644 --- a/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java +++ b/network/src/main/java/io/bitsquare/p2p/network/NetworkNode.java @@ -6,6 +6,9 @@ import io.bitsquare.common.UserThread; import io.bitsquare.common.util.Utilities; import io.bitsquare.p2p.Message; import io.bitsquare.p2p.NodeAddress; +import javafx.beans.property.ObjectProperty; +import javafx.beans.property.ReadOnlyObjectProperty; +import javafx.beans.property.SimpleObjectProperty; import org.apache.commons.lang3.StringUtils; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -26,7 +29,7 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkNotNull; // Run in UserThread -public abstract class NetworkNode implements MessageListener, ConnectionListener { +public abstract class NetworkNode implements MessageListener { private static final Logger log = LoggerFactory.getLogger(NetworkNode.class); private static final int CREATE_SOCKET_TIMEOUT_MILLIS = 5000; @@ -42,6 +45,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener private volatile boolean shutDownInProgress; // accessed from different threads private final CopyOnWriteArraySet outBoundConnections = new CopyOnWriteArraySet<>(); + protected final ObjectProperty nodeAddressProperty = new SimpleObjectProperty<>(); /////////////////////////////////////////////////////////////////////////////////////////// @@ -108,8 +112,30 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener existingConnection.sendMessage(message); return existingConnection; } else { - outboundConnection = new OutboundConnection(socket, NetworkNode.this, NetworkNode.this, peersNodeAddress); - outBoundConnections.add(outboundConnection); + outboundConnection = new OutboundConnection(socket, + NetworkNode.this, + new ConnectionListener() { + @Override + public void onConnection(Connection connection) { + outBoundConnections.add((OutboundConnection) connection); + printOutBoundConnections(); + connectionListeners.stream().forEach(e -> e.onConnection(connection)); + } + + @Override + public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { + log.trace("onDisconnect connectionListener\n\tconnection={}" + connection); + printOutBoundConnections(); + outBoundConnections.remove(connection); + + connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection)); + } + + @Override + public void onError(Throwable throwable) { + connectionListeners.stream().forEach(e -> e.onError(throwable)); + } + }, peersNodeAddress); log.info("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%\n" + "NetworkNode created new outbound connection:" @@ -204,6 +230,10 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener return resultFuture; } + public ReadOnlyObjectProperty nodeAddressProperty() { + return nodeAddressProperty; + } + public Set getAllConnections() { // Can contain inbound and outbound connections with the same peer node address, // as connection hashcode is using uid and port info @@ -255,33 +285,6 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener } - /////////////////////////////////////////////////////////////////////////////////////////// - // ConnectionListener implementation - /////////////////////////////////////////////////////////////////////////////////////////// - - @Override - public void onConnection(Connection connection) { - connectionListeners.stream().forEach(e -> e.onConnection(connection)); - } - - @Override - public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { - log.trace("onDisconnect connectionListener\n\tconnection={}" + connection); - if (inBoundConnections.contains(connection)) - log.warn("We have the connection in our inBoundConnections. That must not happen as it should be called " + - "from the server listener and get removed from there."); - printOutBoundConnections(); - outBoundConnections.remove(connection); - // inbound connections are removed in the listener of the server - connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection)); - } - - @Override - public void onError(Throwable throwable) { - connectionListeners.stream().forEach(e -> e.onError(throwable)); - } - - /////////////////////////////////////////////////////////////////////////////////////////// // MessageListener implementation /////////////////////////////////////////////////////////////////////////////////////////// @@ -329,33 +332,33 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener /////////////////////////////////////////////////////////////////////////////////////////// void createExecutorService() { - executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 20, 50, 120L); + executorService = Utilities.getListeningExecutorService("NetworkNode-" + servicePort, 50, 100, 2 * 60); } void startServer(ServerSocket serverSocket) { - ConnectionListener connectionListener = new ConnectionListener() { - @Override - public void onConnection(Connection connection) { - inBoundConnections.add((InboundConnection) connection); - NetworkNode.this.onConnection(connection); - } - - @Override - public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { - log.trace("onDisconnect at server socket connectionListener\n\tconnection={}" + connection); - inBoundConnections.remove(connection); - printInboundConnections(); - NetworkNode.this.onDisconnect(closeConnectionReason, connection); - } - - @Override - public void onError(Throwable throwable) { - NetworkNode.this.onError(throwable); - } - }; server = new Server(serverSocket, NetworkNode.this, - connectionListener); + new ConnectionListener() { + @Override + public void onConnection(Connection connection) { + inBoundConnections.add((InboundConnection) connection); + printInboundConnections(); + connectionListeners.stream().forEach(e -> e.onConnection(connection)); + } + + @Override + public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { + log.trace("onDisconnect at server socket connectionListener\n\tconnection={}" + connection); + inBoundConnections.remove(connection); + printInboundConnections(); + connectionListeners.stream().forEach(e -> e.onDisconnect(closeConnectionReason, connection)); + } + + @Override + public void onError(Throwable throwable) { + connectionListeners.stream().forEach(e -> e.onError(throwable)); + } + }); executorService.submit(server); } @@ -392,5 +395,7 @@ public abstract class NetworkNode implements MessageListener, ConnectionListener abstract protected Socket createSocket(NodeAddress peersNodeAddress) throws IOException; @Nullable - abstract public NodeAddress getNodeAddress(); + public NodeAddress getNodeAddress() { + return nodeAddressProperty.get(); + } }