Lease cleanup v1.x (#73)

* Moving lease cleanup to ShutdownTask.
* Introducing LeaseCleanupManager with relevant configs.
This commit is contained in:
Joshua Kim 2020-07-27 16:08:52 -04:00 committed by GitHub
parent 089c6ab18e
commit 6f16b168a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 1104 additions and 372 deletions

View file

@ -199,7 +199,7 @@ class ConsumerStates {
@Override
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
return ShardConsumerState.SHUTTING_DOWN.getConsumerState();
}
@Override
@ -531,7 +531,8 @@ class ConsumerStates {
consumer.getLeaseCoordinator(),
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsCache(), consumer.getShardSyncer(),
consumer.getShardSyncStrategy(), consumer.getChildShards());
consumer.getShardSyncStrategy(), consumer.getChildShards(),
consumer.getLeaseCleanupManager());
}
@Override

View file

@ -14,6 +14,7 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Duration;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
@ -89,6 +90,23 @@ public class KinesisClientLibConfiguration {
*/
public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true;
/**
* Interval to run lease cleanup thread in {@link LeaseCleanupManager}.
*/
private static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofHours(1).toMillis();
/**
* Threshold in millis at which to check if there are any completed leases (leases for shards which have been
* closed as a result of a resharding operation) that need to be cleaned up.
*/
private static final long DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS = Duration.ofMinutes(5).toMillis();
/**
* Threshold in millis at which to check if there are any garbage leases (leases for shards which no longer exist
* in the stream) that need to be cleaned up.
*/
private static final long DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS = Duration.ofMinutes(30).toMillis();
/**
* Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
*/
@ -246,6 +264,9 @@ public class KinesisClientLibConfiguration {
private ShardPrioritization shardPrioritization;
private long shutdownGraceMillis;
private ShardSyncStrategyType shardSyncStrategyType;
private long leaseCleanupIntervalMillis;
private long completedLeaseCleanupThresholdMillis;
private long garbageLeaseCleanupThresholdMillis;
@Getter
private Optional<Integer> timeoutInSeconds = Optional.empty();
@ -284,6 +305,7 @@ public class KinesisClientLibConfiguration {
* @param credentialsProvider Provides credentials used to sign AWS requests
* @param workerId Used to distinguish different workers/processes of a Kinesis application
*/
@Deprecated
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider credentialsProvider,
@ -303,6 +325,7 @@ public class KinesisClientLibConfiguration {
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
* @param workerId Used to distinguish different workers/processes of a Kinesis application
*/
@Deprecated
public KinesisClientLibConfiguration(String applicationName,
String streamName,
AWSCredentialsProvider kinesisCredentialsProvider,
@ -373,6 +396,7 @@ public class KinesisClientLibConfiguration {
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@Deprecated
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
@ -444,6 +468,7 @@ public class KinesisClientLibConfiguration {
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@Deprecated
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
@ -470,54 +495,14 @@ public class KinesisClientLibConfiguration {
String regionName,
long shutdownGraceMillis,
BillingMode billingMode) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis);
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis);
checkIsValuePositive("MaxRecords", (long) maxRecords);
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
this.applicationName = applicationName;
this.tableName = applicationName;
this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint;
this.dynamoDBEndpoint = dynamoDBEndpoint;
this.initialPositionInStream = initialPositionInStream;
this.kinesisCredentialsProvider = kinesisCredentialsProvider;
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider;
this.failoverTimeMillis = failoverTimeMillis;
this.maxRecords = maxRecords;
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList;
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
this.workerIdentifier = workerId;
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.metricsBufferTimeMillis = metricsBufferTimeMillis;
this.metricsMaxQueueSize = metricsMaxQueueSize;
this.metricsLevel = DEFAULT_METRICS_LEVEL;
this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
this.regionName = regionName;
this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER;
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory();
this.billingMode = billingMode;
this(applicationName, streamName, kinesisEndpoint, dynamoDBEndpoint, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis,
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis,
metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, billingMode,
new SimpleRecordsFetcherFactory(), DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS, DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
}
/**
@ -556,6 +541,7 @@ public class KinesisClientLibConfiguration {
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
@Deprecated
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
@ -581,6 +567,91 @@ public class KinesisClientLibConfiguration {
boolean validateSequenceNumberBeforeCheckpointing,
String regionName,
RecordsFetcherFactory recordsFetcherFactory) {
this(applicationName, streamName, kinesisEndpoint, dynamoDBEndpoint, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis,
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis,
metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName, 0, DEFAULT_DDB_BILLING_MODE,
recordsFetcherFactory, DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS, DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
}
/**
* @param applicationName Name of the Kinesis application
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream
* @param kinesisEndpoint Kinesis endpoint
* @param dynamoDBEndpoint DynamoDB endpoint
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
* records from that location in the stream when an application starts up for the first time and there
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
* @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
* @param workerId Used to distinguish different workers/processes of a Kinesis application
* @param maxRecords Max records to read per Kinesis getRecords() call
* @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
* GetRecords returned an empty record list.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
* @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates
* @param billingMode The DDB Billing mode to set for lease table creation.
* @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard.
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
* {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
* @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases
* (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up.
* @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases
* (leases for shards which no longer exist in the stream) that need to be cleaned up.
*/
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
String dynamoDBEndpoint,
InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
long failoverTimeMillis,
String workerId,
int maxRecords,
long idleTimeBetweenReadsInMillis,
boolean callProcessRecordsEvenForEmptyRecordList,
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
long taskBackoffTimeMillis,
long metricsBufferTimeMillis,
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName,
long shutdownGraceMillis,
BillingMode billingMode,
RecordsFetcherFactory recordsFetcherFactory,
long leaseCleanupIntervalMillis,
long completedLeaseCleanupThresholdMillis,
long garbageLeaseCleanupThresholdMillis) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -627,7 +698,12 @@ public class KinesisClientLibConfiguration {
this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = recordsFetcherFactory;
this.leaseCleanupIntervalMillis = leaseCleanupIntervalMillis;
this.completedLeaseCleanupThresholdMillis = completedLeaseCleanupThresholdMillis;
this.garbageLeaseCleanupThresholdMillis = garbageLeaseCleanupThresholdMillis;
this.shutdownGraceMillis = shutdownGraceMillis;
this.billingMode = billingMode;
}
// Check if value is positive, otherwise throw an exception
@ -836,6 +912,29 @@ public class KinesisClientLibConfiguration {
return cleanupLeasesUponShardCompletion;
}
/**
* @return Interval in millis at which to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
*/
public long leaseCleanupIntervalMillis() {
return leaseCleanupIntervalMillis;
}
/**
* @return Interval in millis at which to check if there are any completed leases (leases for shards which have been
* closed as a result of a resharding operation) that need to be cleaned up.
*/
public long completedLeaseCleanupThresholdMillis() {
return completedLeaseCleanupThresholdMillis;
}
/**
* @return Interval in millis at which to check if there are any garbage leases (leases for shards which no longer
* exist in the stream) that need to be cleaned up.
*/
public long garbageLeaseCleanupThresholdMillis() {
return garbageLeaseCleanupThresholdMillis;
}
/**
* @return true if we should ignore child shards which have open parents
*/
@ -1476,4 +1575,39 @@ public class KinesisClientLibConfiguration {
this.maxInitializationAttempts = maxInitializationAttempts;
return this;
}
/**
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
* {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
* @return
*/
public KinesisClientLibConfiguration withLeaseCleanupIntervalMillis(long leaseCleanupIntervalMillis) {
checkIsValuePositive("leaseCleanupIntervalMillis", leaseCleanupIntervalMillis);
this.leaseCleanupIntervalMillis = leaseCleanupIntervalMillis;
return this;
}
/**
* Threshold in millis at which to check if there are any completed leases (leases for shards which have been
* closed as a result of a resharding operation) that need to be cleaned up.
* @param completedLeaseCleanupThresholdMillis
* @return
*/
public KinesisClientLibConfiguration withCompletedLeaseCleanupThresholdMillis(long completedLeaseCleanupThresholdMillis) {
checkIsValuePositive("completedLeaseCleanupThresholdMillis", completedLeaseCleanupThresholdMillis);
this.completedLeaseCleanupThresholdMillis = completedLeaseCleanupThresholdMillis;
return this;
}
/**
* Threshold in millis at which to check if there are any garbage leases (leases for shards which no longer exist
* in the stream) that need to be cleaned up.
* @param garbageLeaseCleanupThresholdMillis
* @return
*/
public KinesisClientLibConfiguration withGarbageLeaseCleanupThresholdMillis(long garbageLeaseCleanupThresholdMillis) {
checkIsValuePositive("garbageLeaseCleanupThresholdMillis", garbageLeaseCleanupThresholdMillis);
this.garbageLeaseCleanupThresholdMillis = garbageLeaseCleanupThresholdMillis;
return this;
}
}

View file

@ -17,10 +17,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.model.ChildShard;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;

View file

@ -10,6 +10,7 @@ import java.util.Set;
/**
* Represents the class that decides if a lease is eligible for cleanup.
*/
@Deprecated
class KinesisLeaseCleanupValidator implements LeaseCleanupValidator {
private static final Log LOG = LogFactory.getLog(KinesisLeaseCleanupValidator.class);

View file

@ -17,8 +17,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -26,8 +24,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.amazonaws.services.kinesis.leases.impl.Lease;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.ShardFilter;
import com.amazonaws.services.kinesis.model.ShardFilterType;
@ -49,8 +45,6 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.amazonaws.services.kinesis.model.Shard;
import javax.annotation.Nullable;
/**
* Helper class to sync leases with shards of the Kinesis stream.
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
@ -67,11 +61,10 @@ class KinesisShardSyncer implements ShardSyncer {
}
synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
InitialPositionInStreamExtended initialPositionInStream,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream,
ignoreUnexpectedChildShards);
}
@ -94,7 +87,7 @@ class KinesisShardSyncer implements ShardSyncer {
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, ignoreUnexpectedChildShards);
}
/**
@ -117,7 +110,7 @@ class KinesisShardSyncer implements ShardSyncer {
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List<Shard> latestShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream,
ignoreUnexpectedChildShards, latestShards, leaseManager.isLeaseTableEmpty());
}
@ -127,7 +120,6 @@ class KinesisShardSyncer implements ShardSyncer {
* @param kinesisProxy
* @param leaseManager
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @throws DependencyException
* @throws InvalidStateException
@ -135,10 +127,10 @@ class KinesisShardSyncer implements ShardSyncer {
* @throws KinesisClientLibIOException
*/
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPosition,
boolean ignoreUnexpectedChildShards)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
// In the case where the lease table is empty, we want to synchronize the minimal amount of shards possible
// based on the given initial position.
@ -148,7 +140,7 @@ class KinesisShardSyncer implements ShardSyncer {
? getShardListAtInitialPosition(kinesisProxy, initialPosition)
: getCompleteShardList(kinesisProxy);
syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
syncShardLeases(kinesisProxy, leaseManager, initialPosition,
ignoreUnexpectedChildShards, latestShards, isLeaseTableEmpty);
}
@ -158,7 +150,6 @@ class KinesisShardSyncer implements ShardSyncer {
* @param kinesisProxy
* @param leaseManager
* @param initialPosition
* @param cleanupLeasesOfCompletedShards
* @param ignoreUnexpectedChildShards
* @param latestShards latest snapshot of shards to reuse
* @throws DependencyException
@ -170,7 +161,6 @@ class KinesisShardSyncer implements ShardSyncer {
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
ILeaseManager<KinesisClientLease> leaseManager,
InitialPositionInStreamExtended initialPosition,
boolean cleanupLeasesOfCompletedShards,
boolean ignoreUnexpectedChildShards,
List<Shard> latestShards,
boolean isLeaseTableEmpty)
@ -218,11 +208,6 @@ class KinesisShardSyncer implements ShardSyncer {
trackedLeases.addAll(currentLeases);
}
trackedLeases.addAll(newLeasesToCreate);
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
leaseManager);
}
}
// CHECKSTYLE:ON CyclomaticComplexity
@ -613,150 +598,6 @@ class KinesisShardSyncer implements ShardSyncer {
return parentShardIds;
}
/**
* Delete leases corresponding to shards that no longer exist in the stream.
* Current scheme: Delete a lease if:
* * the corresponding shard is not present in the list of Kinesis shards, AND
* * the parentShardIds listed in the lease are also not present in the list of Kinesis shards.
* @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
* @param trackedLeases List of
* @param kinesisProxy Kinesis proxy (used to get shard list)
* @param leaseManager
* @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis.
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
private void cleanupGarbageLeases(List<Shard> shards, List<KinesisClientLease> trackedLeases,
IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager)
throws KinesisClientLibIOException, DependencyException, InvalidStateException,
ProvisionedThroughputException {
Set<String> kinesisShards = new HashSet<>();
for (Shard shard : shards) {
kinesisShards.add(shard.getShardId());
}
// Check if there are leases for non-existent shards
List<KinesisClientLease> garbageLeases = new ArrayList<>();
for (KinesisClientLease lease : trackedLeases) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) {
garbageLeases.add(lease);
}
}
if (!garbageLeases.isEmpty()) {
LOG.info("Found " + garbageLeases.size() + " candidate leases for cleanup. Refreshing list of"
+ " Kinesis shards to pick up recent/latest shards");
List<Shard> currentShardList = getCompleteShardList(kinesisProxy);
Set<String> currentKinesisShardIds = new HashSet<>();
for (Shard shard : currentShardList) {
currentKinesisShardIds.add(shard.getShardId());
}
for (KinesisClientLease lease : garbageLeases) {
if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) {
LOG.info("Deleting lease for shard " + lease.getLeaseKey()
+ " as it is not present in Kinesis stream.");
leaseManager.deleteLease(lease);
}
}
}
}
/**
* Private helper method.
* Clean up leases for shards that meet the following criteria:
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
* TRIM_HORIZON.
*
* @param currentLeases List of leases we evaluate for clean up
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
* @param trackedLeases List of all leases we are tracking.
* @param leaseManager Lease manager (will be used to delete leases)
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
* @throws KinesisClientLibIOException
*/
private synchronized void cleanupLeasesOfFinishedShards(Collection<KinesisClientLease> currentLeases,
Map<String, Shard> shardIdToShardMap, Map<String, Set<String>> shardIdToChildShardIdsMap,
List<KinesisClientLease> trackedLeases, ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
KinesisClientLibIOException {
Set<String> shardIdsOfClosedShards = new HashSet<>();
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
for (KinesisClientLease lease : currentLeases) {
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
shardIdsOfClosedShards.add(lease.getLeaseKey());
leasesOfClosedShards.add(lease);
}
}
if (!leasesOfClosedShards.isEmpty()) {
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards);
Comparator<? super KinesisClientLease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
shardIdToShardMap);
Collections.sort(leasesOfClosedShards, startingSequenceNumberComparator);
Map<String, KinesisClientLease> trackedLeaseMap = constructShardIdToKCLLeaseMap(trackedLeases);
for (KinesisClientLease leaseOfClosedShard : leasesOfClosedShards) {
String closedShardId = leaseOfClosedShard.getLeaseKey();
Set<String> childShardIds = shardIdToChildShardIdsMap.get(closedShardId);
if ((closedShardId != null) && (childShardIds != null) && (!childShardIds.isEmpty())) {
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
}
}
}
}
/**
* Delete lease for the closed shard. Rules for deletion are:
* a/ the checkpoint for the closed shard is SHARD_END,
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
* Note: This method has package level access solely for testing purposes.
*
* @param closedShardId Identifies the closed shard
* @param childShardIds ShardIds of children of the closed shard
* @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null)
* @param leaseManager
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
*/
synchronized void cleanupLeaseForClosedShard(String closedShardId, Set<String> childShardIds,
Map<String, KinesisClientLease> trackedLeases, ILeaseManager<KinesisClientLease> leaseManager)
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
List<KinesisClientLease> childShardLeases = new ArrayList<>();
for (String childShardId : childShardIds) {
KinesisClientLease childLease = trackedLeases.get(childShardId);
if (childLease != null) {
childShardLeases.add(childLease);
}
}
if ((leaseForClosedShard != null) && (leaseForClosedShard.getCheckpoint()
.equals(ExtendedSequenceNumber.SHARD_END)) && (childShardLeases.size() == childShardIds.size())) {
boolean okayToDelete = true;
for (KinesisClientLease lease : childShardLeases) {
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
okayToDelete = false;
break;
}
}
if (okayToDelete) {
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
+ " as it has been completely processed and processing of child shards has begun.");
leaseManager.deleteLease(leaseForClosedShard);
}
}
}
/**
* Helper method to create a new KinesisClientLease POJO for a shard.
* Note: Package level access only for testing purposes

View file

@ -8,6 +8,7 @@ import java.util.Set;
/**
* Represents the class that decides if a lease is eligible for cleanup.
*/
@Deprecated
public interface LeaseCleanupValidator {
/**

View file

@ -18,9 +18,11 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
@ -55,6 +57,7 @@ class ShardConsumer {
private final IMetricsFactory metricsFactory;
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
private ICheckpoint checkpoint;
private LeaseCleanupManager leaseCleanupManager;
// Backoff time when polling to check if application has finished processing parent shards
private final long parentShardPollIntervalMillis;
private final boolean cleanupLeasesOfCompletedShards;
@ -112,6 +115,7 @@ class ShardConsumer {
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@Deprecated
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
@ -124,6 +128,7 @@ class ShardConsumer {
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this(shardInfo,
streamConfig,
checkpoint,
@ -156,6 +161,7 @@ class ShardConsumer {
* @param shardSyncer shardSyncer instance used to check and create new leases
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
@Deprecated
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
@ -215,7 +221,9 @@ class ShardConsumer {
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration
* @param shardSyncer shardSyncer instance used to check and create new leases
* @param leaseCleanupManager used to clean up leases in lease table.
*/
@Deprecated
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
@ -232,6 +240,53 @@ class ShardConsumer {
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator,
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds,
maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(),
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()));
}
/**
* @param shardInfo Shard information
* @param streamConfig Stream Config to use
* @param checkpoint Checkpoint tracker
* @param recordProcessor Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress
* @param leaseCoordinator Used to manage leases for current worker
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
* @param cleanupLeasesOfCompletedShards clean up the leases of completed shards
* @param executorService ExecutorService used to execute process tasks for this shard
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
* @param backoffTimeMillis backoff interval when we encounter exceptions
* @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists
* @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams.
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
* @param config Kinesis library configuration
* @param shardSyncer shardSyncer instance used to check and create new leases
* @param leaseCleanupManager used to clean up leases in lease table.
*/
ShardConsumer(ShardInfo shardInfo,
StreamConfig streamConfig,
ICheckpoint checkpoint,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisClientLibLeaseCoordinator leaseCoordinator,
long parentShardPollIntervalMillis,
boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService,
IMetricsFactory metricsFactory,
long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
KinesisDataFetcher kinesisDataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy,
LeaseCleanupManager leaseCleanupManager) {
this.shardInfo = shardInfo;
this.streamConfig = streamConfig;
this.checkpoint = checkpoint;
@ -251,6 +306,7 @@ class ShardConsumer {
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
this.shardSyncer = shardSyncer;
this.shardSyncStrategy = shardSyncStrategy;
this.leaseCleanupManager = leaseCleanupManager;
}
/**
@ -529,4 +585,8 @@ class ShardConsumer {
ShardSyncStrategy getShardSyncStrategy() {
return shardSyncStrategy;
}
LeaseCleanupManager getLeaseCleanupManager() {
return leaseCleanupManager;
}
}

View file

@ -86,7 +86,7 @@ public class ShardInfo {
*
* @return a list of shardId's that are parents of this shard, or empty if the shard has no parents.
*/
protected List<String> getParentShardIds() {
public List<String> getParentShardIds() {
return new LinkedList<String>(parentShardIds);
}

View file

@ -14,9 +14,11 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.util.CollectionUtils;
import org.apache.commons.logging.Log;
@ -60,6 +62,7 @@ class ShutdownTask implements ITask {
private final ShardSyncer shardSyncer;
private final ShardSyncStrategy shardSyncStrategy;
private final List<ChildShard> childShards;
private final LeaseCleanupManager leaseCleanupManager;
/**
* Constructor.
@ -76,7 +79,8 @@ class ShutdownTask implements ITask {
KinesisClientLibLeaseCoordinator leaseCoordinator,
long backoffTimeMillis,
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer,
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards) {
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards,
LeaseCleanupManager leaseCleanupManager) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -91,6 +95,7 @@ class ShutdownTask implements ITask {
this.shardSyncer = shardSyncer;
this.shardSyncStrategy = shardSyncStrategy;
this.childShards = childShards;
this.leaseCleanupManager = leaseCleanupManager;
}
/*
@ -153,13 +158,29 @@ class ShutdownTask implements ITask {
recordProcessor.shutdown(shutdownInput);
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END);
if (localReason == ShutdownReason.TERMINATE) {
if ((lastCheckpointValue == null)
|| (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " +
"See IRecordProcessor.shutdown javadocs for more information.");
}
// Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to
// completed shard being reprocessed or we got RNF from service.
// For these cases enqueue the lease for deletion.
if (successfullyCheckpointedShardEnd || CollectionUtils.isNullOrEmpty(childShards)) {
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentLease, shardInfo);
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
}
//TODO: Add shard end checkpointing here.
}
}
LOG.debug("Shutting down retrieval strategy.");
getRecordsCache.shutdown();

View file

@ -39,6 +39,7 @@ import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer;
import com.amazonaws.services.kinesis.leases.impl.LeaseTaker;
@ -157,6 +158,8 @@ public class Worker implements Runnable {
private ShardSyncStrategy shardSyncStrategy;
private PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
private final LeaseCleanupManager leaseCleanupManager;
/**
* Constructor.
*
@ -573,6 +576,10 @@ public class Worker implements Runnable {
this.workerStateChangeListener = workerStateChangeListener;
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager);
this.leaseCleanupManager = LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion,
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords());
}
/**
@ -726,6 +733,13 @@ public class Worker implements Runnable {
}
}
if (!leaseCleanupManager.isRunning()) {
LOG.info("Starting LeaseCleanupManager.");
leaseCleanupManager.start();
} else {
LOG.info("LeaseCleanupManager is already running. No need to start it.");
}
// If we reach this point, then we either skipped the lease sync or did not have any exception for the
// shard sync in the previous attempt.
if (!leaseCoordinator.isRunning()) {
@ -1111,12 +1125,21 @@ public class Worker implements Runnable {
}
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
IRecordProcessor recordProcessor = processorFactory.createProcessor();
final IRecordProcessor recordProcessor = processorFactory.createProcessor();
final RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
shardInfo,
checkpointTracker,
new SequenceNumberValidator(
streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
metricsFactory);
return new ShardConsumer(shardInfo,
streamConfig,
checkpointTracker,
recordProcessor,
recordProcessorCheckpointer,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesUponShardCompletion,
@ -1124,9 +1147,11 @@ public class Worker implements Runnable {
metricsFactory,
taskBackoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config, shardSyncer, shardSyncStrategy);
config, shardSyncer, shardSyncStrategy,
leaseCleanupManager);
}
/**

View file

@ -0,0 +1,31 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.leases;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import lombok.Value;
import lombok.experimental.Accessors;
/**
* Helper class for cleaning up leases.
*/
@Accessors(fluent=true)
@Value
public class LeasePendingDeletion {
private final KinesisClientLease lease;
private final ShardInfo shardInfo;
}

View file

@ -0,0 +1,369 @@
package com.amazonaws.services.kinesis.leases.impl;
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
import com.amazonaws.util.CollectionUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.Value;
import lombok.experimental.Accessors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Helper class to cleanup of any expired/closed shard leases. It will cleanup leases periodically as defined by
* {@link KinesisClientLibConfiguration#leaseCleanupIntervalMillis()} upon worker shutdown, following a re-shard event or
* a shard expiring from the service.
*/
@RequiredArgsConstructor(access= AccessLevel.PACKAGE)
@EqualsAndHashCode
public class LeaseCleanupManager {
@NonNull
private IKinesisProxy kinesisProxy;
@NonNull
private final ILeaseManager<KinesisClientLease> leaseManager;
@NonNull
private final ScheduledExecutorService deletionThreadPool;
@NonNull
private final IMetricsFactory metricsFactory;
private final boolean cleanupLeasesUponShardCompletion;
private final long leaseCleanupIntervalMillis;
private final long completedLeaseCleanupIntervalMillis;
private final long garbageLeaseCleanupIntervalMillis;
private final int maxRecords;
private final Stopwatch completedLeaseStopwatch = Stopwatch.createUnstarted();
private final Stopwatch garbageLeaseStopwatch = Stopwatch.createUnstarted();
private final Queue<LeasePendingDeletion> deletionQueue = new ConcurrentLinkedQueue<>();
private static final long INITIAL_DELAY = 0L;
private static final Log LOG = LogFactory.getLog(LeaseCleanupManager.class);
@Getter
private volatile boolean isRunning = false;
private static LeaseCleanupManager instance;
/**
* Factory method to return a singleton instance of {@link LeaseCleanupManager}.
* @param kinesisProxy
* @param leaseManager
* @param deletionThreadPool
* @param metricsFactory
* @param cleanupLeasesUponShardCompletion
* @param leaseCleanupIntervalMillis
* @param completedLeaseCleanupIntervalMillis
* @param garbageLeaseCleanupIntervalMillis
* @param maxRecords
* @return
*/
public static LeaseCleanupManager createOrGetInstance(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
ScheduledExecutorService deletionThreadPool, IMetricsFactory metricsFactory,
boolean cleanupLeasesUponShardCompletion, long leaseCleanupIntervalMillis,
long completedLeaseCleanupIntervalMillis, long garbageLeaseCleanupIntervalMillis,
int maxRecords) {
if (instance == null) {
instance = new LeaseCleanupManager(kinesisProxy, leaseManager, deletionThreadPool, metricsFactory, cleanupLeasesUponShardCompletion,
leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
}
return instance;
}
/**
* Starts the lease cleanup thread, which is scheduled periodically as specified by
* {@link LeaseCleanupManager#leaseCleanupIntervalMillis}
*/
public void start() {
LOG.debug("Starting lease cleanup thread.");
isRunning = true;
completedLeaseStopwatch.start();
garbageLeaseStopwatch.start();
deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis,
TimeUnit.MILLISECONDS);
}
/**
* Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion}
* for checking the duplicate entries.
* @param leasePendingDeletion
*/
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
final KinesisClientLease lease = leasePendingDeletion.lease();
if (lease == null) {
LOG.warn("Cannot enqueue lease " + lease.getLeaseKey() + " for deferred deletion - instance doesn't hold " +
"the lease for that shard.");
} else {
LOG.debug("Enqueuing lease " + lease.getLeaseKey() + " for deferred deletion.");
if (!deletionQueue.add(leasePendingDeletion)) {
LOG.warn("Unable to enqueue lease " + lease.getLeaseKey() + " for deletion.");
}
}
}
/**
* Check if lease was already enqueued for deletion.
* //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
* @param leasePendingDeletion
* @return true if enqueued for deletion; false otherwise.
*/
public boolean isEnqueuedForDeletion(LeasePendingDeletion leasePendingDeletion) {
return deletionQueue.contains(leasePendingDeletion);
}
/**
* Returns how many leases are currently waiting in the queue pending deletion.
* @return number of leases pending deletion.
*/
private int leasesPendingDeletion() {
return deletionQueue.size();
}
private boolean timeToCheckForCompletedShard() {
return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis;
}
private boolean timeToCheckForGarbageShard() {
return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis;
}
public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion,
boolean timeToCheckForCompletedShard, boolean timeToCheckForGarbageShard)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final KinesisClientLease lease = leasePendingDeletion.lease();
final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
boolean cleanedUpCompletedLease = false;
boolean cleanedUpGarbageLease = false;
boolean alreadyCheckedForGarbageCollection = false;
boolean wereChildShardsPresent = false;
boolean wasResourceNotFound = false;
try {
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
final KinesisClientLease leaseFromDDB = leaseManager.getLease(shardInfo.getShardId());
if(leaseFromDDB != null) {
Set<String> childShardKeys = leaseFromDDB.getChildShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try {
childShardKeys = getChildShardsFromService(shardInfo);
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
LOG.error("No child shards returned from service for shard " + shardInfo.getShardId());
} else {
wereChildShardsPresent = true;
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
}
} catch (ResourceNotFoundException e) {
throw e;
} finally {
alreadyCheckedForGarbageCollection = true;
}
} else {
wereChildShardsPresent = true;
}
try {
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, childShardKeys);
} catch (Exception e) {
// Suppressing the exception here, so that we can attempt for garbage cleanup.
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId());
}
} else {
LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId());
cleanedUpCompletedLease = true;
}
}
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
try {
wereChildShardsPresent = !CollectionUtils
.isNullOrEmpty(getChildShardsFromService(shardInfo));
} catch (ResourceNotFoundException e) {
throw e;
}
}
} catch (ResourceNotFoundException e) {
wasResourceNotFound = true;
cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease);
}
return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent,
wasResourceNotFound);
}
private Set<String> getChildShardsFromService(ShardInfo shardInfo) {
final String iterator = kinesisProxy.getIterator(shardInfo.getShardId(), ShardIteratorType.LATEST.toString());
return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet());
}
// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
// stream (known explicitly from ResourceNotFound being thrown when processing this shard),
private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream.");
leaseManager.deleteLease(lease);
return true;
}
private boolean allParentShardLeasesDeleted(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
for (String parentShard : lease.getParentShardIds()) {
final KinesisClientLease parentLease = leaseManager.getLease(parentShard);
if (parentLease != null) {
LOG.warn("Lease " + lease.getLeaseKey() + " has a parent lease " + parentLease.getLeaseKey() +
" which is still present in the lease table, skipping deletion for this lease.");
return false;
}
}
return true;
}
// We should only be deleting the current shard's lease if
// 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP.
// 2. Its parent shard lease(s) have already been deleted.
private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set<String> childShardLeaseKeys)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
final Set<String> processedChildShardLeaseKeys = new HashSet<>();
for (String childShardLeaseKey : childShardLeaseKeys) {
final KinesisClientLease childShardLease = Optional.ofNullable(
leaseManager.getLease(childShardLeaseKey))
.orElseThrow(() -> new IllegalStateException(
"Child lease " + childShardLeaseKey + " for completed shard not found in "
+ "lease table - not cleaning up lease " + lease));
if (!childShardLease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) && !childShardLease
.getCheckpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) {
processedChildShardLeaseKeys.add(childShardLease.getLeaseKey());
}
}
if (!allParentShardLeasesDeleted(lease) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) {
return false;
}
LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun.");
leaseManager.deleteLease(lease);
return true;
}
private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set<String> childShardKeys)
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
final KinesisClientLease updatedLease = leasePendingDeletion.lease();
updatedLease.setChildShardIds(childShardKeys);
leaseManager.updateLease(updatedLease);
}
@VisibleForTesting
void cleanupLeases() {
LOG.info("Number of pending leases to clean before the scan : " + leasesPendingDeletion());
if (deletionQueue.isEmpty()) {
LOG.debug("No leases pending deletion.");
} else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) {
final Queue<LeasePendingDeletion> failedDeletions = new ConcurrentLinkedQueue<>();
boolean completedLeaseCleanedUp = false;
boolean garbageLeaseCleanedUp = false;
LOG.debug("Attempting to clean up " + deletionQueue.size() + " lease(s).");
while (!deletionQueue.isEmpty()) {
final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll();
final String leaseKey = leasePendingDeletion.lease().getLeaseKey();
boolean deletionSucceeded = false;
try {
final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion,
timeToCheckForCompletedShard(), timeToCheckForGarbageShard());
completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease();
garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease();
if (leaseCleanupResult.leaseCleanedUp()) {
LOG.debug("Successfully cleaned up lease " + leaseKey);
deletionSucceeded = true;
} else {
LOG.warn("Unable to clean up lease " + leaseKey + " due to " + leaseCleanupResult);
}
} catch (Exception e) {
LOG.error("Failed to cleanup lease " + leaseKey + ". Will re-enqueue for deletion and retry on next " +
"scheduled execution.", e);
}
if (!deletionSucceeded) {
LOG.debug("Did not cleanup lease " + leaseKey + ". Re-enqueueing for deletion.");
failedDeletions.add(leasePendingDeletion);
}
}
if (completedLeaseCleanedUp) {
LOG.debug("At least one completed lease was cleaned up - restarting interval");
completedLeaseStopwatch.reset().start();
}
if (garbageLeaseCleanedUp) {
LOG.debug("At least one garbage lease was cleaned up - restarting interval");
garbageLeaseStopwatch.reset().start();
}
deletionQueue.addAll(failedDeletions);
LOG.info("Number of pending leases to clean after the scan : " + leasesPendingDeletion());
}
}
private class LeaseCleanupThread implements Runnable {
@Override
public void run() {
cleanupLeases();
}
}
@Value
@Accessors(fluent=true)
public static class LeaseCleanupResult {
boolean cleanedUpCompletedLease;
boolean cleanedUpGarbageLease;
boolean wereChildShardsPresent;
boolean wasResourceNotFound;
public boolean leaseCleanedUp() {
return cleanedUpCompletedLease | cleanedUpGarbageLease;
}
}
}

View file

@ -124,7 +124,7 @@ public class ConsumerStatesTest {
assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.getConsumerState()));
for (ShutdownReason shutdownReason : ShutdownReason.values()) {
assertThat(state.shutdownTransition(shutdownReason),
equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
}
assertThat(state.getState(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS));

View file

@ -54,6 +54,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hamcrest.Description;
@ -138,6 +140,7 @@ public class ShardConsumerTest {
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty());
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
}
/**
@ -475,6 +478,8 @@ public class ShardConsumerTest {
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
when(leaseManager.getLease(eq(parentShardId))).thenReturn(parentLease);
when(parentLease.getCheckpoint()).thenReturn(ExtendedSequenceNumber.TRIM_HORIZON);
when(recordProcessorCheckpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(streamConfig.getStreamProxy()).thenReturn(streamProxy);
final ShardConsumer consumer =
new ShardConsumer(shardInfo,
@ -507,6 +512,9 @@ public class ShardConsumerTest {
assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED));
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard();
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
Thread.sleep(50L);
consumer.beginShutdown();
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
assertThat(consumer.isShutdown(), is(true));
verify(shutdownNotification, times(1)).shutdownComplete();
@ -704,19 +712,19 @@ public class ShardConsumerTest {
StreamConfig streamConfig =
new StreamConfig(fileBasedProxy,
maxRecords,
idleTimeMS,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
maxRecords,
idleTimeMS,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null);
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
any(IMetricsFactory.class), anyInt()))
any(IMetricsFactory.class), anyInt()))
.thenReturn(getRecordsCache);
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
@ -732,29 +740,29 @@ public class ShardConsumerTest {
ShardConsumer consumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
recordProcessorCheckpointer,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
dataFetcher,
Optional.empty(),
Optional.empty(),
config,
shardSyncer,
shardSyncStrategy);
streamConfig,
checkpoint,
processor,
recordProcessorCheckpointer,
leaseCoordinator,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
dataFetcher,
Optional.empty(),
Optional.empty(),
config,
shardSyncer,
shardSyncStrategy);
List<String> parentShardIds = new ArrayList<>();
parentShardIds.add(shardInfo.getShardId());
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(),
"leaseOwner",
parentShardIds));
"leaseOwner",
parentShardIds));
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
@ -1111,7 +1119,7 @@ public class ShardConsumerTest {
//@formatter:off (gets the formatting wrong)
private void verifyConsumedRecords(List<Record> expectedRecords,
List<Record> actualRecords) {
List<Record> actualRecords) {
//@formatter:on
assertThat(actualRecords.size(), is(equalTo(expectedRecords.size())));
ListIterator<Record> expectedIter = expectedRecords.listIterator();
@ -1141,7 +1149,7 @@ public class ShardConsumerTest {
}
Matcher<InitializationInput> initializationInputMatcher(final ExtendedSequenceNumber checkpoint,
final ExtendedSequenceNumber pendingCheckpoint) {
final ExtendedSequenceNumber pendingCheckpoint) {
return new TypeSafeMatcher<InitializationInput>() {
@Override
protected boolean matchesSafely(InitializationInput item) {

View file

@ -25,24 +25,24 @@ import com.amazonaws.services.kinesis.model.Shard;
/**
* Helper class to create Shard, SequenceRange and related objects.
*/
class ShardObjectHelper {
public class ShardObjectHelper {
private static final int EXPONENT = 128;
/**
* Max value of a sequence number (2^128 -1). Useful for defining sequence number range for a shard.
*/
static final String MAX_SEQUENCE_NUMBER = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
public static final String MAX_SEQUENCE_NUMBER = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
/**
* Min value of a sequence number (0). Useful for defining sequence number range for a shard.
*/
static final String MIN_SEQUENCE_NUMBER = BigInteger.ZERO.toString();
public static final String MIN_SEQUENCE_NUMBER = BigInteger.ZERO.toString();
/**
* Max value of a hash key (2^128 -1). Useful for defining hash key range for a shard.
*/
static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
public static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
/**
* Min value of a hash key (0). Useful for defining sequence number range for a shard.
@ -63,7 +63,7 @@ class ShardObjectHelper {
* @param sequenceNumberRange
* @return
*/
static Shard newShard(String shardId,
public static Shard newShard(String shardId,
String parentShardId,
String adjacentParentShardId,
SequenceNumberRange sequenceNumberRange) {
@ -78,7 +78,7 @@ class ShardObjectHelper {
* @param hashKeyRange
* @return
*/
static Shard newShard(String shardId,
public static Shard newShard(String shardId,
String parentShardId,
String adjacentParentShardId,
SequenceNumberRange sequenceNumberRange,
@ -98,7 +98,7 @@ class ShardObjectHelper {
* @param endingSequenceNumber
* @return
*/
static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) {
public static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) {
SequenceNumberRange range = new SequenceNumberRange();
range.setStartingSequenceNumber(startingSequenceNumber);
range.setEndingSequenceNumber(endingSequenceNumber);
@ -110,14 +110,14 @@ class ShardObjectHelper {
* @param endingHashKey
* @return
*/
static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) {
public static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) {
HashKeyRange range = new HashKeyRange();
range.setStartingHashKey(startingHashKey);
range.setEndingHashKey(endingHashKey);
return range;
}
static List<String> getParentShardIds(Shard shard) {
public static List<String> getParentShardIds(Shard shard) {
List<String> parentShardIds = new ArrayList<>(2);
if (shard.getAdjacentParentShardId() != null) {
parentShardIds.add(shard.getAdjacentParentShardId());

View file

@ -420,6 +420,7 @@ public class ShardSyncerTest {
cleanupLeasesOfCompletedShards, true, shards);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Set<String> expectedLeaseShardIds = new HashSet<String>();
expectedLeaseShardIds.add("shardId-1000"); // dummy lease will still be in the table.
expectedLeaseShardIds.add("shardId-4");
expectedLeaseShardIds.add("shardId-5");
expectedLeaseShardIds.add("shardId-8");
@ -691,31 +692,6 @@ public class ShardSyncerTest {
dataFile.delete();
}
/**
* Test bootstrapShardLeases() - cleanup garbage leases.
*
* @throws ProvisionedThroughputException
* @throws InvalidStateException
* @throws DependencyException
* @throws IOException
* @throws KinesisClientLibIOException
*/
@Test
public final void testBootstrapShardLeasesCleanupGarbage()
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
String garbageShardId = "shardId-garbage-001";
KinesisClientLease garbageLease = shardSyncer.newKCLLease(ShardObjectHelper.newShard(garbageShardId,
null,
null,
ShardObjectHelper.newSequenceNumberRange("101", null)));
garbageLease.setCheckpoint(new ExtendedSequenceNumber("999"));
leaseManager.createLeaseIfNotExists(garbageLease);
Assert.assertEquals(garbageShardId, leaseManager.getLease(garbageShardId).getLeaseKey());
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST);
Assert.assertNull(leaseManager.getLease(garbageShardId));
}
private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition)
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
KinesisClientLibIOException {
@ -730,7 +706,7 @@ public class ShardSyncerTest {
dataFile.deleteOnExit();
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
shardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
shardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition,
false);
List<KinesisClientLease> newLeases = leaseManager.listLeases();
Assert.assertEquals(2, newLeases.size());
@ -2267,81 +2243,6 @@ public class ShardSyncerTest {
Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
}
/**
* Test cleanup of lease for a shard that has been fully processed (and processing of child shards has begun).
*
* @throws DependencyException
* @throws InvalidStateException
* @throws ProvisionedThroughputException
*/
@Test
public final void testCleanupLeaseForClosedShard()
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
String closedShardId = "shardId-2";
KinesisClientLease leaseForClosedShard = newLease(closedShardId);
leaseForClosedShard.setCheckpoint(new ExtendedSequenceNumber("1234"));
leaseManager.createLeaseIfNotExists(leaseForClosedShard);
Set<String> childShardIds = new HashSet<>();
List<KinesisClientLease> trackedLeases = new ArrayList<>();
Set<String> parentShardIds = new HashSet<>();
parentShardIds.add(closedShardId);
String childShardId1 = "shardId-5";
KinesisClientLease childLease1 = newLease(childShardId1);
childLease1.setParentShardIds(parentShardIds);
childLease1.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
String childShardId2 = "shardId-7";
KinesisClientLease childLease2 = newLease(childShardId2);
childLease2.setParentShardIds(parentShardIds);
childLease2.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
Map<String, KinesisClientLease> trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
// empty list of leases
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// closed shard has not been fully processed yet (checkpoint != SHARD_END)
trackedLeases.add(leaseForClosedShard);
trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// closed shard has been fully processed yet (checkpoint == SHARD_END)
leaseForClosedShard.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
leaseManager.updateLease(leaseForClosedShard);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNull(leaseManager.getLease(closedShardId));
// lease for only one child exists
childShardIds.add(childShardId1);
childShardIds.add(childShardId2);
leaseManager.createLeaseIfNotExists(leaseForClosedShard);
leaseManager.createLeaseIfNotExists(childLease1);
trackedLeases.add(childLease1);
trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// leases for both children exists, but they are both at TRIM_HORIZON
leaseManager.createLeaseIfNotExists(childLease2);
trackedLeases.add(childLease2);
trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// leases for both children exists, one is at TRIM_HORIZON
childLease1.setCheckpoint(new ExtendedSequenceNumber("34890"));
leaseManager.updateLease(childLease1);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNotNull(leaseManager.getLease(closedShardId));
// leases for both children exists, NONE of them are at TRIM_HORIZON
childLease2.setCheckpoint(new ExtendedSequenceNumber("43789"));
leaseManager.updateLease(childLease2);
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
Assert.assertNull(leaseManager.getLease(closedShardId));
}
/**
* Test we can handle trimmed Kinesis shards (absent from the shard list), and valid closed shards.
*

View file

@ -34,6 +34,7 @@ import java.util.UUID;
import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.HashKeyRange;
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
@ -86,6 +87,8 @@ public class ShutdownTaskTest {
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator;
@Mock
private LeaseCleanupManager leaseCleanupManager;
/**
* @throws java.lang.Exception
@ -143,7 +146,8 @@ public class ShutdownTaskTest {
getRecordsCache,
shardSyncer,
shardSyncStrategy,
constructChildShards());
constructChildShards(),
leaseCleanupManager);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@ -178,7 +182,8 @@ public class ShutdownTaskTest {
getRecordsCache,
shardSyncer,
shardSyncStrategy,
constructChildShards());
constructChildShards(),
leaseCleanupManager);
TaskResult result = task.call();
verify(getRecordsCache).shutdown();
verify(leaseCoordinator).dropLease(any(KinesisClientLease.class));
@ -205,7 +210,8 @@ public class ShutdownTaskTest {
getRecordsCache,
shardSyncer,
shardSyncStrategy,
constructChildShards());
constructChildShards(),
leaseCleanupManager);
TaskResult result = task.call();
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class));
@ -238,7 +244,8 @@ public class ShutdownTaskTest {
getRecordsCache,
shardSyncer,
shardSyncStrategy,
Collections.emptyList());
Collections.emptyList(),
leaseCleanupManager);
TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
@ -267,7 +274,8 @@ public class ShutdownTaskTest {
getRecordsCache,
shardSyncer,
shardSyncStrategy,
Collections.emptyList());
Collections.emptyList(),
leaseCleanupManager);
TaskResult result = task.call();
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class));
@ -284,7 +292,7 @@ public class ShutdownTaskTest {
ShutdownTask task = new ShutdownTask(null, null, null, null,
null, null, false,
false, leaseCoordinator, 0,
getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList());
getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
}

View file

@ -0,0 +1,289 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.impl;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardObjectHelper;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.services.kinesis.model.ChildShard;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class LeaseCleanupManagerTest {
private ShardInfo shardInfo;
private String concurrencyToken = "1234";
private int maxRecords = 1;
private String getShardId = "getShardId";
private String splitParent = "splitParent";
private String mergeParent1 = "mergeParent-1";
private String mergeParent2 = "mergeParent-2";
private long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis();
private long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
private long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
private boolean cleanupLeasesOfCompletedShards = true;
private LeaseCleanupManager leaseCleanupManager;
private static final IMetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
@Mock
private LeaseManager leaseManager;
@Mock
private LeaseCoordinator leaseCoordinator;
@Mock
private IKinesisProxy kinesis;
@Mock
private ScheduledExecutorService deletionThreadPool;
@Before
public void setUp() {
shardInfo = new ShardInfo(getShardId, concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
leaseCleanupManager = new LeaseCleanupManager(kinesis, leaseManager, deletionThreadPool, NULL_METRICS_FACTORY,
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
}
/**
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
* shard case.
*/
@Test
public final void testParentShardLeaseDeletedSplitCase() throws Exception {
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 1);
}
/**
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
* shard case.
*/
@Test
public final void testParentShardLeaseDeletedMergeCase() throws Exception {
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 1);
}
/**
* Tests that if cleanupLeasesOfCompletedShards is not enabled by the customer, then no leases are cleaned up for
* the completed shard case.
*/
@Test
public final void testNoLeasesDeletedWhenNotEnabled() throws Exception {
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
cleanupLeasesOfCompletedShards = false;
leaseCleanupManager = new LeaseCleanupManager(kinesis, leaseManager, deletionThreadPool, NULL_METRICS_FACTORY,
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0);
}
/**
* Tests that if some of the child shard leases are missing, we fail fast and don't delete the parent shard lease
* for the completed shard case.
*/
@Test
public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception {
List<ChildShard> childShards = childShardsForSplit();
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, ExtendedSequenceNumber.LATEST, false, 0);
}
/**
* Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint TRIM_HORIZON),
* we don't delete them for the completed shard case.
*/
@Test
public final void testParentShardLeaseNotDeletedWhenChildIsAtTrim() throws Exception {
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.TRIM_HORIZON);
}
/**
* Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint AT_TIMESTAMP),
* we don't delete them for the completed shard case.
*/
@Test
public final void testParentShardLeaseNotDeletedWhenChildIsAtTimestamp() throws Exception {
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP);
}
private void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber)
throws Exception {
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), extendedSequenceNumber, 0);
}
/**
* Tests that if a lease's parents are still present, we do not delete the lease.
*/
@Test
public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception {
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.singleton("parent"),
ExtendedSequenceNumber.LATEST);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0);
}
/**
* Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found.
*/
@Test
public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception {
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final KinesisClientLease heldLease = LeaseHelper.createLease(shardInfo.getShardId(), "leaseOwner", Collections.singleton("parentShardId"));
testLeaseDeletedWhenShardDoesNotExist(heldLease);
}
/**
* Tests ResourceNotFound case when completed lease cleanup is disabled.
* @throws Exception
*/
@Test
public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception {
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
final KinesisClientLease heldLease = LeaseHelper.createLease(shardInfo.getShardId(), "leaseOwner", Collections.singleton("parentShardId"));
cleanupLeasesOfCompletedShards = false;
leaseCleanupManager = new LeaseCleanupManager(kinesis, leaseManager, deletionThreadPool, NULL_METRICS_FACTORY,
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
testLeaseDeletedWhenShardDoesNotExist(heldLease);
}
public void testLeaseDeletedWhenShardDoesNotExist(KinesisClientLease heldLease) throws Exception {
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(heldLease);
when(kinesis.get(anyString(), anyInt())).thenThrow(ResourceNotFoundException.class);
when(kinesis.getIterator(anyString(), anyString())).thenThrow(ResourceNotFoundException.class);
when(leaseManager.getLease(heldLease.getLeaseKey())).thenReturn(heldLease);
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(heldLease, shardInfo));
leaseCleanupManager.cleanupLeases();
verify(leaseManager, times(1)).deleteLease(heldLease);
}
private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
ExtendedSequenceNumber extendedSequenceNumber,
int expectedDeletedLeases) throws Exception {
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases);
}
private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
ExtendedSequenceNumber extendedSequenceNumber,
boolean childShardLeasesPresent,
int expectedDeletedLeases) throws Exception {
final KinesisClientLease lease = LeaseHelper.createLease(shardInfo.getShardId(), "leaseOwner", shardInfo.getParentShardIds(),
childShards.stream().map(c -> c.getShardId()).collect(Collectors.toSet()));
final List<KinesisClientLease> childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease(
c.getShardId(), "leaseOwner", Collections.singleton(shardInfo.getShardId()),
Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList());
final List<KinesisClientLease> parentShardLeases = lease.getParentShardIds().stream().map(p ->
LeaseHelper.createLease(p, "leaseOwner", Collections.emptyList(),
Collections.singleton(shardInfo.getShardId()), extendedSequenceNumber)).collect(Collectors.toList());
when(leaseManager.getLease(lease.getLeaseKey())).thenReturn(lease);
for (Lease parentShardLease : parentShardLeases) {
when(leaseManager.getLease(parentShardLease.getLeaseKey())).thenReturn(parentShardLease);
}
if (childShardLeasesPresent) {
for (Lease childShardLease : childShardLeases) {
when(leaseManager.getLease(childShardLease.getLeaseKey())).thenReturn(childShardLease);
}
}
when(kinesis.getIterator(any(String.class), any(String.class))).thenReturn("123");
final GetRecordsResult getRecordsResult = new GetRecordsResult();
getRecordsResult.setRecords(Collections.emptyList());
getRecordsResult.setChildShards(childShards);
when(kinesis.get(any(String.class), any(Integer.class))).thenReturn(getRecordsResult);
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(lease, shardInfo));
leaseCleanupManager.cleanupLeases();
verify(leaseManager, times(expectedDeletedLeases)).deleteLease(any(Lease.class));
}
private List<ChildShard> childShardsForSplit() {
final List<String> parentShards = Arrays.asList(splitParent);
final ChildShard leftChild = new ChildShard();
leftChild.setShardId("leftChild");
leftChild.setParentShards(parentShards);
leftChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"));
final ChildShard rightChild = new ChildShard();
rightChild.setShardId("rightChild");
rightChild.setParentShards(parentShards);
rightChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"));
return Arrays.asList(leftChild, rightChild);
}
private List<ChildShard> childShardsForMerge() {
final List<String> parentShards = Arrays.asList(mergeParent1, mergeParent2);
final ChildShard child = new ChildShard();
child.setShardId("onlyChild");
child.setParentShards(parentShards);
child.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99"));
return Collections.singletonList(child);
}
}

View file

@ -0,0 +1,44 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.amazonaws.services.kinesis.leases.impl;
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
import java.util.Collection;
import java.util.Collections;
public class LeaseHelper {
public static KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
return createLease(leaseKey, leaseOwner, parentShardIds, Collections.emptySet(), ExtendedSequenceNumber.LATEST);
}
public static KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds, Collection<String> childShardIds) {
return createLease(leaseKey, leaseOwner, parentShardIds, childShardIds, ExtendedSequenceNumber.LATEST);
}
public static KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds,
Collection<String> childShardIds, ExtendedSequenceNumber extendedSequenceNumber) {
KinesisClientLease lease = new KinesisClientLease ();
lease.setLeaseKey(leaseKey);
lease.setLeaseOwner(leaseOwner);
lease.setParentShardIds(parentShardIds);
lease.setChildShardIds(childShardIds);
lease.setCheckpoint(extendedSequenceNumber);
return lease;
}
}