diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index eaeb5a1c..2380eed5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -36,6 +36,7 @@ import lombok.NoArgsConstructor; import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -120,6 +121,8 @@ public class Scheduler implements Runnable { private final HierarchicalShardSyncer hierarchicalShardSyncer; private final long schedulerInitializationBackoffTimeMillis; + private final ShardFilter bootstrapShardFilter; + // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap<>(); @@ -222,6 +225,7 @@ public class Scheduler implements Runnable { this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil(); this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer(); this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis(); + this.bootstrapShardFilter = leaseManagementConfig.bootstrapShardFilter(); } /** @@ -267,9 +271,10 @@ public class Scheduler implements Runnable { TaskResult result = null; if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) { log.info("Syncing Kinesis shard info"); + ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition, cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer, - metricsFactory); + metricsFactory, bootstrapShardFilter); result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); } else { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); @@ -611,7 +616,8 @@ public class Scheduler implements Runnable { shardDetector, aggregatorUtil, hierarchicalShardSyncer, - metricsFactory); + metricsFactory, + bootstrapShardFilter); return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument, lifecycleConfig.taskExecutionListener(),lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 578af465..8b7c7fe3 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -34,6 +35,7 @@ import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStream; @@ -76,9 +78,9 @@ public class HierarchicalShardSyncer { public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, - final MetricsScope scope) throws DependencyException, InvalidStateException, + final MetricsScope scope, final ShardFilter shardFilter) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - final List latestShards = getShardList(shardDetector); + final List latestShards = getShardList(shardDetector, shardFilter); checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); } @@ -253,7 +255,8 @@ public class HierarchicalShardSyncer { return shardIdToChildShardIdsMap; } - private static List getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException { + private static List getShardList(@NonNull final ShardDetector shardDetector, @NonNull final ShardFilter shardFilter) + throws KinesisClientLibIOException { final List shards = shardDetector.listShards(); if (shards == null) { throw new KinesisClientLibIOException( @@ -530,7 +533,10 @@ public class HierarchicalShardSyncer { if (!CollectionUtils.isNullOrEmpty(garbageLeases)) { log.info("Found {} candidate leases for cleanup. Refreshing list of" + " Kinesis shards to pick up recent/latest shards", garbageLeases.size()); - final Set currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId) + + final ShardFilter allShardsFilter = ShardFilter.builder().build(); + + final Set currentKinesisShardIds = getShardList(shardDetector, allShardsFilter).stream().map(Shard::shardId) .collect(Collectors.toSet()); for (Lease lease : garbageLeases) { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index ba136f0a..5d01e9dd 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; @@ -72,19 +73,21 @@ public class KinesisShardDetector implements ShardDetector { private volatile Instant lastCacheUpdateTime; @Getter(AccessLevel.PACKAGE) private AtomicInteger cacheMisses = new AtomicInteger(0); + @NonNull + private final ShardFilter shardFilter; @Deprecated public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, - int cacheMissWarningModulus) { + int cacheMissWarningModulus, ShardFilter shardFilter) { this(kinesisClient, streamName, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, - LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, shardFilter); } public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis, int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload, - int cacheMissWarningModulus, Duration kinesisRequestTimeout) { + int cacheMissWarningModulus, Duration kinesisRequestTimeout, ShardFilter shardFilter) { this.kinesisClient = kinesisClient; this.streamName = streamName; this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis; @@ -93,6 +96,7 @@ public class KinesisShardDetector implements ShardDetector { this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload; this.cacheMissWarningModulus = cacheMissWarningModulus; this.kinesisRequestTimeout = kinesisRequestTimeout; + this.shardFilter = shardFilter; } @Override @@ -154,7 +158,7 @@ public class KinesisShardDetector implements ShardDetector { String nextToken = null; do { - result = listShards(nextToken); + result = listShards(nextToken, shardFilter); if (result == null) { /* @@ -172,7 +176,7 @@ public class KinesisShardDetector implements ShardDetector { return shards; } - private ListShardsResponse listShards(final String nextToken) { + private ListShardsResponse listShards(final String nextToken, final ShardFilter shardFilter) { final AWSExceptionManager exceptionManager = new AWSExceptionManager(); exceptionManager.add(LimitExceededException.class, t -> t); exceptionManager.add(ResourceInUseException.class, t -> t); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 20e0aa8f..96bc1fd7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -30,6 +30,8 @@ import lombok.experimental.Accessors; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; @@ -176,6 +178,17 @@ public class LeaseManagementConfig { private InitialPositionInStreamExtended initialPositionInStream = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); + /** + * Shard filter ot use for bootstrap. This needs to align with initialPositionInStreamExtended. i.e + * + * InitialPositionInStream.LATEST => ShardFilterType.AT_LATEST + * InitialPositionInStream.TRIM_HORIZON => ShardFilterType.FROM_TRIM_HORIZON + * InitialPositionInStream.AT_TIMESTAMP => ShardFilterType.FROM_TIMESTAMP + * + */ + + private ShardFilter bootstrapShardFilter = ShardFilter.builder().type(ShardFilterType.FROM_TRIM_HORIZON).build(); + private int maxCacheMissesBeforeReload = 1000; private long listShardsCacheAllowedAgeInSeconds = 30; private int cacheMissWarningModulus = 250; @@ -270,7 +283,8 @@ public class LeaseManagementConfig { initialLeaseTableReadCapacity(), initialLeaseTableWriteCapacity(), hierarchicalShardSyncer(), - tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode()); + tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode(), + bootstrapShardFilter); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index cf3a1a78..96e71ef7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -16,8 +16,10 @@ package software.amazon.kinesis.leases; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import java.util.List; +import java.util.Optional; /** * @@ -26,5 +28,4 @@ public interface ShardDetector { Shard shard(String shardId); List listShards(); - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java index c59608b2..b58d7269 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTask.java @@ -17,6 +17,8 @@ package software.amazon.kinesis.leases; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.lifecycle.ConsumerTask; @@ -48,11 +50,13 @@ public class ShardSyncTask implements ConsumerTask { private final boolean ignoreUnexpectedChildShards; private final long shardSyncTaskIdleTimeMillis; @NonNull - private final HierarchicalShardSyncer hierarchicalShardSyncer; + private final HierarchicalShardSyncer hierarchicalShardSyncerShardSyncTask; @NonNull private final MetricsFactory metricsFactory; private final TaskType taskType = TaskType.SHARDSYNC; + @NonNull + private final ShardFilter shardFilter; /* * (non-Javadoc) @@ -64,8 +68,8 @@ public class ShardSyncTask implements ConsumerTask { final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION); try { - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope); + hierarchicalShardSyncerShardSyncTask.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, + cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope, shardFilter); if (shardSyncTaskIdleTimeMillis > 0) { Thread.sleep(shardSyncTaskIdleTimeMillis); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java index f6db72e3..ca4990ac 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardSyncTaskManager.java @@ -18,6 +18,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import lombok.Data; @@ -53,6 +54,8 @@ public class ShardSyncTaskManager { private final HierarchicalShardSyncer hierarchicalShardSyncer; @NonNull private final MetricsFactory metricsFactory; + @NonNull + private final ShardFilter shardFilter; /** * Constructor. @@ -72,7 +75,7 @@ public class ShardSyncTaskManager { public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService, - MetricsFactory metricsFactory) { + MetricsFactory metricsFactory, ShardFilter shardFilter) { this.shardDetector = shardDetector; this.leaseRefresher = leaseRefresher; this.initialPositionInStream = initialPositionInStream; @@ -82,6 +85,7 @@ public class ShardSyncTaskManager { this.executorService = executorService; this.hierarchicalShardSyncer = new HierarchicalShardSyncer(); this.metricsFactory = metricsFactory; + this.shardFilter = shardFilter; } /** @@ -100,7 +104,7 @@ public class ShardSyncTaskManager { public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher, InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion, boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService, - HierarchicalShardSyncer hierarchicalShardSyncer, MetricsFactory metricsFactory) { + HierarchicalShardSyncer hierarchicalShardSyncer, MetricsFactory metricsFactory, ShardFilter shardFilter) { this.shardDetector = shardDetector; this.leaseRefresher = leaseRefresher; this.initialPositionInStream = initialPositionInStream; @@ -110,6 +114,7 @@ public class ShardSyncTaskManager { this.executorService = executorService; this.hierarchicalShardSyncer = hierarchicalShardSyncer; this.metricsFactory = metricsFactory; + this.shardFilter = shardFilter; } private ConsumerTask currentTask; @@ -143,7 +148,8 @@ public class ShardSyncTaskManager { ignoreUnexpectedChildShards, shardSyncIdleTimeMillis, hierarchicalShardSyncer, - metricsFactory), + metricsFactory, + shardFilter), metricsFactory); future = executorService.submit(currentTask); submittedNewTask = true; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index c2ade429..127131fc 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -23,6 +23,7 @@ import lombok.NonNull; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.leases.HierarchicalShardSyncer; @@ -57,6 +58,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final InitialPositionInStreamExtended initialPositionInStream; @NonNull private final HierarchicalShardSyncer hierarchicalShardSyncer; + @NonNull + private final ShardFilter shardFilter; private final long failoverTimeMillis; private final long epsilonMillis; @@ -114,14 +117,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, - final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus) { + final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final ShardFilter shardFilter) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, - TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY); + TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, shardFilter); } /** @@ -165,7 +168,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, - final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) { + final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final ShardFilter shardFilter) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, @@ -173,7 +176,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK, - LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, shardFilter); } /** @@ -216,19 +219,20 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + ShardFilter shardFilter) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, shardFilter); } /** * Constructor. - * + * * @param kinesisClient * @param streamName * @param dynamoDBClient @@ -268,14 +272,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, - Duration dynamoDbRequestTimeout) { + Duration dynamoDbRequestTimeout, ShardFilter shardFilter) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED, shardFilter); } /** @@ -320,7 +324,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, - Duration dynamoDbRequestTimeout, BillingMode billingMode) { + Duration dynamoDbRequestTimeout, BillingMode billingMode, ShardFilter shardFilter) { this.kinesisClient = kinesisClient; this.streamName = streamName; this.dynamoDBClient = dynamoDBClient; @@ -348,6 +352,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.tableCreatorCallback = tableCreatorCallback; this.dynamoDbRequestTimeout = dynamoDbRequestTimeout; this.billingMode = billingMode; + this.shardFilter = shardFilter; } @Override @@ -374,7 +379,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { shardSyncIntervalMillis, executorService, hierarchicalShardSyncer, - metricsFactory); + metricsFactory, + shardFilter); } @Override @@ -387,6 +393,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public ShardDetector createShardDetector() { return new KinesisShardDetector(kinesisClient, streamName, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, - cacheMissWarningModulus, dynamoDbRequestTimeout); + cacheMissWarningModulus, dynamoDbRequestTimeout, shardFilter); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java index 4e9245f6..809e933d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/ShardSyncer.java @@ -1,6 +1,7 @@ package software.amazon.kinesis.leases.exceptions; import lombok.NonNull; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; @@ -38,9 +39,9 @@ public class ShardSyncer { public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector, final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, - final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, + final MetricsScope scope, final ShardFilter shardFilter) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, shardFilter); } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index bb1788b2..ac2f449e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -269,7 +269,8 @@ class ConsumerStates { input, argument.shouldCallProcessRecordsEvenForEmptyRecordList(), argument.idleTimeInMilliseconds(), - argument.aggregatorUtil(), argument.metricsFactory() + argument.aggregatorUtil(), argument.metricsFactory(), + argument.shardFilter() ); } @@ -496,7 +497,8 @@ class ConsumerStates { argument.taskBackoffTimeMillis(), argument.recordsPublisher(), argument.hierarchicalShardSyncer(), - argument.metricsFactory()); + argument.metricsFactory(), + argument.shardFilter()); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 6c223650..25d43218 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -21,6 +21,7 @@ import lombok.NonNull; import lombok.extern.slf4j.Slf4j; import software.amazon.awssdk.services.cloudwatch.model.StandardUnit; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.leases.ShardDetector; @@ -60,6 +61,7 @@ public class ProcessTask implements ConsumerTask { private final ProcessRecordsInput processRecordsInput; private final MetricsFactory metricsFactory; private final AggregatorUtil aggregatorUtil; + private final ShardFilter shardFilter; public ProcessTask(@NonNull ShardInfo shardInfo, @NonNull ShardRecordProcessor shardRecordProcessor, @@ -72,7 +74,8 @@ public class ProcessTask implements ConsumerTask { boolean shouldCallProcessRecordsEvenForEmptyRecordList, long idleTimeInMilliseconds, @NonNull AggregatorUtil aggregatorUtil, - @NonNull MetricsFactory metricsFactory) { + @NonNull MetricsFactory metricsFactory, + @NonNull ShardFilter shardFilter) { this.shardInfo = shardInfo; this.shardRecordProcessor = shardRecordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -82,6 +85,7 @@ public class ProcessTask implements ConsumerTask { this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList; this.idleTimeInMilliseconds = idleTimeInMilliseconds; this.metricsFactory = metricsFactory; + this.shardFilter = shardFilter; if (!skipShardSyncAtWorkerInitializationIfLeasesExist) { this.shard = shardDetector.shard(shardInfo.shardId()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index 4f1db733..11bbc40f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -18,6 +18,7 @@ package software.amazon.kinesis.lifecycle; import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -71,4 +72,6 @@ public class ShardConsumerArgument { private final HierarchicalShardSyncer hierarchicalShardSyncer; @NonNull private final MetricsFactory metricsFactory; + @NonNull + private final ShardFilter shardFilter; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 2bfcd358..901cb992 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; @@ -79,6 +80,8 @@ public class ShutdownTask implements ConsumerTask { private final HierarchicalShardSyncer hierarchicalShardSyncer; @NonNull private final MetricsFactory metricsFactory; + @NonNull + private final ShardFilter shardFilter; private final TaskType taskType = TaskType.SHUTDOWN; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java index ca0e4edf..092a0db9 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/RetrievalConfig.java @@ -19,6 +19,8 @@ import lombok.Data; import lombok.NonNull; import lombok.experimental.Accessors; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.retrieval.fanout.FanOutConfig; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 23d2e423..fb6b2669 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -53,6 +53,8 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; @@ -77,6 +79,7 @@ public class HierarchicalShardSyncerTest { private final boolean cleanupLeasesOfCompletedShards = true; private final boolean ignoreUnexpectedChildShards = false; + private final ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build(); private HierarchicalShardSyncer hierarchicalShardSyncer; @@ -186,7 +189,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, false, SCOPE); + cleanupLeasesOfCompletedShards, false, SCOPE, shardFilter); final Set expectedShardIds = new HashSet<>( Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); @@ -299,7 +302,7 @@ public class HierarchicalShardSyncerTest { try { hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE, shardFilter); } finally { verify(shardDetector).listShards(); verify(dynamoDBLeaseRefresher, never()).listLeases(); @@ -334,7 +337,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, - cleanupLeasesOfCompletedShards, true, SCOPE); + cleanupLeasesOfCompletedShards, true, SCOPE, shardFilter); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -390,7 +393,7 @@ public class HierarchicalShardSyncerTest { // Initial call: No leases present, create leases. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -405,7 +408,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); final Set sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint) @@ -465,7 +468,7 @@ public class HierarchicalShardSyncerTest { // Initial call: Call to create leases. hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); @@ -482,7 +485,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); } finally { List deleteLeases = leaseDeleteCaptor.getAllValues(); Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -506,7 +509,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); deleteLeases = leaseDeleteCaptor.getAllValues(); @@ -567,7 +570,7 @@ public class HierarchicalShardSyncerTest { // Initial call: Call to create leases. Fails on ListLeases hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -577,7 +580,7 @@ public class HierarchicalShardSyncerTest { // Second call: Leases not present, leases will be created. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); @@ -592,7 +595,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -658,7 +661,7 @@ public class HierarchicalShardSyncerTest { // Initial call: No leases present, create leases. Create lease Fails hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); } finally { verify(shardDetector, times(1)).listShards(); verify(dynamoDBLeaseRefresher, times(1)).listLeases(); @@ -667,7 +670,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); final Set createLeases = new HashSet<>(leaseCreateCaptor.getAllValues()); final Set expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null)); @@ -682,7 +685,7 @@ public class HierarchicalShardSyncerTest { // Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up. hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position, - cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); final List deleteLeases = leaseDeleteCaptor.getAllValues(); final Set shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); @@ -744,7 +747,7 @@ public class HierarchicalShardSyncerTest { doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture()); hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, - INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE); + INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter); assertThat(leaseCaptor.getAllValues().size(), equalTo(1)); assertThat(leaseCaptor.getValue(), equalTo(garbageLease)); @@ -776,7 +779,7 @@ public class HierarchicalShardSyncerTest { hierarchicalShardSyncer .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition, - cleanupLeasesOfCompletedShards, false, SCOPE); + cleanupLeasesOfCompletedShards, false, SCOPE, shardFilter); final List leases = leaseCaptor.getAllValues(); final Set leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java index 1a37f614..38b339c4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java @@ -51,6 +51,8 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; /** * @@ -67,6 +69,7 @@ public class KinesisShardDetectorTest { private static final String SHARD_ID = "shardId-%012d"; private KinesisShardDetector shardDetector; + private ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build(); @Rule public ExpectedException expectedExceptionRule = ExpectedException.none(); @@ -80,7 +83,7 @@ public class KinesisShardDetectorTest { public void setup() { shardDetector = new KinesisShardDetector(client, STREAM_NAME, LIST_SHARDS_BACKOFF_TIME_IN_MILLIS, MAX_LIST_SHARDS_RETRY_ATTEMPTS, LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, - MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS); + MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, shardFilter); } @Test diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java index 42b826d1..67dfc4c0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/ShardSyncTaskIntegrationTest.java @@ -35,6 +35,8 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.awssdk.services.kinesis.model.StreamStatus; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -60,6 +62,7 @@ public class ShardSyncTaskIntegrationTest { private static final int CACHE_MISS_WARNING_MODULUS = 250; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private static KinesisAsyncClient kinesisClient; + private static ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build(); private LeaseRefresher leaseRefresher; private ShardDetector shardDetector; @@ -97,7 +100,7 @@ public class ShardSyncTaskIntegrationTest { USE_CONSISTENT_READS, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); shardDetector = new KinesisShardDetector(kinesisClient, STREAM_NAME, 500L, 50, - LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS); + LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS, shardFilter); hierarchicalShardSyncer = new HierarchicalShardSyncer(); } @@ -119,7 +122,7 @@ public class ShardSyncTaskIntegrationTest { Set shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet()); ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, 0L, - hierarchicalShardSyncer, NULL_METRICS_FACTORY); + hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter); syncTask.call(); List leases = leaseRefresher.listLeases(); Set leaseKeys = new HashSet<>(); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 16f5e9a4..33df3610 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -40,6 +40,8 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -66,6 +68,7 @@ public class ConsumerStatesTest { private ShardConsumer consumer; private ShardConsumerArgument argument; + private ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build(); @Mock private ShardRecordProcessor shardRecordProcessor; @@ -119,7 +122,7 @@ public class ConsumerStatesTest { taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, - new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory); + new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, shardFilter); consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, argument, taskExecutionListener, 0)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java index 16e6426a..345762db 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ProcessTaskTest.java @@ -62,6 +62,8 @@ import lombok.Data; import lombok.Getter; import software.amazon.awssdk.services.kinesis.model.HashKeyRange; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -83,6 +85,7 @@ public class ProcessTaskTest { private boolean shouldCallProcessRecordsEvenForEmptyRecordList = true; private boolean skipShardSyncAtWorkerInitializationIfLeasesExist = true; private ShardInfo shardInfo; + private ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build(); @Mock private ProcessRecordsInput processRecordsInput; @@ -122,7 +125,7 @@ public class ProcessTaskTest { return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis, skipShardSync, shardDetector, throttlingReporter, processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS, - aggregatorUtil, new NullMetricsFactory()); + aggregatorUtil, new NullMetricsFactory(), shardFilter); } @Test diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 6af62edb..2f8860d7 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -37,6 +37,8 @@ import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardFilterType; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -75,6 +77,7 @@ public class ShutdownTaskTest { private boolean ignoreUnexpectedChildShards = false; private ShardInfo shardInfo; private ShutdownTask task; + private ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build(); @Mock private RecordsPublisher recordsPublisher; @@ -104,7 +107,7 @@ public class ShutdownTaskTest { task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY); + hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter); } /** @@ -158,7 +161,7 @@ public class ShutdownTaskTest { task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY); + hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter); when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); @@ -183,7 +186,7 @@ public class ShutdownTaskTest { task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY); + hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter); when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); @@ -207,7 +210,7 @@ public class ShutdownTaskTest { task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY); + hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter); when(shardDetector.listShards()).thenReturn(constructShardListGraphA());