diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 5cd6f472..debb89bb 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -142,8 +142,7 @@ public class HierarchicalShardSyncer { assertAllParentShardsAreClosed(inconsistentShardIds); } final List currentLeases = isMultiStreamMode ? - getLeasesForStream(shardDetector.streamIdentifier(), leaseRefresher) : - leaseRefresher.listLeases(); + leaseRefresher.listLeasesForStream(shardDetector.streamIdentifier()) : leaseRefresher.listLeases(); final MultiStreamArgs multiStreamArgs = new MultiStreamArgs(isMultiStreamMode, shardDetector.streamIdentifier()); final LeaseSynchronizer leaseSynchronizer = isLeaseTableEmpty ? new EmptyLeaseTableSynchronizer() : new NonEmptyLeaseTableSynchronizer(shardDetector, shardIdToShardMap, shardIdToChildShardIdsMap); @@ -171,29 +170,6 @@ public class HierarchicalShardSyncer { } } - // CHECKSTYLE:ON CyclomaticComplexity - - /** Note: This method has package level access solely for testing purposes. - * - * @param streamIdentifier We'll use this stream identifier to filter leases - * @param leaseRefresher Used to fetch leases - * @return Return list of leases (corresponding to shards) of the specified stream. - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - */ - static List getLeasesForStream(StreamIdentifier streamIdentifier, - LeaseRefresher leaseRefresher) - throws DependencyException, ProvisionedThroughputException, InvalidStateException { - List streamLeases = new ArrayList<>(); - for (Lease lease : leaseRefresher.listLeases()) { - if (streamIdentifier.serialize().equals(((MultiStreamLease)lease).streamIdentifier())) { - streamLeases.add(lease); - } - } - return streamLeases; - } - /** Helper method to detect a race condition between fetching the shards via paginated DescribeStream calls * and a reshard operation. * @param inconsistentShardIds diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java index 3ba22c2b..f45c4cc2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseRefresher.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.leases; import java.util.List; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -60,6 +61,18 @@ public interface LeaseRefresher { */ boolean waitUntilLeaseTableExists(long secondsBetweenPolls, long timeoutSeconds) throws DependencyException; + /** + * List all leases for a given stream synchronously. + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity + * + * @return list of leases + */ + List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, + ProvisionedThroughputException; + /** * List all objects in table synchronously. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java index 1c464afe..c5bb1f66 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresher.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.collect.ImmutableMap; import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; @@ -29,6 +30,7 @@ import software.amazon.awssdk.services.dynamodb.model.*; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; @@ -58,6 +60,9 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { private boolean newTableCreated = false; + private static final String STREAM_NAME = "streamName"; + private static final String DDB_STREAM_NAME = ":streamName"; + /** * Constructor. * @@ -263,12 +268,21 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { return System.currentTimeMillis() - startTime; } + /** + * {@inheritDoc} + */ + @Override + public List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, + InvalidStateException, ProvisionedThroughputException { + return list( null, streamIdentifier); + } + /** * {@inheritDoc} */ @Override public List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return list(null); + return list(null, null); } /** @@ -277,22 +291,34 @@ public class DynamoDBLeaseRefresher implements LeaseRefresher { @Override public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - return list(1).isEmpty(); + return list(1, null).isEmpty(); } /** * List with the given page size. Package access for integration testing. * * @param limit number of items to consider at a time - used by integration tests to force paging. + * @param streamIdentifier streamIdentifier for multi-stream mode. Can be null. * @return list of leases * @throws InvalidStateException if table does not exist * @throws DependencyException if DynamoDB scan fail in an unexpected way * @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity */ - List list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + List list(Integer limit, StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, + ProvisionedThroughputException { + log.debug("Listing leases from table {}", table); ScanRequest.Builder scanRequestBuilder = ScanRequest.builder().tableName(table); + + if (streamIdentifier != null) { + final Map expressionAttributeValues = ImmutableMap.of( + DDB_STREAM_NAME, AttributeValue.builder().s(streamIdentifier.serialize()).build() + ); + scanRequestBuilder = scanRequestBuilder.filterExpression(STREAM_NAME + " = " + DDB_STREAM_NAME) + .expressionAttributeValues(expressionAttributeValues); + } + if (limit != null) { scanRequestBuilder = scanRequestBuilder.limit(limit); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java index 233ce724..d9d7d01e 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ExceptionThrowingLeaseRefresher.java @@ -19,6 +19,7 @@ import java.util.List; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; @@ -54,6 +55,7 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { DELETELEASE(9), DELETEALL(10), UPDATELEASE(11), + LISTLEASESFORSTREAM(12), NONE(Integer.MIN_VALUE); private Integer index; @@ -129,6 +131,13 @@ public class ExceptionThrowingLeaseRefresher implements LeaseRefresher { return leaseRefresher.waitUntilLeaseTableExists(secondsBetweenPolls, timeoutSeconds); } + @Override + public List listLeasesForStream(StreamIdentifier streamIdentifier) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + throwExceptions("listLeasesForStream", ExceptionThrowingLeaseRefresherMethods.LISTLEASESFORSTREAM); + + return leaseRefresher.listLeasesForStream(streamIdentifier); + } + @Override public List listLeases() throws DependencyException, InvalidStateException, ProvisionedThroughputException { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 374da4cd..44f6acf4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -1039,7 +1039,7 @@ public class HierarchicalShardSyncerTest { final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); when(shardDetector.listShards()).thenReturn(shards); - when(dynamoDBLeaseRefresher.listLeases()).thenReturn(leases); + when(dynamoDBLeaseRefresher.listLeasesForStream(any(StreamIdentifier.class))).thenReturn(leases); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); setupMultiStream(); hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, @@ -1049,7 +1049,7 @@ public class HierarchicalShardSyncerTest { assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); verify(shardDetector, times(2)).listShards(); - verify(dynamoDBLeaseRefresher).listLeases(); + verify(dynamoDBLeaseRefresher).listLeasesForStream(any(StreamIdentifier.class)); verify(dynamoDBLeaseRefresher).deleteLease(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java index 414f7975..c9df9106 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseRefresherIntegrationTest.java @@ -71,7 +71,7 @@ public class DynamoDBLeaseRefresherIntegrationTest extends LeaseIntegrationTest Collection expected = builder.build().values(); // The / 3 here ensures that we will test Dynamo's paging mechanics. - List actual = leaseRefresher.list(numRecordsToPut / 3); + List actual = leaseRefresher.list(numRecordsToPut / 3, null); for (Lease lease : actual) { assertNotNull(expected.remove(lease));