Adding bootstrap sync strategy with list shards with filter.

This commit is contained in:
jushkem 2020-02-14 03:36:27 -08:00
parent b3bcc59697
commit 06c716ccea
20 changed files with 141 additions and 61 deletions

View file

@ -36,6 +36,7 @@ import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -120,6 +121,8 @@ public class Scheduler implements Runnable {
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final long schedulerInitializationBackoffTimeMillis;
private final ShardFilter bootstrapShardFilter;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<>();
@ -222,6 +225,7 @@ public class Scheduler implements Runnable {
this.aggregatorUtil = this.lifecycleConfig.aggregatorUtil();
this.hierarchicalShardSyncer = leaseManagementConfig.hierarchicalShardSyncer();
this.schedulerInitializationBackoffTimeMillis = this.coordinatorConfig.schedulerInitializationBackoffTimeMillis();
this.bootstrapShardFilter = leaseManagementConfig.bootstrapShardFilter();
}
/**
@ -267,9 +271,10 @@ public class Scheduler implements Runnable {
TaskResult result = null;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist || leaseRefresher.isLeaseTableEmpty()) {
log.info("Syncing Kinesis shard info");
ShardSyncTask shardSyncTask = new ShardSyncTask(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpetedChildShards, 0L, hierarchicalShardSyncer,
metricsFactory);
metricsFactory, bootstrapShardFilter);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
@ -611,7 +616,8 @@ public class Scheduler implements Runnable {
shardDetector,
aggregatorUtil,
hierarchicalShardSyncer,
metricsFactory);
metricsFactory,
bootstrapShardFilter);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(),lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
}

View file

@ -24,6 +24,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -34,6 +35,7 @@ import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStream;
@ -76,9 +78,9 @@ public class HierarchicalShardSyncer {
public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException,
final MetricsScope scope, final ShardFilter shardFilter) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> latestShards = getShardList(shardDetector);
final List<Shard> latestShards = getShardList(shardDetector, shardFilter);
checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, scope, latestShards);
}
@ -253,7 +255,8 @@ public class HierarchicalShardSyncer {
return shardIdToChildShardIdsMap;
}
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector) throws KinesisClientLibIOException {
private static List<Shard> getShardList(@NonNull final ShardDetector shardDetector, @NonNull final ShardFilter shardFilter)
throws KinesisClientLibIOException {
final List<Shard> shards = shardDetector.listShards();
if (shards == null) {
throw new KinesisClientLibIOException(
@ -530,7 +533,10 @@ public class HierarchicalShardSyncer {
if (!CollectionUtils.isNullOrEmpty(garbageLeases)) {
log.info("Found {} candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards", garbageLeases.size());
final Set<String> currentKinesisShardIds = getShardList(shardDetector).stream().map(Shard::shardId)
final ShardFilter allShardsFilter = ShardFilter.builder().build();
final Set<String> currentKinesisShardIds = getShardList(shardDetector, allShardsFilter).stream().map(Shard::shardId)
.collect(Collectors.toSet());
for (Lease lease : garbageLeases) {

View file

@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils;
@ -72,19 +73,21 @@ public class KinesisShardDetector implements ShardDetector {
private volatile Instant lastCacheUpdateTime;
@Getter(AccessLevel.PACKAGE)
private AtomicInteger cacheMisses = new AtomicInteger(0);
@NonNull
private final ShardFilter shardFilter;
@Deprecated
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus) {
int cacheMissWarningModulus, ShardFilter shardFilter) {
this(kinesisClient, streamName, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts,
listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus,
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, shardFilter);
}
public KinesisShardDetector(KinesisAsyncClient kinesisClient, String streamName, long listShardsBackoffTimeInMillis,
int maxListShardsRetryAttempts, long listShardsCacheAllowedAgeInSeconds, int maxCacheMissesBeforeReload,
int cacheMissWarningModulus, Duration kinesisRequestTimeout) {
int cacheMissWarningModulus, Duration kinesisRequestTimeout, ShardFilter shardFilter) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.listShardsBackoffTimeInMillis = listShardsBackoffTimeInMillis;
@ -93,6 +96,7 @@ public class KinesisShardDetector implements ShardDetector {
this.maxCacheMissesBeforeReload = maxCacheMissesBeforeReload;
this.cacheMissWarningModulus = cacheMissWarningModulus;
this.kinesisRequestTimeout = kinesisRequestTimeout;
this.shardFilter = shardFilter;
}
@Override
@ -154,7 +158,7 @@ public class KinesisShardDetector implements ShardDetector {
String nextToken = null;
do {
result = listShards(nextToken);
result = listShards(nextToken, shardFilter);
if (result == null) {
/*
@ -172,7 +176,7 @@ public class KinesisShardDetector implements ShardDetector {
return shards;
}
private ListShardsResponse listShards(final String nextToken) {
private ListShardsResponse listShards(final String nextToken, final ShardFilter shardFilter) {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(LimitExceededException.class, t -> t);
exceptionManager.add(ResourceInUseException.class, t -> t);

View file

@ -30,6 +30,8 @@ import lombok.experimental.Accessors;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
@ -176,6 +178,17 @@ public class LeaseManagementConfig {
private InitialPositionInStreamExtended initialPositionInStream =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
/**
* Shard filter ot use for bootstrap. This needs to align with initialPositionInStreamExtended. i.e
*
* InitialPositionInStream.LATEST => ShardFilterType.AT_LATEST
* InitialPositionInStream.TRIM_HORIZON => ShardFilterType.FROM_TRIM_HORIZON
* InitialPositionInStream.AT_TIMESTAMP => ShardFilterType.FROM_TIMESTAMP
*
*/
private ShardFilter bootstrapShardFilter = ShardFilter.builder().type(ShardFilterType.FROM_TRIM_HORIZON).build();
private int maxCacheMissesBeforeReload = 1000;
private long listShardsCacheAllowedAgeInSeconds = 30;
private int cacheMissWarningModulus = 250;
@ -270,7 +283,8 @@ public class LeaseManagementConfig {
initialLeaseTableReadCapacity(),
initialLeaseTableWriteCapacity(),
hierarchicalShardSyncer(),
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode());
tableCreatorCallback(), dynamoDbRequestTimeout(), billingMode(),
bootstrapShardFilter);
}
return leaseManagementFactory;
}

View file

@ -16,8 +16,10 @@
package software.amazon.kinesis.leases;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import java.util.List;
import java.util.Optional;
/**
*
@ -26,5 +28,4 @@ public interface ShardDetector {
Shard shard(String shardId);
List<Shard> listShards();
}

View file

@ -17,6 +17,8 @@ package software.amazon.kinesis.leases;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.lifecycle.ConsumerTask;
@ -48,11 +50,13 @@ public class ShardSyncTask implements ConsumerTask {
private final boolean ignoreUnexpectedChildShards;
private final long shardSyncTaskIdleTimeMillis;
@NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
private final HierarchicalShardSyncer hierarchicalShardSyncerShardSyncTask;
@NonNull
private final MetricsFactory metricsFactory;
private final TaskType taskType = TaskType.SHARDSYNC;
@NonNull
private final ShardFilter shardFilter;
/*
* (non-Javadoc)
@ -64,8 +68,8 @@ public class ShardSyncTask implements ConsumerTask {
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHARD_SYNC_TASK_OPERATION);
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope);
hierarchicalShardSyncerShardSyncTask.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, scope, shardFilter);
if (shardSyncTaskIdleTimeMillis > 0) {
Thread.sleep(shardSyncTaskIdleTimeMillis);
}

View file

@ -18,6 +18,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import lombok.Data;
@ -53,6 +54,8 @@ public class ShardSyncTaskManager {
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;
@NonNull
private final ShardFilter shardFilter;
/**
* Constructor.
@ -72,7 +75,7 @@ public class ShardSyncTaskManager {
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService,
MetricsFactory metricsFactory) {
MetricsFactory metricsFactory, ShardFilter shardFilter) {
this.shardDetector = shardDetector;
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
@ -82,6 +85,7 @@ public class ShardSyncTaskManager {
this.executorService = executorService;
this.hierarchicalShardSyncer = new HierarchicalShardSyncer();
this.metricsFactory = metricsFactory;
this.shardFilter = shardFilter;
}
/**
@ -100,7 +104,7 @@ public class ShardSyncTaskManager {
public ShardSyncTaskManager(ShardDetector shardDetector, LeaseRefresher leaseRefresher,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesUponShardCompletion,
boolean ignoreUnexpectedChildShards, long shardSyncIdleTimeMillis, ExecutorService executorService,
HierarchicalShardSyncer hierarchicalShardSyncer, MetricsFactory metricsFactory) {
HierarchicalShardSyncer hierarchicalShardSyncer, MetricsFactory metricsFactory, ShardFilter shardFilter) {
this.shardDetector = shardDetector;
this.leaseRefresher = leaseRefresher;
this.initialPositionInStream = initialPositionInStream;
@ -110,6 +114,7 @@ public class ShardSyncTaskManager {
this.executorService = executorService;
this.hierarchicalShardSyncer = hierarchicalShardSyncer;
this.metricsFactory = metricsFactory;
this.shardFilter = shardFilter;
}
private ConsumerTask currentTask;
@ -143,7 +148,8 @@ public class ShardSyncTaskManager {
ignoreUnexpectedChildShards,
shardSyncIdleTimeMillis,
hierarchicalShardSyncer,
metricsFactory),
metricsFactory,
shardFilter),
metricsFactory);
future = executorService.submit(currentTask);
submittedNewTask = true;

View file

@ -23,6 +23,7 @@ import lombok.NonNull;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
@ -57,6 +58,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final InitialPositionInStreamExtended initialPositionInStream;
@NonNull
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final ShardFilter shardFilter;
private final long failoverTimeMillis;
private final long epsilonMillis;
@ -114,14 +117,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus) {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final ShardFilter shardFilter) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY);
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, shardFilter);
}
/**
@ -165,7 +168,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) {
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final ShardFilter shardFilter) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
@ -173,7 +176,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
new HierarchicalShardSyncer(), TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK,
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, shardFilter);
}
/**
@ -216,19 +219,20 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
ShardFilter shardFilter) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, shardFilter);
}
/**
* Constructor.
*
*
* @param kinesisClient
* @param streamName
* @param dynamoDBClient
@ -268,14 +272,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout) {
Duration dynamoDbRequestTimeout, ShardFilter shardFilter) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED, shardFilter);
}
/**
@ -320,7 +324,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
Duration dynamoDbRequestTimeout, BillingMode billingMode, ShardFilter shardFilter) {
this.kinesisClient = kinesisClient;
this.streamName = streamName;
this.dynamoDBClient = dynamoDBClient;
@ -348,6 +352,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.tableCreatorCallback = tableCreatorCallback;
this.dynamoDbRequestTimeout = dynamoDbRequestTimeout;
this.billingMode = billingMode;
this.shardFilter = shardFilter;
}
@Override
@ -374,7 +379,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
shardSyncIntervalMillis,
executorService,
hierarchicalShardSyncer,
metricsFactory);
metricsFactory,
shardFilter);
}
@Override
@ -387,6 +393,6 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
public ShardDetector createShardDetector() {
return new KinesisShardDetector(kinesisClient, streamName, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
cacheMissWarningModulus, dynamoDbRequestTimeout);
cacheMissWarningModulus, dynamoDbRequestTimeout, shardFilter);
}
}

View file

@ -1,6 +1,7 @@
package software.amazon.kinesis.leases.exceptions;
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
@ -38,9 +39,9 @@ public class ShardSyncer {
public static synchronized void checkAndCreateLeasesForNewShards(@NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition,
final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards,
final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
final MetricsScope scope, final ShardFilter shardFilter) throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
HIERARCHICAL_SHARD_SYNCER.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, shardFilter);
}
}

View file

@ -269,7 +269,8 @@ class ConsumerStates {
input,
argument.shouldCallProcessRecordsEvenForEmptyRecordList(),
argument.idleTimeInMilliseconds(),
argument.aggregatorUtil(), argument.metricsFactory()
argument.aggregatorUtil(), argument.metricsFactory(),
argument.shardFilter()
);
}
@ -496,7 +497,8 @@ class ConsumerStates {
argument.taskBackoffTimeMillis(),
argument.recordsPublisher(),
argument.hierarchicalShardSyncer(),
argument.metricsFactory());
argument.metricsFactory(),
argument.shardFilter());
}
@Override

View file

@ -21,6 +21,7 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.cloudwatch.model.StandardUnit;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardDetector;
@ -60,6 +61,7 @@ public class ProcessTask implements ConsumerTask {
private final ProcessRecordsInput processRecordsInput;
private final MetricsFactory metricsFactory;
private final AggregatorUtil aggregatorUtil;
private final ShardFilter shardFilter;
public ProcessTask(@NonNull ShardInfo shardInfo,
@NonNull ShardRecordProcessor shardRecordProcessor,
@ -72,7 +74,8 @@ public class ProcessTask implements ConsumerTask {
boolean shouldCallProcessRecordsEvenForEmptyRecordList,
long idleTimeInMilliseconds,
@NonNull AggregatorUtil aggregatorUtil,
@NonNull MetricsFactory metricsFactory) {
@NonNull MetricsFactory metricsFactory,
@NonNull ShardFilter shardFilter) {
this.shardInfo = shardInfo;
this.shardRecordProcessor = shardRecordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -82,6 +85,7 @@ public class ProcessTask implements ConsumerTask {
this.shouldCallProcessRecordsEvenForEmptyRecordList = shouldCallProcessRecordsEvenForEmptyRecordList;
this.idleTimeInMilliseconds = idleTimeInMilliseconds;
this.metricsFactory = metricsFactory;
this.shardFilter = shardFilter;
if (!skipShardSyncAtWorkerInitializationIfLeasesExist) {
this.shard = shardDetector.shard(shardInfo.shardId());

View file

@ -18,6 +18,7 @@ package software.amazon.kinesis.lifecycle;
import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -71,4 +72,6 @@ public class ShardConsumerArgument {
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;
@NonNull
private final ShardFilter shardFilter;
}

View file

@ -22,6 +22,7 @@ import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
@ -79,6 +80,8 @@ public class ShutdownTask implements ConsumerTask {
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;
@NonNull
private final ShardFilter shardFilter;
private final TaskType taskType = TaskType.SHUTDOWN;

View file

@ -19,6 +19,8 @@ import lombok.Data;
import lombok.NonNull;
import lombok.experimental.Accessors;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.retrieval.fanout.FanOutConfig;

View file

@ -53,6 +53,8 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
@ -77,6 +79,7 @@ public class HierarchicalShardSyncerTest {
private final boolean cleanupLeasesOfCompletedShards = true;
private final boolean ignoreUnexpectedChildShards = false;
private final ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
private HierarchicalShardSyncer hierarchicalShardSyncer;
@ -186,7 +189,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE);
cleanupLeasesOfCompletedShards, false, SCOPE, shardFilter);
final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
@ -299,7 +302,7 @@ public class HierarchicalShardSyncerTest {
try {
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, false, SCOPE, shardFilter);
} finally {
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, never()).listLeases();
@ -334,7 +337,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, true, SCOPE);
cleanupLeasesOfCompletedShards, true, SCOPE, shardFilter);
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -390,7 +393,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -405,7 +408,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases present, with shardId-0 being at ShardEnd causing cleanup.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> sequenceNumbers = deleteLeases.stream().map(Lease::checkpoint)
@ -465,7 +468,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases.
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
@ -482,7 +485,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete fails.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
} finally {
List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -506,7 +509,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases already present. ShardId-0 is at ShardEnd and needs to be cleaned up. Delete passes.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
deleteLeases = leaseDeleteCaptor.getAllValues();
@ -567,7 +570,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: Call to create leases. Fails on ListLeases
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
} finally {
verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -577,7 +580,7 @@ public class HierarchicalShardSyncerTest {
// Second call: Leases not present, leases will be created.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -592,7 +595,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases present, belongs to TestOwner, shardId-0 is at ShardEnd should be cleaned up.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -658,7 +661,7 @@ public class HierarchicalShardSyncerTest {
// Initial call: No leases present, create leases. Create lease Fails
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
} finally {
verify(shardDetector, times(1)).listShards();
verify(dynamoDBLeaseRefresher, times(1)).listLeases();
@ -667,7 +670,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
final Set<Lease> createLeases = new HashSet<>(leaseCreateCaptor.getAllValues());
final Set<Lease> expectedCreateLeases = new HashSet<>(createLeasesFromShards(shards, sequenceNumber, null));
@ -682,7 +685,7 @@ public class HierarchicalShardSyncerTest {
// Final call: Leases are present, shardId-0 is at ShardEnd needs to be cleaned up.
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, position,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
final List<Lease> deleteLeases = leaseDeleteCaptor.getAllValues();
final Set<String> shardIds = deleteLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
@ -744,7 +747,7 @@ public class HierarchicalShardSyncerTest {
doNothing().when(dynamoDBLeaseRefresher).deleteLease(leaseCaptor.capture());
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher,
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE);
INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, SCOPE, shardFilter);
assertThat(leaseCaptor.getAllValues().size(), equalTo(1));
assertThat(leaseCaptor.getValue(), equalTo(garbageLease));
@ -776,7 +779,7 @@ public class HierarchicalShardSyncerTest {
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, initialPosition,
cleanupLeasesOfCompletedShards, false, SCOPE);
cleanupLeasesOfCompletedShards, false, SCOPE, shardFilter);
final List<Lease> leases = leaseCaptor.getAllValues();
final Set<String> leaseKeys = leases.stream().map(Lease::leaseKey).collect(Collectors.toSet());

View file

@ -51,6 +51,8 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
/**
*
@ -67,6 +69,7 @@ public class KinesisShardDetectorTest {
private static final String SHARD_ID = "shardId-%012d";
private KinesisShardDetector shardDetector;
private ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
@Rule
public ExpectedException expectedExceptionRule = ExpectedException.none();
@ -80,7 +83,7 @@ public class KinesisShardDetectorTest {
public void setup() {
shardDetector = new KinesisShardDetector(client, STREAM_NAME, LIST_SHARDS_BACKOFF_TIME_IN_MILLIS,
MAX_LIST_SHARDS_RETRY_ATTEMPTS, LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS,
MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS);
MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT, shardFilter);
}
@Test

View file

@ -35,6 +35,8 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest;
import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.awssdk.services.kinesis.model.StreamStatus;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -60,6 +62,7 @@ public class ShardSyncTaskIntegrationTest {
private static final int CACHE_MISS_WARNING_MODULUS = 250;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
private static KinesisAsyncClient kinesisClient;
private static ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
private LeaseRefresher leaseRefresher;
private ShardDetector shardDetector;
@ -97,7 +100,7 @@ public class ShardSyncTaskIntegrationTest {
USE_CONSISTENT_READS, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
shardDetector = new KinesisShardDetector(kinesisClient, STREAM_NAME, 500L, 50,
LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS);
LIST_SHARDS_CACHE_ALLOWED_AGE_IN_SECONDS, MAX_CACHE_MISSES_BEFORE_RELOAD, CACHE_MISS_WARNING_MODULUS, shardFilter);
hierarchicalShardSyncer = new HierarchicalShardSyncer();
}
@ -119,7 +122,7 @@ public class ShardSyncTaskIntegrationTest {
Set<String> shardIds = shardDetector.listShards().stream().map(Shard::shardId).collect(Collectors.toSet());
ShardSyncTask syncTask = new ShardSyncTask(shardDetector, leaseRefresher,
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST), false, false, 0L,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter);
syncTask.call();
List<Lease> leases = leaseRefresher.listLeases();
Set<String> leaseKeys = new HashSet<>();

View file

@ -40,6 +40,8 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -66,6 +68,7 @@ public class ConsumerStatesTest {
private ShardConsumer consumer;
private ShardConsumerArgument argument;
private ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
@Mock
private ShardRecordProcessor shardRecordProcessor;
@ -119,7 +122,7 @@ public class ConsumerStatesTest {
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory);
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, shardFilter);
consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,
argument, taskExecutionListener, 0));

View file

@ -62,6 +62,8 @@ import lombok.Data;
import lombok.Getter;
import software.amazon.awssdk.services.kinesis.model.HashKeyRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
@ -83,6 +85,7 @@ public class ProcessTaskTest {
private boolean shouldCallProcessRecordsEvenForEmptyRecordList = true;
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist = true;
private ShardInfo shardInfo;
private ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
@Mock
private ProcessRecordsInput processRecordsInput;
@ -122,7 +125,7 @@ public class ProcessTaskTest {
return new ProcessTask(shardInfo, shardRecordProcessor, checkpointer, taskBackoffTimeMillis,
skipShardSync, shardDetector, throttlingReporter,
processRecordsInput, shouldCallProcessRecordsEvenForEmptyRecordList, IDLE_TIME_IN_MILLISECONDS,
aggregatorUtil, new NullMetricsFactory());
aggregatorUtil, new NullMetricsFactory(), shardFilter);
}
@Test

View file

@ -37,6 +37,8 @@ import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardFilterType;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -75,6 +77,7 @@ public class ShutdownTaskTest {
private boolean ignoreUnexpectedChildShards = false;
private ShardInfo shardInfo;
private ShutdownTask task;
private ShardFilter shardFilter = ShardFilter.builder().type(ShardFilterType.AT_LATEST).build();
@Mock
private RecordsPublisher recordsPublisher;
@ -104,7 +107,7 @@ public class ShutdownTaskTest {
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter);
}
/**
@ -158,7 +161,7 @@ public class ShutdownTaskTest {
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
@ -183,7 +186,7 @@ public class ShutdownTaskTest {
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
@ -207,7 +210,7 @@ public class ShutdownTaskTest {
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
hierarchicalShardSyncer, NULL_METRICS_FACTORY, shardFilter);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());