Clean up in-memory state of deleted kinesis stream in MultiStreamMode

This commit is contained in:
Abhit Sawwalakhe 2023-01-13 17:07:33 -08:00
parent 0e86089123
commit 7a36f486fc
15 changed files with 272 additions and 57 deletions

View file

@ -1,5 +1,8 @@
# Changelog
### Release 2.4.6 (January 13, 2023)
* [#1022](https://github.com/awslabs/amazon-kinesis-client/pull/1022) Clean up in-memory state of deleted kinesis stream in MultiStreamMode
### Release 2.4.5 (January 04, 2023)
* [#1014](https://github.com/awslabs/amazon-kinesis-client/pull/1014) Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request

View file

@ -66,6 +66,9 @@ The recommended way to use the KCL for Java is to consume it from Maven.
## Release Notes
### Release 2.4.6 (January 13, 2023)
* [#1022](https://github.com/awslabs/amazon-kinesis-client/pull/1022) Clean up in-memory state of deleted kinesis stream in MultiStreamMode
### Release 2.4.5 (January 04, 2023)
* [#1014](https://github.com/awslabs/amazon-kinesis-client/pull/1014) Use AFTER_SEQUENCE_NUMBER iterator type for expired iterator request

View file

@ -21,7 +21,7 @@
<parent>
<artifactId>amazon-kinesis-client-pom</artifactId>
<groupId>software.amazon.kinesis</groupId>
<version>2.4.5</version>
<version>2.4.6</version>
</parent>
<modelVersion>4.0.0</modelVersion>

View file

@ -22,7 +22,7 @@
<parent>
<groupId>software.amazon.kinesis</groupId>
<artifactId>amazon-kinesis-client-pom</artifactId>
<version>2.4.5</version>
<version>2.4.6</version>
</parent>
<artifactId>amazon-kinesis-client</artifactId>

View file

@ -0,0 +1,38 @@
package software.amazon.kinesis.coordinator;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.common.StreamIdentifier;
/**
* This class is used for storing in-memory set of streams which are no longer existing (deleted) and needs to be
* cleaned up from KCL's in memory state.
*/
@Slf4j
public class DeletedStreamListProvider {
private final Set<StreamIdentifier> deletedStreams;
public DeletedStreamListProvider() {
deletedStreams = ConcurrentHashMap.newKeySet();
}
public void add(StreamIdentifier streamIdentifier) {
log.info("Added {}", streamIdentifier);
deletedStreams.add(streamIdentifier);
}
/**
* Method returns and empties the current set of streams
* @return list of deleted Streams
*/
public Set<StreamIdentifier> purgeAllDeletedStream() {
final Set<StreamIdentifier> response = new HashSet<>(deletedStreams);
deletedStreams.removeAll(response);
return response;
}
}

View file

@ -41,7 +41,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -54,7 +53,6 @@ import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.utils.Validate;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
@ -75,7 +73,6 @@ import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseSerializer;
import software.amazon.kinesis.leases.dynamodb.DynamoDBMultiStreamLeaseSerializer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.LifecycleConfig;
import software.amazon.kinesis.lifecycle.ShardConsumer;
@ -121,6 +118,7 @@ public class Scheduler implements Runnable {
private static final String ACTIVE_STREAMS_COUNT = "ActiveStreams.Count";
private static final String PENDING_STREAMS_DELETION_COUNT = "StreamsPendingDeletion.Count";
private static final String DELETED_STREAMS_COUNT = "DeletedStreams.Count";
private static final String NON_EXISTING_STREAM_DELETE_COUNT = "NonExistingStreamDelete.Count";
private SchedulerLog slog = new SchedulerLog();
@ -173,6 +171,8 @@ public class Scheduler implements Runnable {
private final LeaseCleanupManager leaseCleanupManager;
private final SchemaRegistryDecoder schemaRegistryDecoder;
private final DeletedStreamListProvider deletedStreamListProvider;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<>();
@ -217,6 +217,7 @@ public class Scheduler implements Runnable {
@NonNull final ProcessorConfig processorConfig,
@NonNull final RetrievalConfig retrievalConfig,
@NonNull final DiagnosticEventFactory diagnosticEventFactory) {
log.info("Scheduler invoked for version 2.4.6, V1");
this.checkpointConfig = checkpointConfig;
this.coordinatorConfig = coordinatorConfig;
this.leaseManagementConfig = leaseManagementConfig;
@ -263,9 +264,10 @@ public class Scheduler implements Runnable {
this.executorService = this.coordinatorConfig.coordinatorFactory().createExecutorService();
this.diagnosticEventFactory = diagnosticEventFactory;
this.diagnosticEventHandler = new DiagnosticEventLogger();
this.deletedStreamListProvider = new DeletedStreamListProvider();
this.shardSyncTaskManagerProvider = streamConfig -> this.leaseManagementConfig
.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createShardSyncTaskManager(this.metricsFactory, streamConfig);
.createShardSyncTaskManager(this.metricsFactory, streamConfig, this.deletedStreamListProvider);
this.shardPrioritization = this.coordinatorConfig.shardPrioritization();
this.cleanupLeasesUponShardCompletion = this.leaseManagementConfig.cleanupLeasesUponShardCompletion();
this.skipShardSyncAtWorkerInitializationIfLeasesExist =
@ -558,6 +560,14 @@ public class Scheduler implements Runnable {
.partitioningBy(streamIdentifier -> newStreamConfigMap.containsKey(streamIdentifier), Collectors.toSet()));
final Set<StreamIdentifier> staleStreamIdsToBeDeleted = staleStreamIdDeletionDecisionMap.get(false).stream().filter(streamIdentifier ->
Duration.between(staleStreamDeletionMap.get(streamIdentifier), Instant.now()).toMillis() >= waitPeriodToDeleteOldStreams.toMillis()).collect(Collectors.toSet());
// These are the streams which are deleted in Kinesis and we encounter resource not found during
// shardSyncTask. This is applicable in MultiStreamMode only, in case of SingleStreamMode, store will
// not have any data.
final Set<StreamIdentifier> deletedStreamSet = this.deletedStreamListProvider.purgeAllDeletedStream();
if (deletedStreamSet.size() > 0) {
log.info("Stale streams to delete: {}", deletedStreamSet);
staleStreamIdsToBeDeleted.addAll(deletedStreamSet);
}
final Set<StreamIdentifier> deletedStreamsLeases = deleteMultiStreamLeases(staleStreamIdsToBeDeleted);
streamsSynced.addAll(deletedStreamsLeases);
@ -577,6 +587,8 @@ public class Scheduler implements Runnable {
MetricsUtil.addCount(metricsScope, ACTIVE_STREAMS_COUNT, newStreamConfigMap.size(), MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, PENDING_STREAMS_DELETION_COUNT, staleStreamDeletionMap.size(),
MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, NON_EXISTING_STREAM_DELETE_COUNT, deletedStreamSet.size(),
MetricsLevel.SUMMARY);
MetricsUtil.addCount(metricsScope, DELETED_STREAMS_COUNT, deletedStreamsLeases.size(), MetricsLevel.SUMMARY);
} finally {
MetricsUtil.endScope(metricsScope);
@ -614,6 +626,7 @@ public class Scheduler implements Runnable {
private Set<StreamIdentifier> deleteMultiStreamLeases(Set<StreamIdentifier> streamIdentifiers)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
log.info("Deleting streams: {}", streamIdentifiers);
final Set<StreamIdentifier> streamsSynced = new HashSet<>();
List<MultiStreamLease> leases = null;
Map<String, List<MultiStreamLease>> streamIdToShardsMap = null;

View file

@ -17,6 +17,7 @@ package software.amazon.kinesis.leases;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -39,6 +40,7 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
@ -47,6 +49,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
@ -56,6 +59,7 @@ import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import static java.util.Objects.nonNull;
import static software.amazon.kinesis.common.HashKeyRangeForLease.fromHashKeyRange;
/**
@ -72,6 +76,8 @@ public class HierarchicalShardSyncer {
private final String streamIdentifier;
private final DeletedStreamListProvider deletedStreamListProvider;
private static final String MIN_HASH_KEY = BigInteger.ZERO.toString();
private static final String MAX_HASH_KEY = new BigInteger("2").pow(128).subtract(BigInteger.ONE).toString();
private static final int retriesForCompleteHashRange = 3;
@ -79,13 +85,17 @@ public class HierarchicalShardSyncer {
private static final long DELAY_BETWEEN_LIST_SHARDS_MILLIS = 1000;
public HierarchicalShardSyncer() {
isMultiStreamMode = false;
streamIdentifier = "SingleStreamMode";
this(false, "SingleStreamMode");
}
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier) {
this(isMultiStreamMode, streamIdentifier, null);
}
public HierarchicalShardSyncer(final boolean isMultiStreamMode, final String streamIdentifier, final DeletedStreamListProvider deletedStreamListProvider) {
this.isMultiStreamMode = isMultiStreamMode;
this.streamIdentifier = streamIdentifier;
this.deletedStreamListProvider = deletedStreamListProvider;
}
private static final BiFunction<Lease, MultiStreamArgs, String> shardIdFromLeaseDeducer =
@ -306,8 +316,17 @@ public class HierarchicalShardSyncer {
+ retriesForCompleteHashRange + " retries.");
}
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
final Optional<List<Shard>> shards = Optional.of(shardDetector.listShards());
private List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
// Fallback to existing behavior for backward compatibility
List<Shard> shardList = Collections.emptyList();
try {
shardList = shardDetector.listShardsWithoutConsumingResourceNotFoundException();
} catch (ResourceNotFoundException e) {
if (nonNull(this.deletedStreamListProvider) && isMultiStreamMode) {
deletedStreamListProvider.add(StreamIdentifier.multiStreamInstance(streamIdentifier));
}
}
final Optional<List<Shard>> shards = Optional.of(shardList);
return shards.orElseThrow(() -> new KinesisClientLibIOException("Stream " + shardDetector.streamIdentifier().streamName() +
" is not in ACTIVE OR UPDATING state - will retry getting the shard list."));

View file

@ -80,6 +80,8 @@ public class KinesisShardDetector implements ShardDetector {
@Getter(AccessLevel.PACKAGE)
private AtomicInteger cacheMisses = new AtomicInteger(0);
private static final Boolean THROW_RESOURCE_NOT_FOUND_EXCEPTION = true;
@Deprecated
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
@ -159,15 +161,26 @@ public class KinesisShardDetector implements ShardDetector {
return listShardsWithFilter(null);
}
@Override
@Synchronized
public List<Shard> listShardsWithoutConsumingResourceNotFoundException() {
return listShardsWithFilterInternal(null, THROW_RESOURCE_NOT_FOUND_EXCEPTION);
}
@Override
@Synchronized
public List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
return listShardsWithFilterInternal(shardFilter, !THROW_RESOURCE_NOT_FOUND_EXCEPTION);
}
private List<Shard> listShardsWithFilterInternal(ShardFilter shardFilter,
boolean shouldPropagateResourceNotFoundException) {
final List<Shard> shards = new ArrayList<>();
ListShardsResponse result;
String nextToken = null;
do {
result = listShards(shardFilter, nextToken);
result = listShards(shardFilter, nextToken, shouldPropagateResourceNotFoundException);
if (result == null) {
/*
@ -185,7 +198,12 @@ public class KinesisShardDetector implements ShardDetector {
return shards;
}
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken) {
/**
* @param shouldPropagateResourceNotFoundException : used to determine if ResourceNotFoundException should be
* handled by method and return Empty list or propagate the exception.
*/
private ListShardsResponse listShards(ShardFilter shardFilter, final String nextToken,
final boolean shouldPropagateResourceNotFoundException) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
exceptionManager.add(LimitExceededException.class, t -> t);
@ -233,9 +251,14 @@ public class KinesisShardDetector implements ShardDetector {
} catch (ResourceNotFoundException e) {
log.warn("Got ResourceNotFoundException when fetching shard list for {}. Stream no longer exists.",
streamIdentifier.streamName());
return ListShardsResponse.builder().shards(Collections.emptyList())
.nextToken(null)
.build();
if (shouldPropagateResourceNotFoundException) {
throw e;
}
return ListShardsResponse.builder()
.shards(Collections.emptyList())
.nextToken(null)
.build();
} catch (TimeoutException te) {
throw new RuntimeException(te);
}

View file

@ -16,6 +16,7 @@
package software.amazon.kinesis.leases;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.metrics.MetricsFactory;
@ -31,6 +32,11 @@ public interface LeaseManagementFactory {
throw new UnsupportedOperationException();
}
default ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig,
DeletedStreamListProvider deletedStreamListProvider) {
throw new UnsupportedOperationException("createShardSyncTaskManager method not implemented");
}
DynamoDBLeaseRefresher createLeaseRefresher();
ShardDetector createShardDetector();

View file

@ -46,6 +46,16 @@ public interface ShardDetector {
*/
List<Shard> listShards();
/**
* This method behaves exactly similar to listShards except the fact that this does not consume and throw
* ResourceNotFoundException instead of returning empty list.
*
* @return Shards
*/
default List<Shard> listShardsWithoutConsumingResourceNotFoundException() {
throw new UnsupportedOperationException("listShardsWithoutConsumingResourceNotFoundException not implemented");
}
/**
* List shards with shard filter.
*

View file

@ -29,6 +29,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCleanupManager;
@ -515,6 +516,29 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
metricsFactory);
}
/**
* Create ShardSyncTaskManager from the streamConfig passed
*
* @param metricsFactory - factory to get metrics object
* @param streamConfig - streamConfig for which ShardSyncTaskManager needs to be created
* @return ShardSyncTaskManager
*/
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory, StreamConfig streamConfig,
DeletedStreamListProvider deletedStreamListProvider) {
return new ShardSyncTaskManager(this.createShardDetector(streamConfig),
this.createLeaseRefresher(),
streamConfig.initialPositionInStreamExtended(),
cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards,
shardSyncIntervalMillis,
executorService,
new HierarchicalShardSyncer(isMultiStreamMode, streamConfig.streamIdentifier().toString(),
deletedStreamListProvider),
metricsFactory);
}
@Override
public DynamoDBLeaseRefresher createLeaseRefresher() {
return new DynamoDBLeaseRefresher(tableName, dynamoDBClient, leaseSerializer, consistentReads,

View file

@ -46,7 +46,7 @@ public class RetrievalConfig {
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java";
public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.5";
public static final String KINESIS_CLIENT_LIB_USER_AGENT_VERSION = "2.4.6";
/**
* Client used to make calls to Kinesis for records retrieval

View file

@ -39,6 +39,7 @@ import static software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrat
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -728,16 +729,8 @@ public class SchedulerTest {
private final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreNotDeletedImmediately(boolean expectPendingStreamsForDeletion,
boolean onlyStreamsNoLeasesDeletion)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
List<StreamConfig> streamConfigList1 = IntStream.range(1, 5).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList2 = IntStream.range(3, 7).mapToObj(streamId -> new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)))
.collect(Collectors.toCollection(LinkedList::new));
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,5);
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(3,7);
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
@ -784,6 +777,55 @@ public class SchedulerTest {
scheduler.staleStreamDeletionMap().keySet());
}
@Test
public void testKinesisStaleDeletedStreamCleanup() throws ProvisionedThroughputException, InvalidStateException, DependencyException {
List<StreamConfig> streamConfigList1 = createDummyStreamConfigList(1,6);
List<StreamConfig> streamConfigList2 = createDummyStreamConfigList(1,4);
when(multiStreamTracker.streamConfigList()).thenReturn(streamConfigList1, streamConfigList2);
when(multiStreamTracker.formerStreamsLeasesDeletionStrategy()).thenReturn(new AutoDetectionAndDeferredDeletionStrategy() {
@Override public Duration waitPeriodToDeleteFormerStreams() {
return Duration.ofDays(1);
}
});
retrievalConfig = new RetrievalConfig(kinesisClient, multiStreamTracker, applicationName)
.retrievalFactory(retrievalFactory);
scheduler = spy(new Scheduler(checkpointConfig, coordinatorConfig, leaseManagementConfig, lifecycleConfig,
metricsConfig, processorConfig, retrievalConfig));
when(scheduler.shouldSyncStreamsNow()).thenReturn(true);
// when KCL starts it starts with tracking 5 stream
assertEquals(5, scheduler.currentStreamConfigMap().size());
assertEquals(0, scheduler.staleStreamDeletionMap().size());
// 2 Streams are no longer needed to be consumed
Set<StreamIdentifier> syncedStreams2 = scheduler.checkAndSyncStreamShardsAndLeases();
assertEquals(5, scheduler.currentStreamConfigMap().size());
assertEquals(2, scheduler.staleStreamDeletionMap().size());
// One the stream is deleted from Kinesis side
scheduler.deletedStreamListProvider().add(createDummyStreamConfig(5).streamIdentifier());
Set<StreamIdentifier> syncedStreams3 = scheduler.checkAndSyncStreamShardsAndLeases();
//assert kinesis deleted stream is cleaned up from KCL in memory state.
assertEquals(4, scheduler.currentStreamConfigMap().size());
assertEquals(1, scheduler.staleStreamDeletionMap().size());
verify(multiStreamTracker, times(3)).streamConfigList();
}
//Creates list of upperBound-lowerBound no of dummy StreamConfig
private List<StreamConfig> createDummyStreamConfigList(int lowerBound, int upperBound) {
return IntStream.range(lowerBound, upperBound).mapToObj(this::createDummyStreamConfig)
.collect(Collectors.toCollection(LinkedList::new));
}
private StreamConfig createDummyStreamConfig(int streamId){
return new StreamConfig(
StreamIdentifier.multiStreamInstance(
Joiner.on(":").join(streamId * 111111111, "multiStreamTest-" + streamId, streamId * 12345)),
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST));
}
@Test
public final void testMultiStreamNewStreamsAreSyncedAndStaleStreamsAreDeletedAfterDefermentPeriod()
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
@ -1116,7 +1158,7 @@ public class SchedulerTest {
@Override
public ShardSyncTaskManager createShardSyncTaskManager(MetricsFactory metricsFactory,
StreamConfig streamConfig) {
StreamConfig streamConfig, DeletedStreamListProvider deletedStreamListProvider) {
if(shouldReturnDefaultShardSyncTaskmanager) {
return shardSyncTaskManager;
}

View file

@ -19,6 +19,7 @@ package software.amazon.kinesis.leases;
//
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.atLeast;
@ -55,6 +56,7 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
@ -63,9 +65,12 @@ import software.amazon.kinesis.common.HashKeyRangeForLease;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.coordinator.DeletedStreamListProvider;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseRefresher;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.NullMetricsScope;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@ -294,7 +299,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
@ -317,7 +322,8 @@ public class HierarchicalShardSyncerTest {
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(shardDetector, never()).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -329,7 +335,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
@ -352,7 +358,8 @@ public class HierarchicalShardSyncerTest {
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(shardDetector, never()).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -365,7 +372,7 @@ public class HierarchicalShardSyncerTest {
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
* should never be called.
* or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
@ -398,13 +405,14 @@ public class HierarchicalShardSyncerTest {
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/**
* Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards()
* should never be called.
* or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithShardListMultiStream() throws Exception {
@ -435,13 +443,14 @@ public class HierarchicalShardSyncerTest {
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
/**
* Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards()
* should never be called.
* or shardDetector.listShardsWithoutConsumingResourceNotFoundException() should never be called.
*/
@Test
public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception {
@ -468,6 +477,7 @@ public class HierarchicalShardSyncerTest {
assertThat(extendedSequenceNumbers.size(), equalTo(0));
verify(shardDetector, never()).listShards();
verify(shardDetector, never()).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@ -657,13 +667,13 @@ public class HierarchicalShardSyncerTest {
shards.remove(3);
shards.add(3, shard);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, never()).listLeases();
}
}
@ -677,14 +687,14 @@ public class HierarchicalShardSyncerTest {
shards.remove(3);
shards.add(3, shard);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
setupMultiStream();
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, false,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, never()).listLeases();
}
}
@ -711,7 +721,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
@ -732,7 +742,8 @@ public class HierarchicalShardSyncerTest {
leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(shardDetector, never()).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@ -756,7 +767,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
setupMultiStream();
@ -777,7 +788,7 @@ public class HierarchicalShardSyncerTest {
leaseSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@ -811,7 +822,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
final ArgumentCaptor<Lease> leaseDeleteCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()).thenReturn(leases);
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture())).thenReturn(true);
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseDeleteCaptor.capture());
@ -826,7 +837,7 @@ public class HierarchicalShardSyncerTest {
assertThat(createLeases, equalTo(expectedCreateLeases));
verify(shardDetector, times(1)).listShards();
verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -841,7 +852,7 @@ public class HierarchicalShardSyncerTest {
assertThat(deleteLeases.size(), equalTo(0));
verify(shardDetector, times(2)).listShards();
verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
}
@ -877,7 +888,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases())
.thenThrow(new DependencyException(new Throwable("Throw for ListLeases")))
.thenReturn(Collections.emptyList()).thenReturn(leases);
@ -889,7 +900,7 @@ public class HierarchicalShardSyncerTest {
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -904,7 +915,7 @@ public class HierarchicalShardSyncerTest {
assertThat(createLeases, equalTo(expectedCreateLeases));
verify(shardDetector, times(2)).listShards();
verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -919,13 +930,36 @@ public class HierarchicalShardSyncerTest {
final Set<ExtendedSequenceNumber> expectedSequenceNumbers = new HashSet<>(
Collections.singletonList(ExtendedSequenceNumber.SHARD_END));
verify(shardDetector, times(3)).listShards();
verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(expectedCreateLeases.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(3)).listLeases();
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
}
@Test
public void testDeletedStreamListProviderUpdateOnResourceNotFound()
throws ProvisionedThroughputException, InvalidStateException, DependencyException, InterruptedException {
DeletedStreamListProvider dummyDeletedStreamListProvider = new DeletedStreamListProvider();
hierarchicalShardSyncer = new HierarchicalShardSyncer(MULTISTREAM_MODE_ON, STREAM_IDENTIFIER,
dummyDeletedStreamListProvider);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenThrow(
ResourceNotFoundException.builder()
.build());
boolean response = hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, SCOPE, ignoreUnexpectedChildShards,
dynamoDBLeaseRefresher.isLeaseTableEmpty());
Set<StreamIdentifier> deletedStreamSet = dummyDeletedStreamListProvider.purgeAllDeletedStream();
assertFalse(response);
assertThat(deletedStreamSet.size(), equalTo(1));
assertThat(deletedStreamSet.iterator().next().toString(), equalTo(STREAM_IDENTIFIER));
verify(shardDetector).listShardsWithoutConsumingResourceNotFoundException();
verify(shardDetector, never()).listShards();
}
@Test(expected = DependencyException.class)
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizonAndClosedShardWithCreateLeaseExceptions()
throws Exception {
@ -957,7 +991,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCreateCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList())
.thenReturn(Collections.emptyList()).thenReturn(leases);
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCreateCaptor.capture()))
@ -969,7 +1003,7 @@ public class HierarchicalShardSyncerTest {
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
} finally {
verify(shardDetector, times(1)).listShards();
verify(shardDetector, times(1)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
verify(dynamoDBLeaseRefresher, times(1)).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
@ -983,7 +1017,7 @@ public class HierarchicalShardSyncerTest {
final Set<Lease> expectedCreateLeases = getExpectedLeasesForGraphA(shards, sequenceNumber, position);
assertThat(createLeases, equalTo(expectedCreateLeases));
verify(shardDetector, times(2)).listShards();
verify(shardDetector, times(2)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(2)).listLeases();
verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size()))
.createLeaseIfNotExists(any(Lease.class));
@ -994,7 +1028,7 @@ public class HierarchicalShardSyncerTest {
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, times(3)).listShards();
verify(shardDetector, times(3)).listShardsWithoutConsumingResourceNotFoundException();
verify(dynamoDBLeaseRefresher, times(1 + expectedCreateLeases.size()))
.createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, times(3)).listLeases();
@ -1077,7 +1111,7 @@ public class HierarchicalShardSyncerTest {
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shards);
when(shardDetector.listShardsWithFilter(any())).thenReturn(getFilteredShards(shards, initialPosition));
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(existingLeases);
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(existingLeases.isEmpty());
@ -2296,14 +2330,14 @@ public class HierarchicalShardSyncerTest {
final List<Lease> currentLeases = createLeasesFromShards(shardsWithLeases, ExtendedSequenceNumber.LATEST, "foo");
when(dynamoDBLeaseRefresher.isLeaseTableEmpty()).thenReturn(false);
when(shardDetector.listShards()).thenReturn(shardsWithoutLeases);
when(shardDetector.listShardsWithoutConsumingResourceNotFoundException()).thenReturn(shardsWithoutLeases);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(currentLeases);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
SCOPE, ignoreUnexpectedChildShards, dynamoDBLeaseRefresher.isLeaseTableEmpty());
verify(shardDetector, atLeast(1)).listShards();
verify(shardDetector, atLeast(1)).listShardsWithoutConsumingResourceNotFoundException();
}
/**

View file

@ -22,7 +22,7 @@
<artifactId>amazon-kinesis-client-pom</artifactId>
<packaging>pom</packaging>
<name>Amazon Kinesis Client Library</name>
<version>2.4.5</version>
<version>2.4.6</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
</description>