From 03d32f4a7c227d9a435144d837dab96348b756a5 Mon Sep 17 00:00:00 2001 From: XMRZombie Date: Tue, 29 Apr 2025 00:54:34 +0000 Subject: [PATCH 1/3] Update ThreadUtils.java Updating threading 'engine' for virtual threading --- .../main/java/haveno/common/ThreadUtils.java | 186 ++++++++++++------ 1 file changed, 128 insertions(+), 58 deletions(-) diff --git a/common/src/main/java/haveno/common/ThreadUtils.java b/common/src/main/java/haveno/common/ThreadUtils.java index e463e83154..f5defe1119 100644 --- a/common/src/main/java/haveno/common/ThreadUtils.java +++ b/common/src/main/java/haveno/common/ThreadUtils.java @@ -16,41 +16,44 @@ */ package haveno.common; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; public class ThreadUtils { - - private static final Map EXECUTORS = new HashMap<>(); - private static final Map THREADS = new HashMap<>(); - private static final int POOL_SIZE = 10; - private static final ExecutorService POOL = Executors.newFixedThreadPool(POOL_SIZE); + + private static final Map VIRTUAL_THREADS = new ConcurrentHashMap<>(); /** - * Execute the given command in a thread with the given id. - * + * Execute the given command in a virtual thread with the given id. + * * @param command the command to execute * @param threadId the thread id */ public static Future execute(Runnable command, String threadId) { - synchronized (EXECUTORS) { - if (!EXECUTORS.containsKey(threadId)) EXECUTORS.put(threadId, Executors.newFixedThreadPool(1)); - return EXECUTORS.get(threadId).submit(() -> { - synchronized (THREADS) { - THREADS.put(threadId, Thread.currentThread()); - } - Thread.currentThread().setName(threadId); + CompletableFuture future = new CompletableFuture<>(); + Thread virtualThread = Thread.ofVirtual().name(threadId).start(() -> { + try { command.run(); - }); - } + future.complete(null); + } catch (Exception e) { + future.completeExceptionally(e); + } finally { + VIRTUAL_THREADS.remove(threadId); + } + }); + VIRTUAL_THREADS.put(threadId, virtualThread); + return future; } /** @@ -67,85 +70,152 @@ public class ThreadUtils { } } + /** + * Shuts down the virtual thread with the given id. + * + * @param threadId the thread id + */ public static void shutDown(String threadId) { shutDown(threadId, null); } + /** + * Shuts down the virtual thread with the given id and an optional timeout. + * + * @param threadId the thread id + * @param timeoutMs the timeout in milliseconds + */ public static void shutDown(String threadId, Long timeoutMs) { - if (timeoutMs == null) timeoutMs = Long.MAX_VALUE; - ExecutorService pool = null; - synchronized (EXECUTORS) { - pool = EXECUTORS.get(threadId); - } - if (pool == null) return; // thread not found - pool.shutdown(); - try { - if (!pool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) pool.shutdownNow(); - } catch (InterruptedException e) { - pool.shutdownNow(); - throw new RuntimeException(e); - } finally { - remove(threadId); + Thread thread = VIRTUAL_THREADS.get(threadId); + if (thread == null) return; // thread not found + thread.interrupt(); + if (timeoutMs != null) { + try { + thread.join(timeoutMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } } } + /** + * Removes the virtual thread with the given id from the tracking map. + * + * @param threadId the thread id + */ public static void remove(String threadId) { - synchronized (EXECUTORS) { - EXECUTORS.remove(threadId); - } - synchronized (THREADS) { - THREADS.remove(threadId); - } + VIRTUAL_THREADS.remove(threadId); } - // TODO: consolidate and cleanup apis - + /** + * Submit a single task to be executed in a virtual thread. + * + * @param task the task to execute + * @return a Future representing the pending completion of the task + */ public static Future submitToPool(Runnable task) { return submitToPool(Arrays.asList(task)).get(0); } + /** + * Submit a single callable task to be executed in a virtual thread. + * + * @param task the task to execute + * @return a Future representing the pending completion of the task + */ + public static Future submitToPool(Callable task) { + CompletableFuture future = new CompletableFuture<>(); + execute(() -> { + try { + T result = task.call(); + future.complete(result); + } catch (Exception e) { + future.completeExceptionally(e); + } + }, "pool-task-" + task.hashCode()); + return future; + } + + /** + * Submit a list of tasks to be executed in virtual threads. + * + * @param tasks the tasks to execute + * @return a list of Futures representing the pending completion of the tasks + */ public static List> submitToPool(List tasks) { List> futures = new ArrayList<>(); - for (Runnable task : tasks) futures.add(POOL.submit(task)); + for (Runnable task : tasks) { + futures.add(execute(task, "pool-task-" + task.hashCode())); + } return futures; } + /** + * Await the completion of a single task. + * + * @param task the task to execute + * @return a Future representing the pending completion of the task + */ public static Future awaitTask(Runnable task) { return awaitTask(task, null); } + /** + * Await the completion of a single task with an optional timeout. + * + * @param task the task to execute + * @param timeoutMs the timeout in milliseconds + * @return a Future representing the pending completion of the task + */ public static Future awaitTask(Runnable task, Long timeoutMs) { return awaitTasks(Arrays.asList(task), 1, timeoutMs).get(0); } + /** + * Await the completion of a collection of tasks. + * + * @param tasks the tasks to execute + * @return a list of Futures representing the pending completion of the tasks + */ public static List> awaitTasks(Collection tasks) { return awaitTasks(tasks, tasks.size()); } + /** + * Await the completion of a collection of tasks with a specified maximum concurrency. + * + * @param tasks the tasks to execute + * @param maxConcurrency the maximum number of concurrent tasks + * @return a list of Futures representing the pending completion of the tasks + */ public static List> awaitTasks(Collection tasks, int maxConcurrency) { return awaitTasks(tasks, maxConcurrency, null); } + /** + * Await the completion of a collection of tasks with a specified maximum concurrency and optional timeout. + * + * @param tasks the tasks to execute + * @param maxConcurrency the maximum number of concurrent tasks + * @param timeoutMs the timeout in milliseconds + * @return a list of Futures representing the pending completion of the tasks + */ public static List> awaitTasks(Collection tasks, int maxConcurrency, Long timeoutMs) { if (timeoutMs == null) timeoutMs = Long.MAX_VALUE; if (tasks.isEmpty()) return new ArrayList<>(); - ExecutorService executorService = Executors.newFixedThreadPool(tasks.size()); - try { - List> futures = new ArrayList<>(); - for (Runnable task : tasks) futures.add(executorService.submit(task, null)); - for (Future future : futures) future.get(timeoutMs, TimeUnit.MILLISECONDS); - return futures; - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - executorService.shutdownNow(); - } - } - private static boolean isCurrentThread(Thread thread, String threadId) { - synchronized (THREADS) { - if (!THREADS.containsKey(threadId)) return false; - return thread == THREADS.get(threadId); + List> futures = new ArrayList<>(); + for (Runnable task : tasks) { + futures.add(execute(task, "await-task-" + task.hashCode())); } + for (Future future : futures) { + try { + future.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + return futures; } } From e16e2ebd032c3bcbb758f0daa8f77cffa681818e Mon Sep 17 00:00:00 2001 From: XMRZombie Date: Tue, 29 Apr 2025 00:56:01 +0000 Subject: [PATCH 2/3] Update SingleThreadExecutorUtils.java Trying to apply virtual threading properties to singlethreadexecutor --- .../util/SingleThreadExecutorUtils.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/common/src/main/java/haveno/common/util/SingleThreadExecutorUtils.java b/common/src/main/java/haveno/common/util/SingleThreadExecutorUtils.java index d9af624c67..77c194063c 100644 --- a/common/src/main/java/haveno/common/util/SingleThreadExecutorUtils.java +++ b/common/src/main/java/haveno/common/util/SingleThreadExecutorUtils.java @@ -19,14 +19,13 @@ package haveno.common.util; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; /** - * Utility class for creating single-threaded executors. + * Utility class for creating single-threaded executors with virtual threading properties. */ public class SingleThreadExecutorUtils { @@ -41,17 +40,17 @@ public class SingleThreadExecutorUtils { public static ExecutorService getNonDaemonSingleThreadExecutor(Class aClass) { validateClass(aClass); - return getSingleThreadExecutor(aClass.getSimpleName(), false); + return getSingleThreadExecutor(aClass.getSimpleName()); } public static ExecutorService getSingleThreadExecutor(String name) { validateName(name); - return getSingleThreadExecutor(name, true); + return createSingleThreadExecutor(name); } public static ListeningExecutorService getSingleThreadListeningExecutor(String name) { validateName(name); - return MoreExecutors.listeningDecorator(getSingleThreadExecutor(name)); + return MoreExecutors.listeningDecorator(createSingleThreadExecutor(name)); } public static ExecutorService getSingleThreadExecutor(ThreadFactory threadFactory) { @@ -59,16 +58,16 @@ public class SingleThreadExecutorUtils { return Executors.newSingleThreadExecutor(threadFactory); } - private static ExecutorService getSingleThreadExecutor(String name, boolean isDaemonThread) { - ThreadFactory threadFactory = getThreadFactory(name, isDaemonThread); + private static ExecutorService createSingleThreadExecutor(String name) { + ThreadFactory threadFactory = getThreadFactory(name); return Executors.newSingleThreadExecutor(threadFactory); } - private static ThreadFactory getThreadFactory(String name, boolean isDaemonThread) { - return new ThreadFactoryBuilder() - .setNameFormat(name + "-%d") - .setDaemon(isDaemonThread) - .build(); + private static ThreadFactory getThreadFactory(String name) { + // Virtual threads do not support the daemon property, so we omit it. + return Thread.ofVirtual() + .name(name + "-%d") + .factory(); } private static void validateClass(Class aClass) { From 6d746c05de339454e6482be52b21b883d11d47d7 Mon Sep 17 00:00:00 2001 From: XMRZombie Date: Fri, 2 May 2025 22:44:54 +0000 Subject: [PATCH 3/3] Update ThreadUtils.java - Utilizes Java's virtual threads (Thread.ofVirtual()), which are lightweight and designed for handling a large number of concurrent tasks with minimal overhead - Names virtual threads directly during creation using Thread.ofVirtual().name(threadId). - Manages virtual threads using a ConcurrentHashMap for better concurrency and thread-safety. - Shuts down virtual threads by interrupting them and optionally waiting for them to join with a timeout. Removes threads from the ConcurrentHashMap. - Controls concurrency by submitting tasks to virtual threads in batches, with a configurable maximum concurrency level. - AtomicInteger and AtomicReference for thread-safe counter increments and task management, reducing the need for synchronized blocks --- .../main/java/haveno/common/ThreadUtils.java | 154 ++++++++---------- 1 file changed, 64 insertions(+), 90 deletions(-) diff --git a/common/src/main/java/haveno/common/ThreadUtils.java b/common/src/main/java/haveno/common/ThreadUtils.java index f5defe1119..a945014bc1 100644 --- a/common/src/main/java/haveno/common/ThreadUtils.java +++ b/common/src/main/java/haveno/common/ThreadUtils.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -29,10 +28,17 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; - +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class ThreadUtils { - - private static final Map VIRTUAL_THREADS = new ConcurrentHashMap<>(); + + private static final Logger logger = LoggerFactory.getLogger(ThreadUtils.class); + private static final ConcurrentHashMap VIRTUAL_THREADS = new ConcurrentHashMap<>(); + private static final AtomicInteger THREAD_COUNTER = new AtomicInteger(0); + private static final long DEFAULT_TIMEOUT_MS = 5000; // Default timeout for operations /** * Execute the given command in a virtual thread with the given id. @@ -47,11 +53,11 @@ public class ThreadUtils { command.run(); future.complete(null); } catch (Exception e) { + logger.error("Exception in thread: {} - {}", threadId, e.getMessage(), e); future.completeExceptionally(e); - } finally { - VIRTUAL_THREADS.remove(threadId); } }); + VIRTUAL_THREADS.put(threadId, virtualThread); return future; } @@ -65,65 +71,51 @@ public class ThreadUtils { public static void await(Runnable command, String threadId) { try { execute(command, threadId).get(); - } catch (Exception e) { - throw new RuntimeException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted while awaiting command execution in thread: {} - {}", threadId, e.getMessage(), e); + throw new RuntimeException("Interrupted while awaiting command execution", e); + } catch (ExecutionException e) { + logger.error("Execution exception while awaiting command execution in thread: {} - {}", threadId, e.getMessage(), e); + throw new RuntimeException("Execution exception while awaiting command execution", e); } } - /** - * Shuts down the virtual thread with the given id. - * - * @param threadId the thread id - */ + public static void shutDown(String threadId) { shutDown(threadId, null); } - /** - * Shuts down the virtual thread with the given id and an optional timeout. - * - * @param threadId the thread id - * @param timeoutMs the timeout in milliseconds - */ public static void shutDown(String threadId, Long timeoutMs) { - Thread thread = VIRTUAL_THREADS.get(threadId); - if (thread == null) return; // thread not found + Thread thread = VIRTUAL_THREADS.remove(threadId); + if (thread == null) { + logger.warn("Thread not found: {}", threadId); + return; // thread not found + } thread.interrupt(); if (timeoutMs != null) { try { thread.join(timeoutMs); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - throw new RuntimeException(e); + logger.error("Interrupted while waiting for thread to shut down: {} - {}", threadId, e.getMessage(), e); + throw new RuntimeException("Interrupted while waiting for thread to shut down", e); } } + logger.info("Shut down thread: {}", threadId); } - /** - * Removes the virtual thread with the given id from the tracking map. - * - * @param threadId the thread id - */ + public static void remove(String threadId) { VIRTUAL_THREADS.remove(threadId); } - /** - * Submit a single task to be executed in a virtual thread. - * - * @param task the task to execute - * @return a Future representing the pending completion of the task - */ + // TODO: consolidate and cleanup apis + public static Future submitToPool(Runnable task) { return submitToPool(Arrays.asList(task)).get(0); } - /** - * Submit a single callable task to be executed in a virtual thread. - * - * @param task the task to execute - * @return a Future representing the pending completion of the task - */ public static Future submitToPool(Callable task) { CompletableFuture future = new CompletableFuture<>(); execute(() -> { @@ -131,91 +123,73 @@ public class ThreadUtils { T result = task.call(); future.complete(result); } catch (Exception e) { + logger.error("Exception in callable task - {}", e.getMessage(), e); future.completeExceptionally(e); } - }, "pool-task-" + task.hashCode()); + }, "pool-task-" + THREAD_COUNTER.incrementAndGet()); return future; } - /** - * Submit a list of tasks to be executed in virtual threads. - * - * @param tasks the tasks to execute - * @return a list of Futures representing the pending completion of the tasks - */ public static List> submitToPool(List tasks) { List> futures = new ArrayList<>(); for (Runnable task : tasks) { - futures.add(execute(task, "pool-task-" + task.hashCode())); + futures.add(execute(task, "pool-task-" + THREAD_COUNTER.incrementAndGet())); } return futures; } - /** - * Await the completion of a single task. - * - * @param task the task to execute - * @return a Future representing the pending completion of the task - */ public static Future awaitTask(Runnable task) { return awaitTask(task, null); } - /** - * Await the completion of a single task with an optional timeout. - * - * @param task the task to execute - * @param timeoutMs the timeout in milliseconds - * @return a Future representing the pending completion of the task - */ public static Future awaitTask(Runnable task, Long timeoutMs) { return awaitTasks(Arrays.asList(task), 1, timeoutMs).get(0); } - /** - * Await the completion of a collection of tasks. - * - * @param tasks the tasks to execute - * @return a list of Futures representing the pending completion of the tasks - */ public static List> awaitTasks(Collection tasks) { return awaitTasks(tasks, tasks.size()); } - /** - * Await the completion of a collection of tasks with a specified maximum concurrency. - * - * @param tasks the tasks to execute - * @param maxConcurrency the maximum number of concurrent tasks - * @return a list of Futures representing the pending completion of the tasks - */ public static List> awaitTasks(Collection tasks, int maxConcurrency) { return awaitTasks(tasks, maxConcurrency, null); } - /** - * Await the completion of a collection of tasks with a specified maximum concurrency and optional timeout. - * - * @param tasks the tasks to execute - * @param maxConcurrency the maximum number of concurrent tasks - * @param timeoutMs the timeout in milliseconds - * @return a list of Futures representing the pending completion of the tasks - */ public static List> awaitTasks(Collection tasks, int maxConcurrency, Long timeoutMs) { - if (timeoutMs == null) timeoutMs = Long.MAX_VALUE; + if (timeoutMs == null) timeoutMs = DEFAULT_TIMEOUT_MS; if (tasks.isEmpty()) return new ArrayList<>(); List> futures = new ArrayList<>(); - for (Runnable task : tasks) { - futures.add(execute(task, "await-task-" + task.hashCode())); - } - for (Future future : futures) { - try { - future.get(timeoutMs, TimeUnit.MILLISECONDS); - } catch (InterruptedException | ExecutionException | TimeoutException e) { - throw new RuntimeException(e); + AtomicReference> remainingTasks = new AtomicReference<>(new ArrayList<>(tasks)); + + while (true) { + List batchTasks = remainingTasks.get().subList(0, Math.min(maxConcurrency, remainingTasks.get().size())); + if (batchTasks.isEmpty()) break; + + List> batchFutures = new ArrayList<>(); + for (Runnable task : batchTasks) { + batchFutures.add(execute(task, "await-task-" + THREAD_COUNTER.incrementAndGet())); } + + for (Future future : batchFutures) { + try { + future.get(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Interrupted while awaiting task completion - {}", e.getMessage(), e); + throw new RuntimeException("Interrupted while awaiting task completion", e); + } catch (ExecutionException e) { + logger.error("Execution exception while awaiting task completion - {}", e.getMessage(), e); + throw new RuntimeException("Execution exception while awaiting task completion", e); + } catch (TimeoutException e) { + logger.error("Timeout while awaiting task completion - {}", e.getMessage(), e); + throw new RuntimeException("Timeout while awaiting task completion", e); + } + } + + futures.addAll(batchFutures); + remainingTasks.set(remainingTasks.get().subList(Math.min(maxConcurrency, remainingTasks.get().size()), remainingTasks.get().size())); } + return futures; } }