This commit is contained in:
XMRZombie 2025-06-19 20:16:22 +07:00 committed by GitHub
commit e667dbdc94
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 115 additions and 72 deletions

View file

@ -16,41 +16,50 @@
*/ */
package haveno.common; package haveno.common;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ThreadUtils { public class ThreadUtils {
private static final Map<String, ExecutorService> EXECUTORS = new HashMap<>(); private static final Logger logger = LoggerFactory.getLogger(ThreadUtils.class);
private static final Map<String, Thread> THREADS = new HashMap<>(); private static final ConcurrentHashMap<String, Thread> VIRTUAL_THREADS = new ConcurrentHashMap<>();
private static final int POOL_SIZE = 10; private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(0);
private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE); private static final long DEFAULT_TIMEOUT_MS = 5000; // Default timeout for operations
/** /**
* Execute the given command in a thread with the given id. * Execute the given command in a virtual thread with the given id.
* *
* @param command the command to execute * @param command the command to execute
* @param threadId the thread id * @param threadId the thread id
*/ */
public static Future<?> execute(Runnable command, String threadId) { public static Future<?> execute(Runnable command, String threadId) {
synchronized (EXECUTORS) { CompletableFuture<Void> future = new CompletableFuture<>();
if (!EXECUTORS.containsKey(threadId)) EXECUTORS.put(threadId, Executors.newFixedThreadPool(1)); Thread virtualThread = Thread.ofVirtual().name(threadId).start(() -> {
return EXECUTORS.get(threadId).submit(() -> { try {
synchronized (THREADS) {
THREADS.put(threadId, Thread.currentThread());
}
Thread.currentThread().setName(threadId);
command.run(); command.run();
}); future.complete(null);
} } catch (Exception e) {
logger.error("Exception in thread: {} - {}", threadId, e.getMessage(), e);
future.completeExceptionally(e);
}
});
VIRTUAL_THREADS.put(threadId, virtualThread);
return future;
} }
/** /**
@ -62,40 +71,43 @@ public class ThreadUtils {
public static void await(Runnable command, String threadId) { public static void await(Runnable command, String threadId) {
try { try {
execute(command, threadId).get(); execute(command, threadId).get();
} catch (Exception e) { } catch (InterruptedException e) {
throw new RuntimeException(e); Thread.currentThread().interrupt();
logger.error("Interrupted while awaiting command execution in thread: {} - {}", threadId, e.getMessage(), e);
throw new RuntimeException("Interrupted while awaiting command execution", e);
} catch (ExecutionException e) {
logger.error("Execution exception while awaiting command execution in thread: {} - {}", threadId, e.getMessage(), e);
throw new RuntimeException("Execution exception while awaiting command execution", e);
} }
} }
public static void shutDown(String threadId) { public static void shutDown(String threadId) {
shutDown(threadId, null); shutDown(threadId, null);
} }
public static void shutDown(String threadId, Long timeoutMs) { public static void shutDown(String threadId, Long timeoutMs) {
if (timeoutMs == null) timeoutMs = Long.MAX_VALUE; Thread thread = VIRTUAL_THREADS.remove(threadId);
ExecutorService pool = null; if (thread == null) {
synchronized (EXECUTORS) { logger.warn("Thread not found: {}", threadId);
pool = EXECUTORS.get(threadId); return; // thread not found
} }
if (pool == null) return; // thread not found thread.interrupt();
pool.shutdown(); if (timeoutMs != null) {
try { try {
if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow(); thread.join(timeoutMs);
} catch (InterruptedException e) { } catch (InterruptedException e) {
pool.shutdownNow(); Thread.currentThread().interrupt();
throw new RuntimeException(e); logger.error("Interrupted while waiting for thread to shut down: {} - {}", threadId, e.getMessage(), e);
} finally { throw new RuntimeException("Interrupted while waiting for thread to shut down", e);
remove(threadId); }
} }
logger.info("Shut down thread: {}", threadId);
} }
public static void remove(String threadId) { public static void remove(String threadId) {
synchronized (EXECUTORS) { VIRTUAL_THREADS.remove(threadId);
EXECUTORS.remove(threadId);
}
synchronized (THREADS) {
THREADS.remove(threadId);
}
} }
// TODO: consolidate and cleanup apis // TODO: consolidate and cleanup apis
@ -104,9 +116,25 @@ public class ThreadUtils {
return submitToPool(Arrays.asList(task)).get(0); return submitToPool(Arrays.asList(task)).get(0);
} }
public static <T> Future<T> submitToPool(Callable<T> task) {
CompletableFuture<T> future = new CompletableFuture<>();
execute(() -> {
try {
T result = task.call();
future.complete(result);
} catch (Exception e) {
logger.error("Exception in callable task - {}", e.getMessage(), e);
future.completeExceptionally(e);
}
}, "pool-task-" + THREAD_COUNTER.incrementAndGet());
return future;
}
public static List<Future<?>> submitToPool(List<Runnable> tasks) { public static List<Future<?>> submitToPool(List<Runnable> tasks) {
List<Future<?>> futures = new ArrayList<>(); List<Future<?>> futures = new ArrayList<>();
for (Runnable task : tasks) futures.add(POOL.submit(task)); for (Runnable task : tasks) {
futures.add(execute(task, "pool-task-" + THREAD_COUNTER.incrementAndGet()));
}
return futures; return futures;
} }
@ -127,25 +155,41 @@ public class ThreadUtils {
} }
public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency, Long timeoutMs) { public static List<Future<?>> awaitTasks(Collection<Runnable> tasks, int maxConcurrency, Long timeoutMs) {
if (timeoutMs == null) timeoutMs = Long.MAX_VALUE; if (timeoutMs == null) timeoutMs = DEFAULT_TIMEOUT_MS;
if (tasks.isEmpty()) return new ArrayList<>(); if (tasks.isEmpty()) return new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(tasks.size());
try {
List<Future<?>> futures = new ArrayList<>();
for (Runnable task : tasks) futures.add(executorService.submit(task, null));
for (Future<?> future : futures) future.get(timeoutMs, TimeUnit.MILLISECONDS);
return futures;
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
executorService.shutdownNow();
}
}
private static boolean isCurrentThread(Thread thread, String threadId) { List<Future<?>> futures = new ArrayList<>();
synchronized (THREADS) { AtomicReference<List<Runnable>> remainingTasks = new AtomicReference<>(new ArrayList<>(tasks));
if (!THREADS.containsKey(threadId)) return false;
return thread == THREADS.get(threadId); while (true) {
List<Runnable> batchTasks = remainingTasks.get().subList(0, Math.min(maxConcurrency, remainingTasks.get().size()));
if (batchTasks.isEmpty()) break;
List<Future<?>> batchFutures = new ArrayList<>();
for (Runnable task : batchTasks) {
batchFutures.add(execute(task, "await-task-" + THREAD_COUNTER.incrementAndGet()));
}
for (Future<?> future : batchFutures) {
try {
future.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.error("Interrupted while awaiting task completion - {}", e.getMessage(), e);
throw new RuntimeException("Interrupted while awaiting task completion", e);
} catch (ExecutionException e) {
logger.error("Execution exception while awaiting task completion - {}", e.getMessage(), e);
throw new RuntimeException("Execution exception while awaiting task completion", e);
} catch (TimeoutException e) {
logger.error("Timeout while awaiting task completion - {}", e.getMessage(), e);
throw new RuntimeException("Timeout while awaiting task completion", e);
}
}
futures.addAll(batchFutures);
remainingTasks.set(remainingTasks.get().subList(Math.min(maxConcurrency, remainingTasks.get().size()), remainingTasks.get().size()));
} }
return futures;
} }
} }

View file

@ -19,14 +19,13 @@ package haveno.common.util;
import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
/** /**
* Utility class for creating single-threaded executors. * Utility class for creating single-threaded executors with virtual threading properties.
*/ */
public class SingleThreadExecutorUtils { public class SingleThreadExecutorUtils {
@ -41,17 +40,17 @@ public class SingleThreadExecutorUtils {
public static ExecutorService getNonDaemonSingleThreadExecutor(Class<?> aClass) { public static ExecutorService getNonDaemonSingleThreadExecutor(Class<?> aClass) {
validateClass(aClass); validateClass(aClass);
return getSingleThreadExecutor(aClass.getSimpleName(), false); return getSingleThreadExecutor(aClass.getSimpleName());
} }
public static ExecutorService getSingleThreadExecutor(String name) { public static ExecutorService getSingleThreadExecutor(String name) {
validateName(name); validateName(name);
return getSingleThreadExecutor(name, true); return createSingleThreadExecutor(name);
} }
public static ListeningExecutorService getSingleThreadListeningExecutor(String name) { public static ListeningExecutorService getSingleThreadListeningExecutor(String name) {
validateName(name); validateName(name);
return MoreExecutors.listeningDecorator(getSingleThreadExecutor(name)); return MoreExecutors.listeningDecorator(createSingleThreadExecutor(name));
} }
public static ExecutorService getSingleThreadExecutor(ThreadFactory threadFactory) { public static ExecutorService getSingleThreadExecutor(ThreadFactory threadFactory) {
@ -59,16 +58,16 @@ public class SingleThreadExecutorUtils {
return Executors.newSingleThreadExecutor(threadFactory); return Executors.newSingleThreadExecutor(threadFactory);
} }
private static ExecutorService getSingleThreadExecutor(String name, boolean isDaemonThread) { private static ExecutorService createSingleThreadExecutor(String name) {
ThreadFactory threadFactory = getThreadFactory(name, isDaemonThread); ThreadFactory threadFactory = getThreadFactory(name);
return Executors.newSingleThreadExecutor(threadFactory); return Executors.newSingleThreadExecutor(threadFactory);
} }
private static ThreadFactory getThreadFactory(String name, boolean isDaemonThread) { private static ThreadFactory getThreadFactory(String name) {
return new ThreadFactoryBuilder() // Virtual threads do not support the daemon property, so we omit it.
.setNameFormat(name + "-%d") return Thread.ofVirtual()
.setDaemon(isDaemonThread) .name(name + "-%d")
.build(); .factory();
} }
private static void validateClass(Class<?> aClass) { private static void validateClass(Class<?> aClass) {