Adding logging around the shard sync executor queuesize

This commit is contained in:
Ashwin Giridharan 2020-07-22 11:37:25 -07:00
parent 8aec062e64
commit 75fe6060f6
2 changed files with 10 additions and 4 deletions

View file

@ -28,7 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor;
@ToString(exclude = "isThreadPoolExecutor") @ToString(exclude = "isThreadPoolExecutor")
@Slf4j @Slf4j
@KinesisClientInternalApi @KinesisClientInternalApi
class ExecutorStateEvent implements DiagnosticEvent { public class ExecutorStateEvent implements DiagnosticEvent {
private static final String MESSAGE = "Current thread pool executor state: "; private static final String MESSAGE = "Current thread pool executor state: ";
private boolean isThreadPoolExecutor; private boolean isThreadPoolExecutor;
@ -41,6 +41,11 @@ class ExecutorStateEvent implements DiagnosticEvent {
private int maximumPoolSize; private int maximumPoolSize;
ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) { ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) {
this(executor);
this.leasesOwned = leaseCoordinator.getAssignments().size();
}
public ExecutorStateEvent(ExecutorService executor) {
if (executor instanceof ThreadPoolExecutor) { if (executor instanceof ThreadPoolExecutor) {
this.isThreadPoolExecutor = true; this.isThreadPoolExecutor = true;
@ -52,8 +57,6 @@ class ExecutorStateEvent implements DiagnosticEvent {
this.largestPoolSize = ex.getLargestPoolSize(); this.largestPoolSize = ex.getLargestPoolSize();
this.maximumPoolSize = ex.getMaximumPoolSize(); this.maximumPoolSize = ex.getMaximumPoolSize();
} }
this.leasesOwned = leaseCoordinator.getAssignments().size();
} }
@Override @Override

View file

@ -17,7 +17,6 @@ package software.amazon.kinesis.leases;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
@ -27,6 +26,7 @@ import lombok.Data;
import lombok.NonNull; import lombok.NonNull;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.coordinator.ExecutorStateEvent;
import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.ConsumerTask;
import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.lifecycle.TaskResult;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
@ -186,6 +186,9 @@ public class ShardSyncTaskManager {
metricsFactory); metricsFactory);
future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService)
.whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult)); .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult));
log.info(new ExecutorStateEvent(executorService).message());
submittedNewTask = true; submittedNewTask = true;
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Submitted new {} task.", currentTask.taskType()); log.debug("Submitted new {} task.", currentTask.taskType());