diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java new file mode 100644 index 00000000..d0d332d9 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/DeletedStreamListProvider.java @@ -0,0 +1,38 @@ +package software.amazon.kinesis.coordinator; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import lombok.extern.slf4j.Slf4j; + +import software.amazon.kinesis.common.StreamIdentifier; + +/** + * This class is used for storing in-memory set of streams which are no longer existing (deleted) and needs to be + * cleaned up from KCL's in memory state. + */ +@Slf4j +public class DeletedStreamListProvider { + + private final Set deletedStreams; + + public DeletedStreamListProvider() { + deletedStreams = ConcurrentHashMap.newKeySet(); + } + + public void add(StreamIdentifier streamIdentifier) { + log.info("Added {}", streamIdentifier); + deletedStreams.add(streamIdentifier); + } + + /** + * Method returns and empties the current set of streams + * @return set of deleted Streams + */ + public Set purgeAllDeletedStream() { + final Set response = new HashSet<>(deletedStreams); + deletedStreams.removeAll(response); + return response; + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index e95ddb6f..dacb7ba1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -116,6 +116,7 @@ public class Scheduler implements Runnable { private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count"; private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count"; private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count"; + private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count"; private final SchedulerLog slog = new SchedulerLog(); @@ -166,6 +167,8 @@ public class Scheduler implements Runnable { private final LeaseCleanupManager leaseCleanupManager; private final SchemaRegistryDecoder schemaRegistryDecoder; + private final DeletedStreamListProvider deletedStreamListProvider; + // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. private final ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap<>(); @@ -251,9 +254,10 @@ public class Scheduler implements Runnable { this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService(); this.diagnosticEventFactory = diagnosticEventFactory; this.diagnosticEventHandler = new DiagnosticEventLogger(); + this.deletedStreamListProvider = new DeletedStreamListProvider(); this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig .leaseManagementFactory(leaseSerializer, isMultiStreamMode) - .createShardSyncTaskManager(this.metricsFactory, streamConfig); + .createShardSyncTaskManager(this.metricsFactory, streamConfig, this.deletedStreamListProvider); this.shardPrioritization = this.coordinatorConfig.shardPrioritization(); this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion(); this.skipShardSyncAtWorkerInitializationIfLeasesExist = @@ -535,6 +539,19 @@ public class Scheduler implements Runnable { .partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet())); final Set staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier -> Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet()); + // These are the streams which are deleted in Kinesis and we encounter resource not found during + // shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will + // not have any data. + // Filter streams based on newStreamConfigMap so that we don't override input to KCL in any case. + final Set deletedStreamSet = this.deletedStreamListProvider + .purgeAllDeletedStream() + .stream() + .filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier)) + .collect(Collectors.toSet()); + if (deletedStreamSet.size() > 0) { + log.info("Stale streams to delete: {}", deletedStreamSet); + staleStreamIdsToBeDeleted.addAll(deletedStreamSet); + } final Set deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted); streamsSynced.addAll(deletedStreamsLeases); @@ -554,6 +571,8 @@ public class Scheduler implements Runnable { MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY); MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(), MetricsLevel.SUMMARY); + MetricsUtil.addCount(metricsScope, NON_EXISTING_STREAM_DELETE_COUNT, deletedStreamSet.size(), + MetricsLevel.SUMMARY); MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY); } finally { MetricsUtil.endScope(metricsScope); @@ -594,7 +613,7 @@ public class Scheduler implements Runnable { if (streamIdentifiers.isEmpty()) { return Collections.emptySet(); } - + log.info("Deleting streams: {}", streamIdentifiers); final Set streamsSynced = new HashSet<>(); final List leases = fetchMultiStreamLeases(); final Map> streamIdToShardsMap = leases.stream().collect( diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 175d62ec..aafbfcff 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.leases; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -39,6 +40,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilterType; @@ -47,6 +49,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; @@ -56,6 +59,7 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import static java.util.Objects.nonNull; import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange; /** @@ -72,6 +76,8 @@ public class HierarchicalShardSyncer { private final String streamIdentifier; + private final DeletedStreamListProvider deletedStreamListProvider; + private static final String MIN_HASH_KEY = BigInteger.ZERO.toString(); private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString(); private static final int retriesForCompleteHashRange = 3; @@ -79,13 +85,17 @@ public class HierarchicalShardSyncer { private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000; public HierarchicalShardSyncer() { - isMultiStreamMode = false; - streamIdentifier = "SingleStreamMode"; + this(false, "SingleStreamMode"); } public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) { + this(isMultiStreamMode, streamIdentifier, null); + } + + public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) { this.isMultiStreamMode = isMultiStreamMode; this.streamIdentifier = streamIdentifier; + this.deletedStreamListProvider = deletedStreamListProvider; } private static final BiFunction shardIdFromLeaseDeducer = @@ -279,8 +289,17 @@ public class HierarchicalShardSyncer { + retriesForCompleteHashRange + " retries."); } - private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { - final Optional> shards = Optional.of(shardDetector.listShards()); + private List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { + // Fallback to existing behavior for backward compatibility + List shardList = Collections.emptyList(); + try { + shardList = shardDetector.listShardsWithoutConsumingResourceNotFoundException(); + } catch (ResourceNotFoundException e) { + if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) { + deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier)); + } + } + final Optional> shards = Optional.of(shardList); return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() + " is not in ACTIVE OR UPDATING state - will retry getting the shard list.")); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index 0c3de1bd..9a44a553 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -96,6 +96,8 @@ public class KinesisShardDetector implements ShardDetector { @Getter(AccessLevel.PACKAGE) private final AtomicInteger cacheMisses = new AtomicInteger(0); + private static final Boolean THROW_RESOURCE_NOT_FOUND_EXCEPTION = true; + @Deprecated public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, @@ -175,15 +177,26 @@ public class KinesisShardDetector implements ShardDetector { return listShardsWithFilter(null); } + @Override + @Synchronized + public List listShardsWithoutConsumingResourceNotFoundException() { + return listShardsWithFilterInternal(null, THROW_RESOURCE_NOT_FOUND_EXCEPTION); + } + @Override @Synchronized public List listShardsWithFilter(ShardFilter shardFilter) { + return listShardsWithFilterInternal(shardFilter, !THROW_RESOURCE_NOT_FOUND_EXCEPTION); + } + + private List listShardsWithFilterInternal(ShardFilter shardFilter, + boolean shouldPropagateResourceNotFoundException) { final List shards = new ArrayList<>(); ListShardsResponse result; String nextToken = null; do { - result = listShards(shardFilter, nextToken); + result = listShards(shardFilter, nextToken, shouldPropagateResourceNotFoundException); if (result == null) { /* @@ -201,7 +214,12 @@ public class KinesisShardDetector implements ShardDetector { return shards; } - private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) { + /** + * @param shouldPropagateResourceNotFoundException : used to determine if ResourceNotFoundException should be + * handled by method and return Empty list or propagate the exception. + */ + private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken, + final boolean shouldPropagateResourceNotFoundException) { ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder(); if (StringUtils.isEmpty(nextToken)) { builder = builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter); @@ -243,9 +261,14 @@ public class KinesisShardDetector implements ShardDetector { } catch (ResourceNotFoundException e) { log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.", streamIdentifier.streamName()); - return ListShardsResponse.builder().shards(Collections.emptyList()) - .nextToken(null) - .build(); + if (shouldPropagateResourceNotFoundException) { + throw e; + } + return ListShardsResponse.builder() + .shards(Collections.emptyList()) + .nextToken(null) + .build(); + } catch (TimeoutException te) { throw new RuntimeException(te); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java index ecf9b390..9f2e5f94 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java @@ -16,6 +16,7 @@ package software.amazon.kinesis.leases; import software.amazon.kinesis.common.StreamConfig; +import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.metrics.MetricsFactory; @@ -31,6 +32,11 @@ public interface LeaseManagementFactory { throw new UnsupportedOperationException(); } + default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig, + DeletedStreamListProvider deletedStreamListProvider) { + throw new UnsupportedOperationException("createShardSyncTaskManager method not implemented"); + } + DynamoDBLeaseRefresher createLeaseRefresher(); ShardDetector createShardDetector(); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index 62b93855..32514eb5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -46,6 +46,16 @@ public interface ShardDetector { */ List listShards(); + /** + * This method behaves exactly similar to listShards except the fact that this does not consume and throw + * ResourceNotFoundException instead of returning empty list. + * + * @return Shards + */ + default List listShardsWithoutConsumingResourceNotFoundException() { + throw new UnsupportedOperationException("listShardsWithoutConsumingResourceNotFoundException not implemented"); + } + /** * List shards with shard filter. * diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index ad1a2300..6bf2ff39 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -29,6 +29,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.LeaseCleanupConfig; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.KinesisShardDetector; import software.amazon.kinesis.leases.LeaseCleanupManager; @@ -504,6 +505,20 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { */ @Override public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) { + return createShardSyncTaskManager(metricsFactory, streamConfig, null); + } + + /** + * Create ShardSyncTaskManager from the streamConfig passed + * + * @param metricsFactory - factory to get metrics object + * @param streamConfig - streamConfig for which ShardSyncTaskManager needs to be created + * @param deletedStreamListProvider - store for capturing the streams which are deleted in kinesis + * @return ShardSyncTaskManager + */ + @Override + public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig, + DeletedStreamListProvider deletedStreamListProvider) { return new ShardSyncTaskManager(this.createShardDetector(streamConfig), this.createLeaseRefresher(), streamConfig.initialPositionInStreamExtended(), @@ -511,10 +526,12 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { ignoreUnexpectedChildShards, shardSyncIntervalMillis, executorService, - new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()), + new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString(), + deletedStreamListProvider), metricsFactory); } + @Override public DynamoDBLeaseRefresher createLeaseRefresher() { return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index 90b63477..af0755a3 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -39,6 +39,7 @@ import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrat import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -726,16 +727,8 @@ public class SchedulerTest { boolean expectPendingStreamsForDeletion, boolean onlyStreamsNoLeasesDeletion) throws DependencyException, ProvisionedThroughputException, InvalidStateException { - List streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig( - StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) - .collect(Collectors.toCollection(LinkedList::new)); - List streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig( - StreamIdentifier.multiStreamInstance( - Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), - InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST))) - .collect(Collectors.toCollection(LinkedList::new)); + List streamConfigList1 = createDummyStreamConfigList(1,5); + List streamConfigList2 = createDummyStreamConfigList(3,7); retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) .retrievalFactory(retrievalFactory); when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); @@ -782,6 +775,91 @@ public class SchedulerTest { scheduler.staleStreamDeletionMap().keySet()); } + + @Test + public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException { + List streamConfigList1 = createDummyStreamConfigList(1,6); + List streamConfigList2 = createDummyStreamConfigList(1,4); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2); + + prepareForStaleDeletedStreamCleanupTests(); + + // when KCL starts it starts with tracking 5 stream + assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(0, scheduler.staleStreamDeletionMap().size()); + + // 2 Streams are no longer needed to be consumed + Set syncedStreams1 = scheduler.checkAndSyncStreamShardsAndLeases(); + assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(createDummyStreamConfigList(4, 6).stream() + .map(StreamConfig::streamIdentifier) + .collect(Collectors.toSet()), scheduler.staleStreamDeletionMap() + .keySet()); + assertEquals(0, syncedStreams1.size()); + + StreamConfig deletedStreamConfig = createDummyStreamConfig(5); + // One stream is deleted from Kinesis side + scheduler.deletedStreamListProvider().add(deletedStreamConfig.streamIdentifier()); + + Set syncedStreams2 = scheduler.checkAndSyncStreamShardsAndLeases(); + + Set expectedCurrentStreamConfigs = Sets.newHashSet(streamConfigList1); + expectedCurrentStreamConfigs.remove(deletedStreamConfig); + + //assert kinesis deleted stream is cleaned up from KCL in memory state. + assertEquals(expectedCurrentStreamConfigs, Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + assertEquals(Sets.newHashSet(createDummyStreamConfig(4).streamIdentifier()), + Sets.newHashSet(scheduler.staleStreamDeletionMap().keySet())); + assertEquals(1, syncedStreams2.size()); + assertEquals(0, scheduler.deletedStreamListProvider().purgeAllDeletedStream().size()); + + verify(multiStreamTracker, times(3)).streamConfigList(); + + } + + private void prepareForStaleDeletedStreamCleanupTests() { + + when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() { + @Override public Duration waitPeriodToDeleteFormerStreams() { + return Duration.ofDays(1); + } + }); + + retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName) + .retrievalFactory(retrievalFactory); + scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig, + metricsConfig, processorConfig, retrievalConfig)); + when(scheduler.shouldSyncStreamsNow()).thenReturn(true); + } + // Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker + @Test + public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream() + throws ProvisionedThroughputException, InvalidStateException, DependencyException { + List streamConfigList1 = createDummyStreamConfigList(1,6); + when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1); + prepareForStaleDeletedStreamCleanupTests(); + + scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier()); + + Set syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases(); + + assertEquals(0, syncedStreams.size()); + assertEquals(0, scheduler.staleStreamDeletionMap().size()); + assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values())); + } + + //Creates list of upperBound-lowerBound no of dummy StreamConfig + private List createDummyStreamConfigList(int lowerBound, int upperBound) { + return IntStream.range(lowerBound, upperBound).mapToObj(this::createDummyStreamConfig) + .collect(Collectors.toCollection(LinkedList::new)); + } + private StreamConfig createDummyStreamConfig(int streamId){ + return new StreamConfig( + StreamIdentifier.multiStreamInstance( + Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)), + InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)); + } + @Test public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreDeletedAfterDefermentPeriod() throws DependencyException, ProvisionedThroughputException, InvalidStateException { @@ -1114,7 +1192,7 @@ public class SchedulerTest { @Override public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, - StreamConfig streamConfig) { + StreamConfig streamConfig, DeletedStreamListProvider deletedStreamListProvider) { if(shouldReturnDefaultShardSyncTaskmanager) { return shardSyncTaskManager; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index c8ef05ba..a0dbd1f5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -19,6 +19,7 @@ package software.amazon.kinesis.leases; // import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.mockito.Matchers.any; import static org.mockito.Mockito.atLeast; @@ -54,6 +55,7 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; @@ -62,9 +64,12 @@ import software.amazon.kinesis.common.HashKeyRangeForLease; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.coordinator.DeletedStreamListProvider; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.NullMetricsScope; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @@ -292,7 +297,7 @@ public class HierarchicalShardSyncerTest { final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); @@ -315,7 +320,8 @@ public class HierarchicalShardSyncerTest { extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - verify(shardDetector).listShards(); + verify(shardDetector, never()).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } @@ -326,7 +332,7 @@ public class HierarchicalShardSyncerTest { final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); setupMultiStream(); @@ -349,7 +355,8 @@ public class HierarchicalShardSyncerTest { extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - verify(shardDetector).listShards(); + verify(shardDetector, never()).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } @@ -361,7 +368,7 @@ public class HierarchicalShardSyncerTest { /** * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards() - * should never be called. + * or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called. */ @Test public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { @@ -394,13 +401,14 @@ public class HierarchicalShardSyncerTest { extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); + verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } /** * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards() - * should never be called. + * or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called. */ @Test public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws Exception { @@ -431,13 +439,14 @@ public class HierarchicalShardSyncerTest { extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); verify(shardDetector, never()).listShards(); + verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } /** * Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards() - * should never be called. + * or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called. */ @Test public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception { @@ -463,6 +472,7 @@ public class HierarchicalShardSyncerTest { assertThat(extendedSequenceNumbers.size(), equalTo(0)); verify(shardDetector, never()).listShards(); + verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } @@ -668,13 +678,13 @@ public class HierarchicalShardSyncerTest { shards.remove(3); shards.add(3, shard); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { - verify(shardDetector).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, never()).listLeases(); } } @@ -688,14 +698,14 @@ public class HierarchicalShardSyncerTest { shards.remove(3); shards.add(3, shard); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); setupMultiStream(); try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { - verify(shardDetector).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, never()).listLeases(); } } @@ -722,7 +732,7 @@ public class HierarchicalShardSyncerTest { final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); @@ -743,7 +753,8 @@ public class HierarchicalShardSyncerTest { leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - verify(shardDetector).listShards(); + verify(shardDetector, never()).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } @@ -767,7 +778,7 @@ public class HierarchicalShardSyncerTest { final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); setupMultiStream(); @@ -788,7 +799,7 @@ public class HierarchicalShardSyncerTest { leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); - verify(shardDetector).listShards(); + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } @@ -822,7 +833,7 @@ public class HierarchicalShardSyncerTest { final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class); final ArgumentCaptor leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()).thenReturn(leases); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())).thenReturn(true); doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture()); @@ -837,7 +848,7 @@ public class HierarchicalShardSyncerTest { assertThat(createLeases, equalTo(expectedCreateLeases)); - verify(shardDetector, times(1)).listShards(); + verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -849,7 +860,7 @@ public class HierarchicalShardSyncerTest { assertThat(deleteLeases.size(), equalTo(0)); - verify(shardDetector, times(2)).listShards(); + verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(2)).listLeases(); } @@ -885,7 +896,7 @@ public class HierarchicalShardSyncerTest { final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()) .thenThrow(new DependencyException(new Throwable("Throw for ListLeases"))) .thenReturn(Collections.emptyList()).thenReturn(leases); @@ -897,7 +908,7 @@ public class HierarchicalShardSyncerTest { .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { - verify(shardDetector, times(1)).listShards(); + verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -912,7 +923,7 @@ public class HierarchicalShardSyncerTest { assertThat(createLeases, equalTo(expectedCreateLeases)); - verify(shardDetector, times(2)).listShards(); + verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(2)).listLeases(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -927,13 +938,36 @@ public class HierarchicalShardSyncerTest { final Set expectedSequenceNumbers = new HashSet<>( Collections.singletonList(ExtendedSequenceNumber.SHARD_END)); - verify(shardDetector, times(3)).listShards(); + verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(3)).listLeases(); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); } } + @Test + public void testDeletedStreamListProviderUpdateOnResourceNotFound() + throws ProvisionedThroughputException, InvalidStateException, DependencyException, InterruptedException { + DeletedStreamListProvider dummyDeletedStreamListProvider = new DeletedStreamListProvider(); + hierarchicalShardSyncer = new HierarchicalShardSyncer(MULTISTREAM_MODE_ON, STREAM_IDENTIFIER, + dummyDeletedStreamListProvider); + when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenThrow( + ResourceNotFoundException.builder() + .build()); + boolean response = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, + INITIAL_POSITION_TRIM_HORIZON, SCOPE, ignoreUnexpectedChildShards, + dynamoDBLeaseRefresher.isLeaseTableEmpty()); + Set deletedStreamSet = dummyDeletedStreamListProvider.purgeAllDeletedStream(); + + assertFalse(response); + assertThat(deletedStreamSet.size(), equalTo(1)); + assertThat(deletedStreamSet.iterator().next().toString(), equalTo(STREAM_IDENTIFIER)); + + verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException(); + verify(shardDetector, never()).listShards(); + } + @Test(expected = DependencyException.class) public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithCreateLeaseExceptions() throws Exception { @@ -965,7 +999,7 @@ public class HierarchicalShardSyncerTest { final ArgumentCaptor leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()) .thenReturn(Collections.emptyList()).thenReturn(leases); when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())) @@ -977,7 +1011,7 @@ public class HierarchicalShardSyncerTest { .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); } finally { - verify(shardDetector, times(1)).listShards(); + verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); verify(dynamoDBLeaseRefresher, times(1)).createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); @@ -991,7 +1025,7 @@ public class HierarchicalShardSyncerTest { final Set expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position); assertThat(createLeases, equalTo(expectedCreateLeases)); - verify(shardDetector, times(2)).listShards(); + verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(2)).listLeases(); verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size())) .createLeaseIfNotExists(any(Lease.class)); @@ -1002,7 +1036,7 @@ public class HierarchicalShardSyncerTest { .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - verify(shardDetector, times(3)).listShards(); + verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException(); verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size())) .createLeaseIfNotExists(any(Lease.class)); verify(dynamoDBLeaseRefresher, times(3)).listLeases(); @@ -1084,7 +1118,7 @@ public class HierarchicalShardSyncerTest { final List existingLeases) throws Exception { final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); - when(shardDetector.listShards()).thenReturn(shards); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards); when(shardDetector.listShardsWithFilter(any())).thenReturn(getFilteredShards(shards, initialPosition)); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(existingLeases); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(existingLeases.isEmpty()); @@ -2381,14 +2415,14 @@ public class HierarchicalShardSyncerTest { final List currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo"); when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false); - when(shardDetector.listShards()).thenReturn(shardsWithoutLeases); + when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shardsWithoutLeases); when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases); hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty()); - verify(shardDetector, atLeast(1)).listShards(); + verify(shardDetector, atLeast(1)).listShardsWithoutConsumingResourceNotFoundException(); } /**