add ui checks for un-authenticated node, fix broadcast issue

This commit is contained in:
Manfred Karrer 2015-10-30 00:42:20 +01:00
parent 88ab1419fa
commit 232c5b46ff
25 changed files with 329 additions and 219 deletions

View file

@ -17,7 +17,7 @@ import io.bitsquare.p2p.routing.Neighbor;
import io.bitsquare.p2p.routing.Routing;
import io.bitsquare.p2p.routing.RoutingListener;
import io.bitsquare.p2p.seed.SeedNodesRepository;
import io.bitsquare.p2p.storage.HashSetChangedListener;
import io.bitsquare.p2p.storage.HashMapChangedListener;
import io.bitsquare.p2p.storage.ProtectedExpirableDataStorage;
import io.bitsquare.p2p.storage.data.ExpirableMailboxPayload;
import io.bitsquare.p2p.storage.data.ExpirablePayload;
@ -48,6 +48,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class P2PService {
private static final Logger log = LoggerFactory.getLogger(P2PService.class);
@Nullable
private final EncryptionService encryptionService;
private final SetupListener setupListener;
private KeyRing keyRing;
@ -84,7 +85,7 @@ public class P2PService {
@Named(ProgramArguments.PORT_KEY) int port,
@Named(ProgramArguments.TOR_DIR) File torDir,
@Named(ProgramArguments.USE_LOCALHOST) boolean useLocalhost,
EncryptionService encryptionService,
@Nullable EncryptionService encryptionService,
KeyRing keyRing,
@Named("storage.dir") File storageDir) {
this.encryptionService = encryptionService;
@ -142,7 +143,27 @@ public class P2PService {
@Override
public void onPeerAddressAuthenticated(Address peerAddress, Connection connection) {
authenticatedPeerAddresses.add(peerAddress);
authenticatedToFirstPeer = true;
if (!authenticatedToFirstPeer) {
authenticatedToFirstPeer = true;
Address address = connection.getPeerAddress();
SettableFuture<Connection> future = sendMessage(address,
new GetDataSetMessage(addToListAndGetNonce()));
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.info("onPeerAddressAuthenticated Send GetAllDataMessage to " + address + " succeeded.");
connectedSeedNodes.add(address);
}
@Override
public void onFailure(Throwable throwable) {
log.warn("onPeerAddressAuthenticated Send GetAllDataMessage to " + address + " failed. " +
"Exception:" + throwable.getMessage());
}
});
}
P2PService.this.authenticated = true;
dataStorage.setAuthenticated(true);
@ -182,15 +203,18 @@ public class P2PService {
HashSet<ProtectedData> set = ((DataSetMessage) message).set;
set.stream().forEach(e -> dataStorage.add(e, connection.getPeerAddress()));
set.stream().filter(e -> e instanceof ProtectedMailboxData).forEach(e -> tryDecryptMailboxData((ProtectedMailboxData) e));
// TODO done in addHashSetChangedListener
// set.stream().filter(e -> e instanceof ProtectedMailboxData).forEach(e -> tryDecryptMailboxData((ProtectedMailboxData) e));
dataReceived();
} else if (message instanceof SealedAndSignedMessage) {
try {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerifyMessage((SealedAndSignedMessage) message);
UserThread.execute(() -> decryptedMailListeners.stream().forEach(e -> e.onMailMessage(decryptedMessageWithPubKey, connection.getPeerAddress())));
} catch (CryptoException e) {
log.info("Decryption of SealedAndSignedMessage failed. That is expected if the message is not intended for us.");
if (encryptionService != null) {
try {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerifyMessage((SealedAndSignedMessage) message);
UserThread.execute(() -> decryptedMailListeners.stream().forEach(e -> e.onMailMessage(decryptedMessageWithPubKey, connection.getPeerAddress())));
} catch (CryptoException e) {
log.info("Decryption of SealedAndSignedMessage failed. That is expected if the message is not intended for us.");
}
}
}
});
@ -217,7 +241,7 @@ public class P2PService {
}
});
dataStorage.addHashSetChangedListener(new HashSetChangedListener() {
dataStorage.addHashMapChangedListener(new HashMapChangedListener() {
@Override
public void onAdded(ProtectedData entry) {
if (entry instanceof ProtectedMailboxData)
@ -317,24 +341,26 @@ public class P2PService {
}
private void doSendEncryptedMailMessage(Address peerAddress, PubKeyRing pubKeyRing, MailMessage message, SendMailMessageListener sendMailMessageListener) {
try {
SealedAndSignedMessage sealedAndSignedMessage = encryptionService.encryptAndSignMessage(pubKeyRing, message);
SettableFuture<Connection> future = sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
UserThread.execute(() -> sendMailMessageListener.onArrived());
}
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = encryptionService.encryptAndSignMessage(pubKeyRing, message);
SettableFuture<Connection> future = sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
UserThread.execute(() -> sendMailMessageListener.onArrived());
}
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
UserThread.execute(() -> sendMailMessageListener.onFault());
}
});
} catch (CryptoException e) {
e.printStackTrace();
UserThread.execute(() -> sendMailMessageListener.onFault());
@Override
public void onFailure(Throwable throwable) {
throwable.printStackTrace();
UserThread.execute(() -> sendMailMessageListener.onFault());
}
});
} catch (CryptoException e) {
e.printStackTrace();
UserThread.execute(() -> sendMailMessageListener.onFault());
}
}
}
@ -358,34 +384,36 @@ public class P2PService {
}
private void trySendEncryptedMailboxMessage(Address peerAddress, PubKeyRing peersPubKeyRing, MailboxMessage message, SendMailboxMessageListener sendMailboxMessageListener) {
try {
SealedAndSignedMessage sealedAndSignedMessage = encryptionService.encryptAndSignMessage(peersPubKeyRing, message);
SettableFuture<Connection> future = sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("SendEncryptedMailboxMessage onSuccess");
UserThread.execute(() -> sendMailboxMessageListener.onArrived());
}
if (encryptionService != null) {
try {
SealedAndSignedMessage sealedAndSignedMessage = encryptionService.encryptAndSignMessage(peersPubKeyRing, message);
SettableFuture<Connection> future = sendMessage(peerAddress, sealedAndSignedMessage);
Futures.addCallback(future, new FutureCallback<Connection>() {
@Override
public void onSuccess(@Nullable Connection connection) {
log.trace("SendEncryptedMailboxMessage onSuccess");
UserThread.execute(() -> sendMailboxMessageListener.onArrived());
}
@Override
public void onFailure(Throwable throwable) {
log.trace("SendEncryptedMailboxMessage onFailure");
log.debug(throwable.toString());
log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox.");
log.trace("create MailboxEntry with peerAddress " + peerAddress);
PublicKey receiverStoragePublicKey = peersPubKeyRing.getStorageSignaturePubKey();
addMailboxData(new ExpirableMailboxPayload(sealedAndSignedMessage,
keyRing.getStorageSignatureKeyPair().getPublic(),
receiverStoragePublicKey),
receiverStoragePublicKey);
UserThread.execute(() -> sendMailboxMessageListener.onStoredInMailbox());
}
});
} catch (CryptoException e) {
e.printStackTrace();
log.error("sendEncryptedMessage failed");
UserThread.execute(() -> sendMailboxMessageListener.onFault());
@Override
public void onFailure(Throwable throwable) {
log.trace("SendEncryptedMailboxMessage onFailure");
log.debug(throwable.toString());
log.info("We cannot send message to peer. Peer might be offline. We will store message in mailbox.");
log.trace("create MailboxEntry with peerAddress " + peerAddress);
PublicKey receiverStoragePublicKey = peersPubKeyRing.getStorageSignaturePubKey();
addMailboxData(new ExpirableMailboxPayload(sealedAndSignedMessage,
keyRing.getStorageSignatureKeyPair().getPublic(),
receiverStoragePublicKey),
receiverStoragePublicKey);
UserThread.execute(() -> sendMailboxMessageListener.onStoredInMailbox());
}
});
} catch (CryptoException e) {
e.printStackTrace();
log.error("sendEncryptedMessage failed");
UserThread.execute(() -> sendMailboxMessageListener.onFault());
}
}
}
@ -481,8 +509,8 @@ public class P2PService {
p2pServiceListeners.remove(listener);
}
public void addHashSetChangedListener(HashSetChangedListener hashSetChangedListener) {
dataStorage.addHashSetChangedListener(hashSetChangedListener);
public void addHashSetChangedListener(HashMapChangedListener hashMapChangedListener) {
dataStorage.addHashMapChangedListener(hashMapChangedListener);
}
///////////////////////////////////////////////////////////////////////////////////////////
@ -598,29 +626,31 @@ public class P2PService {
}
private void tryDecryptMailboxData(ProtectedMailboxData mailboxData) {
ExpirablePayload data = mailboxData.expirablePayload;
if (data instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload mailboxEntry = (ExpirableMailboxPayload) data;
SealedAndSignedMessage sealedAndSignedMessage = mailboxEntry.sealedAndSignedMessage;
try {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerifyMessage(sealedAndSignedMessage);
if (decryptedMessageWithPubKey.message instanceof MailboxMessage) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.message;
Address senderAddress = mailboxMessage.getSenderAddress();
checkNotNull(senderAddress, "senderAddress must not be null for mailbox messages");
if (encryptionService != null) {
ExpirablePayload data = mailboxData.expirablePayload;
if (data instanceof ExpirableMailboxPayload) {
ExpirableMailboxPayload mailboxEntry = (ExpirableMailboxPayload) data;
SealedAndSignedMessage sealedAndSignedMessage = mailboxEntry.sealedAndSignedMessage;
try {
DecryptedMessageWithPubKey decryptedMessageWithPubKey = encryptionService.decryptAndVerifyMessage(sealedAndSignedMessage);
if (decryptedMessageWithPubKey.message instanceof MailboxMessage) {
MailboxMessage mailboxMessage = (MailboxMessage) decryptedMessageWithPubKey.message;
Address senderAddress = mailboxMessage.getSenderAddress();
checkNotNull(senderAddress, "senderAddress must not be null for mailbox messages");
log.trace("mailboxData.publicKey " + mailboxData.ownerStoragePubKey.hashCode());
log.trace("keyRing.getStorageSignatureKeyPair().getPublic() " + keyRing.getStorageSignatureKeyPair().getPublic().hashCode());
log.trace("keyRing.getMsgSignatureKeyPair().getPublic() " + keyRing.getMsgSignatureKeyPair().getPublic().hashCode());
log.trace("keyRing.getMsgEncryptionKeyPair().getPublic() " + keyRing.getMsgEncryptionKeyPair().getPublic().hashCode());
log.trace("mailboxData.publicKey " + mailboxData.ownerStoragePubKey.hashCode());
log.trace("keyRing.getStorageSignatureKeyPair().getPublic() " + keyRing.getStorageSignatureKeyPair().getPublic().hashCode());
log.trace("keyRing.getMsgSignatureKeyPair().getPublic() " + keyRing.getMsgSignatureKeyPair().getPublic().hashCode());
log.trace("keyRing.getMsgEncryptionKeyPair().getPublic() " + keyRing.getMsgEncryptionKeyPair().getPublic().hashCode());
mailboxMap.put(decryptedMessageWithPubKey, mailboxData);
log.trace("Decryption of SealedAndSignedMessage succeeded. senderAddress=" + senderAddress + " / my address=" + getAddress());
UserThread.execute(() -> decryptedMailboxListeners.stream().forEach(e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, senderAddress)));
mailboxMap.put(decryptedMessageWithPubKey, mailboxData);
log.trace("Decryption of SealedAndSignedMessage succeeded. senderAddress=" + senderAddress + " / my address=" + getAddress());
UserThread.execute(() -> decryptedMailboxListeners.stream().forEach(e -> e.onMailboxMessageAdded(decryptedMessageWithPubKey, senderAddress)));
}
} catch (CryptoException e) {
log.trace("Decryption of SealedAndSignedMessage failed. That is expected if the message is not intended for us.");
}
} catch (CryptoException e) {
log.trace("Decryption of SealedAndSignedMessage failed. That is expected if the message is not intended for us.");
}
}
}

View file

@ -2,7 +2,7 @@ package io.bitsquare.p2p.storage;
import io.bitsquare.p2p.storage.data.ProtectedData;
public interface HashSetChangedListener {
public interface HashMapChangedListener {
void onAdded(ProtectedData entry);
void onRemoved(ProtectedData entry);

View file

@ -31,7 +31,7 @@ public class ProtectedExpirableDataStorage {
private final Routing routing;
private final Map<BigInteger, ProtectedData> map = new ConcurrentHashMap<>();
private final List<HashSetChangedListener> hashSetChangedListeners = new CopyOnWriteArrayList<>();
private final List<HashMapChangedListener> hashMapChangedListeners = new CopyOnWriteArrayList<>();
private ConcurrentHashMap<BigInteger, Integer> sequenceNumberMap = new ConcurrentHashMap<>();
private final Storage<ConcurrentHashMap> storage;
private boolean authenticated;
@ -104,12 +104,26 @@ public class ProtectedExpirableDataStorage {
BigInteger hashOfPayload = getHashAsBigInteger(protectedData.expirablePayload);
boolean containsKey = map.containsKey(hashOfPayload);
boolean result = checkPublicKeys(protectedData, true)
&& isSequenceNrValid(protectedData, hashOfPayload)
&& checkSignature(protectedData)
&& (!containsKey || checkIfStoredDataMatchesNewData(protectedData, hashOfPayload))
&& doAddProtectedExpirableData(protectedData, hashOfPayload, sender);
&& checkSignature(protectedData);
if (containsKey) {
result &= checkIfStoredDataMatchesNewData(protectedData, hashOfPayload)
&& isSequenceNrValid(protectedData, hashOfPayload);
}
if (result) {
map.put(hashOfPayload, protectedData);
log.trace("Data added to our map and it will be broadcasted to our neighbors.");
UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onAdded(protectedData)));
StringBuilder sb = new StringBuilder("\n\nSet after addProtectedExpirableData:\n");
map.values().stream().forEach(e -> sb.append(e.toString() + "\n\n"));
sb.append("\n\n");
log.trace(sb.toString());
if (!containsKey)
broadcast(new AddDataMessage(protectedData), sender);
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
storage.queueUpForSave();
} else {
@ -126,10 +140,14 @@ public class ProtectedExpirableDataStorage {
&& checkPublicKeys(protectedData, false)
&& isSequenceNrValid(protectedData, hashOfPayload)
&& checkSignature(protectedData)
&& checkIfStoredDataMatchesNewData(protectedData, hashOfPayload)
&& doRemoveProtectedExpirableData(protectedData, hashOfPayload, sender);
&& checkIfStoredDataMatchesNewData(protectedData, hashOfPayload);
if (result) {
doRemoveProtectedExpirableData(protectedData, hashOfPayload);
broadcast(new RemoveDataMessage(protectedData), sender);
sequenceNumberMap.put(hashOfPayload, protectedData.sequenceNumber);
storage.queueUpForSave();
} else {
@ -147,10 +165,13 @@ public class ProtectedExpirableDataStorage {
&& isSequenceNrValid(protectedMailboxData, hashOfData)
&& protectedMailboxData.receiversPubKey.equals(protectedMailboxData.ownerStoragePubKey) // at remove both keys are the same (only receiver is able to remove data)
&& checkSignature(protectedMailboxData)
&& checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxData, hashOfData)
&& doRemoveProtectedExpirableData(protectedMailboxData, hashOfData, sender);
&& checkIfStoredMailboxDataMatchesNewMailboxData(protectedMailboxData, hashOfData);
if (result) {
doRemoveProtectedExpirableData(protectedMailboxData, hashOfData);
broadcast(new RemoveMailboxDataMessage(protectedMailboxData), sender);
sequenceNumberMap.put(hashOfData, protectedMailboxData.sequenceNumber);
storage.queueUpForSave();
} else {
@ -190,8 +211,8 @@ public class ProtectedExpirableDataStorage {
return new ProtectedMailboxData(expirableMailboxPayload, expirableMailboxPayload.getTTL(), storageSignaturePubKey.getPublic(), sequenceNumber, signature, receiversPublicKey);
}
public void addHashSetChangedListener(HashSetChangedListener hashSetChangedListener) {
hashSetChangedListeners.add(hashSetChangedListener);
public void addHashMapChangedListener(HashMapChangedListener hashMapChangedListener) {
hashMapChangedListeners.add(hashMapChangedListener);
}
public void addMessageListener(MessageListener messageListener) {
@ -203,11 +224,22 @@ public class ProtectedExpirableDataStorage {
// Private
///////////////////////////////////////////////////////////////////////////////////////////
private void doRemoveProtectedExpirableData(ProtectedData protectedData, BigInteger hashOfPayload) {
map.remove(hashOfPayload);
log.trace("Data removed from our map. We broadcast the message to our neighbors.");
UserThread.execute(() -> hashMapChangedListeners.stream().forEach(e -> e.onRemoved(protectedData)));
StringBuilder sb = new StringBuilder("\n\nSet after removeProtectedExpirableData:\n");
map.values().stream().forEach(e -> sb.append(e.toString() + "\n\n"));
sb.append("\n\n");
log.trace(sb.toString());
}
private boolean isSequenceNrValid(ProtectedData data, BigInteger hashOfData) {
int newSequenceNumber = data.sequenceNumber;
Integer storedSequenceNumber = sequenceNumberMap.get(hashOfData);
if (sequenceNumberMap.containsKey(hashOfData) && newSequenceNumber <= storedSequenceNumber) {
log.warn("Sequence number is invalid. That might happen in rare cases. newSequenceNumber="
log.warn("Sequence number is invalid. newSequenceNumber="
+ newSequenceNumber + " / storedSequenceNumber=" + storedSequenceNumber);
return false;
} else {
@ -274,34 +306,6 @@ public class ProtectedExpirableDataStorage {
}
}
private boolean doAddProtectedExpirableData(ProtectedData data, BigInteger hashOfData, Address sender) {
map.put(hashOfData, data);
log.trace("Data added to our map and it will be broadcasted to our neighbors.");
UserThread.execute(() -> hashSetChangedListeners.stream().forEach(e -> e.onAdded(data)));
broadcast(new AddDataMessage(data), sender);
StringBuilder sb = new StringBuilder("\n\nSet after addProtectedExpirableData:\n");
map.values().stream().forEach(e -> sb.append(e.toString() + "\n\n"));
sb.append("\n\n");
log.trace(sb.toString());
return true;
}
private boolean doRemoveProtectedExpirableData(ProtectedData data, BigInteger hashOfData, Address sender) {
map.remove(hashOfData);
log.trace("Data removed from our map. We broadcast the message to our neighbors.");
UserThread.execute(() -> hashSetChangedListeners.stream().forEach(e -> e.onRemoved(data)));
if (data instanceof ProtectedMailboxData)
broadcast(new RemoveMailboxDataMessage((ProtectedMailboxData) data), sender);
else
broadcast(new RemoveDataMessage(data), sender);
StringBuilder sb = new StringBuilder("\n\nSet after removeProtectedExpirableData:\n");
map.values().stream().forEach(e -> sb.append(e.toString() + "\n\n"));
sb.append("\n\n");
log.trace(sb.toString());
return true;
}
private void broadcast(BroadcastMessage message, Address sender) {
if (authenticated) {

View file

@ -19,7 +19,7 @@ public class ProtectedData implements Serializable {
public final int sequenceNumber;
public final byte[] signature;
@VisibleForTesting
public Date date;
transient public Date date;
public ProtectedData(ExpirablePayload expirablePayload, long ttl, PublicKey ownerStoragePubKey, int sequenceNumber, byte[] signature) {
this.expirablePayload = expirablePayload;
@ -34,15 +34,6 @@ public class ProtectedData implements Serializable {
try {
in.defaultReadObject();
ttl = expirablePayload.getTTL();
// in case the reported creation date is in the future
// we reset the date to the current time
if (date.getTime() > new Date().getTime()) {
log.warn("Date of object is in future. " +
"That might be ok as clocks are not synced but could be also a spam attack. " +
"date=" + date + " / now=" + new Date());
date = new Date();
}
date = new Date();
} catch (Throwable t) {