Clean up in-memory state of deleted kinesis stream in MultiStreamMode (#1056)

Co-authored-by: Abhit Sawwalakhe <sawwa@amazon.com>
This commit is contained in:
Abhit Sawwalakhe 2023-03-08 13:39:35 -08:00 committed by GitHub
parent 43d43653d0
commit 27b166c5aa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 296 additions and 52 deletions

View file

@ -0,0 +1,38 @@
package software.amazon.kinesis.coordinator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.common.StreamIdentifier;
/**
* This class is used for storing in-memory set of streams which are no longer existing (deleted) and needs to be
* cleaned up from KCL's in memory state.
*/
@Slf4j
public class DeletedStreamListProvider {
private final Set<StreamIdentifier> deletedStreams;
public DeletedStreamListProvider() {
deletedStreams = ConcurrentHashMap.newKeySet();
}
public void add(StreamIdentifier streamIdentifier) {
log.info("Added {}", streamIdentifier);
deletedStreams.add(streamIdentifier);
}
/**
* Method returns and empties the current set of streams
* @return set of deleted Streams
*/
public Set<StreamIdentifier> purgeAllDeletedStream() {
final Set<StreamIdentifier> response = new HashSet<>(deletedStreams);
deletedStreams.removeAll(response);
return response;
}
}

View file

@ -116,6 +116,7 @@ public class Scheduler implements Runnable {
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count";
private final SchedulerLog slog = new SchedulerLog();
@ -166,6 +167,8 @@ public class Scheduler implements Runnable {
private final LeaseCleanupManager leaseCleanupManager;
private final SchemaRegistryDecoder schemaRegistryDecoder;
private final DeletedStreamListProvider deletedStreamListProvider;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
private final ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<>();
@ -251,9 +254,10 @@ public class Scheduler implements Runnable {
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventFactory = diagnosticEventFactory;
this.diagnosticEventHandler = new DiagnosticEventLogger();
this.deletedStreamListProvider = new DeletedStreamListProvider();
this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createShardSyncTaskManager(this.metricsFactory, streamConfig);
.createShardSyncTaskManager(this.metricsFactory, streamConfig, this.deletedStreamListProvider);
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
@ -535,6 +539,19 @@ public class Scheduler implements Runnable {
.partitioningBy(newStreamConfigMap::containsKey, Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
// These are the streams which are deleted in Kinesis and we encounter resource not found during
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
// not have any data.
// Filter streams based on newStreamConfigMap so that we don't override input to KCL in any case.
final Set<StreamIdentifier> deletedStreamSet = this.deletedStreamListProvider
.purgeAllDeletedStream()
.stream()
.filter(streamIdentifier -> !newStreamConfigMap.containsKey(streamIdentifier))
.collect(Collectors.toSet());
if (deletedStreamSet.size() > 0) {
log.info("Stale streams to delete: {}", deletedStreamSet);
staleStreamIdsToBeDeleted.addAll(deletedStreamSet);
}
final Set<StreamIdentifier> deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted);
streamsSynced.addAll(deletedStreamsLeases);
@ -554,6 +571,8 @@ public class Scheduler implements Runnable {
MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(),
MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, NON_EXISTING_STREAM_DELETE_COUNT, deletedStreamSet.size(),
MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY);
} finally {
MetricsUtil.endScope(metricsScope);
@ -594,7 +613,7 @@ public class Scheduler implements Runnable {
if (streamIdentifiers.isEmpty()) {
return Collections.emptySet();
}
log.info("Deleting streams: {}", streamIdentifiers);
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
final List<MultiStreamLease> leases = fetchMultiStreamLeases();
final Map<String, List<MultiStreamLease>> streamIdToShardsMap = leases.stream().collect(

View file

@ -17,6 +17,7 @@ package software.amazon.kinesis.leases;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -39,6 +40,7 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
@ -47,6 +49,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@ -56,6 +59,7 @@ import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static java.util.Objects.nonNull;
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;
/**
@ -72,6 +76,8 @@ public class HierarchicalShardSyncer {
private final String streamIdentifier;
private final DeletedStreamListProvider deletedStreamListProvider;
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
private static final int retriesForCompleteHashRange = 3;
@ -79,13 +85,17 @@ public class HierarchicalShardSyncer {
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
public HierarchicalShardSyncer() {
isMultiStreamMode = false;
streamIdentifier = "SingleStreamMode";
this(false, "SingleStreamMode");
}
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) {
this(isMultiStreamMode, streamIdentifier, null);
}
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) {
this.isMultiStreamMode = isMultiStreamMode;
this.streamIdentifier = streamIdentifier;
this.deletedStreamListProvider = deletedStreamListProvider;
}
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
@ -279,8 +289,17 @@ public class HierarchicalShardSyncer {
+ retriesForCompleteHashRange + " retries.");
}
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShards());
private List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
// Fallback to existing behavior for backward compatibility
List<Shard> shardList = Collections.emptyList();
try {
shardList = shardDetector.listShardsWithoutConsumingResourceNotFoundException();
} catch (ResourceNotFoundException e) {
if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) {
deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier));
}
}
final Optional<List<Shard>> shards = Optional.of(shardList);
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));

View file

@ -96,6 +96,8 @@ public class KinesisShardDetector implements ShardDetector {
@Getter(AccessLevel.PACKAGE)
private final AtomicInteger cacheMisses = new AtomicInteger(0);
private static final Boolean THROW_RESOURCE_NOT_FOUND_EXCEPTION = true;
@Deprecated
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
@ -175,15 +177,26 @@ public class KinesisShardDetector implements ShardDetector {
return listShardsWithFilter(null);
}
@Override
@Synchronized
public List<Shard> listShardsWithoutConsumingResourceNotFoundException() {
return listShardsWithFilterInternal(null, THROW_RESOURCE_NOT_FOUND_EXCEPTION);
}
@Override
@Synchronized
public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
return listShardsWithFilterInternal(shardFilter, !THROW_RESOURCE_NOT_FOUND_EXCEPTION);
}
private List<Shard> listShardsWithFilterInternal(ShardFilter shardFilter,
boolean shouldPropagateResourceNotFoundException) {
final List<Shard> shards = new ArrayList<>();
ListShardsResponse result;
String nextToken = null;
do {
result = listShards(shardFilter, nextToken);
result = listShards(shardFilter, nextToken, shouldPropagateResourceNotFoundException);
if (result == null) {
/*
@ -201,7 +214,12 @@ public class KinesisShardDetector implements ShardDetector {
return shards;
}
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
/**
* @param shouldPropagateResourceNotFoundException : used to determine if ResourceNotFoundException should be
* handled by method and return Empty list or propagate the exception.
*/
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken,
final boolean shouldPropagateResourceNotFoundException) {
ListShardsRequest.Builder builder = KinesisRequestsBuilder.listShardsRequestBuilder();
if (StringUtils.isEmpty(nextToken)) {
builder = builder.streamName(streamIdentifier.streamName()).shardFilter(shardFilter);
@ -243,9 +261,14 @@ public class KinesisShardDetector implements ShardDetector {
} catch (ResourceNotFoundException e) {
log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.",
streamIdentifier.streamName());
return ListShardsResponse.builder().shards(Collections.emptyList())
.nextToken(null)
.build();
if (shouldPropagateResourceNotFoundException) {
throw e;
}
return ListShardsResponse.builder()
.shards(Collections.emptyList())
.nextToken(null)
.build();
} catch (TimeoutException te) {
throw new RuntimeException(te);
}

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.leases;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.metrics.MetricsFactory;
@ -31,6 +32,11 @@ public interface LeaseManagementFactory {
throw new UnsupportedOperationException();
}
default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig,
DeletedStreamListProvider deletedStreamListProvider) {
throw new UnsupportedOperationException("createShardSyncTaskManager method not implemented");
}
DynamoDBLeaseRefresher createLeaseRefresher();
ShardDetector createShardDetector();

View file

@ -46,6 +46,16 @@ public interface ShardDetector {
*/
List<Shard> listShards();
/**
* This method behaves exactly similar to listShards except the fact that this does not consume and throw
* ResourceNotFoundException instead of returning empty list.
*
* @return Shards
*/
default List<Shard> listShardsWithoutConsumingResourceNotFoundException() {
throw new UnsupportedOperationException("listShardsWithoutConsumingResourceNotFoundException not implemented");
}
/**
* List shards with shard filter.
*

View file

@ -29,6 +29,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCleanupManager;
@ -504,6 +505,20 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
*/
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig) {
return createShardSyncTaskManager(metricsFactory, streamConfig, null);
}
/**
* Create ShardSyncTaskManager from the streamConfig passed
*
* @param metricsFactory - factory to get metrics object
* @param streamConfig - streamConfig for which ShardSyncTaskManager needs to be created
* @param deletedStreamListProvider - store for capturing the streams which are deleted in kinesis
* @return ShardSyncTaskManager
*/
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig,
DeletedStreamListProvider deletedStreamListProvider) {
return new ShardSyncTaskManager(this.createShardDetector(streamConfig),
this.createLeaseRefresher(),
streamConfig.initialPositionInStreamExtended(),
@ -511,10 +526,12 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString()),
new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString(),
deletedStreamListProvider),
metricsFactory);
}
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads,

View file

@ -39,6 +39,7 @@ import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrat
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -726,16 +727,8 @@ public class SchedulerTest {
boolean expectPendingStreamsForDeletion,
boolean onlyStreamsNoLeasesDeletion)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,5);
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(3,7);
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
@ -782,6 +775,91 @@ public class SchedulerTest {
scheduler.staleStreamDeletionMap().keySet());
}
@Test
public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1,4);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
prepareForStaleDeletedStreamCleanupTests();
// when KCL starts it starts with tracking 5 stream
assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
assertEquals(0, scheduler.staleStreamDeletionMap().size());
// 2 Streams are no longer needed to be consumed
Set<StreamIdentifier> syncedStreams1 = scheduler.checkAndSyncStreamShardsAndLeases();
assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
assertEquals(createDummyStreamConfigList(4, 6).stream()
.map(StreamConfig::streamIdentifier)
.collect(Collectors.toSet()), scheduler.staleStreamDeletionMap()
.keySet());
assertEquals(0, syncedStreams1.size());
StreamConfig deletedStreamConfig = createDummyStreamConfig(5);
// One stream is deleted from Kinesis side
scheduler.deletedStreamListProvider().add(deletedStreamConfig.streamIdentifier());
Set<StreamIdentifier> syncedStreams2 = scheduler.checkAndSyncStreamShardsAndLeases();
Set<StreamConfig> expectedCurrentStreamConfigs = Sets.newHashSet(streamConfigList1);
expectedCurrentStreamConfigs.remove(deletedStreamConfig);
//assert kinesis deleted stream is cleaned up from KCL in memory state.
assertEquals(expectedCurrentStreamConfigs, Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
assertEquals(Sets.newHashSet(createDummyStreamConfig(4).streamIdentifier()),
Sets.newHashSet(scheduler.staleStreamDeletionMap().keySet()));
assertEquals(1, syncedStreams2.size());
assertEquals(0, scheduler.deletedStreamListProvider().purgeAllDeletedStream().size());
verify(multiStreamTracker, times(3)).streamConfigList();
}
private void prepareForStaleDeletedStreamCleanupTests() {
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofDays(1);
}
});
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
}
// Tests validate that no cleanup of stream is done if its still tracked in multiStreamTracker
@Test
public void testKinesisStaleDeletedStreamNoCleanUpForTrackedStream()
throws ProvisionedThroughputException, InvalidStateException, DependencyException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1);
prepareForStaleDeletedStreamCleanupTests();
scheduler.deletedStreamListProvider().add(createDummyStreamConfig(3).streamIdentifier());
Set<StreamIdentifier> syncedStreams = scheduler.checkAndSyncStreamShardsAndLeases();
assertEquals(0, syncedStreams.size());
assertEquals(0, scheduler.staleStreamDeletionMap().size());
assertEquals(Sets.newHashSet(streamConfigList1), Sets.newHashSet(scheduler.currentStreamConfigMap().values()));
}
//Creates list of upperBound-lowerBound no of dummy StreamConfig
private List<StreamConfig> createDummyStreamConfigList(int lowerBound, int upperBound) {
return IntStream.range(lowerBound, upperBound).mapToObj(this::createDummyStreamConfig)
.collect(Collectors.toCollection(LinkedList::new));
}
private StreamConfig createDummyStreamConfig(int streamId){
return new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
}
@Test
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreDeletedAfterDefermentPeriod()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
@ -1114,7 +1192,7 @@ public class SchedulerTest {
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory,
StreamConfig streamConfig) {
StreamConfig streamConfig, DeletedStreamListProvider deletedStreamListProvider) {
if(shouldReturnDefaultShardSyncTaskmanager) {
return shardSyncTaskManager;
}

View file

@ -19,6 +19,7 @@ package software.amazon.kinesis.leases;
//
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
@ -54,6 +55,7 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
@ -62,9 +64,12 @@ import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.NullMetricsScope;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -292,7 +297,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
@ -315,7 +320,8 @@ public class HierarchicalShardSyncerTest {
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(shardDetector, never()).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@ -326,7 +332,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
@ -349,7 +355,8 @@ public class HierarchicalShardSyncerTest {
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(shardDetector, never()).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@ -361,7 +368,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
* should never be called.
* or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
@ -394,13 +401,14 @@ public class HierarchicalShardSyncerTest {
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
* should never be called.
* or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws Exception {
@ -431,13 +439,14 @@ public class HierarchicalShardSyncerTest {
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/**
* Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards()
* should never be called.
* or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception {
@ -463,6 +472,7 @@ public class HierarchicalShardSyncerTest {
assertThat(extendedSequenceNumbers.size(), equalTo(0));
verify(shardDetector, never()).listShards();
verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@ -668,13 +678,13 @@ public class HierarchicalShardSyncerTest {
shards.remove(3);
shards.add(3, shard);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, never()).listLeases();
}
}
@ -688,14 +698,14 @@ public class HierarchicalShardSyncerTest {
shards.remove(3);
shards.add(3, shard);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
setupMultiStream();
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, never()).listLeases();
}
}
@ -722,7 +732,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
@ -743,7 +753,8 @@ public class HierarchicalShardSyncerTest {
leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(shardDetector, never()).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@ -767,7 +778,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
@ -788,7 +799,7 @@ public class HierarchicalShardSyncerTest {
leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@ -822,7 +833,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
final ArgumentCaptor<Lease> leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()).thenReturn(leases);
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())).thenReturn(true);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
@ -837,7 +848,7 @@ public class HierarchicalShardSyncerTest {
assertThat(createLeases, equalTo(expectedCreateLeases));
verify(shardDetector, times(1)).listShards();
verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -849,7 +860,7 @@ public class HierarchicalShardSyncerTest {
assertThat(deleteLeases.size(), equalTo(0));
verify(shardDetector, times(2)).listShards();
verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
}
@ -885,7 +896,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases())
.thenThrow(new DependencyException(new Throwable("Throw for ListLeases")))
.thenReturn(Collections.emptyList()).thenReturn(leases);
@ -897,7 +908,7 @@ public class HierarchicalShardSyncerTest {
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -912,7 +923,7 @@ public class HierarchicalShardSyncerTest {
assertThat(createLeases, equalTo(expectedCreateLeases));
verify(shardDetector, times(2)).listShards();
verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -927,13 +938,36 @@ public class HierarchicalShardSyncerTest {
final Set<ExtendedSequenceNumber> expectedSequenceNumbers = new HashSet<>(
Collections.singletonList(ExtendedSequenceNumber.SHARD_END));
verify(shardDetector, times(3)).listShards();
verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(3)).listLeases();
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
}
@Test
public void testDeletedStreamListProviderUpdateOnResourceNotFound()
throws ProvisionedThroughputException, InvalidStateException, DependencyException, InterruptedException {
DeletedStreamListProvider dummyDeletedStreamListProvider = new DeletedStreamListProvider();
hierarchicalShardSyncer = new HierarchicalShardSyncer(MULTISTREAM_MODE_ON, STREAM_IDENTIFIER,
dummyDeletedStreamListProvider);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenThrow(
ResourceNotFoundException.builder()
.build());
boolean response = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, ignoreUnexpectedChildShards,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
Set<StreamIdentifier> deletedStreamSet = dummyDeletedStreamListProvider.purgeAllDeletedStream();
assertFalse(response);
assertThat(deletedStreamSet.size(), equalTo(1));
assertThat(deletedStreamSet.iterator().next().toString(), equalTo(STREAM_IDENTIFIER));
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(shardDetector, never()).listShards();
}
@Test(expected = DependencyException.class)
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithCreateLeaseExceptions()
throws Exception {
@ -965,7 +999,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList())
.thenReturn(Collections.emptyList()).thenReturn(leases);
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture()))
@ -977,7 +1011,7 @@ public class HierarchicalShardSyncerTest {
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, times(1)).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -991,7 +1025,7 @@ public class HierarchicalShardSyncerTest {
final Set<Lease> expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position);
assertThat(createLeases, equalTo(expectedCreateLeases));
verify(shardDetector, times(2)).listShards();
verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size()))
.createLeaseIfNotExists(any(Lease.class));
@ -1002,7 +1036,7 @@ public class HierarchicalShardSyncerTest {
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, times(3)).listShards();
verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size()))
.createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(3)).listLeases();
@ -1084,7 +1118,7 @@ public class HierarchicalShardSyncerTest {
final List<Lease> existingLeases) throws Exception {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(shardDetector.listShardsWithFilter(any())).thenReturn(getFilteredShards(shards, initialPosition));
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(existingLeases);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(existingLeases.isEmpty());
@ -2381,14 +2415,14 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
when(shardDetector.listShards()).thenReturn(shardsWithoutLeases);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shardsWithoutLeases);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShards();
verify(shardDetector, atLeast(1)).listShardsWithoutConsumingResourceNotFoundException();
}
/**