merging and resolving conflicts
This commit is contained in:
commit
5f7d4b3bc6
20 changed files with 1128 additions and 378 deletions
|
|
@ -199,7 +199,7 @@ class ConsumerStates {
|
|||
|
||||
@Override
|
||||
public ConsumerState shutdownTransition(ShutdownReason shutdownReason) {
|
||||
return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState();
|
||||
return ShardConsumerState.SHUTTING_DOWN.getConsumerState();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -531,7 +531,8 @@ class ConsumerStates {
|
|||
consumer.getLeaseCoordinator(),
|
||||
consumer.getTaskBackoffTimeMillis(),
|
||||
consumer.getGetRecordsCache(), consumer.getShardSyncer(),
|
||||
consumer.getShardSyncStrategy(), consumer.getChildShards());
|
||||
consumer.getShardSyncStrategy(), consumer.getChildShards(),
|
||||
consumer.getLeaseCleanupManager());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Date;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
|
|
@ -89,6 +90,23 @@ public class KinesisClientLibConfiguration {
|
|||
*/
|
||||
public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true;
|
||||
|
||||
/**
|
||||
* Interval to run lease cleanup thread in {@link LeaseCleanupManager}.
|
||||
*/
|
||||
private static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofHours(1).toMillis();
|
||||
|
||||
/**
|
||||
* Threshold in millis at which to check if there are any completed leases (leases for shards which have been
|
||||
* closed as a result of a resharding operation) that need to be cleaned up.
|
||||
*/
|
||||
private static final long DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS = Duration.ofMinutes(5).toMillis();
|
||||
|
||||
/**
|
||||
* Threshold in millis at which to check if there are any garbage leases (leases for shards which no longer exist
|
||||
* in the stream) that need to be cleaned up.
|
||||
*/
|
||||
private static final long DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS = Duration.ofMinutes(30).toMillis();
|
||||
|
||||
/**
|
||||
* Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
|
||||
*/
|
||||
|
|
@ -246,6 +264,9 @@ public class KinesisClientLibConfiguration {
|
|||
private ShardPrioritization shardPrioritization;
|
||||
private long shutdownGraceMillis;
|
||||
private ShardSyncStrategyType shardSyncStrategyType;
|
||||
private long leaseCleanupIntervalMillis;
|
||||
private long completedLeaseCleanupThresholdMillis;
|
||||
private long garbageLeaseCleanupThresholdMillis;
|
||||
|
||||
@Getter
|
||||
private Optional<Integer> timeoutInSeconds = Optional.empty();
|
||||
|
|
@ -284,6 +305,7 @@ public class KinesisClientLibConfiguration {
|
|||
* @param credentialsProvider Provides credentials used to sign AWS requests
|
||||
* @param workerId Used to distinguish different workers/processes of a Kinesis application
|
||||
*/
|
||||
@Deprecated
|
||||
public KinesisClientLibConfiguration(String applicationName,
|
||||
String streamName,
|
||||
AWSCredentialsProvider credentialsProvider,
|
||||
|
|
@ -303,6 +325,7 @@ public class KinesisClientLibConfiguration {
|
|||
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
|
||||
* @param workerId Used to distinguish different workers/processes of a Kinesis application
|
||||
*/
|
||||
@Deprecated
|
||||
public KinesisClientLibConfiguration(String applicationName,
|
||||
String streamName,
|
||||
AWSCredentialsProvider kinesisCredentialsProvider,
|
||||
|
|
@ -373,6 +396,7 @@ public class KinesisClientLibConfiguration {
|
|||
*/
|
||||
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
|
||||
@Deprecated
|
||||
public KinesisClientLibConfiguration(String applicationName,
|
||||
String streamName,
|
||||
String kinesisEndpoint,
|
||||
|
|
@ -444,6 +468,7 @@ public class KinesisClientLibConfiguration {
|
|||
*/
|
||||
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
|
||||
@Deprecated
|
||||
public KinesisClientLibConfiguration(String applicationName,
|
||||
String streamName,
|
||||
String kinesisEndpoint,
|
||||
|
|
@ -470,54 +495,14 @@ public class KinesisClientLibConfiguration {
|
|||
String regionName,
|
||||
long shutdownGraceMillis,
|
||||
BillingMode billingMode) {
|
||||
// Check following values are greater than zero
|
||||
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
|
||||
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
|
||||
checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis);
|
||||
checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis);
|
||||
checkIsValuePositive("MaxRecords", (long) maxRecords);
|
||||
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
|
||||
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
|
||||
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
|
||||
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
|
||||
this.applicationName = applicationName;
|
||||
this.tableName = applicationName;
|
||||
this.streamName = streamName;
|
||||
this.kinesisEndpoint = kinesisEndpoint;
|
||||
this.dynamoDBEndpoint = dynamoDBEndpoint;
|
||||
this.initialPositionInStream = initialPositionInStream;
|
||||
this.kinesisCredentialsProvider = kinesisCredentialsProvider;
|
||||
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
|
||||
this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider;
|
||||
this.failoverTimeMillis = failoverTimeMillis;
|
||||
this.maxRecords = maxRecords;
|
||||
this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis;
|
||||
this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList;
|
||||
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
|
||||
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
|
||||
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
|
||||
this.workerIdentifier = workerId;
|
||||
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
|
||||
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
|
||||
this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
|
||||
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
|
||||
this.metricsBufferTimeMillis = metricsBufferTimeMillis;
|
||||
this.metricsMaxQueueSize = metricsMaxQueueSize;
|
||||
this.metricsLevel = DEFAULT_METRICS_LEVEL;
|
||||
this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS;
|
||||
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
|
||||
this.regionName = regionName;
|
||||
this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER;
|
||||
this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME;
|
||||
this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY;
|
||||
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
|
||||
this.initialPositionInStreamExtended =
|
||||
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
|
||||
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
|
||||
this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE;
|
||||
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
|
||||
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory();
|
||||
this.billingMode = billingMode;
|
||||
|
||||
this(applicationName, streamName, kinesisEndpoint, dynamoDBEndpoint, initialPositionInStream, kinesisCredentialsProvider,
|
||||
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis,
|
||||
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
|
||||
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis,
|
||||
metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, billingMode,
|
||||
new SimpleRecordsFetcherFactory(), DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS, DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
|
||||
DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -556,6 +541,7 @@ public class KinesisClientLibConfiguration {
|
|||
*/
|
||||
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
|
||||
@Deprecated
|
||||
public KinesisClientLibConfiguration(String applicationName,
|
||||
String streamName,
|
||||
String kinesisEndpoint,
|
||||
|
|
@ -581,6 +567,91 @@ public class KinesisClientLibConfiguration {
|
|||
boolean validateSequenceNumberBeforeCheckpointing,
|
||||
String regionName,
|
||||
RecordsFetcherFactory recordsFetcherFactory) {
|
||||
|
||||
|
||||
this(applicationName, streamName, kinesisEndpoint, dynamoDBEndpoint, initialPositionInStream, kinesisCredentialsProvider,
|
||||
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis,
|
||||
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
|
||||
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis,
|
||||
metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName, 0, DEFAULT_DDB_BILLING_MODE,
|
||||
recordsFetcherFactory, DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS, DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS,
|
||||
DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param applicationName Name of the Kinesis application
|
||||
* By default the application name is included in the user agent string used to make AWS requests. This
|
||||
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
|
||||
* @param streamName Name of the Kinesis stream
|
||||
* @param kinesisEndpoint Kinesis endpoint
|
||||
* @param dynamoDBEndpoint DynamoDB endpoint
|
||||
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
|
||||
* records from that location in the stream when an application starts up for the first time and there
|
||||
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
|
||||
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
|
||||
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
|
||||
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
|
||||
* @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
|
||||
* @param workerId Used to distinguish different workers/processes of a Kinesis application
|
||||
* @param maxRecords Max records to read per Kinesis getRecords() call
|
||||
* @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
|
||||
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
|
||||
* GetRecords returned an empty record list.
|
||||
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
|
||||
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
|
||||
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
|
||||
* in Kinesis)
|
||||
* @param kinesisClientConfig Client Configuration used by Kinesis client
|
||||
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
|
||||
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
|
||||
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
|
||||
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
|
||||
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
|
||||
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
|
||||
* with a call to Amazon Kinesis before checkpointing for calls to
|
||||
* {@link RecordProcessorCheckpointer#checkpoint(String)}
|
||||
* @param regionName The region name for the service
|
||||
* @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates
|
||||
* @param billingMode The DDB Billing mode to set for lease table creation.
|
||||
* @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard.
|
||||
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
|
||||
* {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
|
||||
* @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases
|
||||
* (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up.
|
||||
* @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases
|
||||
* (leases for shards which no longer exist in the stream) that need to be cleaned up.
|
||||
*/
|
||||
public KinesisClientLibConfiguration(String applicationName,
|
||||
String streamName,
|
||||
String kinesisEndpoint,
|
||||
String dynamoDBEndpoint,
|
||||
InitialPositionInStream initialPositionInStream,
|
||||
AWSCredentialsProvider kinesisCredentialsProvider,
|
||||
AWSCredentialsProvider dynamoDBCredentialsProvider,
|
||||
AWSCredentialsProvider cloudWatchCredentialsProvider,
|
||||
long failoverTimeMillis,
|
||||
String workerId,
|
||||
int maxRecords,
|
||||
long idleTimeBetweenReadsInMillis,
|
||||
boolean callProcessRecordsEvenForEmptyRecordList,
|
||||
long parentShardPollIntervalMillis,
|
||||
long shardSyncIntervalMillis,
|
||||
boolean cleanupTerminatedShardsBeforeExpiry,
|
||||
ClientConfiguration kinesisClientConfig,
|
||||
ClientConfiguration dynamoDBClientConfig,
|
||||
ClientConfiguration cloudWatchClientConfig,
|
||||
long taskBackoffTimeMillis,
|
||||
long metricsBufferTimeMillis,
|
||||
int metricsMaxQueueSize,
|
||||
boolean validateSequenceNumberBeforeCheckpointing,
|
||||
String regionName,
|
||||
long shutdownGraceMillis,
|
||||
BillingMode billingMode,
|
||||
RecordsFetcherFactory recordsFetcherFactory,
|
||||
long leaseCleanupIntervalMillis,
|
||||
long completedLeaseCleanupThresholdMillis,
|
||||
long garbageLeaseCleanupThresholdMillis) {
|
||||
|
||||
// Check following values are greater than zero
|
||||
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
|
||||
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
|
||||
|
|
@ -627,7 +698,12 @@ public class KinesisClientLibConfiguration {
|
|||
this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE;
|
||||
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
|
||||
this.recordsFetcherFactory = recordsFetcherFactory;
|
||||
this.leaseCleanupIntervalMillis = leaseCleanupIntervalMillis;
|
||||
this.completedLeaseCleanupThresholdMillis = completedLeaseCleanupThresholdMillis;
|
||||
this.garbageLeaseCleanupThresholdMillis = garbageLeaseCleanupThresholdMillis;
|
||||
this.shutdownGraceMillis = shutdownGraceMillis;
|
||||
this.billingMode = billingMode;
|
||||
|
||||
}
|
||||
|
||||
// Check if value is positive, otherwise throw an exception
|
||||
|
|
@ -836,6 +912,29 @@ public class KinesisClientLibConfiguration {
|
|||
return cleanupLeasesUponShardCompletion;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Interval in millis at which to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
|
||||
*/
|
||||
public long leaseCleanupIntervalMillis() {
|
||||
return leaseCleanupIntervalMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Interval in millis at which to check if there are any completed leases (leases for shards which have been
|
||||
* closed as a result of a resharding operation) that need to be cleaned up.
|
||||
*/
|
||||
public long completedLeaseCleanupThresholdMillis() {
|
||||
return completedLeaseCleanupThresholdMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Interval in millis at which to check if there are any garbage leases (leases for shards which no longer
|
||||
* exist in the stream) that need to be cleaned up.
|
||||
*/
|
||||
public long garbageLeaseCleanupThresholdMillis() {
|
||||
return garbageLeaseCleanupThresholdMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if we should ignore child shards which have open parents
|
||||
*/
|
||||
|
|
@ -1476,4 +1575,39 @@ public class KinesisClientLibConfiguration {
|
|||
this.maxInitializationAttempts = maxInitializationAttempts;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in
|
||||
* {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager}
|
||||
* @return
|
||||
*/
|
||||
public KinesisClientLibConfiguration withLeaseCleanupIntervalMillis(long leaseCleanupIntervalMillis) {
|
||||
checkIsValuePositive("leaseCleanupIntervalMillis", leaseCleanupIntervalMillis);
|
||||
this.leaseCleanupIntervalMillis = leaseCleanupIntervalMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Threshold in millis at which to check if there are any completed leases (leases for shards which have been
|
||||
* closed as a result of a resharding operation) that need to be cleaned up.
|
||||
* @param completedLeaseCleanupThresholdMillis
|
||||
* @return
|
||||
*/
|
||||
public KinesisClientLibConfiguration withCompletedLeaseCleanupThresholdMillis(long completedLeaseCleanupThresholdMillis) {
|
||||
checkIsValuePositive("completedLeaseCleanupThresholdMillis", completedLeaseCleanupThresholdMillis);
|
||||
this.completedLeaseCleanupThresholdMillis = completedLeaseCleanupThresholdMillis;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Threshold in millis at which to check if there are any garbage leases (leases for shards which no longer exist
|
||||
* in the stream) that need to be cleaned up.
|
||||
* @param garbageLeaseCleanupThresholdMillis
|
||||
* @return
|
||||
*/
|
||||
public KinesisClientLibConfiguration withGarbageLeaseCleanupThresholdMillis(long garbageLeaseCleanupThresholdMillis) {
|
||||
checkIsValuePositive("garbageLeaseCleanupThresholdMillis", garbageLeaseCleanupThresholdMillis);
|
||||
this.garbageLeaseCleanupThresholdMillis = garbageLeaseCleanupThresholdMillis;
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,10 +17,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
import java.util.Collections;
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.SdkClientException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@ import java.util.Set;
|
|||
/**
|
||||
* Represents the class that decides if a lease is eligible for cleanup.
|
||||
*/
|
||||
@Deprecated
|
||||
class KinesisLeaseCleanupValidator implements LeaseCleanupValidator {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(KinesisLeaseCleanupValidator.class);
|
||||
|
|
|
|||
|
|
@ -17,8 +17,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
import java.io.Serializable;
|
||||
import java.math.BigInteger;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
|
|
@ -26,8 +24,6 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.amazonaws.services.kinesis.leases.impl.Lease;
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.services.kinesis.model.ShardFilter;
|
||||
import com.amazonaws.services.kinesis.model.ShardFilterType;
|
||||
|
|
@ -49,8 +45,6 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
|
|||
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
|
||||
import com.amazonaws.services.kinesis.model.Shard;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
|
||||
/**
|
||||
* Helper class to sync leases with shards of the Kinesis stream.
|
||||
* It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding).
|
||||
|
|
@ -67,11 +61,10 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
}
|
||||
|
||||
synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager,
|
||||
InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards,
|
||||
boolean ignoreUnexpectedChildShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||
KinesisClientLibIOException {
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
|
||||
InitialPositionInStreamExtended initialPositionInStream,
|
||||
boolean ignoreUnexpectedChildShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream,
|
||||
ignoreUnexpectedChildShards);
|
||||
}
|
||||
|
||||
|
|
@ -94,7 +87,7 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
boolean ignoreUnexpectedChildShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||
KinesisClientLibIOException {
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards);
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, ignoreUnexpectedChildShards);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -117,7 +110,7 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List<Shard> latestShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||
KinesisClientLibIOException {
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards,
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream,
|
||||
ignoreUnexpectedChildShards, latestShards, leaseManager.isLeaseTableEmpty());
|
||||
}
|
||||
|
||||
|
|
@ -127,7 +120,6 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
* @param kinesisProxy
|
||||
* @param leaseManager
|
||||
* @param initialPosition
|
||||
* @param cleanupLeasesOfCompletedShards
|
||||
* @param ignoreUnexpectedChildShards
|
||||
* @throws DependencyException
|
||||
* @throws InvalidStateException
|
||||
|
|
@ -135,10 +127,10 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
* @throws KinesisClientLibIOException
|
||||
*/
|
||||
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
|
||||
ILeaseManager<KinesisClientLease> leaseManager, InitialPositionInStreamExtended initialPosition,
|
||||
boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||
KinesisClientLibIOException {
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
InitialPositionInStreamExtended initialPosition,
|
||||
boolean ignoreUnexpectedChildShards)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException {
|
||||
|
||||
// In the case where the lease table is empty, we want to synchronize the minimal amount of shards possible
|
||||
// based on the given initial position.
|
||||
|
|
@ -148,7 +140,7 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
? getShardListAtInitialPosition(kinesisProxy, initialPosition)
|
||||
: getCompleteShardList(kinesisProxy);
|
||||
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
|
||||
syncShardLeases(kinesisProxy, leaseManager, initialPosition,
|
||||
ignoreUnexpectedChildShards, latestShards, isLeaseTableEmpty);
|
||||
}
|
||||
|
||||
|
|
@ -158,7 +150,6 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
* @param kinesisProxy
|
||||
* @param leaseManager
|
||||
* @param initialPosition
|
||||
* @param cleanupLeasesOfCompletedShards
|
||||
* @param ignoreUnexpectedChildShards
|
||||
* @param latestShards latest snapshot of shards to reuse
|
||||
* @throws DependencyException
|
||||
|
|
@ -170,7 +161,6 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
private synchronized void syncShardLeases(IKinesisProxy kinesisProxy,
|
||||
ILeaseManager<KinesisClientLease> leaseManager,
|
||||
InitialPositionInStreamExtended initialPosition,
|
||||
boolean cleanupLeasesOfCompletedShards,
|
||||
boolean ignoreUnexpectedChildShards,
|
||||
List<Shard> latestShards,
|
||||
boolean isLeaseTableEmpty)
|
||||
|
|
@ -218,11 +208,6 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
trackedLeases.addAll(currentLeases);
|
||||
}
|
||||
trackedLeases.addAll(newLeasesToCreate);
|
||||
cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager);
|
||||
if (cleanupLeasesOfCompletedShards) {
|
||||
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
|
||||
leaseManager);
|
||||
}
|
||||
}
|
||||
// CHECKSTYLE:ON CyclomaticComplexity
|
||||
|
||||
|
|
@ -613,150 +598,6 @@ class KinesisShardSyncer implements ShardSyncer {
|
|||
return parentShardIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete leases corresponding to shards that no longer exist in the stream.
|
||||
* Current scheme: Delete a lease if:
|
||||
* * the corresponding shard is not present in the list of Kinesis shards, AND
|
||||
* * the parentShardIds listed in the lease are also not present in the list of Kinesis shards.
|
||||
* @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state).
|
||||
* @param trackedLeases List of
|
||||
* @param kinesisProxy Kinesis proxy (used to get shard list)
|
||||
* @param leaseManager
|
||||
* @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis.
|
||||
* @throws ProvisionedThroughputException
|
||||
* @throws InvalidStateException
|
||||
* @throws DependencyException
|
||||
*/
|
||||
private void cleanupGarbageLeases(List<Shard> shards, List<KinesisClientLease> trackedLeases,
|
||||
IKinesisProxy kinesisProxy, ILeaseManager<KinesisClientLease> leaseManager)
|
||||
throws KinesisClientLibIOException, DependencyException, InvalidStateException,
|
||||
ProvisionedThroughputException {
|
||||
Set<String> kinesisShards = new HashSet<>();
|
||||
for (Shard shard : shards) {
|
||||
kinesisShards.add(shard.getShardId());
|
||||
}
|
||||
|
||||
// Check if there are leases for non-existent shards
|
||||
List<KinesisClientLease> garbageLeases = new ArrayList<>();
|
||||
for (KinesisClientLease lease : trackedLeases) {
|
||||
if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) {
|
||||
garbageLeases.add(lease);
|
||||
}
|
||||
}
|
||||
|
||||
if (!garbageLeases.isEmpty()) {
|
||||
LOG.info("Found " + garbageLeases.size() + " candidate leases for cleanup. Refreshing list of"
|
||||
+ " Kinesis shards to pick up recent/latest shards");
|
||||
List<Shard> currentShardList = getCompleteShardList(kinesisProxy);
|
||||
Set<String> currentKinesisShardIds = new HashSet<>();
|
||||
for (Shard shard : currentShardList) {
|
||||
currentKinesisShardIds.add(shard.getShardId());
|
||||
}
|
||||
|
||||
for (KinesisClientLease lease : garbageLeases) {
|
||||
if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) {
|
||||
LOG.info("Deleting lease for shard " + lease.getLeaseKey()
|
||||
+ " as it is not present in Kinesis stream.");
|
||||
leaseManager.deleteLease(lease);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Private helper method.
|
||||
* Clean up leases for shards that meet the following criteria:
|
||||
* a/ the shard has been fully processed (checkpoint is set to SHARD_END)
|
||||
* b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not
|
||||
* TRIM_HORIZON.
|
||||
*
|
||||
* @param currentLeases List of leases we evaluate for clean up
|
||||
* @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards)
|
||||
* @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards)
|
||||
* @param trackedLeases List of all leases we are tracking.
|
||||
* @param leaseManager Lease manager (will be used to delete leases)
|
||||
* @throws DependencyException
|
||||
* @throws InvalidStateException
|
||||
* @throws ProvisionedThroughputException
|
||||
* @throws KinesisClientLibIOException
|
||||
*/
|
||||
private synchronized void cleanupLeasesOfFinishedShards(Collection<KinesisClientLease> currentLeases,
|
||||
Map<String, Shard> shardIdToShardMap, Map<String, Set<String>> shardIdToChildShardIdsMap,
|
||||
List<KinesisClientLease> trackedLeases, ILeaseManager<KinesisClientLease> leaseManager)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException,
|
||||
KinesisClientLibIOException {
|
||||
Set<String> shardIdsOfClosedShards = new HashSet<>();
|
||||
List<KinesisClientLease> leasesOfClosedShards = new ArrayList<>();
|
||||
for (KinesisClientLease lease : currentLeases) {
|
||||
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
|
||||
shardIdsOfClosedShards.add(lease.getLeaseKey());
|
||||
leasesOfClosedShards.add(lease);
|
||||
}
|
||||
}
|
||||
|
||||
if (!leasesOfClosedShards.isEmpty()) {
|
||||
assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards);
|
||||
Comparator<? super KinesisClientLease> startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator(
|
||||
shardIdToShardMap);
|
||||
Collections.sort(leasesOfClosedShards, startingSequenceNumberComparator);
|
||||
Map<String, KinesisClientLease> trackedLeaseMap = constructShardIdToKCLLeaseMap(trackedLeases);
|
||||
|
||||
for (KinesisClientLease leaseOfClosedShard : leasesOfClosedShards) {
|
||||
String closedShardId = leaseOfClosedShard.getLeaseKey();
|
||||
Set<String> childShardIds = shardIdToChildShardIdsMap.get(closedShardId);
|
||||
if ((closedShardId != null) && (childShardIds != null) && (!childShardIds.isEmpty())) {
|
||||
cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete lease for the closed shard. Rules for deletion are:
|
||||
* a/ the checkpoint for the closed shard is SHARD_END,
|
||||
* b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON
|
||||
* Note: This method has package level access solely for testing purposes.
|
||||
*
|
||||
* @param closedShardId Identifies the closed shard
|
||||
* @param childShardIds ShardIds of children of the closed shard
|
||||
* @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null)
|
||||
* @param leaseManager
|
||||
* @throws ProvisionedThroughputException
|
||||
* @throws InvalidStateException
|
||||
* @throws DependencyException
|
||||
*/
|
||||
synchronized void cleanupLeaseForClosedShard(String closedShardId, Set<String> childShardIds,
|
||||
Map<String, KinesisClientLease> trackedLeases, ILeaseManager<KinesisClientLease> leaseManager)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId);
|
||||
List<KinesisClientLease> childShardLeases = new ArrayList<>();
|
||||
|
||||
for (String childShardId : childShardIds) {
|
||||
KinesisClientLease childLease = trackedLeases.get(childShardId);
|
||||
if (childLease != null) {
|
||||
childShardLeases.add(childLease);
|
||||
}
|
||||
}
|
||||
|
||||
if ((leaseForClosedShard != null) && (leaseForClosedShard.getCheckpoint()
|
||||
.equals(ExtendedSequenceNumber.SHARD_END)) && (childShardLeases.size() == childShardIds.size())) {
|
||||
boolean okayToDelete = true;
|
||||
for (KinesisClientLease lease : childShardLeases) {
|
||||
if (lease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) {
|
||||
okayToDelete = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (okayToDelete) {
|
||||
LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey()
|
||||
+ " as it has been completely processed and processing of child shards has begun.");
|
||||
leaseManager.deleteLease(leaseForClosedShard);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method to create a new KinesisClientLease POJO for a shard.
|
||||
* Note: Package level access only for testing purposes
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import java.util.Set;
|
|||
/**
|
||||
* Represents the class that decides if a lease is eligible for cleanup.
|
||||
*/
|
||||
@Deprecated
|
||||
public interface LeaseCleanupValidator {
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -18,9 +18,11 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
|||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
|
@ -55,6 +57,7 @@ class ShardConsumer {
|
|||
private final IMetricsFactory metricsFactory;
|
||||
private final KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||
private ICheckpoint checkpoint;
|
||||
private LeaseCleanupManager leaseCleanupManager;
|
||||
// Backoff time when polling to check if application has finished processing parent shards
|
||||
private final long parentShardPollIntervalMillis;
|
||||
private final boolean cleanupLeasesOfCompletedShards;
|
||||
|
|
@ -112,6 +115,7 @@ class ShardConsumer {
|
|||
* @param shardSyncer shardSyncer instance used to check and create new leases
|
||||
*/
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
||||
@Deprecated
|
||||
ShardConsumer(ShardInfo shardInfo,
|
||||
StreamConfig streamConfig,
|
||||
ICheckpoint checkpoint,
|
||||
|
|
@ -124,6 +128,7 @@ class ShardConsumer {
|
|||
long backoffTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
|
||||
|
||||
this(shardInfo,
|
||||
streamConfig,
|
||||
checkpoint,
|
||||
|
|
@ -156,6 +161,7 @@ class ShardConsumer {
|
|||
* @param shardSyncer shardSyncer instance used to check and create new leases
|
||||
*/
|
||||
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
|
||||
@Deprecated
|
||||
ShardConsumer(ShardInfo shardInfo,
|
||||
StreamConfig streamConfig,
|
||||
ICheckpoint checkpoint,
|
||||
|
|
@ -215,7 +221,9 @@ class ShardConsumer {
|
|||
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
|
||||
* @param config Kinesis library configuration
|
||||
* @param shardSyncer shardSyncer instance used to check and create new leases
|
||||
* @param leaseCleanupManager used to clean up leases in lease table.
|
||||
*/
|
||||
@Deprecated
|
||||
ShardConsumer(ShardInfo shardInfo,
|
||||
StreamConfig streamConfig,
|
||||
ICheckpoint checkpoint,
|
||||
|
|
@ -232,6 +240,53 @@ class ShardConsumer {
|
|||
Optional<Integer> retryGetRecordsInSeconds,
|
||||
Optional<Integer> maxGetRecordsThreadPool,
|
||||
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) {
|
||||
|
||||
this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator,
|
||||
parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory,
|
||||
backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds,
|
||||
maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
||||
Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(),
|
||||
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
|
||||
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param shardInfo Shard information
|
||||
* @param streamConfig Stream Config to use
|
||||
* @param checkpoint Checkpoint tracker
|
||||
* @param recordProcessor Record processor used to process the data records for the shard
|
||||
* @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress
|
||||
* @param leaseCoordinator Used to manage leases for current worker
|
||||
* @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception)
|
||||
* @param cleanupLeasesOfCompletedShards clean up the leases of completed shards
|
||||
* @param executorService ExecutorService used to execute process tasks for this shard
|
||||
* @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard
|
||||
* @param backoffTimeMillis backoff interval when we encounter exceptions
|
||||
* @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists
|
||||
* @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams.
|
||||
* @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record
|
||||
* @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool
|
||||
* @param config Kinesis library configuration
|
||||
* @param shardSyncer shardSyncer instance used to check and create new leases
|
||||
* @param leaseCleanupManager used to clean up leases in lease table.
|
||||
*/
|
||||
ShardConsumer(ShardInfo shardInfo,
|
||||
StreamConfig streamConfig,
|
||||
ICheckpoint checkpoint,
|
||||
IRecordProcessor recordProcessor,
|
||||
RecordProcessorCheckpointer recordProcessorCheckpointer,
|
||||
KinesisClientLibLeaseCoordinator leaseCoordinator,
|
||||
long parentShardPollIntervalMillis,
|
||||
boolean cleanupLeasesOfCompletedShards,
|
||||
ExecutorService executorService,
|
||||
IMetricsFactory metricsFactory,
|
||||
long backoffTimeMillis,
|
||||
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
KinesisDataFetcher kinesisDataFetcher,
|
||||
Optional<Integer> retryGetRecordsInSeconds,
|
||||
Optional<Integer> maxGetRecordsThreadPool,
|
||||
KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy,
|
||||
LeaseCleanupManager leaseCleanupManager) {
|
||||
this.shardInfo = shardInfo;
|
||||
this.streamConfig = streamConfig;
|
||||
this.checkpoint = checkpoint;
|
||||
|
|
@ -251,6 +306,7 @@ class ShardConsumer {
|
|||
this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords());
|
||||
this.shardSyncer = shardSyncer;
|
||||
this.shardSyncStrategy = shardSyncStrategy;
|
||||
this.leaseCleanupManager = leaseCleanupManager;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -529,4 +585,8 @@ class ShardConsumer {
|
|||
ShardSyncStrategy getShardSyncStrategy() {
|
||||
return shardSyncStrategy;
|
||||
}
|
||||
|
||||
LeaseCleanupManager getLeaseCleanupManager() {
|
||||
return leaseCleanupManager;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ public class ShardInfo {
|
|||
*
|
||||
* @return a list of shardId's that are parents of this shard, or empty if the shard has no parents.
|
||||
*/
|
||||
protected List<String> getParentShardIds() {
|
||||
public List<String> getParentShardIds() {
|
||||
return new LinkedList<String>(parentShardIds);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -14,10 +14,12 @@
|
|||
*/
|
||||
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
|
||||
|
||||
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.exceptions.internal.BlockedOnParentShardException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
|
||||
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
|
|
@ -65,6 +67,7 @@ class ShutdownTask implements ITask {
|
|||
private final ShardSyncer shardSyncer;
|
||||
private final ShardSyncStrategy shardSyncStrategy;
|
||||
private final List<ChildShard> childShards;
|
||||
private final LeaseCleanupManager leaseCleanupManager;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
|
|
@ -81,7 +84,8 @@ class ShutdownTask implements ITask {
|
|||
KinesisClientLibLeaseCoordinator leaseCoordinator,
|
||||
long backoffTimeMillis,
|
||||
GetRecordsCache getRecordsCache, ShardSyncer shardSyncer,
|
||||
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards) {
|
||||
ShardSyncStrategy shardSyncStrategy, List<ChildShard> childShards,
|
||||
LeaseCleanupManager leaseCleanupManager) {
|
||||
this.shardInfo = shardInfo;
|
||||
this.recordProcessor = recordProcessor;
|
||||
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
|
||||
|
|
@ -96,6 +100,7 @@ class ShutdownTask implements ITask {
|
|||
this.shardSyncer = shardSyncer;
|
||||
this.shardSyncStrategy = shardSyncStrategy;
|
||||
this.childShards = childShards;
|
||||
this.leaseCleanupManager = leaseCleanupManager;
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
@ -123,13 +128,29 @@ class ShutdownTask implements ITask {
|
|||
recordProcessor.shutdown(shutdownInput);
|
||||
ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue();
|
||||
|
||||
final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END);
|
||||
|
||||
if (localReason == ShutdownReason.TERMINATE) {
|
||||
if ((lastCheckpointValue == null)
|
||||
|| (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) {
|
||||
if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) {
|
||||
throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
|
||||
+ shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " +
|
||||
"See IRecordProcessor.shutdown javadocs for more information.");
|
||||
}
|
||||
|
||||
// Check if either the shard end ddb persist is successful or
|
||||
// if childshards is empty. When child shards is empty then either it is due to
|
||||
// completed shard being reprocessed or we got RNF from service.
|
||||
// For these cases enqueue the lease for deletion.
|
||||
if (successfullyCheckpointedShardEnd || CollectionUtils.isNullOrEmpty(childShards)) {
|
||||
final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId());
|
||||
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentLease, shardInfo);
|
||||
|
||||
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
|
||||
leaseCleanupManager.enqueueForDeletion(leasePendingDeletion);
|
||||
}
|
||||
|
||||
//TODO: Add shard end checkpointing here.
|
||||
}
|
||||
}
|
||||
LOG.debug("Shutting down retrieval strategy.");
|
||||
getRecordsCache.shutdown();
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
|||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector;
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator;
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer;
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseTaker;
|
||||
|
|
@ -157,6 +158,8 @@ public class Worker implements Runnable {
|
|||
private ShardSyncStrategy shardSyncStrategy;
|
||||
private PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager;
|
||||
|
||||
private final LeaseCleanupManager leaseCleanupManager;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
|
|
@ -573,6 +576,10 @@ public class Worker implements Runnable {
|
|||
this.workerStateChangeListener = workerStateChangeListener;
|
||||
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED);
|
||||
createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager);
|
||||
this.leaseCleanupManager = LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(),
|
||||
Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion,
|
||||
config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(),
|
||||
config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords());
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -726,6 +733,13 @@ public class Worker implements Runnable {
|
|||
}
|
||||
}
|
||||
|
||||
if (!leaseCleanupManager.isRunning()) {
|
||||
LOG.info("Starting LeaseCleanupManager.");
|
||||
leaseCleanupManager.start();
|
||||
} else {
|
||||
LOG.info("LeaseCleanupManager is already running. No need to start it.");
|
||||
}
|
||||
|
||||
// If we reach this point, then we either skipped the lease sync or did not have any exception for the
|
||||
// shard sync in the previous attempt.
|
||||
if (!leaseCoordinator.isRunning()) {
|
||||
|
|
@ -1111,12 +1125,21 @@ public class Worker implements Runnable {
|
|||
}
|
||||
|
||||
protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) {
|
||||
IRecordProcessor recordProcessor = processorFactory.createProcessor();
|
||||
final IRecordProcessor recordProcessor = processorFactory.createProcessor();
|
||||
final RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
|
||||
shardInfo,
|
||||
checkpointTracker,
|
||||
new SequenceNumberValidator(
|
||||
streamConfig.getStreamProxy(),
|
||||
shardInfo.getShardId(),
|
||||
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
|
||||
metricsFactory);
|
||||
|
||||
return new ShardConsumer(shardInfo,
|
||||
streamConfig,
|
||||
checkpointTracker,
|
||||
recordProcessor,
|
||||
recordProcessorCheckpointer,
|
||||
leaseCoordinator,
|
||||
parentShardPollIntervalMillis,
|
||||
cleanupLeasesUponShardCompletion,
|
||||
|
|
@ -1124,9 +1147,11 @@ public class Worker implements Runnable {
|
|||
metricsFactory,
|
||||
taskBackoffTimeMillis,
|
||||
skipShardSyncAtWorkerInitializationIfLeasesExist,
|
||||
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
|
||||
retryGetRecordsInSeconds,
|
||||
maxGetRecordsThreadPool,
|
||||
config, shardSyncer, shardSyncStrategy);
|
||||
config, shardSyncer, shardSyncStrategy,
|
||||
leaseCleanupManager);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package com.amazonaws.services.kinesis.leases;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
|
||||
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
|
||||
import lombok.Value;
|
||||
import lombok.experimental.Accessors;
|
||||
|
||||
/**
|
||||
* Helper class for cleaning up leases.
|
||||
*/
|
||||
@Accessors(fluent=true)
|
||||
@Value
|
||||
public class LeasePendingDeletion {
|
||||
private final KinesisClientLease lease;
|
||||
private final ShardInfo shardInfo;
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,369 @@
|
|||
package com.amazonaws.services.kinesis.leases.impl;
|
||||
|
||||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.DependencyException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException;
|
||||
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||
import com.amazonaws.services.kinesis.model.ShardIteratorType;
|
||||
import com.amazonaws.util.CollectionUtils;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import lombok.AccessLevel;
|
||||
import lombok.EqualsAndHashCode;
|
||||
import lombok.Getter;
|
||||
import lombok.NonNull;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.Value;
|
||||
import lombok.experimental.Accessors;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Helper class to cleanup of any expired/closed shard leases. It will cleanup leases periodically as defined by
|
||||
* {@link KinesisClientLibConfiguration#leaseCleanupIntervalMillis()} upon worker shutdown, following a re-shard event or
|
||||
* a shard expiring from the service.
|
||||
*/
|
||||
@RequiredArgsConstructor(access= AccessLevel.PACKAGE)
|
||||
@EqualsAndHashCode
|
||||
public class LeaseCleanupManager {
|
||||
@NonNull
|
||||
private IKinesisProxy kinesisProxy;
|
||||
@NonNull
|
||||
private final ILeaseManager<KinesisClientLease> leaseManager;
|
||||
@NonNull
|
||||
private final ScheduledExecutorService deletionThreadPool;
|
||||
@NonNull
|
||||
private final IMetricsFactory metricsFactory;
|
||||
private final boolean cleanupLeasesUponShardCompletion;
|
||||
private final long leaseCleanupIntervalMillis;
|
||||
private final long completedLeaseCleanupIntervalMillis;
|
||||
private final long garbageLeaseCleanupIntervalMillis;
|
||||
private final int maxRecords;
|
||||
|
||||
private final Stopwatch completedLeaseStopwatch = Stopwatch.createUnstarted();
|
||||
private final Stopwatch garbageLeaseStopwatch = Stopwatch.createUnstarted();
|
||||
private final Queue<LeasePendingDeletion> deletionQueue = new ConcurrentLinkedQueue<>();
|
||||
|
||||
private static final long INITIAL_DELAY = 0L;
|
||||
private static final Log LOG = LogFactory.getLog(LeaseCleanupManager.class);
|
||||
|
||||
@Getter
|
||||
private volatile boolean isRunning = false;
|
||||
|
||||
private static LeaseCleanupManager instance;
|
||||
|
||||
/**
|
||||
* Factory method to return a singleton instance of {@link LeaseCleanupManager}.
|
||||
* @param kinesisProxy
|
||||
* @param leaseManager
|
||||
* @param deletionThreadPool
|
||||
* @param metricsFactory
|
||||
* @param cleanupLeasesUponShardCompletion
|
||||
* @param leaseCleanupIntervalMillis
|
||||
* @param completedLeaseCleanupIntervalMillis
|
||||
* @param garbageLeaseCleanupIntervalMillis
|
||||
* @param maxRecords
|
||||
* @return
|
||||
*/
|
||||
public static LeaseCleanupManager createOrGetInstance(IKinesisProxy kinesisProxy, ILeaseManager leaseManager,
|
||||
ScheduledExecutorService deletionThreadPool, IMetricsFactory metricsFactory,
|
||||
boolean cleanupLeasesUponShardCompletion, long leaseCleanupIntervalMillis,
|
||||
long completedLeaseCleanupIntervalMillis, long garbageLeaseCleanupIntervalMillis,
|
||||
int maxRecords) {
|
||||
if (instance == null) {
|
||||
instance = new LeaseCleanupManager(kinesisProxy, leaseManager, deletionThreadPool, metricsFactory, cleanupLeasesUponShardCompletion,
|
||||
leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
|
||||
}
|
||||
|
||||
return instance;
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the lease cleanup thread, which is scheduled periodically as specified by
|
||||
* {@link LeaseCleanupManager#leaseCleanupIntervalMillis}
|
||||
*/
|
||||
public void start() {
|
||||
LOG.debug("Starting lease cleanup thread.");
|
||||
isRunning = true;
|
||||
completedLeaseStopwatch.start();
|
||||
garbageLeaseStopwatch.start();
|
||||
|
||||
deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis,
|
||||
TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion}
|
||||
* for checking the duplicate entries.
|
||||
* @param leasePendingDeletion
|
||||
*/
|
||||
public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) {
|
||||
final KinesisClientLease lease = leasePendingDeletion.lease();
|
||||
if (lease == null) {
|
||||
LOG.warn("Cannot enqueue lease " + lease.getLeaseKey() + " for deferred deletion - instance doesn't hold " +
|
||||
"the lease for that shard.");
|
||||
} else {
|
||||
LOG.debug("Enqueuing lease " + lease.getLeaseKey() + " for deferred deletion.");
|
||||
if (!deletionQueue.add(leasePendingDeletion)) {
|
||||
LOG.warn("Unable to enqueue lease " + lease.getLeaseKey() + " for deletion.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if lease was already enqueued for deletion.
|
||||
* //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597.
|
||||
* @param leasePendingDeletion
|
||||
* @return true if enqueued for deletion; false otherwise.
|
||||
*/
|
||||
public boolean isEnqueuedForDeletion(LeasePendingDeletion leasePendingDeletion) {
|
||||
return deletionQueue.contains(leasePendingDeletion);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns how many leases are currently waiting in the queue pending deletion.
|
||||
* @return number of leases pending deletion.
|
||||
*/
|
||||
private int leasesPendingDeletion() {
|
||||
return deletionQueue.size();
|
||||
}
|
||||
|
||||
private boolean timeToCheckForCompletedShard() {
|
||||
return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis;
|
||||
}
|
||||
|
||||
private boolean timeToCheckForGarbageShard() {
|
||||
return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis;
|
||||
}
|
||||
|
||||
public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion,
|
||||
boolean timeToCheckForCompletedShard, boolean timeToCheckForGarbageShard)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
final KinesisClientLease lease = leasePendingDeletion.lease();
|
||||
final ShardInfo shardInfo = leasePendingDeletion.shardInfo();
|
||||
|
||||
boolean cleanedUpCompletedLease = false;
|
||||
boolean cleanedUpGarbageLease = false;
|
||||
boolean alreadyCheckedForGarbageCollection = false;
|
||||
boolean wereChildShardsPresent = false;
|
||||
boolean wasResourceNotFound = false;
|
||||
|
||||
try {
|
||||
if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) {
|
||||
final KinesisClientLease leaseFromDDB = leaseManager.getLease(shardInfo.getShardId());
|
||||
if(leaseFromDDB != null) {
|
||||
Set<String> childShardKeys = leaseFromDDB.getChildShardIds();
|
||||
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
|
||||
try {
|
||||
childShardKeys = getChildShardsFromService(shardInfo);
|
||||
|
||||
if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
|
||||
LOG.error("No child shards returned from service for shard " + shardInfo.getShardId());
|
||||
} else {
|
||||
wereChildShardsPresent = true;
|
||||
updateLeaseWithChildShards(leasePendingDeletion, childShardKeys);
|
||||
}
|
||||
} catch (ResourceNotFoundException e) {
|
||||
throw e;
|
||||
} finally {
|
||||
alreadyCheckedForGarbageCollection = true;
|
||||
}
|
||||
} else {
|
||||
wereChildShardsPresent = true;
|
||||
}
|
||||
try {
|
||||
cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, childShardKeys);
|
||||
} catch (Exception e) {
|
||||
// Suppressing the exception here, so that we can attempt for garbage cleanup.
|
||||
LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId());
|
||||
}
|
||||
} else {
|
||||
LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId());
|
||||
cleanedUpCompletedLease = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
|
||||
try {
|
||||
wereChildShardsPresent = !CollectionUtils
|
||||
.isNullOrEmpty(getChildShardsFromService(shardInfo));
|
||||
} catch (ResourceNotFoundException e) {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
} catch (ResourceNotFoundException e) {
|
||||
wasResourceNotFound = true;
|
||||
cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease);
|
||||
}
|
||||
|
||||
return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent,
|
||||
wasResourceNotFound);
|
||||
}
|
||||
|
||||
private Set<String> getChildShardsFromService(ShardInfo shardInfo) {
|
||||
final String iterator = kinesisProxy.getIterator(shardInfo.getShardId(), ShardIteratorType.LATEST.toString());
|
||||
return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
|
||||
// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
|
||||
// stream (known explicitly from ResourceNotFound being thrown when processing this shard),
|
||||
private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream.");
|
||||
leaseManager.deleteLease(lease);
|
||||
return true;
|
||||
}
|
||||
|
||||
private boolean allParentShardLeasesDeleted(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
for (String parentShard : lease.getParentShardIds()) {
|
||||
final KinesisClientLease parentLease = leaseManager.getLease(parentShard);
|
||||
|
||||
if (parentLease != null) {
|
||||
LOG.warn("Lease " + lease.getLeaseKey() + " has a parent lease " + parentLease.getLeaseKey() +
|
||||
" which is still present in the lease table, skipping deletion for this lease.");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// We should only be deleting the current shard's lease if
|
||||
// 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP.
|
||||
// 2. Its parent shard lease(s) have already been deleted.
|
||||
private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set<String> childShardLeaseKeys)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException {
|
||||
final Set<String> processedChildShardLeaseKeys = new HashSet<>();
|
||||
|
||||
for (String childShardLeaseKey : childShardLeaseKeys) {
|
||||
final KinesisClientLease childShardLease = Optional.ofNullable(
|
||||
leaseManager.getLease(childShardLeaseKey))
|
||||
.orElseThrow(() -> new IllegalStateException(
|
||||
"Child lease " + childShardLeaseKey + " for completed shard not found in "
|
||||
+ "lease table - not cleaning up lease " + lease));
|
||||
|
||||
if (!childShardLease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) && !childShardLease
|
||||
.getCheckpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) {
|
||||
processedChildShardLeaseKeys.add(childShardLease.getLeaseKey());
|
||||
}
|
||||
}
|
||||
|
||||
if (!allParentShardLeasesDeleted(lease) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun.");
|
||||
leaseManager.deleteLease(lease);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set<String> childShardKeys)
|
||||
throws DependencyException, ProvisionedThroughputException, InvalidStateException {
|
||||
final KinesisClientLease updatedLease = leasePendingDeletion.lease();
|
||||
updatedLease.setChildShardIds(childShardKeys);
|
||||
|
||||
leaseManager.updateLease(updatedLease);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void cleanupLeases() {
|
||||
LOG.info("Number of pending leases to clean before the scan : " + leasesPendingDeletion());
|
||||
if (deletionQueue.isEmpty()) {
|
||||
LOG.debug("No leases pending deletion.");
|
||||
} else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) {
|
||||
final Queue<LeasePendingDeletion> failedDeletions = new ConcurrentLinkedQueue<>();
|
||||
boolean completedLeaseCleanedUp = false;
|
||||
boolean garbageLeaseCleanedUp = false;
|
||||
|
||||
LOG.debug("Attempting to clean up " + deletionQueue.size() + " lease(s).");
|
||||
|
||||
while (!deletionQueue.isEmpty()) {
|
||||
final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll();
|
||||
final String leaseKey = leasePendingDeletion.lease().getLeaseKey();
|
||||
boolean deletionSucceeded = false;
|
||||
try {
|
||||
final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion,
|
||||
timeToCheckForCompletedShard(), timeToCheckForGarbageShard());
|
||||
completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease();
|
||||
garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease();
|
||||
|
||||
if (leaseCleanupResult.leaseCleanedUp()) {
|
||||
LOG.debug("Successfully cleaned up lease " + leaseKey);
|
||||
deletionSucceeded = true;
|
||||
} else {
|
||||
LOG.warn("Unable to clean up lease " + leaseKey + " due to " + leaseCleanupResult);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to cleanup lease " + leaseKey + ". Will re-enqueue for deletion and retry on next " +
|
||||
"scheduled execution.", e);
|
||||
}
|
||||
if (!deletionSucceeded) {
|
||||
LOG.debug("Did not cleanup lease " + leaseKey + ". Re-enqueueing for deletion.");
|
||||
failedDeletions.add(leasePendingDeletion);
|
||||
}
|
||||
}
|
||||
if (completedLeaseCleanedUp) {
|
||||
LOG.debug("At least one completed lease was cleaned up - restarting interval");
|
||||
completedLeaseStopwatch.reset().start();
|
||||
}
|
||||
if (garbageLeaseCleanedUp) {
|
||||
LOG.debug("At least one garbage lease was cleaned up - restarting interval");
|
||||
garbageLeaseStopwatch.reset().start();
|
||||
}
|
||||
deletionQueue.addAll(failedDeletions);
|
||||
|
||||
LOG.info("Number of pending leases to clean after the scan : " + leasesPendingDeletion());
|
||||
}
|
||||
}
|
||||
|
||||
private class LeaseCleanupThread implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
cleanupLeases();
|
||||
}
|
||||
}
|
||||
|
||||
@Value
|
||||
@Accessors(fluent=true)
|
||||
public static class LeaseCleanupResult {
|
||||
boolean cleanedUpCompletedLease;
|
||||
boolean cleanedUpGarbageLease;
|
||||
boolean wereChildShardsPresent;
|
||||
boolean wasResourceNotFound;
|
||||
|
||||
public boolean leaseCleanedUp() {
|
||||
return cleanedUpCompletedLease | cleanedUpGarbageLease;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -240,7 +240,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
|
|||
*/
|
||||
@Override
|
||||
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
return list(1).isEmpty();
|
||||
return list(1, 1).isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -253,6 +253,20 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
|
|||
* @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
|
||||
*/
|
||||
List<T> list(Integer limit) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
return list(limit, Integer.MAX_VALUE);
|
||||
}
|
||||
|
||||
/**
|
||||
* List with the given page size, up to a limit of paginated calls.
|
||||
*
|
||||
* @param limit number of items to consider at a time - used by integration tests to force paging.
|
||||
* @param maxPages max number of paginated scan calls.
|
||||
* @return list of leases
|
||||
* @throws InvalidStateException if table does not exist
|
||||
* @throws DependencyException if DynamoDB scan fail in an unexpected way
|
||||
* @throws ProvisionedThroughputException if DynamoDB scan fail due to exceeded capacity
|
||||
*/
|
||||
private List<T> list(Integer limit, Integer maxPages) throws InvalidStateException, ProvisionedThroughputException, DependencyException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Listing leases from table " + table);
|
||||
}
|
||||
|
|
@ -277,7 +291,7 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
|
|||
}
|
||||
|
||||
Map<String, AttributeValue> lastEvaluatedKey = scanResult.getLastEvaluatedKey();
|
||||
if (lastEvaluatedKey == null) {
|
||||
if (lastEvaluatedKey == null || --maxPages <= 0) {
|
||||
// Signify that we're done.
|
||||
scanResult = null;
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
|
|
|||
|
|
@ -124,7 +124,7 @@ public class ConsumerStatesTest {
|
|||
assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.getConsumerState()));
|
||||
for (ShutdownReason shutdownReason : ShutdownReason.values()) {
|
||||
assertThat(state.shutdownTransition(shutdownReason),
|
||||
equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState()));
|
||||
equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState()));
|
||||
}
|
||||
|
||||
assertThat(state.getState(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS));
|
||||
|
|
|
|||
|
|
@ -54,6 +54,8 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseManager;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.hamcrest.Description;
|
||||
|
|
@ -138,6 +140,7 @@ public class ShardConsumerTest {
|
|||
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory());
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
|
||||
when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty());
|
||||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -475,6 +478,8 @@ public class ShardConsumerTest {
|
|||
when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager);
|
||||
when(leaseManager.getLease(eq(parentShardId))).thenReturn(parentLease);
|
||||
when(parentLease.getCheckpoint()).thenReturn(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
when(recordProcessorCheckpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||
when(streamConfig.getStreamProxy()).thenReturn(streamProxy);
|
||||
|
||||
final ShardConsumer consumer =
|
||||
new ShardConsumer(shardInfo,
|
||||
|
|
@ -507,6 +512,9 @@ public class ShardConsumerTest {
|
|||
assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED));
|
||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
|
||||
consumer.consumeShard();
|
||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN)));
|
||||
Thread.sleep(50L);
|
||||
consumer.beginShutdown();
|
||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)));
|
||||
assertThat(consumer.isShutdown(), is(true));
|
||||
verify(shutdownNotification, times(1)).shutdownComplete();
|
||||
|
|
@ -704,19 +712,19 @@ public class ShardConsumerTest {
|
|||
|
||||
StreamConfig streamConfig =
|
||||
new StreamConfig(fileBasedProxy,
|
||||
maxRecords,
|
||||
idleTimeMS,
|
||||
callProcessRecordsForEmptyRecordList,
|
||||
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
|
||||
maxRecords,
|
||||
idleTimeMS,
|
||||
callProcessRecordsForEmptyRecordList,
|
||||
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
|
||||
|
||||
ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null);
|
||||
|
||||
dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
|
||||
|
||||
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
|
||||
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
|
||||
new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
|
||||
when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(),
|
||||
any(IMetricsFactory.class), anyInt()))
|
||||
any(IMetricsFactory.class), anyInt()))
|
||||
.thenReturn(getRecordsCache);
|
||||
|
||||
RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer(
|
||||
|
|
@ -732,29 +740,29 @@ public class ShardConsumerTest {
|
|||
|
||||
ShardConsumer consumer =
|
||||
new ShardConsumer(shardInfo,
|
||||
streamConfig,
|
||||
checkpoint,
|
||||
processor,
|
||||
recordProcessorCheckpointer,
|
||||
leaseCoordinator,
|
||||
parentShardPollIntervalMillis,
|
||||
cleanupLeasesOfCompletedShards,
|
||||
executorService,
|
||||
metricsFactory,
|
||||
taskBackoffTimeMillis,
|
||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
||||
dataFetcher,
|
||||
Optional.empty(),
|
||||
Optional.empty(),
|
||||
config,
|
||||
shardSyncer,
|
||||
shardSyncStrategy);
|
||||
streamConfig,
|
||||
checkpoint,
|
||||
processor,
|
||||
recordProcessorCheckpointer,
|
||||
leaseCoordinator,
|
||||
parentShardPollIntervalMillis,
|
||||
cleanupLeasesOfCompletedShards,
|
||||
executorService,
|
||||
metricsFactory,
|
||||
taskBackoffTimeMillis,
|
||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
||||
dataFetcher,
|
||||
Optional.empty(),
|
||||
Optional.empty(),
|
||||
config,
|
||||
shardSyncer,
|
||||
shardSyncStrategy);
|
||||
|
||||
List<String> parentShardIds = new ArrayList<>();
|
||||
parentShardIds.add(shardInfo.getShardId());
|
||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(),
|
||||
"leaseOwner",
|
||||
parentShardIds));
|
||||
"leaseOwner",
|
||||
parentShardIds));
|
||||
when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true);
|
||||
|
||||
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
|
||||
|
|
@ -1111,7 +1119,7 @@ public class ShardConsumerTest {
|
|||
|
||||
//@formatter:off (gets the formatting wrong)
|
||||
private void verifyConsumedRecords(List<Record> expectedRecords,
|
||||
List<Record> actualRecords) {
|
||||
List<Record> actualRecords) {
|
||||
//@formatter:on
|
||||
assertThat(actualRecords.size(), is(equalTo(expectedRecords.size())));
|
||||
ListIterator<Record> expectedIter = expectedRecords.listIterator();
|
||||
|
|
@ -1141,7 +1149,7 @@ public class ShardConsumerTest {
|
|||
}
|
||||
|
||||
Matcher<InitializationInput> initializationInputMatcher(final ExtendedSequenceNumber checkpoint,
|
||||
final ExtendedSequenceNumber pendingCheckpoint) {
|
||||
final ExtendedSequenceNumber pendingCheckpoint) {
|
||||
return new TypeSafeMatcher<InitializationInput>() {
|
||||
@Override
|
||||
protected boolean matchesSafely(InitializationInput item) {
|
||||
|
|
|
|||
|
|
@ -25,24 +25,24 @@ import com.amazonaws.services.kinesis.model.Shard;
|
|||
/**
|
||||
* Helper class to create Shard, SequenceRange and related objects.
|
||||
*/
|
||||
class ShardObjectHelper {
|
||||
public class ShardObjectHelper {
|
||||
|
||||
private static final int EXPONENT = 128;
|
||||
|
||||
/**
|
||||
* Max value of a sequence number (2^128 -1). Useful for defining sequence number range for a shard.
|
||||
*/
|
||||
static final String MAX_SEQUENCE_NUMBER = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
|
||||
public static final String MAX_SEQUENCE_NUMBER = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
|
||||
|
||||
/**
|
||||
* Min value of a sequence number (0). Useful for defining sequence number range for a shard.
|
||||
*/
|
||||
static final String MIN_SEQUENCE_NUMBER = BigInteger.ZERO.toString();
|
||||
public static final String MIN_SEQUENCE_NUMBER = BigInteger.ZERO.toString();
|
||||
|
||||
/**
|
||||
* Max value of a hash key (2^128 -1). Useful for defining hash key range for a shard.
|
||||
*/
|
||||
static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
|
||||
public static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString();
|
||||
|
||||
/**
|
||||
* Min value of a hash key (0). Useful for defining sequence number range for a shard.
|
||||
|
|
@ -63,7 +63,7 @@ class ShardObjectHelper {
|
|||
* @param sequenceNumberRange
|
||||
* @return
|
||||
*/
|
||||
static Shard newShard(String shardId,
|
||||
public static Shard newShard(String shardId,
|
||||
String parentShardId,
|
||||
String adjacentParentShardId,
|
||||
SequenceNumberRange sequenceNumberRange) {
|
||||
|
|
@ -78,7 +78,7 @@ class ShardObjectHelper {
|
|||
* @param hashKeyRange
|
||||
* @return
|
||||
*/
|
||||
static Shard newShard(String shardId,
|
||||
public static Shard newShard(String shardId,
|
||||
String parentShardId,
|
||||
String adjacentParentShardId,
|
||||
SequenceNumberRange sequenceNumberRange,
|
||||
|
|
@ -98,7 +98,7 @@ class ShardObjectHelper {
|
|||
* @param endingSequenceNumber
|
||||
* @return
|
||||
*/
|
||||
static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) {
|
||||
public static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) {
|
||||
SequenceNumberRange range = new SequenceNumberRange();
|
||||
range.setStartingSequenceNumber(startingSequenceNumber);
|
||||
range.setEndingSequenceNumber(endingSequenceNumber);
|
||||
|
|
@ -110,14 +110,14 @@ class ShardObjectHelper {
|
|||
* @param endingHashKey
|
||||
* @return
|
||||
*/
|
||||
static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) {
|
||||
public static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) {
|
||||
HashKeyRange range = new HashKeyRange();
|
||||
range.setStartingHashKey(startingHashKey);
|
||||
range.setEndingHashKey(endingHashKey);
|
||||
return range;
|
||||
}
|
||||
|
||||
static List<String> getParentShardIds(Shard shard) {
|
||||
public static List<String> getParentShardIds(Shard shard) {
|
||||
List<String> parentShardIds = new ArrayList<>(2);
|
||||
if (shard.getAdjacentParentShardId() != null) {
|
||||
parentShardIds.add(shard.getAdjacentParentShardId());
|
||||
|
|
|
|||
|
|
@ -420,6 +420,7 @@ public class ShardSyncerTest {
|
|||
cleanupLeasesOfCompletedShards, true, shards);
|
||||
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
||||
Set<String> expectedLeaseShardIds = new HashSet<String>();
|
||||
expectedLeaseShardIds.add("shardId-1000"); // dummy lease will still be in the table.
|
||||
expectedLeaseShardIds.add("shardId-4");
|
||||
expectedLeaseShardIds.add("shardId-5");
|
||||
expectedLeaseShardIds.add("shardId-8");
|
||||
|
|
@ -691,31 +692,6 @@ public class ShardSyncerTest {
|
|||
dataFile.delete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test bootstrapShardLeases() - cleanup garbage leases.
|
||||
*
|
||||
* @throws ProvisionedThroughputException
|
||||
* @throws InvalidStateException
|
||||
* @throws DependencyException
|
||||
* @throws IOException
|
||||
* @throws KinesisClientLibIOException
|
||||
*/
|
||||
@Test
|
||||
public final void testBootstrapShardLeasesCleanupGarbage()
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
|
||||
KinesisClientLibIOException {
|
||||
String garbageShardId = "shardId-garbage-001";
|
||||
KinesisClientLease garbageLease = shardSyncer.newKCLLease(ShardObjectHelper.newShard(garbageShardId,
|
||||
null,
|
||||
null,
|
||||
ShardObjectHelper.newSequenceNumberRange("101", null)));
|
||||
garbageLease.setCheckpoint(new ExtendedSequenceNumber("999"));
|
||||
leaseManager.createLeaseIfNotExists(garbageLease);
|
||||
Assert.assertEquals(garbageShardId, leaseManager.getLease(garbageShardId).getLeaseKey());
|
||||
testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST);
|
||||
Assert.assertNull(leaseManager.getLease(garbageShardId));
|
||||
}
|
||||
|
||||
private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition)
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException,
|
||||
KinesisClientLibIOException {
|
||||
|
|
@ -730,7 +706,7 @@ public class ShardSyncerTest {
|
|||
dataFile.deleteOnExit();
|
||||
IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath());
|
||||
|
||||
shardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards,
|
||||
shardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition,
|
||||
false);
|
||||
List<KinesisClientLease> newLeases = leaseManager.listLeases();
|
||||
Assert.assertEquals(2, newLeases.size());
|
||||
|
|
@ -2267,81 +2243,6 @@ public class ShardSyncerTest {
|
|||
Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test cleanup of lease for a shard that has been fully processed (and processing of child shards has begun).
|
||||
*
|
||||
* @throws DependencyException
|
||||
* @throws InvalidStateException
|
||||
* @throws ProvisionedThroughputException
|
||||
*/
|
||||
@Test
|
||||
public final void testCleanupLeaseForClosedShard()
|
||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||
String closedShardId = "shardId-2";
|
||||
KinesisClientLease leaseForClosedShard = newLease(closedShardId);
|
||||
leaseForClosedShard.setCheckpoint(new ExtendedSequenceNumber("1234"));
|
||||
leaseManager.createLeaseIfNotExists(leaseForClosedShard);
|
||||
|
||||
Set<String> childShardIds = new HashSet<>();
|
||||
List<KinesisClientLease> trackedLeases = new ArrayList<>();
|
||||
Set<String> parentShardIds = new HashSet<>();
|
||||
parentShardIds.add(closedShardId);
|
||||
String childShardId1 = "shardId-5";
|
||||
KinesisClientLease childLease1 = newLease(childShardId1);
|
||||
childLease1.setParentShardIds(parentShardIds);
|
||||
childLease1.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
String childShardId2 = "shardId-7";
|
||||
KinesisClientLease childLease2 = newLease(childShardId2);
|
||||
childLease2.setParentShardIds(parentShardIds);
|
||||
childLease2.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
Map<String, KinesisClientLease> trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
|
||||
|
||||
// empty list of leases
|
||||
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
||||
Assert.assertNotNull(leaseManager.getLease(closedShardId));
|
||||
|
||||
// closed shard has not been fully processed yet (checkpoint != SHARD_END)
|
||||
trackedLeases.add(leaseForClosedShard);
|
||||
trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
|
||||
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
||||
Assert.assertNotNull(leaseManager.getLease(closedShardId));
|
||||
|
||||
// closed shard has been fully processed yet (checkpoint == SHARD_END)
|
||||
leaseForClosedShard.setCheckpoint(ExtendedSequenceNumber.SHARD_END);
|
||||
leaseManager.updateLease(leaseForClosedShard);
|
||||
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
||||
Assert.assertNull(leaseManager.getLease(closedShardId));
|
||||
|
||||
// lease for only one child exists
|
||||
childShardIds.add(childShardId1);
|
||||
childShardIds.add(childShardId2);
|
||||
leaseManager.createLeaseIfNotExists(leaseForClosedShard);
|
||||
leaseManager.createLeaseIfNotExists(childLease1);
|
||||
trackedLeases.add(childLease1);
|
||||
trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
|
||||
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
||||
Assert.assertNotNull(leaseManager.getLease(closedShardId));
|
||||
|
||||
// leases for both children exists, but they are both at TRIM_HORIZON
|
||||
leaseManager.createLeaseIfNotExists(childLease2);
|
||||
trackedLeases.add(childLease2);
|
||||
trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases);
|
||||
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
||||
Assert.assertNotNull(leaseManager.getLease(closedShardId));
|
||||
|
||||
// leases for both children exists, one is at TRIM_HORIZON
|
||||
childLease1.setCheckpoint(new ExtendedSequenceNumber("34890"));
|
||||
leaseManager.updateLease(childLease1);
|
||||
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
||||
Assert.assertNotNull(leaseManager.getLease(closedShardId));
|
||||
|
||||
// leases for both children exists, NONE of them are at TRIM_HORIZON
|
||||
childLease2.setCheckpoint(new ExtendedSequenceNumber("43789"));
|
||||
leaseManager.updateLease(childLease2);
|
||||
shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager);
|
||||
Assert.assertNull(leaseManager.getLease(closedShardId));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test we can handle trimmed Kinesis shards (absent from the shard list), and valid closed shards.
|
||||
*
|
||||
|
|
|
|||
|
|
@ -40,6 +40,7 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingSha
|
|||
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
|
||||
import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException;
|
||||
import com.amazonaws.services.kinesis.leases.impl.UpdateField;
|
||||
import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.services.kinesis.model.HashKeyRange;
|
||||
import com.amazonaws.services.kinesis.model.SequenceNumberRange;
|
||||
|
|
@ -94,6 +95,8 @@ public class ShutdownTaskTest {
|
|||
private KinesisClientLibLeaseCoordinator leaseCoordinator;
|
||||
@Mock
|
||||
private IRecordProcessor defaultRecordProcessor;
|
||||
@Mock
|
||||
private LeaseCleanupManager leaseCleanupManager;
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
|
|
@ -150,7 +153,8 @@ public class ShutdownTaskTest {
|
|||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy,
|
||||
constructSplitChildShards());
|
||||
constructSplitChildShards(),
|
||||
leaseCleanupManager);
|
||||
TaskResult result = task.call();
|
||||
assertNotNull(result.getException());
|
||||
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
|
||||
|
|
@ -185,7 +189,8 @@ public class ShutdownTaskTest {
|
|||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy,
|
||||
constructSplitChildShards());
|
||||
constructSplitChildShards(),
|
||||
leaseCleanupManager);
|
||||
TaskResult result = task.call();
|
||||
verify(getRecordsCache).shutdown();
|
||||
verify(leaseCoordinator).dropLease(any(KinesisClientLease.class));
|
||||
|
|
@ -221,7 +226,8 @@ public class ShutdownTaskTest {
|
|||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy,
|
||||
constructMergeChildShards()));
|
||||
constructMergeChildShards(),
|
||||
leaseCleanupManager));
|
||||
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
|
||||
TaskResult result = task.call();
|
||||
assertNotNull(result.getException());
|
||||
|
|
@ -246,7 +252,8 @@ public class ShutdownTaskTest {
|
|||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy,
|
||||
constructMergeChildShards()));
|
||||
constructMergeChildShards(),
|
||||
leaseCleanupManager));
|
||||
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
|
||||
TaskResult result = task.call();
|
||||
assertNull(result.getException());
|
||||
|
|
@ -283,7 +290,8 @@ public class ShutdownTaskTest {
|
|||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy,
|
||||
constructMergeChildShards()));
|
||||
constructMergeChildShards(),
|
||||
leaseCleanupManager));
|
||||
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
|
||||
TaskResult result = task.call();
|
||||
assertNotNull(result.getException());
|
||||
|
|
@ -307,7 +315,8 @@ public class ShutdownTaskTest {
|
|||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy,
|
||||
constructMergeChildShards()));
|
||||
constructMergeChildShards(),
|
||||
leaseCleanupManager));
|
||||
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true);
|
||||
TaskResult result = task.call();
|
||||
assertNull(result.getException());
|
||||
|
|
@ -337,7 +346,8 @@ public class ShutdownTaskTest {
|
|||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy,
|
||||
constructSplitChildShards());
|
||||
constructSplitChildShards(),
|
||||
leaseCleanupManager);
|
||||
TaskResult result = task.call();
|
||||
verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class));
|
||||
verify(leaseManager).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
|
||||
|
|
@ -370,7 +380,8 @@ public class ShutdownTaskTest {
|
|||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy,
|
||||
Collections.emptyList());
|
||||
Collections.emptyList(),
|
||||
leaseCleanupManager);
|
||||
TaskResult result = task.call();
|
||||
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
|
||||
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
|
||||
|
|
@ -399,7 +410,8 @@ public class ShutdownTaskTest {
|
|||
getRecordsCache,
|
||||
shardSyncer,
|
||||
shardSyncStrategy,
|
||||
Collections.emptyList());
|
||||
Collections.emptyList(),
|
||||
leaseCleanupManager);
|
||||
TaskResult result = task.call();
|
||||
verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class));
|
||||
verify(leaseManager, never()).updateLeaseWithMetaInfo(any(KinesisClientLease.class), any(UpdateField.class));
|
||||
|
|
@ -416,7 +428,7 @@ public class ShutdownTaskTest {
|
|||
ShutdownTask task = new ShutdownTask(null, null, null, null,
|
||||
null, null, false,
|
||||
false, leaseCoordinator, 0,
|
||||
getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList());
|
||||
getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager);
|
||||
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,289 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.amazonaws.services.kinesis.leases.impl;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardObjectHelper;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
import com.amazonaws.services.kinesis.leases.LeasePendingDeletion;
|
||||
import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
|
||||
import com.amazonaws.services.kinesis.model.ChildShard;
|
||||
import com.amazonaws.services.kinesis.model.GetRecordsResult;
|
||||
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyInt;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class LeaseCleanupManagerTest {
|
||||
|
||||
private ShardInfo shardInfo;
|
||||
private String concurrencyToken = "1234";
|
||||
private int maxRecords = 1;
|
||||
|
||||
private String getShardId = "getShardId";
|
||||
private String splitParent = "splitParent";
|
||||
private String mergeParent1 = "mergeParent-1";
|
||||
private String mergeParent2 = "mergeParent-2";
|
||||
|
||||
private long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis();
|
||||
private long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
|
||||
private long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis();
|
||||
private boolean cleanupLeasesOfCompletedShards = true;
|
||||
private LeaseCleanupManager leaseCleanupManager;
|
||||
private static final IMetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
|
||||
|
||||
@Mock
|
||||
private LeaseManager leaseManager;
|
||||
@Mock
|
||||
private LeaseCoordinator leaseCoordinator;
|
||||
@Mock
|
||||
private IKinesisProxy kinesis;
|
||||
@Mock
|
||||
private ScheduledExecutorService deletionThreadPool;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
shardInfo = new ShardInfo(getShardId, concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
leaseCleanupManager = new LeaseCleanupManager(kinesis, leaseManager, deletionThreadPool, NULL_METRICS_FACTORY,
|
||||
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
|
||||
* shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testParentShardLeaseDeletedSplitCase() throws Exception {
|
||||
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that when both child shard leases are present, we are able to delete the parent shard for the completed
|
||||
* shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testParentShardLeaseDeletedMergeCase() throws Exception {
|
||||
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if cleanupLeasesOfCompletedShards is not enabled by the customer, then no leases are cleaned up for
|
||||
* the completed shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testNoLeasesDeletedWhenNotEnabled() throws Exception {
|
||||
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
cleanupLeasesOfCompletedShards = false;
|
||||
|
||||
leaseCleanupManager = new LeaseCleanupManager(kinesis, leaseManager, deletionThreadPool, NULL_METRICS_FACTORY,
|
||||
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if some of the child shard leases are missing, we fail fast and don't delete the parent shard lease
|
||||
* for the completed shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception {
|
||||
List<ChildShard> childShards = childShardsForSplit();
|
||||
|
||||
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, ExtendedSequenceNumber.LATEST, false, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint TRIM_HORIZON),
|
||||
* we don't delete them for the completed shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testParentShardLeaseNotDeletedWhenChildIsAtTrim() throws Exception {
|
||||
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.TRIM_HORIZON);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint AT_TIMESTAMP),
|
||||
* we don't delete them for the completed shard case.
|
||||
*/
|
||||
@Test
|
||||
public final void testParentShardLeaseNotDeletedWhenChildIsAtTimestamp() throws Exception {
|
||||
testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP);
|
||||
}
|
||||
|
||||
private void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber)
|
||||
throws Exception {
|
||||
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), extendedSequenceNumber, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that if a lease's parents are still present, we do not delete the lease.
|
||||
*/
|
||||
@Test
|
||||
public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception {
|
||||
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.singleton("parent"),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found.
|
||||
*/
|
||||
@Test
|
||||
public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception {
|
||||
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
final KinesisClientLease heldLease = LeaseHelper.createLease(shardInfo.getShardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
||||
|
||||
testLeaseDeletedWhenShardDoesNotExist(heldLease);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests ResourceNotFound case when completed lease cleanup is disabled.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception {
|
||||
shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(),
|
||||
ExtendedSequenceNumber.LATEST);
|
||||
final KinesisClientLease heldLease = LeaseHelper.createLease(shardInfo.getShardId(), "leaseOwner", Collections.singleton("parentShardId"));
|
||||
|
||||
cleanupLeasesOfCompletedShards = false;
|
||||
|
||||
leaseCleanupManager = new LeaseCleanupManager(kinesis, leaseManager, deletionThreadPool, NULL_METRICS_FACTORY,
|
||||
cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords);
|
||||
|
||||
testLeaseDeletedWhenShardDoesNotExist(heldLease);
|
||||
}
|
||||
|
||||
public void testLeaseDeletedWhenShardDoesNotExist(KinesisClientLease heldLease) throws Exception {
|
||||
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(heldLease);
|
||||
when(kinesis.get(anyString(), anyInt())).thenThrow(ResourceNotFoundException.class);
|
||||
when(kinesis.getIterator(anyString(), anyString())).thenThrow(ResourceNotFoundException.class);
|
||||
when(leaseManager.getLease(heldLease.getLeaseKey())).thenReturn(heldLease);
|
||||
|
||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(heldLease, shardInfo));
|
||||
leaseCleanupManager.cleanupLeases();
|
||||
|
||||
verify(leaseManager, times(1)).deleteLease(heldLease);
|
||||
}
|
||||
|
||||
private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
||||
ExtendedSequenceNumber extendedSequenceNumber,
|
||||
int expectedDeletedLeases) throws Exception {
|
||||
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases);
|
||||
}
|
||||
|
||||
private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List<ChildShard> childShards,
|
||||
ExtendedSequenceNumber extendedSequenceNumber,
|
||||
boolean childShardLeasesPresent,
|
||||
int expectedDeletedLeases) throws Exception {
|
||||
|
||||
final KinesisClientLease lease = LeaseHelper.createLease(shardInfo.getShardId(), "leaseOwner", shardInfo.getParentShardIds(),
|
||||
childShards.stream().map(c -> c.getShardId()).collect(Collectors.toSet()));
|
||||
final List<KinesisClientLease> childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease(
|
||||
c.getShardId(), "leaseOwner", Collections.singleton(shardInfo.getShardId()),
|
||||
Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList());
|
||||
|
||||
final List<KinesisClientLease> parentShardLeases = lease.getParentShardIds().stream().map(p ->
|
||||
LeaseHelper.createLease(p, "leaseOwner", Collections.emptyList(),
|
||||
Collections.singleton(shardInfo.getShardId()), extendedSequenceNumber)).collect(Collectors.toList());
|
||||
|
||||
when(leaseManager.getLease(lease.getLeaseKey())).thenReturn(lease);
|
||||
for (Lease parentShardLease : parentShardLeases) {
|
||||
when(leaseManager.getLease(parentShardLease.getLeaseKey())).thenReturn(parentShardLease);
|
||||
}
|
||||
if (childShardLeasesPresent) {
|
||||
for (Lease childShardLease : childShardLeases) {
|
||||
when(leaseManager.getLease(childShardLease.getLeaseKey())).thenReturn(childShardLease);
|
||||
}
|
||||
}
|
||||
|
||||
when(kinesis.getIterator(any(String.class), any(String.class))).thenReturn("123");
|
||||
|
||||
final GetRecordsResult getRecordsResult = new GetRecordsResult();
|
||||
getRecordsResult.setRecords(Collections.emptyList());
|
||||
getRecordsResult.setChildShards(childShards);
|
||||
|
||||
when(kinesis.get(any(String.class), any(Integer.class))).thenReturn(getRecordsResult);
|
||||
|
||||
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(lease, shardInfo));
|
||||
leaseCleanupManager.cleanupLeases();
|
||||
|
||||
verify(leaseManager, times(expectedDeletedLeases)).deleteLease(any(Lease.class));
|
||||
}
|
||||
|
||||
private List<ChildShard> childShardsForSplit() {
|
||||
final List<String> parentShards = Arrays.asList(splitParent);
|
||||
|
||||
final ChildShard leftChild = new ChildShard();
|
||||
leftChild.setShardId("leftChild");
|
||||
leftChild.setParentShards(parentShards);
|
||||
leftChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"));
|
||||
|
||||
final ChildShard rightChild = new ChildShard();
|
||||
rightChild.setShardId("rightChild");
|
||||
rightChild.setParentShards(parentShards);
|
||||
rightChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"));
|
||||
|
||||
return Arrays.asList(leftChild, rightChild);
|
||||
}
|
||||
|
||||
private List<ChildShard> childShardsForMerge() {
|
||||
final List<String> parentShards = Arrays.asList(mergeParent1, mergeParent2);
|
||||
|
||||
final ChildShard child = new ChildShard();
|
||||
child.setShardId("onlyChild");
|
||||
child.setParentShards(parentShards);
|
||||
child.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99"));
|
||||
|
||||
return Collections.singletonList(child);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2020 Amazon.com, Inc. or its affiliates.
|
||||
* Licensed under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package com.amazonaws.services.kinesis.leases.impl;
|
||||
|
||||
import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
||||
public class LeaseHelper {
|
||||
|
||||
public static KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds) {
|
||||
return createLease(leaseKey, leaseOwner, parentShardIds, Collections.emptySet(), ExtendedSequenceNumber.LATEST);
|
||||
}
|
||||
|
||||
public static KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds, Collection<String> childShardIds) {
|
||||
return createLease(leaseKey, leaseOwner, parentShardIds, childShardIds, ExtendedSequenceNumber.LATEST);
|
||||
}
|
||||
|
||||
public static KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection<String> parentShardIds,
|
||||
Collection<String> childShardIds, ExtendedSequenceNumber extendedSequenceNumber) {
|
||||
KinesisClientLease lease = new KinesisClientLease ();
|
||||
lease.setLeaseKey(leaseKey);
|
||||
lease.setLeaseOwner(leaseOwner);
|
||||
lease.setParentShardIds(parentShardIds);
|
||||
lease.setChildShardIds(childShardIds);
|
||||
lease.setCheckpoint(extendedSequenceNumber);
|
||||
|
||||
return lease;
|
||||
}
|
||||
}
|
||||
Loading…
Reference in a new issue