diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java index 3333cc42..33c83a5c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/ExecutorStateEvent.java @@ -28,7 +28,7 @@ import java.util.concurrent.ThreadPoolExecutor; @ToString(exclude = "isThreadPoolExecutor") @Slf4j @KinesisClientInternalApi -class ExecutorStateEvent implements DiagnosticEvent { +public class ExecutorStateEvent implements DiagnosticEvent { private static final String MESSAGE = "Current thread pool executor state: "; private boolean isThreadPoolExecutor; @@ -41,6 +41,11 @@ class ExecutorStateEvent implements DiagnosticEvent { private int maximumPoolSize; ExecutorStateEvent(ExecutorService executor, LeaseCoordinator leaseCoordinator) { + this(executor); + this.leasesOwned = leaseCoordinator.getAssignments().size(); + } + + public ExecutorStateEvent(ExecutorService executor) { if (executor instanceof ThreadPoolExecutor) { this.isThreadPoolExecutor = true; @@ -52,8 +57,6 @@ class ExecutorStateEvent implements DiagnosticEvent { this.largestPoolSize = ex.getLargestPoolSize(); this.maximumPoolSize = ex.getMaximumPoolSize(); } - - this.leasesOwned = leaseCoordinator.getAssignments().size(); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index b3cfdb56..4f677524 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -97,6 +97,7 @@ public class HierarchicalShardSyncer { /** * Check and create leases for any new shards (e.g. following a reshard operation). Sync leases with Kinesis shards * (e.g. at startup, or when we reach end of a shard). + * Return true, if shard sync was performed. Return false, if shard sync is skipped. * * @param shardDetector * @param leaseRefresher @@ -109,18 +110,18 @@ public class HierarchicalShardSyncer { * @throws KinesisClientLibIOException */ // CHECKSTYLE:OFF CyclomaticComplexity - public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, + public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final MetricsScope scope, final boolean ignoreUnexpectedChildShards, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException, InterruptedException { final List latestShards = isLeaseTableEmpty ? getShardListAtInitialPosition(shardDetector, initialPosition) : getShardList(shardDetector); - checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope, + return checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, latestShards, ignoreUnexpectedChildShards, scope, isLeaseTableEmpty); } //Provide a pre-collcted list of shards to avoid calling ListShards API - public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, + public synchronized boolean checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, List latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { @@ -131,7 +132,7 @@ public class HierarchicalShardSyncer { log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size()); } else { log.warn("Skipping shard sync for {} as no shards found from service.", streamIdentifier); - return; + return false; } final Map shardIdToShardMap = constructShardIdToShardMap(latestShards); @@ -161,6 +162,7 @@ public class HierarchicalShardSyncer { } final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); + return true; } /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index 820d4528..dd576114 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -67,11 +67,11 @@ public class ShardSyncTask implements ConsumerTask { boolean shardSyncSuccess = true; try { - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, + boolean didPerformShardSync = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, scope, ignoreUnexpectedChildShards, leaseRefresher.isLeaseTableEmpty()); - if (shardSyncTaskIdleTimeMillis > 0) { + if (didPerformShardSync && shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } } catch (Exception e) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index 6a1ceff4..e03046a0 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -17,7 +17,6 @@ package software.amazon.kinesis.leases; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -27,6 +26,7 @@ import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.coordinator.ExecutorStateEvent; import software.amazon.kinesis.lifecycle.ConsumerTask; import software.amazon.kinesis.lifecycle.TaskResult; import software.amazon.kinesis.metrics.MetricsFactory; @@ -186,6 +186,9 @@ public class ShardSyncTaskManager { metricsFactory); future = CompletableFuture.supplyAsync(() -> currentTask.call(), executorService) .whenComplete((taskResult, exception) -> handlePendingShardSyncs(exception, taskResult)); + + log.info(new ExecutorStateEvent(executorService).message()); + submittedNewTask = true; if (log.isDebugEnabled()) { log.debug("Submitted new {} task.", currentTask.taskType());