From 0246e1e852540ef77b6b623e1db209d90ad29561 Mon Sep 17 00:00:00 2001 From: Ashwin Giridharan Date: Mon, 22 Jun 2020 14:30:28 -0700 Subject: [PATCH] Only making leader to do the stream sync --- .../software/amazon/kinesis/coordinator/Scheduler.java | 8 +++++++- .../software/amazon/kinesis/lifecycle/ShutdownTask.java | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 6a87db23..b13fc6b1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -423,7 +423,9 @@ public class Scheduler implements Runnable { cleanupShardConsumers(assignedShards); // check for new streams and sync with the scheduler state - checkAndSyncStreamShardsAndLeases(); + if (isLeader()) { + checkAndSyncStreamShardsAndLeases(); + } logExecutorState(); slog.info("Sleeping ..."); @@ -440,6 +442,10 @@ public class Scheduler implements Runnable { slog.resetInfoLogging(); } + private boolean isLeader() { + return leaderDecider.isLeader(leaseManagementConfig.workerIdentifier()); + } + /** * Note: This method has package level access solely for testing purposes. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index d8496a91..5e11fddf 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -113,8 +113,8 @@ public class ShutdownTask implements ConsumerTask { try { try { - log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - leaseKeyProvider.apply(shardInfo), shardInfo.concurrencyToken(), reason); + log.debug("Invoking shutdown() for shard {} with child shards {} , concurrencyToken {}. Shutdown reason: {}", + leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason); final long startTime = System.currentTimeMillis(); if (reason == ShutdownReason.SHARD_END) {