diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index c67b3d80..b3cfdb56 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -125,6 +125,8 @@ public class HierarchicalShardSyncer { List latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 + if (!CollectionUtils.isNullOrEmpty(latestShards)) { log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size()); } else { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 8dc46364..b7f38a4e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -217,19 +217,6 @@ public interface LeaseRefresher { */ boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException; - /** - * Check (synchronously) if there are any leases in the lease table for a given stream identifier. - * - * @param streamIdentifier for multi-stream mode. Can be null. - * @return true if there are no leases in the lease table - * - * @throws DependencyException if DynamoDB scan fails in an unexpected way - * @throws InvalidStateException if lease table does not exist - * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity - */ - boolean isLeaseTableEmptyForStreamIdentifier(StreamIdentifier streamIdentifier) throws DependencyException, - InvalidStateException, ProvisionedThroughputException; - /** * Gets the current checkpoint of the shard. This is useful in the resharding use case * where we will wait for the parent shard to complete before starting on the records from a child shard. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index 12e05550..820d4528 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -69,7 +69,7 @@ public class ShardSyncTask implements ConsumerTask { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, scope, ignoreUnexpectedChildShards, - leaseRefresher.isLeaseTableEmptyForStreamIdentifier(shardDetector.streamIdentifier())); + leaseRefresher.isLeaseTableEmpty()); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 913c7331..df5746a2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -292,13 +292,7 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { @Override public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return isLeaseTableEmptyForStreamIdentifier(null); - } - - @Override - public boolean isLeaseTableEmptyForStreamIdentifier(StreamIdentifier streamIdentifier) - throws DependencyException, ProvisionedThroughputException, InvalidStateException { - return list(1, 1, streamIdentifier).isEmpty(); + return list(1, 1, null).isEmpty(); } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index 557e2b38..81a49839 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -215,11 +215,6 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { return false; } - @Override - public boolean isLeaseTableEmptyForStreamIdentifier(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return false; - } - @Override public ExtendedSequenceNumber getCheckpoint(final String leaseKey) throws ProvisionedThroughputException, InvalidStateException, DependencyException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index f4c1dd2c..c390987c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -335,8 +335,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier( - StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER))); + SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -418,8 +417,8 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - latestShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier( - StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER))); + latestShards, false, SCOPE, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -683,7 +682,7 @@ public class HierarchicalShardSyncerTest { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, - dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER))); + dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, never()).listLeases(); @@ -763,8 +762,7 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier( - StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER))); + SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmpty()); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());