diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java index 3333cc42..33c83a5c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -28,7 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; @ToString(exclude = "isThreadPoolExecutor") @Slf4j @KinesisClientInternalApi -class ExecutorStateEvent implements DiagnosticEvent { +public class ExecutorStateEvent implements DiagnosticEvent { private static final String MESSAGE = "Current thread pool executor state: "; private boolean isThreadPoolExecutor; @@ -41,6 +41,11 @@ class ExecutorStateEvent implements DiagnosticEvent { private int maximumPoolSize; ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) { + this(executor); + this.leasesOwned = leaseCoordinator.getAssignments().size(); + } + + public ExecutorStateEvent(ExecutorService executor) { if (executor instanceof ThreadPoolExecutor) { this.isThreadPoolExecutor = true; @@ -52,8 +57,6 @@ class ExecutorStateEvent implements DiagnosticEvent { this.largestPoolSize = ex.getLargestPoolSize(); this.maximumPoolSize = ex.getMaximumPoolSize(); } - - this.leasesOwned = leaseCoordinator.getAssignments().size(); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index 6a1ceff4..e03046a0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -17,7 +17,6 @@ package software.amazon.kinesis.leases; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -27,6 +26,7 @@ import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.coordinator.ExecutorStateEvent; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.MetricsFactory; @@ -186,6 +186,9 @@ public class ShardSyncTaskManager { metricsFactory); future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult)); + + log.info(new ExecutorStateEvent(executorService).message()); + submittedNewTask = true; if (log.isDebugEnabled()) { log.debug("Submitted new {} task.", currentTask.taskType());