Fix bug with missing broadcast

This commit is contained in:
Manfred Karrer 2016-02-27 00:44:33 +01:00
parent 891799b09c
commit b511267340
14 changed files with 44 additions and 55 deletions

View file

@ -63,7 +63,7 @@ public class Log {
logbackLogger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME); logbackLogger = loggerContext.getLogger(org.slf4j.Logger.ROOT_LOGGER_NAME);
//TODO for now use always trace //TODO for now use always trace
logbackLogger.setLevel(useDetailedLogging ? Level.TRACE : Level.INFO); logbackLogger.setLevel(useDetailedLogging ? Level.INFO : Level.INFO);
// logbackLogger.setLevel(useDetailedLogging ? Level.TRACE : Level.DEBUG); // logbackLogger.setLevel(useDetailedLogging ? Level.TRACE : Level.DEBUG);
logbackLogger.addAppender(appender); logbackLogger.addAppender(appender);
} }

View file

@ -50,7 +50,7 @@ public class AlertService {
} }
public void addAlertMessage(Alert alert, @Nullable ResultHandler resultHandler, @Nullable ErrorMessageHandler errorMessageHandler) { public void addAlertMessage(Alert alert, @Nullable ResultHandler resultHandler, @Nullable ErrorMessageHandler errorMessageHandler) {
boolean result = p2PService.addData(alert, true, true); boolean result = p2PService.addData(alert, true);
if (result) { if (result) {
log.trace("Add alertMessage to network was successful. AlertMessage = " + alert); log.trace("Add alertMessage to network was successful. AlertMessage = " + alert);
if (resultHandler != null) resultHandler.handleResult(); if (resultHandler != null) resultHandler.handleResult();

View file

@ -59,7 +59,7 @@ public class ArbitratorService {
public void addArbitrator(Arbitrator arbitrator, final ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { public void addArbitrator(Arbitrator arbitrator, final ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
log.debug("addArbitrator arbitrator.hashCode() " + arbitrator.hashCode()); log.debug("addArbitrator arbitrator.hashCode() " + arbitrator.hashCode());
boolean result = p2PService.addData(arbitrator, true, true); boolean result = p2PService.addData(arbitrator, true);
if (result) { if (result) {
log.trace("Add arbitrator to network was successful. Arbitrator.hashCode() = " + arbitrator.hashCode()); log.trace("Add arbitrator to network was successful. Arbitrator.hashCode() = " + arbitrator.hashCode());
resultHandler.handleResult(); resultHandler.handleResult();

View file

@ -84,17 +84,8 @@ public class OfferBookService {
// API // API
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public void republishOffers(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
doAddOffer(offer, resultHandler, errorMessageHandler, true);
}
public void addOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) { public void addOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler) {
doAddOffer(offer, resultHandler, errorMessageHandler, false); boolean result = p2PService.addData(offer, true);
}
private void doAddOffer(Offer offer, ResultHandler resultHandler, ErrorMessageHandler errorMessageHandler, boolean forceBroadcast) {
boolean result = p2PService.addData(offer, forceBroadcast, true);
if (result) { if (result) {
log.trace("Add offer to network was successful. Offer ID = " + offer.getId()); log.trace("Add offer to network was successful. Offer ID = " + offer.getId());
resultHandler.handleResult(); resultHandler.handleResult();

View file

@ -370,7 +370,7 @@ public class OpenOfferManager implements PeerManager.Listener, DecryptedDirectMe
} }
private void republishOffer(OpenOffer openOffer) { private void republishOffer(OpenOffer openOffer) {
offerBookService.republishOffers(openOffer.getOffer(), offerBookService.addOffer(openOffer.getOffer(),
() -> { () -> {
if (!stopped) { if (!stopped) {
log.debug("Successful added offer to P2P network"); log.debug("Successful added offer to P2P network");

View file

@ -84,12 +84,12 @@
<PropertyValueFactory property="creationDate"/> <PropertyValueFactory property="creationDate"/>
</cellValueFactory> </cellValueFactory>
</TableColumn> </TableColumn>
<TableColumn text="Last activity" fx:id="lastActivityColumn" minWidth="100" maxWidth="120"> <!-- <TableColumn text="Last activity" fx:id="lastActivityColumn" minWidth="100" maxWidth="120">
<cellValueFactory> <cellValueFactory>
<PropertyValueFactory property="lastActivity"/> <PropertyValueFactory property="lastActivity"/>
</cellValueFactory> </cellValueFactory>
</TableColumn> </TableColumn>-->
<TableColumn text="RTT" fx:id="roundTripTimeColumn" minWidth="80" maxWidth="80"> <TableColumn text="Roundtrip" fx:id="roundTripTimeColumn" minWidth="80" maxWidth="80">
<cellValueFactory> <cellValueFactory>
<PropertyValueFactory property="roundTripTime"/> <PropertyValueFactory property="roundTripTime"/>
</cellValueFactory> </cellValueFactory>
@ -117,8 +117,8 @@
focusTraversable="false"/> focusTraversable="false"/>
<columnConstraints> <columnConstraints>
<ColumnConstraints hgrow="NEVER" halignment="RIGHT"/> <ColumnConstraints hgrow="ALWAYS" halignment="RIGHT"/>
<ColumnConstraints hgrow="ALWAYS"/> <ColumnConstraints hgrow="SOMETIMES"/>
</columnConstraints> </columnConstraints>
</GridPane> </GridPane>

View file

@ -71,7 +71,7 @@ public class NetworkSettingsView extends ActivatableViewAndModel<GridPane, Activ
TableView<P2pNetworkListItem> p2PPeerTable; TableView<P2pNetworkListItem> p2PPeerTable;
@FXML @FXML
TableColumn<P2pNetworkListItem, String> onionAddressColumn, connectionTypeColumn, creationDateColumn, TableColumn<P2pNetworkListItem, String> onionAddressColumn, connectionTypeColumn, creationDateColumn,
lastActivityColumn, roundTripTimeColumn, sentBytesColumn, receivedBytesColumn, peerTypeColumn; /*lastActivityColumn,*/ roundTripTimeColumn, sentBytesColumn, receivedBytesColumn, peerTypeColumn;
private Subscription numP2PPeersSubscription; private Subscription numP2PPeersSubscription;
private Subscription bitcoinPeersSubscription; private Subscription bitcoinPeersSubscription;
private Subscription nodeAddressSubscription; private Subscription nodeAddressSubscription;

View file

@ -556,7 +556,7 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
public void onBroadcastFailed(String errorMessage) { public void onBroadcastFailed(String errorMessage) {
} }
}; };
boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener, true, true); boolean result = p2PDataStorage.add(protectedMailboxStorageEntry, networkNode.getNodeAddress(), listener, true);
if (!result) { if (!result) {
//TODO remove and add again with a delay to ensure the data will be broadcasted //TODO remove and add again with a delay to ensure the data will be broadcasted
sendMailboxMessageListener.onFault("Data already exists in our local database"); sendMailboxMessageListener.onFault("Data already exists in our local database");
@ -616,13 +616,13 @@ public class P2PService implements SetupListener, MessageListener, ConnectionLis
// Data storage // Data storage
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public boolean addData(StoragePayload storagePayload, boolean forceBroadcast, boolean isDataOwner) { public boolean addData(StoragePayload storagePayload, boolean isDataOwner) {
Log.traceCall(); Log.traceCall();
checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen."); checkArgument(optionalKeyRing.isPresent(), "keyRing not set. Seems that is called on a seed node which must not happen.");
if (isBootstrapped()) { if (isBootstrapped()) {
try { try {
ProtectedStorageEntry protectedStorageEntry = p2PDataStorage.getProtectedData(storagePayload, optionalKeyRing.get().getSignatureKeyPair()); ProtectedStorageEntry protectedStorageEntry = p2PDataStorage.getProtectedData(storagePayload, optionalKeyRing.get().getSignatureKeyPair());
return p2PDataStorage.add(protectedStorageEntry, networkNode.getNodeAddress(), null, forceBroadcast, isDataOwner); return p2PDataStorage.add(protectedStorageEntry, networkNode.getNodeAddress(), null, isDataOwner);
} catch (CryptoException e) { } catch (CryptoException e) {
log.error("Signing at getDataWithSignedSeqNr failed. That should never happen."); log.error("Signing at getDataWithSignedSeqNr failed. That should never happen.");
return false; return false;

View file

@ -2,19 +2,19 @@ package io.bitsquare.p2p.network;
public enum CloseConnectionReason { public enum CloseConnectionReason {
// First block are from different exceptions // First block are from different exceptions
SOCKET_CLOSED(false), SOCKET_CLOSED(false, false),
RESET(false), RESET(false, false),
SOCKET_TIMEOUT(false), SOCKET_TIMEOUT(false, false),
TERMINATED(false), // EOFException TERMINATED(false, false), // EOFException
UNKNOWN_EXCEPTION(false), UNKNOWN_EXCEPTION(false, false),
// Planned // Planned
APP_SHUT_DOWN(true, true), APP_SHUT_DOWN(true, true),
CLOSE_REQUESTED_BY_PEER(false, true), CLOSE_REQUESTED_BY_PEER(false, true),
// send msg // send msg
SEND_MSG_FAILURE(false), SEND_MSG_FAILURE(false, false),
SEND_MSG_TIMEOUT(false), SEND_MSG_TIMEOUT(false, false),
// maintenance // maintenance
TOO_MANY_CONNECTIONS_OPEN(true, true), TOO_MANY_CONNECTIONS_OPEN(true, true),
@ -27,10 +27,6 @@ public enum CloseConnectionReason {
public final boolean sendCloseMessage; public final boolean sendCloseMessage;
public boolean isIntended; public boolean isIntended;
CloseConnectionReason(boolean sendCloseMessage) {
this(sendCloseMessage, true);
}
CloseConnectionReason(boolean sendCloseMessage, boolean isIntended) { CloseConnectionReason(boolean sendCloseMessage, boolean isIntended) {
this.sendCloseMessage = sendCloseMessage; this.sendCloseMessage = sendCloseMessage;
this.isIntended = isIntended; this.isIntended = isIntended;

View file

@ -1,5 +1,6 @@
package io.bitsquare.p2p.network; package io.bitsquare.p2p.network;
import com.google.common.util.concurrent.CycleDetectingLockFactory;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
import io.bitsquare.app.Log; import io.bitsquare.app.Log;
@ -68,6 +69,8 @@ public class Connection implements MessageListener {
return MAX_MSG_SIZE; return MAX_MSG_SIZE;
} }
private static final CycleDetectingLockFactory cycleDetectingLockFactory = CycleDetectingLockFactory.newInstance(CycleDetectingLockFactory.Policies.THROW);
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
// Class fields // Class fields
@ -79,7 +82,7 @@ public class Connection implements MessageListener {
private final String portInfo; private final String portInfo;
private final String uid; private final String uid;
private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor(); private final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
private final ReentrantLock objectOutputStreamLock = new ReentrantLock(true); private final ReentrantLock objectOutputStreamLock = cycleDetectingLockFactory.newReentrantLock("objectOutputStreamLock");
// holder of state shared between InputHandler and Connection // holder of state shared between InputHandler and Connection
private final SharedModel sharedModel; private final SharedModel sharedModel;
private final Statistic statistic; private final Statistic statistic;
@ -104,7 +107,6 @@ public class Connection implements MessageListener {
Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener, Connection(Socket socket, MessageListener messageListener, ConnectionListener connectionListener,
@Nullable NodeAddress peersNodeAddress) { @Nullable NodeAddress peersNodeAddress) {
this.socket = socket; this.socket = socket;
//this.messageListener = messageListener;
this.connectionListener = connectionListener; this.connectionListener = connectionListener;
uid = UUID.randomUUID().toString(); uid = UUID.randomUUID().toString();
statistic = new Statistic(); statistic = new Statistic();

View file

@ -47,7 +47,7 @@ public class PeerManager implements ConnectionListener {
} }
static { static {
setMaxConnections(10); setMaxConnections(12);
} }
private static final int MAX_REPORTED_PEERS = 1000; private static final int MAX_REPORTED_PEERS = 1000;

View file

@ -157,7 +157,7 @@ public class RequestDataHandler implements MessageListener {
"at that moment"); "at that moment");
((GetDataResponse) message).dataSet.stream() ((GetDataResponse) message).dataSet.stream()
.forEach(protectedData -> dataStorage.add(protectedData, .forEach(protectedData -> dataStorage.add(protectedData,
connection.getPeersNodeAddressOptional().get(), null, false, false)); connection.getPeersNodeAddressOptional().get(), null, false));
cleanup(); cleanup();
listener.onComplete(); listener.onComplete();

View file

@ -113,7 +113,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
Log.traceCall(StringUtils.abbreviate(message.toString(), 100) + "\n\tconnection=" + connection); Log.traceCall(StringUtils.abbreviate(message.toString(), 100) + "\n\tconnection=" + connection);
connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> { connection.getPeersNodeAddressOptional().ifPresent(peersNodeAddress -> {
if (message instanceof AddDataMessage) { if (message instanceof AddDataMessage) {
add(((AddDataMessage) message).protectedStorageEntry, peersNodeAddress, null, false, false); add(((AddDataMessage) message).protectedStorageEntry, peersNodeAddress, null, false);
} else if (message instanceof RemoveDataMessage) { } else if (message instanceof RemoveDataMessage) {
remove(((RemoveDataMessage) message).protectedStorageEntry, peersNodeAddress, false); remove(((RemoveDataMessage) message).protectedStorageEntry, peersNodeAddress, false);
} else if (message instanceof RemoveMailboxDataMessage) { } else if (message instanceof RemoveMailboxDataMessage) {
@ -136,7 +136,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
@Override @Override
public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) { public void onDisconnect(CloseConnectionReason closeConnectionReason, Connection connection) {
if (connection.getPeersNodeAddressOptional().isPresent() && !closeConnectionReason.isIntended) { if (connection.hasPeersNodeAddress() && !closeConnectionReason.isIntended) {
map.values().stream() map.values().stream()
.forEach(protectedData -> { .forEach(protectedData -> {
ExpirablePayload expirablePayload = protectedData.getStoragePayload(); ExpirablePayload expirablePayload = protectedData.getStoragePayload();
@ -151,6 +151,8 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
ByteArray hashOfPayload = getHashAsByteArray(expirablePayload); ByteArray hashOfPayload = getHashAsByteArray(expirablePayload);
boolean containsKey = map.containsKey(hashOfPayload); boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) { if (containsKey) {
log.info("We remove the data as the data owner got disconnected with " +
"closeConnectionReason=" + closeConnectionReason);
doRemoveProtectedExpirableData(protectedData, hashOfPayload); doRemoveProtectedExpirableData(protectedData, hashOfPayload);
} else { } else {
log.debug("Remove data ignored as we don't have an entry for that data."); log.debug("Remove data ignored as we don't have an entry for that data.");
@ -172,13 +174,14 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
/////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////
public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender, public boolean add(ProtectedStorageEntry protectedStorageEntry, @Nullable NodeAddress sender,
@Nullable BroadcastHandler.Listener listener, boolean forceBroadcast, boolean isDataOwner) { @Nullable BroadcastHandler.Listener listener, boolean isDataOwner) {
Log.traceCall(); Log.traceCall();
ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload()); ByteArray hashOfPayload = getHashAsByteArray(protectedStorageEntry.getStoragePayload());
boolean sequenceNrValid = isSequenceNrValid(protectedStorageEntry.sequenceNumber, hashOfPayload);
boolean result = checkPublicKeys(protectedStorageEntry, true) boolean result = checkPublicKeys(protectedStorageEntry, true)
&& checkSignature(protectedStorageEntry) && checkSignature(protectedStorageEntry)
&& isSequenceNrValid(protectedStorageEntry.sequenceNumber, hashOfPayload); && sequenceNrValid;
boolean containsKey = map.containsKey(hashOfPayload); boolean containsKey = map.containsKey(hashOfPayload);
if (containsKey) if (containsKey)
@ -197,10 +200,7 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
log.trace(sb.toString()); log.trace(sb.toString());
log.info("Data set after doAdd: size=" + map.values().size()); log.info("Data set after doAdd: size=" + map.values().size());
if (!containsKey || forceBroadcast) broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner);
broadcast(new AddDataMessage(protectedStorageEntry), sender, listener, isDataOwner);
else
log.trace("Not broadcasting data as we had it already in our map.");
hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedStorageEntry)); hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedStorageEntry));
} else { } else {
@ -383,13 +383,13 @@ public class P2PDataStorage implements MessageListener, ConnectionListener {
private boolean isSequenceNrValid(int newSequenceNumber, ByteArray hashOfData) { private boolean isSequenceNrValid(int newSequenceNumber, ByteArray hashOfData) {
if (sequenceNumberMap.containsKey(hashOfData)) { if (sequenceNumberMap.containsKey(hashOfData)) {
Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData).sequenceNr; Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData).sequenceNr;
if (newSequenceNumber < storedSequenceNumber) { if (newSequenceNumber > storedSequenceNumber) {
return true;
} else {
log.info("Sequence number is invalid. sequenceNumber = " log.info("Sequence number is invalid. sequenceNumber = "
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + "\n" + + newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber + "\n" +
"That can happen if the data owner gets an old delayed data storage message."); "That can happen if the data owner gets an old delayed data storage message.");
return false; return false;
} else {
return true;
} }
} else { } else {
return true; return true;

View file

@ -99,7 +99,7 @@ public class ProtectedDataStorageTest {
//@Test //@Test
public void testAddAndRemove() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { public void testAddAndRemove() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null, null, true, true)); Assert.assertTrue(dataStorage1.add(data, null, null, true));
Assert.assertEquals(1, dataStorage1.getMap().size()); Assert.assertEquals(1, dataStorage1.getMap().size());
int newSequenceNumber = data.sequenceNumber + 1; int newSequenceNumber = data.sequenceNumber + 1;
@ -115,7 +115,7 @@ public class ProtectedDataStorageTest {
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5); mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5);
ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
log.debug("data.date " + data.timeStamp); log.debug("data.date " + data.timeStamp);
Assert.assertTrue(dataStorage1.add(data, null, null, true, true)); Assert.assertTrue(dataStorage1.add(data, null, null, true));
log.debug("test 1"); log.debug("test 1");
Assert.assertEquals(1, dataStorage1.getMap().size()); Assert.assertEquals(1, dataStorage1.getMap().size());
@ -163,7 +163,7 @@ public class ProtectedDataStorageTest {
public void testRefreshTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException { public void testRefreshTTL() throws InterruptedException, NoSuchAlgorithmException, CertificateException, KeyStoreException, IOException, CryptoException, SignatureException, InvalidKeyException, NoSuchProviderException {
mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5); mockData.ttl = (int) (P2PDataStorage.CHECK_TTL_INTERVAL_SEC * 1.5);
ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1); ProtectedStorageEntry data = dataStorage1.getProtectedData(mockData, storageSignatureKeyPair1);
Assert.assertTrue(dataStorage1.add(data, null, null, true, true)); Assert.assertTrue(dataStorage1.add(data, null, null, true));
Assert.assertEquals(1, dataStorage1.getMap().size()); Assert.assertEquals(1, dataStorage1.getMap().size());
Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC); Thread.sleep(P2PDataStorage.CHECK_TTL_INTERVAL_SEC);
log.debug("test 1"); log.debug("test 1");