Adding Lease cleanup in shutdown task.
This commit is contained in:
parent
c7cdbd5d8b
commit
2f1838483c
15 changed files with 969 additions and 94 deletions
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.common;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* Configuration for lease cleanup.
|
||||
*/
|
||||
@Builder
|
||||
@Getter
|
||||
@Accessors(fluent=true)
|
||||
public class LeaseCleanupConfig {
|
||||
/**
|
||||
* Interval at which to run lease cleanup thread.
|
||||
*/
|
||||
private final long leaseCleanupIntervalMillis;
|
||||
/**
|
||||
* Interval at which to check if a lease is completed or not.
|
||||
*/
|
||||
private final long completedLeaseCleanupIntervalMillis;
|
||||
/**
|
||||
* Interval at which to check if a lease is garbage (i.e trimmed past the stream's retention period) or not.
|
||||
*/
|
||||
private final long garbageLeaseCleanupIntervalMillis;
|
||||
}
|
||||
|
|
@ -58,6 +58,7 @@ import software.amazon.kinesis.common.StreamConfig;
|
|||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
||||
import software.amazon.kinesis.leases.Lease;
|
||||
import software.amazon.kinesis.leases.LeaseCleanupManager;
|
||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
||||
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||
|
|
@ -164,6 +165,7 @@ public class Scheduler implements Runnable {
|
|||
private final long schedulerInitializationBackoffTimeMillis;
|
||||
private final LeaderDecider leaderDecider;
|
||||
private final Map<StreamIdentifier, Instant> staleStreamDeletionMap = new HashMap<>();
|
||||
private final LeaseCleanupManager leaseCleanupManager;
|
||||
|
||||
// Holds consumers for shards the worker is currently tracking. Key is shard
|
||||
// info, value is ShardConsumer.
|
||||
|
|
@ -289,6 +291,8 @@ public class Scheduler implements Runnable {
|
|||
this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager(
|
||||
leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap,
|
||||
shardSyncTaskManagerProvider, isMultiStreamMode);
|
||||
this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode)
|
||||
.createLeaseCleanupManager(metricsFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -341,6 +345,13 @@ public class Scheduler implements Runnable {
|
|||
log.info("Skipping shard sync per configuration setting (and lease table is not empty)");
|
||||
}
|
||||
|
||||
if (!leaseCleanupManager.isRunning()) {
|
||||
log.info("Starting LeaseCleanupManager.");
|
||||
leaseCleanupManager.start();
|
||||
} else {
|
||||
log.info("LeaseCleanupManager is already running. No need to start it");
|
||||
}
|
||||
|
||||
// If we reach this point, then we either skipped the lease sync or did not have any exception
|
||||
// for any of the shard sync in the previous attempt.
|
||||
if (!leaseCoordinator.isRunning()) {
|
||||
|
|
@ -397,29 +408,14 @@ public class Scheduler implements Runnable {
|
|||
void runProcessLoop() {
|
||||
try {
|
||||
Set<ShardInfo> assignedShards = new HashSet<>();
|
||||
final Set<ShardInfo> completedShards = new HashSet<>();
|
||||
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
|
||||
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo,
|
||||
processorConfig.shardRecordProcessorFactory());
|
||||
processorConfig.shardRecordProcessorFactory(), leaseCleanupManager);
|
||||
|
||||
if (shardConsumer.isShutdown() && shardConsumer.shutdownReason().equals(ShutdownReason.SHARD_END)) {
|
||||
completedShards.add(shardInfo);
|
||||
} else {
|
||||
shardConsumer.executeLifecycle();
|
||||
}
|
||||
shardConsumer.executeLifecycle();
|
||||
assignedShards.add(shardInfo);
|
||||
}
|
||||
|
||||
for (ShardInfo completedShard : completedShards) {
|
||||
final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt());
|
||||
final StreamConfig streamConfig = currentStreamConfigMap
|
||||
.getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier));
|
||||
if (createOrGetShardSyncTaskManager(streamConfig).submitShardSyncTask()) {
|
||||
log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ",
|
||||
streamIdentifier.serialize(), completedShard.toString());
|
||||
}
|
||||
}
|
||||
|
||||
// clean up shard consumers for unassigned shards
|
||||
cleanupShardConsumers(assignedShards);
|
||||
|
||||
|
|
@ -868,7 +864,8 @@ public class Scheduler implements Runnable {
|
|||
* @return ShardConsumer for the shard
|
||||
*/
|
||||
ShardConsumer createOrGetShardConsumer(@NonNull final ShardInfo shardInfo,
|
||||
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
|
||||
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory,
|
||||
@NonNull final LeaseCleanupManager leaseCleanupManager) {
|
||||
ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
|
||||
// Instantiate a new consumer if we don't have one, or the one we
|
||||
// had was from an earlier
|
||||
|
|
@ -877,7 +874,7 @@ public class Scheduler implements Runnable {
|
|||
// completely processed (shutdown reason terminate).
|
||||
if ((consumer == null)
|
||||
|| (consumer.isShutdown() && consumer.shutdownReason().equals(ShutdownReason.LEASE_LOST))) {
|
||||
consumer = buildConsumer(shardInfo, shardRecordProcessorFactory);
|
||||
consumer = buildConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
|
||||
shardInfoShardConsumerMap.put(shardInfo, consumer);
|
||||
slog.infoForce("Created new shardConsumer for : " + shardInfo);
|
||||
}
|
||||
|
|
@ -889,12 +886,14 @@ public class Scheduler implements Runnable {
|
|||
}
|
||||
|
||||
protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo,
|
||||
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) {
|
||||
@NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory,
|
||||
@NonNull final LeaseCleanupManager leaseCleanupManager) {
|
||||
ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo,
|
||||
checkpoint);
|
||||
// The only case where streamName is not available will be when multistreamtracker not set. In this case,
|
||||
// get the default stream name for the single stream application.
|
||||
final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt());
|
||||
|
||||
// Irrespective of single stream app or multi stream app, streamConfig should always be available.
|
||||
// If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config
|
||||
// to gracefully complete the reading.
|
||||
|
|
@ -922,7 +921,8 @@ public class Scheduler implements Runnable {
|
|||
shardDetectorProvider.apply(streamConfig),
|
||||
aggregatorUtil,
|
||||
hierarchicalShardSyncerProvider.apply(streamConfig),
|
||||
metricsFactory);
|
||||
metricsFactory,
|
||||
leaseCleanupManager);
|
||||
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
|
||||
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,338 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.leases;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NonNull;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.Value;
|
||||
import lombok.experimental.Accessors;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
|
||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
||||
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
|
||||
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
|
||||
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
||||
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
|
||||
import software.amazon.awssdk.utils.CollectionUtils;
|
||||
import software.amazon.kinesis.common.FutureUtils;
|
||||
import software.amazon.kinesis.common.KinesisRequestsBuilder;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
|
||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
import software.amazon.kinesis.retrieval.AWSExceptionManager;
|
||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.HashSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Helper class to cleanup of any expired/closed shard leases. It will cleanup leases periodically as defined by
|
||||
* {@link LeaseManagementConfig#leaseCleanupConfig()} asynchronously.
|
||||
*/
|
||||
@Accessors(fluent=true)
|
||||
@Slf4j
|
||||
@RequiredArgsConstructor
|
||||
@EqualsAndHashCode
|
||||
public class LeaseCleanupManager {
|
||||
@NonNull
|
||||
private final LeaseCoordinator leaseCoordinator;
|
||||
@NonNull
|
||||
private final KinesisAsyncClient kinesisClient;
|
||||
@NonNull
|
||||
private final MetricsFactory metricsFactory;
|
||||
@NonNull
|
||||
private final Duration maxFutureWait;
|
||||
@NonNull
|
||||
private final ScheduledExecutorService deletionThreadPool;
|
||||
private final boolean cleanupLeasesUponShardCompletion;
|
||||
private final long leaseCleanupIntervalMillis;
|
||||
private final long completedLeaseCleanupIntervalMillis;
|
||||
private final long garbageLeaseCleanupIntervalMillis;
|
||||
private final Stopwatch completedLeaseStopwatch = Stopwatch.createUnstarted();
|
||||
private final Stopwatch garbageLeaseStopwatch = Stopwatch.createUnstarted();
|
||||
|
||||
private final Queue<LeasePendingDeletion> deletionQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private static final int MAX_RECORDS = 1;
|
||||
private static final long INITIAL_DELAY = 0L;
|
||||
|
||||
@Getter
|
||||
private volatile boolean isRunning = false;
|
||||
|
||||
/**
|
||||
* Starts the lease cleanup thread, which is scheduled periodically as specified by
|
||||
* {@link LeaseCleanupManager#leaseCleanupIntervalMillis}
|
||||
*/
|
||||
public void start() {
|
||||
log.debug("Starting lease cleanup thread.");
|
||||
completedLeaseStopwatch.start();
|
||||
garbageLeaseStopwatch.start();
|
||||
|
||||
deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueues a lease for deletion.
|
||||
* @param leasePendingDeletion
|
||||
*/
|
||||
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
|
||||
final Lease lease = leasePendingDeletion.lease();
|
||||
if (lease == null) {
|
||||
log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.",
|
||||
lease.leaseKey());
|
||||
} else {
|
||||
//TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
|
||||
if (!deletionQueue.contains(leasePendingDeletion)) {
|
||||
log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey());
|
||||
deletionQueue.add(leasePendingDeletion);
|
||||
} else {
|
||||
log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns how many leases are currently waiting in the queue pending deletion.
|
||||
* @return number of leases pending deletion.
|
||||
*/
|
||||
public int leasesPendingDeletion() {
|
||||
return deletionQueue.size();
|
||||
}
|
||||
|
||||
private boolean timeToCheckForCompletedShard() {
|
||||
return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis;
|
||||
}
|
||||
|
||||
private boolean timeToCheckForGarbageShard() {
|
||||
return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis;
|
||||
}
|
||||
|
||||
private LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion) throws TimeoutException,
|
||||
InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
final Lease lease = leasePendingDeletion.lease();
|
||||
final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
|
||||
final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
|
||||
|
||||
final AWSExceptionManager exceptionManager = createExceptionManager();
|
||||
|
||||
boolean cleanedUpCompletedLease = false;
|
||||
boolean cleanedUpGarbageLease = false;
|
||||
boolean alreadyCheckedForGarbageCollection = false;
|
||||
|
||||
try {
|
||||
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard()) {
|
||||
Set<String> childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds();
|
||||
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
|
||||
try {
|
||||
childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier);
|
||||
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
|
||||
} catch (ExecutionException e) {
|
||||
throw exceptionManager.apply(e.getCause());
|
||||
} finally {
|
||||
alreadyCheckedForGarbageCollection = true;
|
||||
}
|
||||
}
|
||||
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys);
|
||||
}
|
||||
|
||||
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard()) {
|
||||
try {
|
||||
getChildShardsFromService(shardInfo, streamIdentifier);
|
||||
} catch (ExecutionException e) {
|
||||
throw exceptionManager.apply(e.getCause());
|
||||
}
|
||||
}
|
||||
} catch (ResourceNotFoundException e) {
|
||||
cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease);
|
||||
}
|
||||
|
||||
return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease);
|
||||
}
|
||||
|
||||
private Set<String> getChildShardsFromService(ShardInfo shardInfo, StreamIdentifier streamIdentifier)
|
||||
throws InterruptedException, ExecutionException, TimeoutException {
|
||||
final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
|
||||
.streamName(streamIdentifier.streamName())
|
||||
.shardIteratorType(ShardIteratorType.LATEST)
|
||||
.shardId(shardInfo.shardId())
|
||||
.build();
|
||||
|
||||
final GetShardIteratorResponse getShardIteratorResponse =
|
||||
FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), maxFutureWait);
|
||||
|
||||
final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder()
|
||||
.shardIterator(getShardIteratorResponse.shardIterator())
|
||||
.limit(MAX_RECORDS)
|
||||
.build();
|
||||
|
||||
final GetRecordsResponse getRecordsResponse =
|
||||
FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), maxFutureWait);
|
||||
|
||||
return getRecordsResponse.childShards().stream().map(c -> c.shardId()).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
|
||||
// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
|
||||
// stream (known explicitly from ResourceNotFound being thrown when processing this shard),
|
||||
private boolean cleanupLeaseForGarbageShard(Lease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
log.info("Deleting lease {} as it is not present in the stream.", lease);
|
||||
leaseCoordinator.leaseRefresher().deleteLease(lease);
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
for (String parentShard : lease.parentShardIds()) {
|
||||
final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard));
|
||||
|
||||
if (parentLease != null) {
|
||||
log.warn("Lease {} has a parent lease {} which is still present in the lease table, skipping deletion " +
|
||||
"for this lease.", lease, parentLease);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// We should only be deleting the current shard's lease if
|
||||
// 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP.
|
||||
// 2. Its parent shard lease(s) have already been deleted.
|
||||
private boolean cleanupLeaseForCompletedShard(Lease lease, ShardInfo shardInfo, Set<String> childShardKeys)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
|
||||
final Set<String> processedChildShardLeases = new HashSet<>();
|
||||
|
||||
for (String childShardKey : childShardKeys) {
|
||||
final Lease childShardLease = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(childShardKey)).orElseThrow(
|
||||
() -> new IllegalStateException("Child lease " + childShardKey + " for completed shard not found in " +
|
||||
"lease table - not cleaning up lease " + lease));
|
||||
|
||||
if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)
|
||||
&& !childShardLease.checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) {
|
||||
processedChildShardLeases.add(childShardLease.leaseKey());
|
||||
}
|
||||
}
|
||||
|
||||
if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardKeys, processedChildShardLeases)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
log.info("Deleting lease {} as it has been completely processed and processing of child shard(s) has begun.",
|
||||
lease);
|
||||
leaseCoordinator.leaseRefresher().deleteLease(lease);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set<String> childShardKeys)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
final Lease updatedLease = leasePendingDeletion.lease();
|
||||
updatedLease.childShardIds(childShardKeys);
|
||||
|
||||
leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS);
|
||||
}
|
||||
|
||||
private AWSExceptionManager createExceptionManager() {
|
||||
final AWSExceptionManager exceptionManager = new AWSExceptionManager();
|
||||
exceptionManager.add(ResourceNotFoundException.class, t -> t);
|
||||
|
||||
return exceptionManager;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void cleanupLeases() {
|
||||
if (deletionQueue.isEmpty()) {
|
||||
log.debug("No leases pending deletion.");
|
||||
} else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) {
|
||||
final Queue<LeasePendingDeletion> failedDeletions = new ConcurrentLinkedQueue<>();
|
||||
boolean completedLeaseCleanedUp = false;
|
||||
boolean garbageLeaseCleanedUp = false;
|
||||
|
||||
log.debug("Attempting to clean up {} lease(s).", deletionQueue.size());
|
||||
|
||||
while (!deletionQueue.isEmpty()) {
|
||||
final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll();
|
||||
final String leaseKey = leasePendingDeletion.lease().leaseKey();
|
||||
final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier();
|
||||
boolean deletionFailed = true;
|
||||
try {
|
||||
final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion);
|
||||
completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease();
|
||||
garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease();
|
||||
|
||||
if (leaseCleanupResult.leaseCleanedUp()) {
|
||||
log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier);
|
||||
deletionFailed = false;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " +
|
||||
"scheduled execution.", leaseKey, streamIdentifier, e);
|
||||
}
|
||||
|
||||
if (deletionFailed) {
|
||||
log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier);
|
||||
failedDeletions.add(leasePendingDeletion);
|
||||
}
|
||||
}
|
||||
|
||||
if (completedLeaseCleanedUp) {
|
||||
log.debug("At least one completed lease was cleaned up - restarting interval");
|
||||
completedLeaseStopwatch.reset().start();
|
||||
}
|
||||
|
||||
if (garbageLeaseCleanedUp) {
|
||||
log.debug("At least one garbage lease was cleaned up - restarting interval");
|
||||
garbageLeaseStopwatch.reset().start();
|
||||
}
|
||||
|
||||
deletionQueue.addAll(failedDeletions);
|
||||
}
|
||||
}
|
||||
|
||||
private class LeaseCleanupThread implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
cleanupLeases();
|
||||
}
|
||||
}
|
||||
|
||||
@Value
|
||||
private class LeaseCleanupResult {
|
||||
boolean cleanedUpCompletedLease;
|
||||
boolean cleanedUpGarbageLease;
|
||||
|
||||
public boolean leaseCleanedUp() {
|
||||
return cleanedUpCompletedLease | cleanedUpGarbageLease;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -33,6 +33,7 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
|||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.common.LeaseCleanupConfig;
|
||||
import software.amazon.kinesis.common.StreamConfig;
|
||||
import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory;
|
||||
import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback;
|
||||
|
|
@ -48,6 +49,16 @@ public class LeaseManagementConfig {
|
|||
|
||||
public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(1);
|
||||
|
||||
public static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis();
|
||||
public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis();
|
||||
public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis();
|
||||
|
||||
public static final LeaseCleanupConfig DEFAULT_LEASE_CLEANUP_CONFIG = LeaseCleanupConfig.builder()
|
||||
.leaseCleanupIntervalMillis(DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS)
|
||||
.completedLeaseCleanupIntervalMillis(DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS)
|
||||
.garbageLeaseCleanupIntervalMillis(DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS)
|
||||
.build();
|
||||
|
||||
/**
|
||||
* Name of the table to use in DynamoDB
|
||||
*
|
||||
|
|
@ -108,6 +119,15 @@ public class LeaseManagementConfig {
|
|||
*/
|
||||
private boolean cleanupLeasesUponShardCompletion = true;
|
||||
|
||||
/**
|
||||
* Configuration for lease cleanup in {@link LeaseCleanupManager}.
|
||||
*
|
||||
* <p>Default lease cleanup interval value: 1 minute.</p>
|
||||
* <p>Default completed lease cleanup threshold: 5 minute.</p>
|
||||
* <p>Default garbage lease cleanup threshold: 30 minute.</p>
|
||||
*/
|
||||
private final LeaseCleanupConfig leaseCleanupConfig = DEFAULT_LEASE_CLEANUP_CONFIG;
|
||||
|
||||
/**
|
||||
* The max number of leases (shards) this worker should process.
|
||||
* This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints
|
||||
|
|
@ -344,7 +364,8 @@ public class LeaseManagementConfig {
|
|||
billingMode(),
|
||||
leaseSerializer,
|
||||
customShardDetectorProvider(),
|
||||
isMultiStreamingMode);
|
||||
isMultiStreamingMode,
|
||||
leaseCleanupConfig());
|
||||
}
|
||||
return leaseManagementFactory;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,4 +39,6 @@ public interface LeaseManagementFactory {
|
|||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory);
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package software.amazon.kinesis.leases.dynamodb;
|
|||
|
||||
import java.time.Duration;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.function.Function;
|
||||
import lombok.Data;
|
||||
import lombok.NonNull;
|
||||
|
|
@ -25,10 +26,12 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode;
|
|||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.common.LeaseCleanupConfig;
|
||||
import software.amazon.kinesis.common.StreamConfig;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
||||
import software.amazon.kinesis.leases.KinesisShardDetector;
|
||||
import software.amazon.kinesis.leases.LeaseCleanupManager;
|
||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
||||
import software.amazon.kinesis.leases.LeaseManagementFactory;
|
||||
|
|
@ -83,6 +86,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
private final Duration dynamoDbRequestTimeout;
|
||||
private final BillingMode billingMode;
|
||||
private final boolean isMultiStreamMode;
|
||||
private final LeaseCleanupConfig leaseCleanupConfig;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -208,7 +212,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
* @param cacheMissWarningModulus
|
||||
* @param initialLeaseTableReadCapacity
|
||||
* @param initialLeaseTableWriteCapacity
|
||||
* @param deprecatedHierarchicalShardSyncer
|
||||
* @param hierarchicalShardSyncer
|
||||
* @param tableCreatorCallback
|
||||
*/
|
||||
@Deprecated
|
||||
|
|
@ -222,14 +226,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
|
||||
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
|
||||
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
|
||||
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
|
||||
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
|
||||
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
|
||||
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
|
||||
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
|
||||
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
|
||||
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
|
||||
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
|
||||
deprecatedHierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
|
||||
hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -258,7 +262,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
* @param cacheMissWarningModulus
|
||||
* @param initialLeaseTableReadCapacity
|
||||
* @param initialLeaseTableWriteCapacity
|
||||
* @param deprecatedHierarchicalShardSyncer
|
||||
* @param hierarchicalShardSyncer
|
||||
* @param tableCreatorCallback
|
||||
* @param dynamoDbRequestTimeout
|
||||
*/
|
||||
|
|
@ -273,7 +277,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
|
||||
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
|
||||
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
|
||||
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
|
||||
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
|
||||
Duration dynamoDbRequestTimeout) {
|
||||
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
|
||||
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
|
||||
|
|
@ -281,7 +285,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
|
||||
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
|
||||
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
|
||||
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
|
||||
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -310,7 +314,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
* @param cacheMissWarningModulus
|
||||
* @param initialLeaseTableReadCapacity
|
||||
* @param initialLeaseTableWriteCapacity
|
||||
* @param deprecatedHierarchicalShardSyncer
|
||||
* @param hierarchicalShardSyncer
|
||||
* @param tableCreatorCallback
|
||||
* @param dynamoDbRequestTimeout
|
||||
* @param billingMode
|
||||
|
|
@ -326,7 +330,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
|
||||
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
|
||||
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
|
||||
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
|
||||
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
|
||||
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
|
||||
|
||||
this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
|
||||
|
|
@ -335,7 +339,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
|
||||
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
|
||||
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
|
||||
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
|
||||
hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -386,7 +390,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
|
||||
cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity,
|
||||
deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer,
|
||||
null, false);
|
||||
null, false, LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG);
|
||||
this.streamConfig = streamConfig;
|
||||
}
|
||||
|
||||
|
|
@ -420,6 +424,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
* @param leaseSerializer
|
||||
* @param customShardDetectorProvider
|
||||
* @param isMultiStreamMode
|
||||
* @param leaseCleanupConfig
|
||||
*/
|
||||
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
|
||||
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
|
||||
|
|
@ -432,7 +437,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
|
||||
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
|
||||
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer,
|
||||
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode) {
|
||||
Function<StreamConfig, ShardDetector> customShardDetectorProvider, boolean isMultiStreamMode,
|
||||
LeaseCleanupConfig leaseCleanupConfig) {
|
||||
this.kinesisClient = kinesisClient;
|
||||
this.dynamoDBClient = dynamoDBClient;
|
||||
this.tableName = tableName;
|
||||
|
|
@ -461,6 +467,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
this.leaseSerializer = leaseSerializer;
|
||||
this.customShardDetectorProvider = customShardDetectorProvider;
|
||||
this.isMultiStreamMode = isMultiStreamMode;
|
||||
this.leaseCleanupConfig = leaseCleanupConfig;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -535,4 +542,19 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
|
|||
maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload,
|
||||
cacheMissWarningModulus, dynamoDbRequestTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* LeaseCleanupManager cleans up leases in the lease table for shards which have either expired past the
|
||||
* stream's retention period or have been completely processed.
|
||||
* @param metricsFactory
|
||||
* @return LeaseCleanupManager
|
||||
*/
|
||||
@Override
|
||||
public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) {
|
||||
return new LeaseCleanupManager(createLeaseCoordinator(metricsFactory), kinesisClient,
|
||||
metricsFactory, dynamoDbRequestTimeout, Executors.newSingleThreadScheduledExecutor(),
|
||||
cleanupLeasesUponShardCompletion, leaseCleanupConfig.leaseCleanupIntervalMillis(),
|
||||
leaseCleanupConfig.completedLeaseCleanupIntervalMillis(),
|
||||
leaseCleanupConfig.garbageLeaseCleanupIntervalMillis());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.leases.exceptions;
|
||||
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Value;
|
||||
import lombok.experimental.Accessors;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.Lease;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
|
||||
/**
|
||||
* Helper class for cleaning up leases.
|
||||
*/
|
||||
@Accessors(fluent = true)
|
||||
@Value
|
||||
@EqualsAndHashCode(exclude = {"queueEntryTime"})
|
||||
public class LeasePendingDeletion {
|
||||
private final StreamIdentifier streamIdentifier;
|
||||
private final Lease lease;
|
||||
private final ShardInfo shardInfo;
|
||||
}
|
||||
|
|
@ -146,7 +146,7 @@ class ConsumerStates {
|
|||
|
||||
@Override
|
||||
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
|
||||
return ShardConsumerState.SHUTDOWN_COMPLETE.consumerState();
|
||||
return ShardConsumerState.SHUTTING_DOWN.consumerState();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -497,7 +497,9 @@ class ConsumerStates {
|
|||
argument.recordsPublisher(),
|
||||
argument.hierarchicalShardSyncer(),
|
||||
argument.metricsFactory(),
|
||||
input == null ? null : input.childShards());
|
||||
input == null ? null : input.childShards(),
|
||||
argument.streamIdentifier(),
|
||||
argument.leaseCleanupManager());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
|||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.LeaseCleanupManager;
|
||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||
import software.amazon.kinesis.leases.ShardDetector;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
|
|
@ -71,4 +72,5 @@ public class ShardConsumerArgument {
|
|||
private final HierarchicalShardSyncer hierarchicalShardSyncer;
|
||||
@NonNull
|
||||
private final MetricsFactory metricsFactory;
|
||||
private final LeaseCleanupManager leaseCleanupManager;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,10 @@
|
|||
package software.amazon.kinesis.lifecycle;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import lombok.NonNull;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
|
@ -23,13 +27,16 @@ import software.amazon.awssdk.utils.CollectionUtils;
|
|||
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
|
||||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
||||
import software.amazon.kinesis.leases.Lease;
|
||||
import software.amazon.kinesis.leases.LeaseCleanupManager;
|
||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||
import software.amazon.kinesis.leases.ShardDetector;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
|
||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
|
||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
|
||||
|
|
@ -42,7 +49,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
|
|||
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
|
|
@ -85,6 +91,10 @@ public class ShutdownTask implements ConsumerTask {
|
|||
private final TaskType taskType = TaskType.SHUTDOWN;
|
||||
|
||||
private final List<ChildShard> childShards;
|
||||
@NonNull
|
||||
private final StreamIdentifier streamIdentifier;
|
||||
@NonNull
|
||||
private final LeaseCleanupManager leaseCleanupManager;
|
||||
|
||||
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
|
||||
|
||||
|
|
@ -113,19 +123,26 @@ public class ShutdownTask implements ConsumerTask {
|
|||
// This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception.
|
||||
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
|
||||
// This scenario could happen when customer deletes the stream while leaving the KCL application running.
|
||||
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
||||
|
||||
if (!CollectionUtils.isNullOrEmpty(childShards)) {
|
||||
createLeasesForChildShardsIfNotExist();
|
||||
updateLeaseWithChildShards();
|
||||
} else {
|
||||
log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo));
|
||||
updateLeaseWithChildShards(currentShardLease);
|
||||
}
|
||||
|
||||
recordProcessorCheckpointer
|
||||
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
|
||||
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
|
||||
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
|
||||
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
|
||||
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
|
||||
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo)))
|
||||
.orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist."));
|
||||
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
|
||||
recordProcessorCheckpointer
|
||||
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
|
||||
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
|
||||
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
|
||||
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded.
|
||||
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime);
|
||||
}
|
||||
|
||||
final LeasePendingDeletion garbageLease = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo);
|
||||
leaseCleanupManager.enqueueForDeletion(garbageLease);
|
||||
} else {
|
||||
throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime);
|
||||
}
|
||||
|
|
@ -162,8 +179,8 @@ public class ShutdownTask implements ConsumerTask {
|
|||
if (lastCheckpointValue == null
|
||||
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
|
||||
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
|
||||
+ leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
|
||||
"See ShardRecordProcessor.shardEnded javadocs for more information.");
|
||||
+ leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " +
|
||||
"See ShardRecordProcessor.shardEnded javadocs for more information.");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -189,9 +206,8 @@ public class ShutdownTask implements ConsumerTask {
|
|||
}
|
||||
}
|
||||
|
||||
private void updateLeaseWithChildShards()
|
||||
private void updateLeaseWithChildShards(Lease currentLease)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo));
|
||||
Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
|
||||
|
||||
final Lease updatedLease = currentLease.copy();
|
||||
|
|
@ -206,7 +222,7 @@ public class ShutdownTask implements ConsumerTask {
|
|||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
*
|
||||
*
|
||||
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerTask#taskType()
|
||||
*/
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -73,6 +73,7 @@ import software.amazon.kinesis.common.StreamConfig;
|
|||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.exceptions.KinesisClientLibException;
|
||||
import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException;
|
||||
import software.amazon.kinesis.leases.LeaseCleanupManager;
|
||||
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||
import software.amazon.kinesis.leases.LeaseManagementConfig;
|
||||
|
|
@ -153,6 +154,8 @@ public class SchedulerTest {
|
|||
private WorkerStateChangeListener workerStateChangeListener;
|
||||
@Mock
|
||||
private MultiStreamTracker multiStreamTracker;
|
||||
@Mock
|
||||
private LeaseCleanupManager leaseCleanupManager;
|
||||
|
||||
private Map<StreamIdentifier, ShardSyncTaskManager> shardSyncTaskManagerMap;
|
||||
private Map<StreamIdentifier, ShardDetector> shardDetectorMap;
|
||||
|
|
@ -219,9 +222,9 @@ public class SchedulerTest {
|
|||
final String shardId = "shardId-000000000000";
|
||||
final String concurrencyToken = "concurrencyToken";
|
||||
final ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory);
|
||||
final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
|
||||
assertNotNull(shardConsumer1);
|
||||
final ShardConsumer shardConsumer2 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory);
|
||||
final ShardConsumer shardConsumer2 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager);
|
||||
assertNotNull(shardConsumer2);
|
||||
|
||||
assertSame(shardConsumer1, shardConsumer2);
|
||||
|
|
@ -229,7 +232,7 @@ public class SchedulerTest {
|
|||
final String anotherConcurrencyToken = "anotherConcurrencyToken";
|
||||
final ShardInfo shardInfo2 = new ShardInfo(shardId, anotherConcurrencyToken, null,
|
||||
ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
final ShardConsumer shardConsumer3 = scheduler.createOrGetShardConsumer(shardInfo2, shardRecordProcessorFactory);
|
||||
final ShardConsumer shardConsumer3 = scheduler.createOrGetShardConsumer(shardInfo2, shardRecordProcessorFactory, leaseCleanupManager);
|
||||
assertNotNull(shardConsumer3);
|
||||
|
||||
assertNotSame(shardConsumer1, shardConsumer3);
|
||||
|
|
@ -261,9 +264,9 @@ public class SchedulerTest {
|
|||
schedulerSpy.runProcessLoop();
|
||||
schedulerSpy.runProcessLoop();
|
||||
|
||||
verify(schedulerSpy).buildConsumer(same(initialShardInfo.get(0)), eq(shardRecordProcessorFactory));
|
||||
verify(schedulerSpy, never()).buildConsumer(same(firstShardInfo.get(0)), eq(shardRecordProcessorFactory));
|
||||
verify(schedulerSpy, never()).buildConsumer(same(secondShardInfo.get(0)), eq(shardRecordProcessorFactory));
|
||||
verify(schedulerSpy).buildConsumer(same(initialShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
|
||||
verify(schedulerSpy, never()).buildConsumer(same(firstShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
|
||||
verify(schedulerSpy, never()).buildConsumer(same(secondShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager));
|
||||
verify(checkpoint).getCheckpointObject(eq(shardId));
|
||||
}
|
||||
|
||||
|
|
@ -279,10 +282,10 @@ public class SchedulerTest {
|
|||
ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
final ShardInfo shardInfo1 = new ShardInfo(shard1, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
|
||||
final ShardConsumer shardConsumer0 = scheduler.createOrGetShardConsumer(shardInfo0, shardRecordProcessorFactory);
|
||||
final ShardConsumer shardConsumer0 = scheduler.createOrGetShardConsumer(shardInfo0, shardRecordProcessorFactory, leaseCleanupManager);
|
||||
final ShardConsumer shardConsumer0WithAnotherConcurrencyToken =
|
||||
scheduler.createOrGetShardConsumer(shardInfo0WithAnotherConcurrencyToken, shardRecordProcessorFactory);
|
||||
final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo1, shardRecordProcessorFactory);
|
||||
scheduler.createOrGetShardConsumer(shardInfo0WithAnotherConcurrencyToken, shardRecordProcessorFactory, leaseCleanupManager);
|
||||
final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo1, shardRecordProcessorFactory, leaseCleanupManager);
|
||||
|
||||
Set<ShardInfo> shards = new HashSet<>();
|
||||
shards.add(shardInfo0);
|
||||
|
|
@ -397,11 +400,11 @@ public class SchedulerTest {
|
|||
schedulerSpy.runProcessLoop();
|
||||
|
||||
initialShardInfo.stream().forEach(
|
||||
shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
|
||||
shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager)));
|
||||
firstShardInfo.stream().forEach(
|
||||
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
|
||||
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)));
|
||||
secondShardInfo.stream().forEach(
|
||||
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory)));
|
||||
shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)));
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -1071,6 +1074,11 @@ public class SchedulerTest {
|
|||
public ShardDetector createShardDetector(StreamConfig streamConfig) {
|
||||
return shardDetectorMap.get(streamConfig.streamIdentifier());
|
||||
}
|
||||
|
||||
@Override
|
||||
public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) {
|
||||
return leaseCleanupManager;
|
||||
}
|
||||
}
|
||||
|
||||
private class TestKinesisCheckpointFactory implements CheckpointFactory {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,347 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.leases;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
|
||||
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
|
||||
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
|
||||
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
|
||||
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
|
||||
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
|
||||
import software.amazon.kinesis.metrics.MetricsFactory;
|
||||
import software.amazon.kinesis.metrics.NullMetricsFactory;
|
||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class LeaseCleanupManagerTest {
|
||||
|
||||
private ShardInfo shardInfo;
|
||||
private StreamIdentifier streamIdentifier;
|
||||
private String concurrencyToken = "1234";
|
||||
|
||||
private String shardId = "shardId";
|
||||
private String splitParent = "splitParent";
|
||||
private String mergeParent1 = "mergeParent-1";
|
||||
private String mergeParent2 = "mergeParent-2";
|
||||
|
||||
private Duration maxFutureWait = Duration.ofSeconds(1);
|
||||
private long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis();
|
||||
private long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
|
||||
private long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
|
||||
private boolean cleanupLeasesOfCompletedShards = true;
|
||||
private LeaseCleanupManager leaseCleanupManager;
|
||||
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
|
||||
|
||||
@Mock
|
||||
private LeaseRefresher leaseRefresher;
|
||||
@Mock
|
||||
private LeaseCoordinator leaseCoordinator;
|
||||
@Mock
|
||||
private KinesisAsyncClient kinesis;
|
||||
@Mock
|
||||
private ScheduledExecutorService deletionThreadPool;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
streamIdentifier = StreamIdentifier.singleStreamInstance("streamName");
|
||||
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis,
|
||||
NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis,
|
||||
completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis);
|
||||
|
||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||
when(leaseCoordinator.updateLease(any(Lease.class), any(UUID.class), any(String.class), any(String.class))).thenReturn(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
|
||||
* shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testParentShardLeaseDeletedSplitCase() throws Exception {
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
|
||||
* shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testParentShardLeaseDeletedMergeCase() throws Exception {
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if cleanupLeasesOfCompletedShards is not enabled by the customer, then no leases are cleaned up for
|
||||
* the completed shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testNoLeasesDeletedWhenNotEnabled() throws Exception {
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
cleanupLeasesOfCompletedShards = false;
|
||||
|
||||
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait,
|
||||
deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
|
||||
garbageLeaseCleanupIntervalMillis);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if some of the child shard leases are missing, we fail fast and don't delete the parent shard lease
|
||||
* for the completed shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception {
|
||||
List<ChildShard> childShards = childShardsForSplit();
|
||||
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, ExtendedSequenceNumber.LATEST, false, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint TRIM_HORIZON),
|
||||
* we don't delete them for the completed shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testParentShardLeaseNotDeletedWhenChildIsAtTrim() throws Exception {
|
||||
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint AT_TIMESTAMP),
|
||||
* we don't delete them for the completed shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testParentShardLeaseNotDeletedWhenChildIsAtTimestamp() throws Exception {
|
||||
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP);
|
||||
}
|
||||
|
||||
private final void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber)
|
||||
throws Exception {
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), extendedSequenceNumber, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if a lease's parents are still present, we do not delete the lease.
|
||||
*/
|
||||
@Test
|
||||
public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception {
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.singleton("parent"),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found.
|
||||
*/
|
||||
@Test
|
||||
public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception {
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
||||
|
||||
testLeaseDeletedWhenShardDoesNotExist(heldLease);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests ResourceNotFound case when completed lease cleanup is disabled.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception {
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
||||
|
||||
cleanupLeasesOfCompletedShards = false;
|
||||
|
||||
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait,
|
||||
deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
|
||||
garbageLeaseCleanupIntervalMillis);
|
||||
|
||||
testLeaseDeletedWhenShardDoesNotExist(heldLease);
|
||||
}
|
||||
|
||||
public void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception {
|
||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
|
||||
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(ResourceNotFoundException.class);
|
||||
when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease);
|
||||
|
||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
|
||||
leaseCleanupManager.cleanupLeases();
|
||||
|
||||
verify(leaseRefresher, times(1)).deleteLease(heldLease);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if a lease deletion fails, it's re-enqueued for deletion.
|
||||
*/
|
||||
@Test
|
||||
public final void testFailedDeletionsReEnqueued() throws Exception {
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
||||
|
||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
|
||||
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(Exception.class);
|
||||
|
||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
|
||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
|
||||
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests duplicate leases are not enqueued for deletion.
|
||||
*/
|
||||
@Test
|
||||
public final void testNoDuplicateLeasesEnqueued() {
|
||||
// Disable lease cleanup so that the queue isn't drained while the test is running.
|
||||
cleanupLeasesOfCompletedShards = false;
|
||||
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis,
|
||||
NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis,
|
||||
completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis);
|
||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
||||
|
||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
|
||||
|
||||
// Enqueue the same lease twice.
|
||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
|
||||
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
|
||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo));
|
||||
Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion());
|
||||
}
|
||||
|
||||
private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
||||
ExtendedSequenceNumber extendedSequenceNumber,
|
||||
int expectedDeletedLeases) throws Exception {
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases);
|
||||
}
|
||||
|
||||
private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
||||
ExtendedSequenceNumber extendedSequenceNumber,
|
||||
boolean childShardLeasesPresent,
|
||||
int expectedDeletedLeases) throws Exception {
|
||||
|
||||
final Lease lease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", shardInfo.parentShardIds(),
|
||||
childShards.stream().map(c -> c.shardId()).collect(Collectors.toSet()));
|
||||
final List<Lease> childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease(
|
||||
ShardInfo.getLeaseKey(shardInfo, c.shardId()), "leaseOwner", Collections.singleton(shardInfo.shardId()),
|
||||
Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList());
|
||||
|
||||
final List<Lease> parentShardLeases = lease.parentShardIds().stream().map(p ->
|
||||
LeaseHelper.createLease(ShardInfo.getLeaseKey(shardInfo, p), "leaseOwner", Collections.emptyList(),
|
||||
Collections.singleton(shardInfo.shardId()), extendedSequenceNumber)).collect(Collectors.toList());
|
||||
|
||||
when(leaseRefresher.getLease(lease.leaseKey())).thenReturn(lease);
|
||||
for (Lease parentShardLease : parentShardLeases) {
|
||||
when(leaseRefresher.getLease(parentShardLease.leaseKey())).thenReturn(parentShardLease);
|
||||
}
|
||||
if (childShardLeasesPresent) {
|
||||
for (Lease childShardLease : childShardLeases) {
|
||||
when(leaseRefresher.getLease(childShardLease.leaseKey())).thenReturn(childShardLease);
|
||||
}
|
||||
}
|
||||
|
||||
GetShardIteratorResponse getShardIteratorResponse = GetShardIteratorResponse.builder()
|
||||
.shardIterator("123")
|
||||
.build();
|
||||
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(CompletableFuture.completedFuture(getShardIteratorResponse));
|
||||
|
||||
GetRecordsResponse getRecordsResponse = GetRecordsResponse.builder()
|
||||
.records(Collections.emptyList())
|
||||
.childShards(childShards)
|
||||
.build();
|
||||
when(kinesis.getRecords(any(GetRecordsRequest.class))).thenReturn(CompletableFuture.completedFuture(getRecordsResponse));
|
||||
|
||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo));
|
||||
leaseCleanupManager.cleanupLeases();
|
||||
|
||||
verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class));
|
||||
}
|
||||
|
||||
private List<ChildShard> childShardsForSplit() {
|
||||
List<String> parentShards = Arrays.asList(splitParent);
|
||||
|
||||
ChildShard leftChild = ChildShard.builder()
|
||||
.shardId("leftChild")
|
||||
.parentShards(parentShards)
|
||||
.hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"))
|
||||
.build();
|
||||
ChildShard rightChild = ChildShard.builder()
|
||||
.shardId("rightChild")
|
||||
.parentShards(parentShards)
|
||||
.hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"))
|
||||
.build();
|
||||
|
||||
return Arrays.asList(leftChild, rightChild);
|
||||
}
|
||||
|
||||
private List<ChildShard> childShardsForMerge() {
|
||||
List<String> parentShards = Arrays.asList(mergeParent1, mergeParent2);
|
||||
|
||||
ChildShard child = ChildShard.builder()
|
||||
.shardId("onlyChild")
|
||||
.parentShards(parentShards)
|
||||
.hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99"))
|
||||
.build();
|
||||
|
||||
return Collections.singletonList(child);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package software.amazon.kinesis.leases;
|
||||
|
||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
public class LeaseHelper {
|
||||
|
||||
public static Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
|
||||
return createLease(leaseKey, leaseOwner, parentShardIds, Collections.emptySet(), ExtendedSequenceNumber.LATEST);
|
||||
}
|
||||
|
||||
public static Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds, Collection<String> childShardIds) {
|
||||
return createLease(leaseKey, leaseOwner, parentShardIds, childShardIds, ExtendedSequenceNumber.LATEST);
|
||||
}
|
||||
|
||||
public static Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds,
|
||||
Collection<String> childShardIds, ExtendedSequenceNumber extendedSequenceNumber) {
|
||||
Lease lease = new Lease();
|
||||
lease.leaseKey(leaseKey);
|
||||
lease.leaseOwner(leaseOwner);
|
||||
lease.parentShardIds(parentShardIds);
|
||||
lease.childShardIds(childShardIds);
|
||||
lease.checkpoint(extendedSequenceNumber);
|
||||
|
||||
return lease;
|
||||
}
|
||||
}
|
||||
|
|
@ -46,6 +46,7 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
|||
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
import software.amazon.kinesis.common.StreamIdentifier;
|
||||
import software.amazon.kinesis.leases.LeaseCleanupManager;
|
||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||
import software.amazon.kinesis.leases.ShardDetector;
|
||||
|
|
@ -102,6 +103,8 @@ public class ConsumerStatesTest {
|
|||
private ProcessRecordsInput processRecordsInput;
|
||||
@Mock
|
||||
private TaskExecutionListener taskExecutionListener;
|
||||
@Mock
|
||||
private LeaseCleanupManager leaseCleanupManager;
|
||||
|
||||
private long parentShardPollIntervalMillis = 0xCAFE;
|
||||
private boolean cleanupLeasesOfCompletedShards = true;
|
||||
|
|
@ -122,7 +125,7 @@ public class ConsumerStatesTest {
|
|||
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
|
||||
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
|
||||
INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
|
||||
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory);
|
||||
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager);
|
||||
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
|
||||
when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize()));
|
||||
consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,
|
||||
|
|
@ -148,7 +151,7 @@ public class ConsumerStatesTest {
|
|||
assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.consumerState()));
|
||||
for (ShutdownReason shutdownReason : ShutdownReason.values()) {
|
||||
assertThat(state.shutdownTransition(shutdownReason),
|
||||
equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.consumerState()));
|
||||
equalTo(ShardConsumerState.SHUTTING_DOWN.consumerState()));
|
||||
}
|
||||
|
||||
assertThat(state.state(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS));
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ import static org.junit.Assert.assertNotNull;
|
|||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.times;
|
||||
|
|
@ -27,8 +26,6 @@ import static org.mockito.Mockito.verify;
|
|||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
|
@ -41,8 +38,6 @@ import org.mockito.Mock;
|
|||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import software.amazon.awssdk.services.kinesis.model.ChildShard;
|
||||
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
|
||||
import software.amazon.awssdk.services.kinesis.model.Shard;
|
||||
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
|
||||
import software.amazon.kinesis.common.InitialPositionInStream;
|
||||
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
|
||||
|
|
@ -50,13 +45,16 @@ import software.amazon.kinesis.common.StreamIdentifier;
|
|||
import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException;
|
||||
import software.amazon.kinesis.leases.HierarchicalShardSyncer;
|
||||
import software.amazon.kinesis.leases.Lease;
|
||||
import software.amazon.kinesis.leases.LeaseCleanupManager;
|
||||
import software.amazon.kinesis.leases.LeaseCoordinator;
|
||||
import software.amazon.kinesis.leases.LeaseHelper;
|
||||
import software.amazon.kinesis.leases.LeaseRefresher;
|
||||
import software.amazon.kinesis.leases.ShardDetector;
|
||||
import software.amazon.kinesis.leases.ShardInfo;
|
||||
import software.amazon.kinesis.leases.ShardObjectHelper;
|
||||
import software.amazon.kinesis.leases.exceptions.CustomerApplicationException;
|
||||
import software.amazon.kinesis.leases.exceptions.DependencyException;
|
||||
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
|
||||
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
|
||||
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
|
||||
|
|
@ -86,6 +84,7 @@ public class ShutdownTaskTest {
|
|||
private boolean ignoreUnexpectedChildShards = false;
|
||||
private ShardInfo shardInfo;
|
||||
private ShutdownTask task;
|
||||
private StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance("streamName");
|
||||
|
||||
@Mock
|
||||
private RecordsPublisher recordsPublisher;
|
||||
|
|
@ -103,6 +102,8 @@ public class ShutdownTaskTest {
|
|||
private HierarchicalShardSyncer hierarchicalShardSyncer;
|
||||
@Mock
|
||||
private ShardRecordProcessor shardRecordProcessor;
|
||||
@Mock
|
||||
private LeaseCleanupManager leaseCleanupManager;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
|
@ -119,7 +120,7 @@ public class ShutdownTaskTest {
|
|||
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards());
|
||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -129,8 +130,9 @@ public class ShutdownTaskTest {
|
|||
@Test
|
||||
public final void testCallWhenApplicationDoesNotCheckpoint() throws Exception {
|
||||
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
|
||||
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
|
||||
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"), Collections.emptyList(), ExtendedSequenceNumber.LATEST);
|
||||
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
||||
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
|
||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
|
||||
|
||||
|
|
@ -169,13 +171,14 @@ public class ShutdownTaskTest {
|
|||
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards());
|
||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager);
|
||||
|
||||
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||
Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
|
||||
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
|
||||
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
|
||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
|
||||
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
|
||||
|
||||
final TaskResult result = task.call();
|
||||
assertNull(result.getException());
|
||||
|
|
@ -185,6 +188,7 @@ public class ShutdownTaskTest {
|
|||
verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
||||
verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -192,23 +196,24 @@ public class ShutdownTaskTest {
|
|||
* This test is for the scenario that a ShutdownTask is created for detecting a false Shard End.
|
||||
*/
|
||||
@Test
|
||||
public final void testCallWhenShardNotFound() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
public final void testCallWhenShardNotFound() throws Exception {
|
||||
final Lease heldLease = LeaseHelper.createLease("shardId-4", "leaseOwner", Collections.emptyList());
|
||||
shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>());
|
||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager);
|
||||
|
||||
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
|
||||
when(leaseRefresher.getLease("shardId-4")).thenReturn(heldLease);
|
||||
when(leaseCoordinator.getCurrentlyHeldLease("shardId-4")).thenReturn(heldLease);
|
||||
|
||||
final TaskResult result = task.call();
|
||||
assertNull(result.getException());
|
||||
verify(recordsPublisher).shutdown();
|
||||
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
||||
verify(leaseCoordinator, never()).getCurrentlyHeldLease(shardInfo.shardId());
|
||||
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||
}
|
||||
|
|
@ -220,11 +225,11 @@ public class ShutdownTaskTest {
|
|||
@Test
|
||||
public final void testCallWhenLeaseLost() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||
LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>());
|
||||
LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager);
|
||||
|
||||
final TaskResult result = task.call();
|
||||
assertNull(result.getException());
|
||||
|
|
@ -232,10 +237,9 @@ public class ShutdownTaskTest {
|
|||
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
|
||||
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
|
||||
verify(leaseCoordinator, never()).getAssignments();
|
||||
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||
verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
|
||||
verify(leaseCoordinator, never()).dropLease(any(Lease.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test method for {@link ShutdownTask#taskType()}.
|
||||
*/
|
||||
|
|
@ -262,14 +266,4 @@ public class ShutdownTaskTest {
|
|||
childShards.add(rightChild);
|
||||
return childShards;
|
||||
}
|
||||
|
||||
private Lease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
|
||||
Lease lease = new Lease();
|
||||
lease.leaseKey(leaseKey);
|
||||
lease.leaseOwner(leaseOwner);
|
||||
lease.parentShardIds(parentShardIds);
|
||||
|
||||
return lease;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue