update chat views from upstream, support sending logs

Co-authored-by: jmacxx <47253594+jmacxx@users.noreply.github.com>
This commit is contained in:
woodser 2024-03-20 18:20:24 -04:00
parent 833cdb3b84
commit 1647a582f5
23 changed files with 1691 additions and 206 deletions

View file

@ -60,7 +60,7 @@ public class CoreNotificationService {
sendNotification(NotificationMessage.newBuilder()
.setType(NotificationType.CHAT_MESSAGE)
.setTimestamp(System.currentTimeMillis())
.setChatMessage(chatMessage.toProtoChatMessageBuilder())
.setChatMessage(chatMessage.toProtoNetworkEnvelope().getChatMessage())
.build());
}

View file

@ -55,6 +55,7 @@ import haveno.core.trade.messages.SignContractResponse;
import haveno.network.p2p.AckMessage;
import haveno.network.p2p.BundleOfEnvelopes;
import haveno.network.p2p.CloseConnectionMessage;
import haveno.network.p2p.FileTransferPart;
import haveno.network.p2p.PrefixedSealedAndSignedMessage;
import haveno.network.p2p.peers.getdata.messages.GetDataResponse;
import haveno.network.p2p.peers.getdata.messages.GetUpdatedDataRequest;
@ -178,6 +179,9 @@ public class CoreNetworkProtoResolver extends CoreProtoResolver implements Netwo
case GET_INVENTORY_RESPONSE:
return GetInventoryResponse.fromProto(proto.getGetInventoryResponse(), messageVersion);
case FILE_TRANSFER_PART:
return FileTransferPart.fromProto(proto.getFileTransferPart(), messageVersion);
default:
throw new ProtobufferException("Unknown proto message case (PB.NetworkEnvelope). messageCase=" +
proto.getMessageCase() + "; proto raw data=" + proto.toString());

View file

@ -30,8 +30,13 @@ import haveno.core.locale.Res;
import haveno.core.payment.payload.PaymentAccountPayload;
import haveno.core.proto.CoreProtoResolver;
import haveno.core.support.SupportType;
import haveno.core.support.dispute.mediation.FileTransferReceiver;
import haveno.core.support.dispute.mediation.FileTransferSender;
import haveno.core.support.dispute.mediation.FileTransferSession;
import haveno.core.support.messages.ChatMessage;
import haveno.core.trade.Contract;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.network.NetworkNode;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.IntegerProperty;
import javafx.beans.property.ObjectProperty;
@ -49,6 +54,8 @@ import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
@ -151,6 +158,25 @@ public final class Dispute implements NetworkPayload, PersistablePayload {
private transient final BooleanProperty isClosedProperty = new SimpleBooleanProperty();
private transient final IntegerProperty badgeCountProperty = new SimpleIntegerProperty();
private transient FileTransferReceiver fileTransferSession = null;
public FileTransferReceiver createOrGetFileTransferReceiver(NetworkNode networkNode,
NodeAddress peerNodeAddress,
FileTransferSession.FtpCallback callback) throws IOException {
// the receiver stores its state temporarily here in the dispute
// this method gets called to retrieve the session each time a part of the log files is received
if (fileTransferSession == null) {
fileTransferSession = new FileTransferReceiver(networkNode, peerNodeAddress, this.tradeId, this.traderId, this.getRoleStringForLogFile(), callback);
}
return fileTransferSession;
}
public FileTransferSender createFileTransferSender(NetworkNode networkNode,
NodeAddress peerNodeAddress,
FileTransferSession.FtpCallback callback) {
return new FileTransferSender(networkNode, peerNodeAddress, this.tradeId, this.traderId, this.getRoleStringForLogFile(), false, callback);
}
///////////////////////////////////////////////////////////////////////////////////////////
// Constructor
@ -478,6 +504,11 @@ public final class Dispute implements NetworkPayload, PersistablePayload {
}
}
public String getRoleStringForLogFile() {
return (disputeOpenerIsBuyer ? "BUYER" : "SELLER") + "_"
+ (disputeOpenerIsMaker ? "MAKER" : "TAKER");
}
@Nullable
public PaymentAccountPayload getBuyerPaymentAccountPayload() {
return contract.isBuyerMakerAndSellerTaker() ? makerPaymentAccountPayload : takerPaymentAccountPayload;

View file

@ -940,7 +940,7 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
return new Tuple2<>(peerNodeAddress, receiverPubKeyRing);
}
private boolean isAgent(Dispute dispute) {
public boolean isAgent(Dispute dispute) {
return keyRing.getPubKeyRing().equals(dispute.getAgentPubKeyRing());
}
@ -1038,6 +1038,20 @@ public abstract class DisputeManager<T extends DisputeList<Dispute>> extends Sup
requestPersistence();
}
protected void addMediationLogsReceivedMessage(Dispute dispute, String logsIdentifier) {
String logsReceivedMessage = Res.get("support.mediatorReceivedLogs", logsIdentifier);
ChatMessage chatMessage = new ChatMessage(
getSupportType(),
dispute.getTradeId(),
keyRing.hashCode(),
false,
logsReceivedMessage,
p2PService.getAddress());
chatMessage.setSystemMessage(true);
dispute.addAndPersistChatMessage(chatMessage);
requestPersistence();
}
// If price was going down between take offer time and open dispute time the buyer has an incentive to
// not send the payment but to try to make a new trade with the better price. We risks to lose part of the
// security deposit (in mediation we will always get back 0.003 BTC to keep some incentive to accept mediated

View file

@ -43,6 +43,7 @@ import haveno.common.UserThread;
import haveno.common.app.Version;
import haveno.common.config.Config;
import haveno.common.crypto.KeyRing;
import haveno.common.proto.network.NetworkEnvelope;
import haveno.core.api.XmrConnectionService;
import haveno.core.api.CoreNotificationService;
import haveno.core.locale.Res;
@ -55,6 +56,9 @@ import haveno.core.support.dispute.DisputeResult;
import haveno.core.support.dispute.DisputeResult.Winner;
import haveno.core.support.dispute.DisputeSummaryVerification;
import haveno.core.support.dispute.arbitration.arbitrator.ArbitratorManager;
import haveno.core.support.dispute.mediation.FileTransferReceiver;
import haveno.core.support.dispute.mediation.FileTransferSender;
import haveno.core.support.dispute.mediation.FileTransferSession;
import haveno.core.support.dispute.messages.DisputeClosedMessage;
import haveno.core.support.dispute.messages.DisputeOpenedMessage;
import haveno.core.support.messages.ChatMessage;
@ -67,8 +71,11 @@ import haveno.core.trade.TradeManager;
import haveno.core.xmr.wallet.TradeWalletService;
import haveno.core.xmr.wallet.XmrWalletService;
import haveno.network.p2p.AckMessageSourceType;
import haveno.network.p2p.FileTransferPart;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.P2PService;
import haveno.network.p2p.network.Connection;
import haveno.network.p2p.network.MessageListener;
import lombok.extern.slf4j.Slf4j;
import monero.wallet.MoneroWallet;
import monero.wallet.model.MoneroDestination;
@ -76,6 +83,7 @@ import monero.wallet.model.MoneroMultisigSignResult;
import monero.wallet.model.MoneroTxSet;
import monero.wallet.model.MoneroTxWallet;
import java.io.IOException;
import java.math.BigInteger;
import java.util.HashMap;
import java.util.HashSet;
@ -88,7 +96,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j
@Singleton
public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeList> {
public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeList> implements MessageListener, FileTransferSession.FtpCallback {
private final ArbitratorManager arbitratorManager;
@ -116,6 +124,7 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
openOfferManager, keyRing, arbitrationDisputeListService, config, priceFeedService);
this.arbitratorManager = arbitratorManager;
HavenoUtils.arbitrationManager = this; // TODO: storing static reference, better way?
p2PService.getNetworkNode().addMessageListener(this); // listening for FileTransferPart message
}
@ -497,4 +506,60 @@ public final class ArbitrationManager extends DisputeManager<ArbitrationDisputeL
}
}
}
public FileTransferSender initLogUpload(FileTransferSession.FtpCallback callback,
String tradeId,
int traderId) throws IOException {
Dispute dispute = findDispute(tradeId, traderId)
.orElseThrow(() -> new IOException("could not locate Dispute for tradeId/traderId"));
return dispute.createFileTransferSender(p2PService.getNetworkNode(),
dispute.getContract().getArbitratorNodeAddress(), callback);
}
private void processFilePartReceived(FileTransferPart ftp) {
if (!ftp.isInitialRequest()) {
return; // existing sessions are processed by FileTransferSession object directly
}
// we create a new session which is related to an open dispute from our list
Optional<Dispute> dispute = findDispute(ftp.getTradeId(), ftp.getTraderId());
if (dispute.isEmpty()) {
log.error("Received log upload request for unknown TradeId/TraderId {}/{}", ftp.getTradeId(), ftp.getTraderId());
return;
}
if (dispute.get().isClosed()) {
log.error("Received a file transfer request for closed dispute {}", ftp.getTradeId());
return;
}
try {
FileTransferReceiver session = dispute.get().createOrGetFileTransferReceiver(
p2PService.getNetworkNode(), ftp.getSenderNodeAddress(), this);
session.processFilePartReceived(ftp);
} catch (IOException e) {
log.error("Unable to process a received file message" + e);
}
}
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof FileTransferPart) { // mediator receiving log file data
FileTransferPart ftp = (FileTransferPart) networkEnvelope;
processFilePartReceived(ftp);
}
}
@Override
public void onFtpProgress(double progressPct) {
log.trace("ftp progress: {}", progressPct);
}
@Override
public void onFtpComplete(FileTransferSession session) {
Optional<Dispute> dispute = findDispute(session.getFullTradeId(), session.getTraderId());
dispute.ifPresent(d -> addMediationLogsReceivedMessage(d, session.getZipId()));
}
@Override
public void onFtpTimeout(String statusMsg, FileTransferSession session) {
session.resetSession();
}
}

View file

@ -0,0 +1,126 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.support.dispute.mediation;
import haveno.network.p2p.AckMessage;
import haveno.network.p2p.AckMessageSourceType;
import haveno.network.p2p.FileTransferPart;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.network.NetworkNode;
import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.common.util.Utilities;
import java.nio.file.FileSystems;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
@Slf4j
public class FileTransferReceiver extends FileTransferSession {
protected final String zipFilePath;
public FileTransferReceiver(NetworkNode networkNode,
NodeAddress peerNodeAddress,
String tradeId,
int traderId,
String traderRole,
@Nullable FileTransferSession.FtpCallback callback) throws IOException {
super(networkNode, peerNodeAddress, tradeId, traderId, traderRole, callback);
zipFilePath = ensureReceivingDirectoryExists().getAbsolutePath() + FileSystems.getDefault().getSeparator() + zipId + ".zip";
}
public void processFilePartReceived(FileTransferPart ftp) {
checkpointLastActivity();
// check that the supplied sequence number is in line with what we are expecting
if (currentBlockSeqNum < 0) {
// we have not yet started receiving a file, validate this ftp packet as the initiation request
initReceiveSession(ftp.uid, ftp.seqNumOrFileLength);
} else if (currentBlockSeqNum == ftp.seqNumOrFileLength) {
// we are in the middle of receiving a file; add the block of data to the file
processReceivedBlock(ftp, networkNode, peerNodeAddress);
} else {
log.error("ftp sequence num mismatch, expected {} received {}", currentBlockSeqNum, ftp.seqNumOrFileLength);
resetSession(); // aborts the file transfer
}
}
public void initReceiveSession(String uid, long expectedFileBytes) {
networkNode.addMessageListener(this);
this.expectedFileLength = expectedFileBytes;
fileOffsetBytes = 0;
currentBlockSeqNum = 0;
initSessionTimer();
log.info("Received a start file transfer request, tradeId={}, traderId={}, size={}", fullTradeId, traderId, expectedFileBytes);
log.info("New file will be written to {}", zipFilePath);
UserThread.execute(() -> ackReceivedPart(uid, networkNode, peerNodeAddress));
}
private void processReceivedBlock(FileTransferPart ftp, NetworkNode networkNode, NodeAddress peerNodeAddress) {
try {
RandomAccessFile file = new RandomAccessFile(zipFilePath, "rwd");
file.seek(fileOffsetBytes);
file.write(ftp.messageData.toByteArray(), 0, ftp.messageData.size());
fileOffsetBytes = fileOffsetBytes + ftp.messageData.size();
log.info("Sequence number {} for {}, received data {} / {}",
ftp.seqNumOrFileLength, Utilities.getShortId(ftp.tradeId), fileOffsetBytes, expectedFileLength);
currentBlockSeqNum++;
UserThread.runAfter(() -> {
ackReceivedPart(ftp.uid, networkNode, peerNodeAddress);
if (fileOffsetBytes >= expectedFileLength) {
log.info("Success! We have reached the EOF, received {} expected {}", fileOffsetBytes, expectedFileLength);
ftpCallback.ifPresent(c -> c.onFtpComplete(this));
resetSession();
}
}, 100, TimeUnit.MILLISECONDS);
} catch (IOException e) {
log.error(e.toString());
e.printStackTrace();
}
}
private void ackReceivedPart(String uid, NetworkNode networkNode, NodeAddress peerNodeAddress) {
AckMessage ackMessage = new AckMessage(peerNodeAddress,
AckMessageSourceType.LOG_TRANSFER,
FileTransferPart.class.getSimpleName(),
uid,
Utilities.getShortId(fullTradeId),
true, // result
null); // errorMessage
log.info("Send AckMessage for {} to peer {}. id={}, uid={}",
ackMessage.getSourceMsgClassName(), peerNodeAddress, ackMessage.getSourceId(), ackMessage.getSourceUid());
sendMessage(ackMessage, networkNode, peerNodeAddress);
}
private static File ensureReceivingDirectoryExists() throws IOException {
File directory = new File(Config.appDataDir() + "/clientLogs");
if (!directory.exists() && !directory.mkdirs()) {
log.error("Could not create directory {}", directory.getAbsolutePath());
throw new IOException("Could not create directory: " + directory.getAbsolutePath());
}
return directory;
}
}

View file

@ -0,0 +1,198 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.support.dispute.mediation;
import haveno.network.p2p.FileTransferPart;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.network.NetworkNode;
import haveno.common.UserThread;
import haveno.common.config.Config;
import haveno.common.util.Utilities;
import com.google.protobuf.ByteString;
import java.net.URI;
import java.nio.file.FileSystem;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import static haveno.common.file.FileUtil.doesFileContainKeyword;
@Slf4j
public class FileTransferSender extends FileTransferSession {
protected final String zipFilePath;
private final boolean isTest;
public FileTransferSender(NetworkNode networkNode,
NodeAddress peerNodeAddress,
String tradeId,
int traderId,
String traderRole,
boolean isTest,
@Nullable FileTransferSession.FtpCallback callback) {
super(networkNode, peerNodeAddress, tradeId, traderId, traderRole, callback);
zipFilePath = Utilities.getUserDataDir() + FileSystems.getDefault().getSeparator() + zipId + ".zip";
this.isTest = isTest;
updateProgress();
}
public void createZipFileToSend() {
createZipFileOfLogs(zipFilePath, zipId, fullTradeId);
}
public static void createZipFileOfLogs(String zipFilePath, String zipId, String fullTradeId) {
try {
Map<String, String> env = new HashMap<>();
env.put("create", "true");
URI uri = URI.create("jar:file:///" + zipFilePath
.replace('\\', '/')
.replaceAll(" ", "%20"));
FileSystem zipfs = FileSystems.newFileSystem(uri, env);
Files.createDirectory(zipfs.getPath(zipId)); // store logfiles in a usefully-named subdir
Stream<Path> paths = Files.walk(Paths.get(Config.appDataDir().toString()), 1);
paths.filter(Files::isRegularFile).forEach(externalTxtFile -> {
try {
// always include haveno.log; and other .log files if they contain the TradeId
if (externalTxtFile.getFileName().toString().equals("haveno.log") ||
(fullTradeId == null && externalTxtFile.getFileName().toString().matches(".*.log")) ||
(externalTxtFile.getFileName().toString().matches(".*.log") &&
doesFileContainKeyword(externalTxtFile.toFile(), fullTradeId))) {
Path pathInZipfile = zipfs.getPath(zipId + "/" + externalTxtFile.getFileName().toString());
log.info("adding {} to zip file {}", pathInZipfile, zipfs);
Files.copy(externalTxtFile, pathInZipfile, StandardCopyOption.REPLACE_EXISTING);
}
} catch (IOException e) {
log.error(e.toString());
e.printStackTrace();
}
});
zipfs.close();
} catch (IOException | IllegalArgumentException ex) {
log.error(ex.toString());
ex.printStackTrace();
}
}
public void initSend() throws IOException {
initSessionTimer();
networkNode.addMessageListener(this);
RandomAccessFile file = new RandomAccessFile(zipFilePath, "r");
expectedFileLength = file.length();
file.close();
// an empty block is sent as request to initiate file transfer, peer must ACK for transfer to continue
dataAwaitingAck = Optional.of(new FileTransferPart(networkNode.getNodeAddress(), fullTradeId, traderId, UUID.randomUUID().toString(), expectedFileLength, ByteString.EMPTY));
uploadData();
}
public void sendNextBlock() throws IOException, IllegalStateException {
if (dataAwaitingAck.isPresent()) {
log.warn("prepNextBlockToSend invoked, but we are still waiting for a previous ACK");
throw new IllegalStateException("prepNextBlockToSend invoked, but we are still waiting for a previous ACK");
}
RandomAccessFile file = new RandomAccessFile(zipFilePath, "r");
file.seek(fileOffsetBytes);
byte[] buff = new byte[FILE_BLOCK_SIZE];
int nBytesRead = file.read(buff, 0, FILE_BLOCK_SIZE);
file.close();
if (nBytesRead < 0) {
log.info("Success! We have reached the EOF, {} bytes sent. Removing zip file {}", fileOffsetBytes, zipFilePath);
Files.delete(Paths.get(zipFilePath));
ftpCallback.ifPresent(c -> c.onFtpComplete(this));
UserThread.runAfter(this::resetSession, 1);
return;
}
dataAwaitingAck = Optional.of(new FileTransferPart(networkNode.getNodeAddress(), fullTradeId, traderId, UUID.randomUUID().toString(), currentBlockSeqNum, ByteString.copyFrom(buff, 0, nBytesRead)));
uploadData();
}
public void retrySend() {
if (transferIsInProgress()) {
log.info("Retry send of current block");
initSessionTimer();
uploadData();
} else {
UserThread.runAfter(() -> ftpCallback.ifPresent((f) -> f.onFtpTimeout("Could not re-send", this)), 1);
}
}
protected void uploadData() {
if (dataAwaitingAck.isEmpty()) {
return;
}
FileTransferPart ftp = dataAwaitingAck.get();
log.info("Send FileTransferPart seq {} length {} to peer {}, UID={}",
ftp.seqNumOrFileLength, ftp.messageData.size(), peerNodeAddress, ftp.uid);
sendMessage(ftp, networkNode, peerNodeAddress);
}
public boolean processAckForFilePart(String ackUid) {
if (dataAwaitingAck.isEmpty()) {
log.warn("We received an ACK we were not expecting. {}", ackUid);
return false;
}
if (!dataAwaitingAck.get().uid.equals(ackUid)) {
log.warn("We received an ACK that has a different UID to what we were expecting. We ignore and wait for the correct ACK");
log.info("Received {} expecting {}", ackUid, dataAwaitingAck.get().uid);
return false;
}
// fileOffsetBytes gets incremented by the size of the block that was ack'd
fileOffsetBytes += dataAwaitingAck.get().messageData.size();
currentBlockSeqNum++;
dataAwaitingAck = Optional.empty();
checkpointLastActivity();
updateProgress();
if (isTest) {
return true;
}
UserThread.runAfter(() -> { // to trigger continuing the file transfer
try {
sendNextBlock();
} catch (IOException e) {
log.error(e.toString());
e.printStackTrace();
}
}, 100, TimeUnit.MILLISECONDS);
return true;
}
public void updateProgress() {
double progressPct = expectedFileLength > 0 ?
((double) fileOffsetBytes / expectedFileLength) : 0.0;
ftpCallback.ifPresent(c -> c.onFtpProgress(progressPct));
log.info("ftp progress: {}", String.format("%.0f%%", progressPct * 100));
}
}

View file

@ -0,0 +1,174 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/
package haveno.core.support.dispute.mediation;
import haveno.network.p2p.AckMessage;
import haveno.network.p2p.AckMessageSourceType;
import haveno.network.p2p.FileTransferPart;
import haveno.network.p2p.NodeAddress;
import haveno.network.p2p.network.Connection;
import haveno.network.p2p.network.MessageListener;
import haveno.network.p2p.network.NetworkNode;
import haveno.common.UserThread;
import haveno.common.proto.network.NetworkEnvelope;
import haveno.common.util.Utilities;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import javax.annotation.Nullable;
import static haveno.network.p2p.network.Connection.getPermittedMessageSize;
@Slf4j
public abstract class FileTransferSession implements MessageListener {
protected static final int FTP_SESSION_TIMEOUT_MILLIS = (int) TimeUnit.SECONDS.toMillis(60);
protected static final int FILE_BLOCK_SIZE = getPermittedMessageSize() - 1024; // allowing space for protobuf
public interface FtpCallback {
void onFtpProgress(double progressPct);
void onFtpComplete(FileTransferSession session);
void onFtpTimeout(String statusMsg, FileTransferSession session);
}
@Getter
protected final String fullTradeId;
@Getter
protected final int traderId;
@Getter
protected final String zipId;
protected final Optional<FtpCallback> ftpCallback;
protected final NetworkNode networkNode; // for sending network messages
protected final NodeAddress peerNodeAddress;
protected Optional<FileTransferPart> dataAwaitingAck;
protected long fileOffsetBytes;
protected long currentBlockSeqNum;
protected long expectedFileLength;
protected long lastActivityTime;
public FileTransferSession(NetworkNode networkNode,
NodeAddress peerNodeAddress,
String tradeId,
int traderId,
String traderRole,
@Nullable FileTransferSession.FtpCallback callback) {
this.networkNode = networkNode;
this.peerNodeAddress = peerNodeAddress;
this.fullTradeId = tradeId;
this.traderId = traderId;
this.ftpCallback = Optional.ofNullable(callback);
this.zipId = Utilities.getShortId(fullTradeId) + "_" + traderRole.toUpperCase() + "_"
+ new SimpleDateFormat("yyyyMMdd_HHmmss").format(new Date());
resetSession();
}
public void resetSession() {
lastActivityTime = 0;
currentBlockSeqNum = -1;
fileOffsetBytes = 0;
expectedFileLength = 0;
dataAwaitingAck = Optional.empty();
networkNode.removeMessageListener(this);
log.info("Ftp session parameters have been reset.");
}
@Override
public void onMessage(NetworkEnvelope networkEnvelope, Connection connection) {
if (networkEnvelope instanceof FileTransferPart) {
// mediator receiving log file data
FileTransferPart ftp = (FileTransferPart) networkEnvelope;
if (this instanceof FileTransferReceiver) {
((FileTransferReceiver) this).processFilePartReceived(ftp);
}
} else if (networkEnvelope instanceof AckMessage) {
AckMessage ackMessage = (AckMessage) networkEnvelope;
if (ackMessage.getSourceType() == AckMessageSourceType.LOG_TRANSFER) {
if (ackMessage.isSuccess()) {
log.info("Received AckMessage for {} with id {} and uid {}",
ackMessage.getSourceMsgClassName(), ackMessage.getSourceId(), ackMessage.getSourceUid());
if (this instanceof FileTransferSender) {
((FileTransferSender) this).processAckForFilePart(ackMessage.getSourceUid());
}
} else {
log.warn("Received AckMessage with error state for {} with id {} and errorMessage={}",
ackMessage.getSourceMsgClassName(), ackMessage.getSourceId(), ackMessage.getErrorMessage());
}
}
}
}
protected void checkpointLastActivity() {
lastActivityTime = System.currentTimeMillis();
}
protected void initSessionTimer() {
UserThread.runAfter(() -> {
if (!transferIsInProgress()) // transfer may have finished before this timer executes
return;
if (System.currentTimeMillis() - lastActivityTime < FTP_SESSION_TIMEOUT_MILLIS) {
log.info("Last activity was {}, we have not yet timed out.", new Date(lastActivityTime));
initSessionTimer();
} else {
log.warn("File transfer session timed out. expected: {} received: {}", expectedFileLength, fileOffsetBytes);
ftpCallback.ifPresent((e) -> e.onFtpTimeout("Timed out during send", this));
}
}, FTP_SESSION_TIMEOUT_MILLIS / 4, TimeUnit.MILLISECONDS); // check more frequently than the timeout
}
protected boolean transferIsInProgress() {
return fileOffsetBytes != expectedFileLength;
}
protected void sendMessage(NetworkEnvelope message, NetworkNode networkNode, NodeAddress nodeAddress) {
SettableFuture<Connection> future = networkNode.sendMessage(nodeAddress, message);
if (future != null) { // is null when testing with Mockito
Futures.addCallback(future, new FutureCallback<>() {
@Override
public void onSuccess(Connection connection) {
}
@Override
public void onFailure(@NotNull Throwable throwable) {
String errorSend = "Sending " + message.getClass().getSimpleName() +
" to " + nodeAddress.getFullAddress() +
" failed. That is expected if the peer is offline.\n\t" +
".\n\tException=" + throwable.getMessage();
log.warn(errorSend);
ftpCallback.ifPresent((f) -> f.onFtpTimeout("Peer offline", FileTransferSession.this));
resetSession();
}
}, MoreExecutors.directExecutor());
}
}
}

View file

@ -17,25 +17,25 @@
package haveno.core.support.messages;
import haveno.common.app.Version;
import haveno.common.util.Utilities;
import haveno.core.locale.Res;
import haveno.core.support.SupportType;
import haveno.core.support.dispute.Attachment;
import haveno.core.support.dispute.Dispute;
import haveno.core.support.dispute.DisputeResult;
import haveno.network.p2p.NodeAddress;
import haveno.common.UserThread;
import haveno.common.app.Version;
import haveno.common.util.Utilities;
import javafx.beans.property.BooleanProperty;
import javafx.beans.property.ReadOnlyBooleanProperty;
import javafx.beans.property.ReadOnlyStringProperty;
import javafx.beans.property.SimpleBooleanProperty;
import javafx.beans.property.SimpleStringProperty;
import javafx.beans.property.StringProperty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@ -44,13 +44,22 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.lang.ref.WeakReference;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nullable;
/* Message for direct communication between two nodes. Originally built for trader to
* arbitrator communication as no other direct communication was allowed. Arbitrator is
* considered as the server and trader as the client in arbitration chats
*
* For trader to trader communication the maker is considered to be the server
* and the taker is considered as the client.
* */
*/
@EqualsAndHashCode(callSuper = true) // listener is transient and therefore excluded anyway
@Getter
@Slf4j
@ -84,14 +93,14 @@ public final class ChatMessage extends SupportMessage {
private final StringProperty sendMessageErrorProperty;
private final StringProperty ackErrorProperty;
transient private Listener listener;
transient private WeakReference<Listener> listener;
public ChatMessage(SupportType supportType,
String tradeId,
int traderId,
boolean senderIsTrader,
String message,
NodeAddress senderNodeAddress) {
String tradeId,
int traderId,
boolean senderIsTrader,
String message,
NodeAddress senderNodeAddress) {
this(supportType,
tradeId,
traderId,
@ -111,12 +120,12 @@ public final class ChatMessage extends SupportMessage {
}
public ChatMessage(SupportType supportType,
String tradeId,
int traderId,
boolean senderIsTrader,
String message,
NodeAddress senderNodeAddress,
ArrayList<Attachment> attachments) {
String tradeId,
int traderId,
boolean senderIsTrader,
String message,
NodeAddress senderNodeAddress,
ArrayList<Attachment> attachments) {
this(supportType,
tradeId,
traderId,
@ -136,12 +145,12 @@ public final class ChatMessage extends SupportMessage {
}
public ChatMessage(SupportType supportType,
String tradeId,
int traderId,
boolean senderIsTrader,
String message,
NodeAddress senderNodeAddress,
long date) {
String tradeId,
int traderId,
boolean senderIsTrader,
String message,
NodeAddress senderNodeAddress,
long date) {
this(supportType,
tradeId,
traderId,
@ -198,7 +207,9 @@ public final class ChatMessage extends SupportMessage {
notifyChangeListener();
}
public protobuf.ChatMessage.Builder toProtoChatMessageBuilder() {
// We cannot rename protobuf definition because it would break backward compatibility
@Override
public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
protobuf.ChatMessage.Builder builder = protobuf.ChatMessage.newBuilder()
.setType(SupportType.toProtoMessage(supportType))
.setTradeId(tradeId)
@ -216,14 +227,6 @@ public final class ChatMessage extends SupportMessage {
.setWasDisplayed(wasDisplayed);
Optional.ofNullable(sendMessageErrorProperty.get()).ifPresent(builder::setSendMessageError);
Optional.ofNullable(ackErrorProperty.get()).ifPresent(builder::setAckError);
return builder;
}
// We cannot rename protobuf definition because it would break backward compatibility
@Override
public protobuf.NetworkEnvelope toProtoNetworkEnvelope() {
protobuf.ChatMessage.Builder builder = toProtoChatMessageBuilder();
return getNetworkEnvelopeBuilder()
.setChatMessage(builder)
.build();
@ -296,6 +299,16 @@ public final class ChatMessage extends SupportMessage {
notifyChangeListener();
}
// each chat message notifies the user if an ACK is not received in time
public void startAckTimer() {
UserThread.runAfter(() -> {
if (!this.getAcknowledgedProperty().get() && !this.getStoredInMailboxProperty().get()) {
this.setArrived(false);
this.setAckError(Res.get("support.errorTimeout"));
}
}, 60, TimeUnit.SECONDS);
}
public ReadOnlyBooleanProperty acknowledgedProperty() {
return acknowledgedProperty;
}
@ -327,12 +340,8 @@ public final class ChatMessage extends SupportMessage {
return Utilities.getShortId(tradeId);
}
public void addChangeListener(Listener listener) {
this.listener = listener;
}
public void removeChangeListener() {
this.listener = null;
public void addWeakMessageStateListener(Listener listener) {
this.listener = new WeakReference<>(listener);
}
public boolean isResultMessage(Dispute dispute) {
@ -352,7 +361,10 @@ public final class ChatMessage extends SupportMessage {
private void notifyChangeListener() {
if (listener != null) {
listener.onMessageStateChanged();
Listener listener = this.listener.get();
if (listener != null) {
listener.onMessageStateChanged();
}
}
}
@ -375,4 +387,4 @@ public final class ChatMessage extends SupportMessage {
",\n ackErrorProperty=" + ackErrorProperty +
"\n} " + super.toString();
}
}
}