diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 6a87db23..b13fc6b1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -423,7 +423,9 @@ public class Scheduler implements Runnable { cleanupShardConsumers(assignedShards); // check for new streams and sync with the scheduler state - checkAndSyncStreamShardsAndLeases(); + if (isLeader()) { + checkAndSyncStreamShardsAndLeases(); + } logExecutorState(); slog.info("Sleeping ..."); @@ -440,6 +442,10 @@ public class Scheduler implements Runnable { slog.resetInfoLogging(); } + private boolean isLeader() { + return leaderDecider.isLeader(leaseManagementConfig.workerIdentifier()); + } + /** * Note: This method has package level access solely for testing purposes. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index d8496a91..5e11fddf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -113,8 +113,8 @@ public class ShutdownTask implements ConsumerTask { try { try { - log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - leaseKeyProvider.apply(shardInfo), shardInfo.concurrencyToken(), reason); + log.debug("Invoking shutdown() for shard {} with child shards {} , concurrencyToken {}. Shutdown reason: {}", + leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason); final long startTime = System.currentTimeMillis(); if (reason == ShutdownReason.SHARD_END) {