Merge pull request #31 from ashwing/ShutdownTaskCleanup

* Adding LeaseCleanupManager
* Added lease cleanup unit tests
* Adding lease cleanup path to shard consumer shutdown
This commit is contained in:
Joshua Kim 2020-06-11 19:48:58 -04:00 committed by GitHub
commit 2e2211a9b7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 974 additions and 94 deletions

View file

@ -0,0 +1,41 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import lombok.Builder;
import lombok.Getter;
import lombok.experimental.Accessors;
/**
* Configuration for lease cleanup.
*/
@Builder
@Getter
@Accessors(fluent=true)
public class LeaseCleanupConfig {
/**
* Interval at which to run lease cleanup thread.
*/
private final long leaseCleanupIntervalMillis;
/**
* Interval at which to check if a lease is completed or not.
*/
private final long completedLeaseCleanupIntervalMillis;
/**
* Interval at which to check if a lease is garbage (i.e trimmed past the stream's retention period) or not.
*/
private final long garbageLeaseCleanupIntervalMillis;
}

View file

@ -58,6 +58,7 @@ import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseRefresher;
@ -164,6 +165,7 @@ public class Scheduler implements Runnable {
private final long schedulerInitializationBackoffTimeMillis;
private final LeaderDecider leaderDecider;
private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<>();
private final LeaseCleanupManager leaseCleanupManager;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
@ -289,6 +291,8 @@ public class Scheduler implements Runnable {
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
shardSyncTaskManagerProvider, isMultiStreamMode);
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
.createLeaseCleanupManager(metricsFactory);
}
/**
@ -341,6 +345,13 @@ public class Scheduler implements Runnable {
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
}
if (!leaseCleanupManager.isRunning()) {
log.info("Starting LeaseCleanupManager.");
leaseCleanupManager.start();
} else {
log.info("LeaseCleanupManager is already running. No need to start it");
}
// If we reach this point, then we either skipped the lease sync or did not have any exception
// for any of the shard sync in the previous attempt.
if (!leaseCoordinator.isRunning()) {
@ -397,29 +408,14 @@ public class Scheduler implements Runnable {
void runProcessLoop() {
try {
Set<ShardInfo> assignedShards = new HashSet<>();
final Set<ShardInfo> completedShards = new HashSet<>();
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo,
processorConfig.shardRecordProcessorFactory());
processorConfig.shardRecordProcessorFactory(), leaseCleanupManager);
if (shardConsumer.isShutdown() && shardConsumer.shutdownReason().equals(ShutdownReason.SHARD_END)) {
completedShards.add(shardInfo);
} else {
shardConsumer.executeLifecycle();
}
shardConsumer.executeLifecycle();
assignedShards.add(shardInfo);
}
for (ShardInfo completedShard : completedShards) {
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt());
final StreamConfig streamConfig = currentStreamConfigMap
.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
if (createOrGetShardSyncTaskManager(streamConfig).submitShardSyncTask()) {
log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ",
streamIdentifier.serialize(), completedShard.toString());
}
}
// clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShards);
@ -868,7 +864,8 @@ public class Scheduler implements Runnable {
* @return ShardConsumer for the shard
*/
ShardConsumer createOrGetShardConsumer(@NonNull final ShardInfo shardInfo,
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory,
@NonNull final LeaseCleanupManager leaseCleanupManager) {
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
// Instantiate a new consumer if we don't have one, or the one we
// had was from an earlier
@ -877,7 +874,7 @@ public class Scheduler implements Runnable {
// completely processed (shutdown reason terminate).
if ((consumer == null)
|| (consumer.isShutdown() && consumer.shutdownReason().equals(ShutdownReason.LEASE_LOST))) {
consumer = buildConsumer(shardInfo, shardRecordProcessorFactory);
consumer = buildConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
shardInfoShardConsumerMap.put(shardInfo, consumer);
slog.infoForce("Created new shardConsumer for : " + shardInfo);
}
@ -889,12 +886,14 @@ public class Scheduler implements Runnable {
}
protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory,
@NonNull final LeaseCleanupManager leaseCleanupManager) {
ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,
checkpoint);
// The only case where streamName is not available will be when multistreamtracker not set. In this case,
// get the default stream name for the single stream application.
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
// to gracefully complete the reading.
@ -922,7 +921,8 @@ public class Scheduler implements Runnable {
shardDetectorProvider.apply(streamConfig),
aggregatorUtil,
hierarchicalShardSyncerProvider.apply(streamConfig),
metricsFactory);
metricsFactory,
leaseCleanupManager);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
}

View file

@ -0,0 +1,343 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.leases;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/**
* Helper class to cleanup of any expired/closed shard leases. It will cleanup leases periodically as defined by
* {@link LeaseManagementConfig#leaseCleanupConfig()} asynchronously.
*/
@Accessors(fluent=true)
@Slf4j
@RequiredArgsConstructor
@EqualsAndHashCode
public class LeaseCleanupManager {
@NonNull
private final LeaseCoordinator leaseCoordinator;
@NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
private final MetricsFactory metricsFactory;
@NonNull
private final Duration maxFutureWait;
@NonNull
private final ScheduledExecutorService deletionThreadPool;
private final boolean cleanupLeasesUponShardCompletion;
private final long leaseCleanupIntervalMillis;
private final long completedLeaseCleanupIntervalMillis;
private final long garbageLeaseCleanupIntervalMillis;
private final Stopwatch completedLeaseStopwatch = Stopwatch.createUnstarted();
private final Stopwatch garbageLeaseStopwatch = Stopwatch.createUnstarted();
private final Queue<LeasePendingDeletion> deletionQueue = new ConcurrentLinkedQueue<>();
private static final int MAX_RECORDS = 1;
private static final long INITIAL_DELAY = 0L;
@Getter
private volatile boolean isRunning = false;
/**
* Starts the lease cleanup thread, which is scheduled periodically as specified by
* {@link LeaseCleanupManager#leaseCleanupIntervalMillis}
*/
public void start() {
log.debug("Starting lease cleanup thread.");
completedLeaseStopwatch.start();
garbageLeaseStopwatch.start();
deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis,
TimeUnit.MILLISECONDS);
}
/**
* Enqueues a lease for deletion.
* @param leasePendingDeletion
*/
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
final Lease lease = leasePendingDeletion.lease();
if (lease == null) {
log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.",
lease.leaseKey());
} else {
//TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
if (!deletionQueue.contains(leasePendingDeletion)) {
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
deletionQueue.add(leasePendingDeletion);
} else {
log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey());
}
}
}
/**
* Returns how many leases are currently waiting in the queue pending deletion.
* @return number of leases pending deletion.
*/
public int leasesPendingDeletion() {
return deletionQueue.size();
}
private boolean timeToCheckForCompletedShard() {
return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis;
}
private boolean timeToCheckForGarbageShard() {
return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis;
}
private LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion) throws TimeoutException,
InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
final Lease lease = leasePendingDeletion.lease();
final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
final AWSExceptionManager exceptionManager = createExceptionManager();
boolean cleanedUpCompletedLease = false;
boolean cleanedUpGarbageLease = false;
boolean alreadyCheckedForGarbageCollection = false;
try {
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard()) {
Set<String> childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try {
childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier);
if (childShardKeys == null) {
log.error("No child shards returned from service for shard {} for {}.", shardInfo.shardId(), streamIdentifier.streamName());
} else {
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
}
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} finally {
alreadyCheckedForGarbageCollection = true;
}
}
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys);
}
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard()) {
try {
getChildShardsFromService(shardInfo, streamIdentifier);
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
}
}
} catch (ResourceNotFoundException e) {
cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease);
}
return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease);
}
private Set<String> getChildShardsFromService(ShardInfo shardInfo, StreamIdentifier streamIdentifier)
throws InterruptedException, ExecutionException, TimeoutException {
final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName())
.shardIteratorType(ShardIteratorType.LATEST)
.shardId(shardInfo.shardId())
.build();
final GetShardIteratorResponse getShardIteratorResponse =
FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), maxFutureWait);
final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(getShardIteratorResponse.shardIterator())
.limit(MAX_RECORDS)
.build();
final GetRecordsResponse getRecordsResponse =
FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), maxFutureWait);
return getRecordsResponse.childShards().stream().map(c -> c.shardId()).collect(Collectors.toSet());
}
// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
// stream (known explicitly from ResourceNotFound being thrown when processing this shard),
private boolean cleanupLeaseForGarbageShard(Lease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
log.info("Deleting lease {} as it is not present in the stream.", lease);
leaseCoordinator.leaseRefresher().deleteLease(lease);
return true;
}
private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
for (String parentShard : lease.parentShardIds()) {
final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard));
if (parentLease != null) {
log.warn("Lease {} has a parent lease {} which is still present in the lease table, skipping deletion " +
"for this lease.", lease, parentLease);
return false;
}
}
return true;
}
// We should only be deleting the current shard's lease if
// 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP.
// 2. Its parent shard lease(s) have already been deleted.
private boolean cleanupLeaseForCompletedShard(Lease lease, ShardInfo shardInfo, Set<String> childShardKeys)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
final Set<String> processedChildShardLeases = new HashSet<>();
for (String childShardKey : childShardKeys) {
final Lease childShardLease = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(childShardKey)).orElseThrow(
() -> new IllegalStateException("Child lease " + childShardKey + " for completed shard not found in " +
"lease table - not cleaning up lease " + lease));
if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)
&& !childShardLease.checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) {
processedChildShardLeases.add(childShardLease.leaseKey());
}
}
if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardKeys, processedChildShardLeases)) {
return false;
}
log.info("Deleting lease {} as it has been completely processed and processing of child shard(s) has begun.",
lease);
leaseCoordinator.leaseRefresher().deleteLease(lease);
return true;
}
private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set<String> childShardKeys)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final Lease updatedLease = leasePendingDeletion.lease();
updatedLease.childShardIds(childShardKeys);
leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
}
private AWSExceptionManager createExceptionManager() {
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
exceptionManager.add(ResourceNotFoundException.class, t -> t);
return exceptionManager;
}
@VisibleForTesting
void cleanupLeases() {
if (deletionQueue.isEmpty()) {
log.debug("No leases pending deletion.");
} else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) {
final Queue<LeasePendingDeletion> failedDeletions = new ConcurrentLinkedQueue<>();
boolean completedLeaseCleanedUp = false;
boolean garbageLeaseCleanedUp = false;
log.debug("Attempting to clean up {} lease(s).", deletionQueue.size());
while (!deletionQueue.isEmpty()) {
final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll();
final String leaseKey = leasePendingDeletion.lease().leaseKey();
final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
boolean deletionFailed = true;
try {
final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion);
completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease();
garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease();
if (leaseCleanupResult.leaseCleanedUp()) {
log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier);
deletionFailed = false;
}
} catch (Exception e) {
log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " +
"scheduled execution.", leaseKey, streamIdentifier, e);
}
if (deletionFailed) {
log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier);
failedDeletions.add(leasePendingDeletion);
}
}
if (completedLeaseCleanedUp) {
log.debug("At least one completed lease was cleaned up - restarting interval");
completedLeaseStopwatch.reset().start();
}
if (garbageLeaseCleanedUp) {
log.debug("At least one garbage lease was cleaned up - restarting interval");
garbageLeaseStopwatch.reset().start();
}
deletionQueue.addAll(failedDeletions);
}
}
private class LeaseCleanupThread implements Runnable {
@Override
public void run() {
cleanupLeases();
}
}
@Value
private class LeaseCleanupResult {
boolean cleanedUpCompletedLease;
boolean cleanedUpGarbageLease;
public boolean leaseCleanedUp() {
return cleanedUpCompletedLease | cleanedUpGarbageLease;
}
}
}

View file

@ -33,6 +33,7 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
@ -48,6 +49,16 @@ public class LeaseManagementConfig {
public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(1);
public static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis();
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
public static final LeaseCleanupConfig DEFAULT_LEASE_CLEANUP_CONFIG = LeaseCleanupConfig.builder()
.leaseCleanupIntervalMillis(DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS)
.completedLeaseCleanupIntervalMillis(DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS)
.garbageLeaseCleanupIntervalMillis(DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS)
.build();
/**
* Name of the table to use in DynamoDB
*
@ -108,6 +119,15 @@ public class LeaseManagementConfig {
*/
private boolean cleanupLeasesUponShardCompletion = true;
/**
* Configuration for lease cleanup in {@link LeaseCleanupManager}.
*
* <p>Default lease cleanup interval value: 1 minute.</p>
* <p>Default completed lease cleanup threshold: 5 minute.</p>
* <p>Default garbage lease cleanup threshold: 30 minute.</p>
*/
private final LeaseCleanupConfig leaseCleanupConfig = DEFAULT_LEASE_CLEANUP_CONFIG;
/**
* The max number of leases (shards) this worker should process.
* This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
@ -344,7 +364,8 @@ public class LeaseManagementConfig {
billingMode(),
leaseSerializer,
customShardDetectorProvider(),
isMultiStreamingMode);
isMultiStreamingMode,
leaseCleanupConfig());
}
return leaseManagementFactory;
}

View file

@ -39,4 +39,6 @@ public interface LeaseManagementFactory {
throw new UnsupportedOperationException();
}
LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory);
}

View file

@ -17,6 +17,7 @@ package software.amazon.kinesis.leases.dynamodb;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import lombok.Data;
import lombok.NonNull;
@ -25,10 +26,12 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.LeaseCleanupConfig;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.KinesisShardDetector;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
import software.amazon.kinesis.leases.LeaseManagementFactory;
@ -83,6 +86,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final Duration dynamoDbRequestTimeout;
private final BillingMode billingMode;
private final boolean isMultiStreamMode;
private final LeaseCleanupConfig leaseCleanupConfig;
/**
* Constructor.
@ -208,7 +212,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param deprecatedHierarchicalShardSyncer
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
*/
@Deprecated
@ -222,14 +226,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
}
/**
@ -258,7 +262,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param deprecatedHierarchicalShardSyncer
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
*/
@ -273,7 +277,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
@ -281,7 +285,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
}
/**
@ -310,7 +314,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param cacheMissWarningModulus
* @param initialLeaseTableReadCapacity
* @param initialLeaseTableWriteCapacity
* @param deprecatedHierarchicalShardSyncer
* @param hierarchicalShardSyncer
* @param tableCreatorCallback
* @param dynamoDbRequestTimeout
* @param billingMode
@ -326,7 +330,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
@ -335,7 +339,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
}
/**
@ -386,7 +390,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
null, false);
null, false, LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
this.streamConfig = streamConfig;
}
@ -420,6 +424,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
* @param leaseSerializer
* @param customShardDetectorProvider
* @param isMultiStreamMode
* @param leaseCleanupConfig
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
@ -432,7 +437,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode) {
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode,
LeaseCleanupConfig leaseCleanupConfig) {
this.kinesisClient = kinesisClient;
this.dynamoDBClient = dynamoDBClient;
this.tableName = tableName;
@ -461,6 +467,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.leaseSerializer = leaseSerializer;
this.customShardDetectorProvider = customShardDetectorProvider;
this.isMultiStreamMode = isMultiStreamMode;
this.leaseCleanupConfig = leaseCleanupConfig;
}
@Override
@ -535,4 +542,19 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
cacheMissWarningModulus, dynamoDbRequestTimeout);
}
/**
* LeaseCleanupManager cleans up leases in the lease table for shards which have either expired past the
* stream's retention period or have been completely processed.
* @param metricsFactory
* @return LeaseCleanupManager
*/
@Override
public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) {
return new LeaseCleanupManager(createLeaseCoordinator(metricsFactory), kinesisClient,
metricsFactory, dynamoDbRequestTimeout, Executors.newSingleThreadScheduledExecutor(),
cleanupLeasesUponShardCompletion, leaseCleanupConfig.leaseCleanupIntervalMillis(),
leaseCleanupConfig.completedLeaseCleanupIntervalMillis(),
leaseCleanupConfig.garbageLeaseCleanupIntervalMillis());
}
}

View file

@ -0,0 +1,35 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.leases.exceptions;
import lombok.EqualsAndHashCode;
import lombok.Value;
import lombok.experimental.Accessors;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.ShardInfo;
/**
* Helper class for cleaning up leases.
*/
@Accessors(fluent = true)
@Value
@EqualsAndHashCode(exclude = {"queueEntryTime"})
public class LeasePendingDeletion {
private final StreamIdentifier streamIdentifier;
private final Lease lease;
private final ShardInfo shardInfo;
}

View file

@ -146,7 +146,7 @@ class ConsumerStates {
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return ShardConsumerState.SHUTDOWN_COMPLETE.consumerState();
return ShardConsumerState.SHUTTING_DOWN.consumerState();
}
@Override
@ -497,7 +497,9 @@ class ConsumerStates {
argument.recordsPublisher(),
argument.hierarchicalShardSyncer(),
argument.metricsFactory(),
input == null ? null : input.childShards());
input == null ? null : input.childShards(),
argument.streamIdentifier(),
argument.leaseCleanupManager());
}
@Override

View file

@ -22,6 +22,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
@ -71,4 +72,5 @@ public class ShardConsumerArgument {
private final HierarchicalShardSyncer hierarchicalShardSyncer;
@NonNull
private final MetricsFactory metricsFactory;
private final LeaseCleanupManager leaseCleanupManager;
}

View file

@ -15,6 +15,10 @@
package software.amazon.kinesis.lifecycle;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.Optional;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -23,13 +27,16 @@ import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
@ -42,7 +49,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
@ -85,6 +91,10 @@ public class ShutdownTask implements ConsumerTask {
private final TaskType taskType = TaskType.SHUTDOWN;
private final List<ChildShard> childShards;
@NonNull
private final StreamIdentifier streamIdentifier;
@NonNull
private final LeaseCleanupManager leaseCleanupManager;
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
@ -113,19 +123,26 @@ public class ShutdownTask implements ConsumerTask {
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
if (!CollectionUtils.isNullOrEmpty(childShards)) {
createLeasesForChildShardsIfNotExist();
updateLeaseWithChildShards();
} else {
log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo));
updateLeaseWithChildShards(currentShardLease);
}
recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
.orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
}
final LeasePendingDeletion garbageLease = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
leaseCleanupManager.enqueueForDeletion(garbageLease);
} else {
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
}
@ -162,8 +179,8 @@ public class ShutdownTask implements ConsumerTask {
if (lastCheckpointValue == null
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information.");
+ leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information.");
}
}
@ -189,9 +206,8 @@ public class ShutdownTask implements ConsumerTask {
}
}
private void updateLeaseWithChildShards()
private void updateLeaseWithChildShards(Lease currentLease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
final Lease updatedLease = currentLease.copy();

View file

@ -73,6 +73,7 @@ import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.KinesisClientLibException;
import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseManagementConfig;
@ -153,6 +154,8 @@ public class SchedulerTest {
private WorkerStateChangeListener workerStateChangeListener;
@Mock
private MultiStreamTracker multiStreamTracker;
@Mock
private LeaseCleanupManager leaseCleanupManager;
private Map<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerMap;
private Map<StreamIdentifier, ShardDetector> shardDetectorMap;
@ -219,9 +222,9 @@ public class SchedulerTest {
final String shardId = "shardId-000000000000";
final String concurrencyToken = "concurrencyToken";
final ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory);
final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
assertNotNull(shardConsumer1);
final ShardConsumer shardConsumer2 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory);
final ShardConsumer shardConsumer2 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
assertNotNull(shardConsumer2);
assertSame(shardConsumer1, shardConsumer2);
@ -229,7 +232,7 @@ public class SchedulerTest {
final String anotherConcurrencyToken = "anotherConcurrencyToken";
final ShardInfo shardInfo2 = new ShardInfo(shardId, anotherConcurrencyToken, null,
ExtendedSequenceNumber.TRIM_HORIZON);
final ShardConsumer shardConsumer3 = scheduler.createOrGetShardConsumer(shardInfo2, shardRecordProcessorFactory);
final ShardConsumer shardConsumer3 = scheduler.createOrGetShardConsumer(shardInfo2, shardRecordProcessorFactory, leaseCleanupManager);
assertNotNull(shardConsumer3);
assertNotSame(shardConsumer1, shardConsumer3);
@ -261,9 +264,9 @@ public class SchedulerTest {
schedulerSpy.runProcessLoop();
schedulerSpy.runProcessLoop();
verify(schedulerSpy).buildConsumer(same(initialShardInfo.get(0)), eq(shardRecordProcessorFactory));
verify(schedulerSpy, never()).buildConsumer(same(firstShardInfo.get(0)), eq(shardRecordProcessorFactory));
verify(schedulerSpy, never()).buildConsumer(same(secondShardInfo.get(0)), eq(shardRecordProcessorFactory));
verify(schedulerSpy).buildConsumer(same(initialShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
verify(schedulerSpy, never()).buildConsumer(same(firstShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
verify(schedulerSpy, never()).buildConsumer(same(secondShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
verify(checkpoint).getCheckpointObject(eq(shardId));
}
@ -279,10 +282,10 @@ public class SchedulerTest {
ExtendedSequenceNumber.TRIM_HORIZON);
final ShardInfo shardInfo1 = new ShardInfo(shard1, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
final ShardConsumer shardConsumer0 = scheduler.createOrGetShardConsumer(shardInfo0, shardRecordProcessorFactory);
final ShardConsumer shardConsumer0 = scheduler.createOrGetShardConsumer(shardInfo0, shardRecordProcessorFactory, leaseCleanupManager);
final ShardConsumer shardConsumer0WithAnotherConcurrencyToken =
scheduler.createOrGetShardConsumer(shardInfo0WithAnotherConcurrencyToken, shardRecordProcessorFactory);
final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo1, shardRecordProcessorFactory);
scheduler.createOrGetShardConsumer(shardInfo0WithAnotherConcurrencyToken, shardRecordProcessorFactory, leaseCleanupManager);
final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo1, shardRecordProcessorFactory, leaseCleanupManager);
Set<ShardInfo> shards = new HashSet<>();
shards.add(shardInfo0);
@ -397,11 +400,11 @@ public class SchedulerTest {
schedulerSpy.runProcessLoop();
initialShardInfo.stream().forEach(
shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager)));
firstShardInfo.stream().forEach(
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)));
secondShardInfo.stream().forEach(
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)));
}
@ -1071,6 +1074,11 @@ public class SchedulerTest {
public ShardDetector createShardDetector(StreamConfig streamConfig) {
return shardDetectorMap.get(streamConfig.streamIdentifier());
}
@Override
public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) {
return leaseCleanupManager;
}
}
private class TestKinesisCheckpointFactory implements CheckpointFactory {

View file

@ -0,0 +1,347 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.leases;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class LeaseCleanupManagerTest {
private ShardInfo shardInfo;
private StreamIdentifier streamIdentifier;
private String concurrencyToken = "1234";
private String shardId = "shardId";
private String splitParent = "splitParent";
private String mergeParent1 = "mergeParent-1";
private String mergeParent2 = "mergeParent-2";
private Duration maxFutureWait = Duration.ofSeconds(1);
private long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis();
private long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
private long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
private boolean cleanupLeasesOfCompletedShards = true;
private LeaseCleanupManager leaseCleanupManager;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
@Mock
private LeaseRefresher leaseRefresher;
@Mock
private LeaseCoordinator leaseCoordinator;
@Mock
private KinesisAsyncClient kinesis;
@Mock
private ScheduledExecutorService deletionThreadPool;
@Before
public void setUp() throws Exception {
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
streamIdentifier = StreamIdentifier.singleStreamInstance("streamName");
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis,
NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis,
completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(any(Lease.class), any(UUID.class), any(String.class), any(String.class))).thenReturn(true);
}
/**
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
* shard case.
*/
@Test
public final void testParentShardLeaseDeletedSplitCase() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 1);
}
/**
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
* shard case.
*/
@Test
public final void testParentShardLeaseDeletedMergeCase() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 1);
}
/**
* Tests that if cleanupLeasesOfCompletedShards is not enabled by the customer, then no leases are cleaned up for
* the completed shard case.
*/
@Test
public final void testNoLeasesDeletedWhenNotEnabled() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
cleanupLeasesOfCompletedShards = false;
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait,
deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
garbageLeaseCleanupIntervalMillis);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0);
}
/**
* Tests that if some of the child shard leases are missing, we fail fast and don't delete the parent shard lease
* for the completed shard case.
*/
@Test
public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception {
List<ChildShard> childShards = childShardsForSplit();
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, ExtendedSequenceNumber.LATEST, false, 0);
}
/**
* Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint TRIM_HORIZON),
* we don't delete them for the completed shard case.
*/
@Test
public final void testParentShardLeaseNotDeletedWhenChildIsAtTrim() throws Exception {
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.TRIM_HORIZON);
}
/**
* Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint AT_TIMESTAMP),
* we don't delete them for the completed shard case.
*/
@Test
public final void testParentShardLeaseNotDeletedWhenChildIsAtTimestamp() throws Exception {
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP);
}
private final void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber)
throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), extendedSequenceNumber, 0);
}
/**
* Tests that if a lease's parents are still present, we do not delete the lease.
*/
@Test
public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.singleton("parent"),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0);
}
/**
* Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found.
*/
@Test
public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
testLeaseDeletedWhenShardDoesNotExist(heldLease);
}
/**
* Tests ResourceNotFound case when completed lease cleanup is disabled.
* @throws Exception
*/
@Test
public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
cleanupLeasesOfCompletedShards = false;
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait,
deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
garbageLeaseCleanupIntervalMillis);
testLeaseDeletedWhenShardDoesNotExist(heldLease);
}
public void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception {
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(ResourceNotFoundException.class);
when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease);
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
leaseCleanupManager.cleanupLeases();
verify(leaseRefresher, times(1)).deleteLease(heldLease);
}
/**
* Tests that if a lease deletion fails, it's re-enqueued for deletion.
*/
@Test
public final void testFailedDeletionsReEnqueued() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(Exception.class);
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
}
/**
* Tests duplicate leases are not enqueued for deletion.
*/
@Test
public final void testNoDuplicateLeasesEnqueued() {
// Disable lease cleanup so that the queue isn't drained while the test is running.
cleanupLeasesOfCompletedShards = false;
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis,
NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis,
completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis);
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
// Enqueue the same lease twice.
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
}
private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
ExtendedSequenceNumber extendedSequenceNumber,
int expectedDeletedLeases) throws Exception {
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases);
}
private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
ExtendedSequenceNumber extendedSequenceNumber,
boolean childShardLeasesPresent,
int expectedDeletedLeases) throws Exception {
final Lease lease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", shardInfo.parentShardIds(),
childShards.stream().map(c -> c.shardId()).collect(Collectors.toSet()));
final List<Lease> childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease(
ShardInfo.getLeaseKey(shardInfo, c.shardId()), "leaseOwner", Collections.singleton(shardInfo.shardId()),
Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList());
final List<Lease> parentShardLeases = lease.parentShardIds().stream().map(p ->
LeaseHelper.createLease(ShardInfo.getLeaseKey(shardInfo, p), "leaseOwner", Collections.emptyList(),
Collections.singleton(shardInfo.shardId()), extendedSequenceNumber)).collect(Collectors.toList());
when(leaseRefresher.getLease(lease.leaseKey())).thenReturn(lease);
for (Lease parentShardLease : parentShardLeases) {
when(leaseRefresher.getLease(parentShardLease.leaseKey())).thenReturn(parentShardLease);
}
if (childShardLeasesPresent) {
for (Lease childShardLease : childShardLeases) {
when(leaseRefresher.getLease(childShardLease.leaseKey())).thenReturn(childShardLease);
}
}
GetShardIteratorResponse getShardIteratorResponse = GetShardIteratorResponse.builder()
.shardIterator("123")
.build();
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(CompletableFuture.completedFuture(getShardIteratorResponse));
GetRecordsResponse getRecordsResponse = GetRecordsResponse.builder()
.records(Collections.emptyList())
.childShards(childShards)
.build();
when(kinesis.getRecords(any(GetRecordsRequest.class))).thenReturn(CompletableFuture.completedFuture(getRecordsResponse));
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo));
leaseCleanupManager.cleanupLeases();
verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class));
}
private List<ChildShard> childShardsForSplit() {
List<String> parentShards = Arrays.asList(splitParent);
ChildShard leftChild = ChildShard.builder()
.shardId("leftChild")
.parentShards(parentShards)
.hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"))
.build();
ChildShard rightChild = ChildShard.builder()
.shardId("rightChild")
.parentShards(parentShards)
.hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"))
.build();
return Arrays.asList(leftChild, rightChild);
}
private List<ChildShard> childShardsForMerge() {
List<String> parentShards = Arrays.asList(mergeParent1, mergeParent2);
ChildShard child = ChildShard.builder()
.shardId("onlyChild")
.parentShards(parentShards)
.hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99"))
.build();
return Collections.singletonList(child);
}
}

View file

@ -0,0 +1,44 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.leases;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Collection;
import java.util.Collections;
public class LeaseHelper {
public static Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
return createLease(leaseKey, leaseOwner, parentShardIds, Collections.emptySet(), ExtendedSequenceNumber.LATEST);
}
public static Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds, Collection<String> childShardIds) {
return createLease(leaseKey, leaseOwner, parentShardIds, childShardIds, ExtendedSequenceNumber.LATEST);
}
public static Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds,
Collection<String> childShardIds, ExtendedSequenceNumber extendedSequenceNumber) {
Lease lease = new Lease();
lease.leaseKey(leaseKey);
lease.leaseOwner(leaseOwner);
lease.parentShardIds(parentShardIds);
lease.childShardIds(childShardIds);
lease.checkpoint(extendedSequenceNumber);
return lease;
}
}

View file

@ -46,6 +46,7 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
@ -102,6 +103,8 @@ public class ConsumerStatesTest {
private ProcessRecordsInput processRecordsInput;
@Mock
private TaskExecutionListener taskExecutionListener;
@Mock
private LeaseCleanupManager leaseCleanupManager;
private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true;
@ -122,7 +125,7 @@ public class ConsumerStatesTest {
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory);
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager);
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize()));
consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,
@ -148,7 +151,7 @@ public class ConsumerStatesTest {
assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.consumerState()));
for (ShutdownReason shutdownReason : ShutdownReason.values()) {
assertThat(state.shutdownTransition(shutdownReason),
equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.consumerState()));
equalTo(ShardConsumerState.SHUTTING_DOWN.consumerState()));
}
assertThat(state.state(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS));

View file

@ -19,7 +19,6 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@ -27,8 +26,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
@ -41,8 +38,6 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -50,13 +45,16 @@ import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseHelper;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
@ -86,6 +84,7 @@ public class ShutdownTaskTest {
private boolean ignoreUnexpectedChildShards = false;
private ShardInfo shardInfo;
private ShutdownTask task;
private StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance("streamName");
@Mock
private RecordsPublisher recordsPublisher;
@ -103,6 +102,8 @@ public class ShutdownTaskTest {
private HierarchicalShardSyncer hierarchicalShardSyncer;
@Mock
private ShardRecordProcessor shardRecordProcessor;
@Mock
private LeaseCleanupManager leaseCleanupManager;
@Before
public void setUp() throws Exception {
@ -119,7 +120,7 @@ public class ShutdownTaskTest {
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards());
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager);
}
/**
@ -129,8 +130,9 @@ public class ShutdownTaskTest {
@Test
public final void testCallWhenApplicationDoesNotCheckpoint() throws Exception {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"), Collections.emptyList(), ExtendedSequenceNumber.LATEST);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
@ -169,13 +171,14 @@ public class ShutdownTaskTest {
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards());
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
final TaskResult result = task.call();
assertNull(result.getException());
@ -185,6 +188,7 @@ public class ShutdownTaskTest {
verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
}
/**
@ -192,23 +196,24 @@ public class ShutdownTaskTest {
* This test is for the scenario that a ShutdownTask is created for detecting a false Shard End.
*/
@Test
public final void testCallWhenShardNotFound() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
public final void testCallWhenShardNotFound() throws Exception {
final Lease heldLease = LeaseHelper.createLease("shardId-4", "leaseOwner", Collections.emptyList());
shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>());
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseRefresher.getLease("shardId-4")).thenReturn(heldLease);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-4")).thenReturn(heldLease);
final TaskResult result = task.call();
assertNull(result.getException());
verify(recordsPublisher).shutdown();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator, never()).getCurrentlyHeldLease(shardInfo.shardId());
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
}
@ -220,11 +225,11 @@ public class ShutdownTaskTest {
@Test
public final void testCallWhenLeaseLost() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>());
LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager);
final TaskResult result = task.call();
assertNull(result.getException());
@ -232,10 +237,9 @@ public class ShutdownTaskTest {
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator, never()).getAssignments();
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(leaseCoordinator, never()).dropLease(any(Lease.class));
}
/**
* Test method for {@link ShutdownTask#taskType()}.
*/
@ -262,14 +266,4 @@ public class ShutdownTaskTest {
childShards.add(rightChild);
return childShards;
}
private Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
Lease lease = new Lease();
lease.leaseKey(leaseKey);
lease.leaseOwner(leaseOwner);
lease.parentShardIds(parentShardIds);
return lease;
}
}