diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index b3cfdb56..c67b3d80 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -125,8 +125,6 @@ public class HierarchicalShardSyncer { List latestShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope, final boolean isLeaseTableEmpty) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - //TODO: Need to add multistream support for this https://sim.amazon.com/issues/KinesisLTR-191 - if (!CollectionUtils.isNullOrEmpty(latestShards)) { log.debug("{} - Num shards: {}", streamIdentifier, latestShards.size()); } else { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index b7f38a4e..8dc46364 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -217,6 +217,19 @@ public interface LeaseRefresher { */ boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Check (synchronously) if there are any leases in the lease table for a given stream identifier. + * + * @param streamIdentifier for multi-stream mode. Can be null. + * @return true if there are no leases in the lease table + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity + */ + boolean isLeaseTableEmptyForStreamIdentifier(StreamIdentifier streamIdentifier) throws DependencyException, + InvalidStateException, ProvisionedThroughputException; + /** * Gets the current checkpoint of the shard. This is useful in the resharding use case * where we will wait for the parent shard to complete before starting on the records from a child shard. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index 820d4528..12e05550 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -69,7 +69,7 @@ public class ShardSyncTask implements ConsumerTask { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, scope, ignoreUnexpectedChildShards, - leaseRefresher.isLeaseTableEmpty()); + leaseRefresher.isLeaseTableEmptyForStreamIdentifier(shardDetector.streamIdentifier())); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index df5746a2..913c7331 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -292,7 +292,13 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { @Override public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return list(1, 1, null).isEmpty(); + return isLeaseTableEmptyForStreamIdentifier(null); + } + + @Override + public boolean isLeaseTableEmptyForStreamIdentifier(StreamIdentifier streamIdentifier) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + return list(1, 1, streamIdentifier).isEmpty(); } /** diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index 81a49839..557e2b38 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -215,6 +215,11 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { return false; } + @Override + public boolean isLeaseTableEmptyForStreamIdentifier(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + return false; + } + @Override public ExtendedSequenceNumber getCheckpoint(final String leaseKey) throws ProvisionedThroughputException, InvalidStateException, DependencyException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index c390987c..f4c1dd2c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -335,7 +335,8 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier( + StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER))); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -417,8 +418,8 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - latestShards, false, SCOPE, - dynamoDBLeaseRefresher.isLeaseTableEmpty()); + latestShards, false, SCOPE, dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier( + StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER))); final Set expectedShardIds = new HashSet<>( toMultiStreamLeaseList(Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"))); @@ -682,7 +683,7 @@ public class HierarchicalShardSyncerTest { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, - dynamoDBLeaseRefresher.isLeaseTableEmpty()); + dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier(StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER))); } finally { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, never()).listLeases(); @@ -762,7 +763,8 @@ public class HierarchicalShardSyncerTest { setupMultiStream(); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmpty()); + SCOPE, true, dynamoDBLeaseRefresher.isLeaseTableEmptyForStreamIdentifier( + StreamIdentifier.multiStreamInstance(STREAM_IDENTIFIER))); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());