Merge branch 'master' into graceful-shutdown

This commit is contained in:
Pfifer, Justin 2016-10-24 07:10:19 -07:00
commit 0d1279829e
17 changed files with 552 additions and 56 deletions

View file

@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.config;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.lang.reflect.Constructor;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -30,6 +31,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
private static final Log LOG = LogFactory.getLog(AWSCredentialsProviderPropertyValueDecoder.class); private static final Log LOG = LogFactory.getLog(AWSCredentialsProviderPropertyValueDecoder.class);
private static final String AUTH_PREFIX = "com.amazonaws.auth."; private static final String AUTH_PREFIX = "com.amazonaws.auth.";
private static final String LIST_DELIMITER = ","; private static final String LIST_DELIMITER = ",";
private static final String ARG_DELIMITER = "|";
/** /**
* Constructor. * Constructor.
@ -70,11 +72,25 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode
private static List<AWSCredentialsProvider> getValidCredentialsProviders(List<String> providerNames) { private static List<AWSCredentialsProvider> getValidCredentialsProviders(List<String> providerNames) {
List<AWSCredentialsProvider> credentialsProviders = new ArrayList<AWSCredentialsProvider>(); List<AWSCredentialsProvider> credentialsProviders = new ArrayList<AWSCredentialsProvider>();
for (String providerName : providerNames) { for (String providerName : providerNames) {
try { if (providerName.contains(ARG_DELIMITER)) {
Class<?> className = Class.forName(providerName); String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER);
credentialsProviders.add((AWSCredentialsProvider) className.newInstance()); Class<?>[] argTypes = new Class<?>[nameAndArgs.length - 1];
} catch (Exception e) { Arrays.fill(argTypes, String.class);
LOG.debug("Can't find any credentials provider matching " + providerName + "."); try {
Class<?> className = Class.forName(nameAndArgs[0]);
Constructor<?> c = className.getConstructor(argTypes);
credentialsProviders.add((AWSCredentialsProvider) c.newInstance(
Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length)));
} catch (Exception e) {
LOG.debug("Can't find any credentials provider matching " + providerName + ".");
}
} else {
try {
Class<?> className = Class.forName(providerName);
credentialsProviders.add((AWSCredentialsProvider) className.newInstance());
} catch (Exception e) {
LOG.debug("Can't find any credentials provider matching " + providerName + ".");
}
} }
} }
return credentialsProviders; return credentialsProviders;

View file

@ -50,7 +50,9 @@ public class KinesisClientLibConfigurator {
// Required properties // Required properties
private static final String PROP_APP_NAME = "applicationName"; private static final String PROP_APP_NAME = "applicationName";
private static final String PROP_STREAM_NAME = "streamName"; private static final String PROP_STREAM_NAME = "streamName";
private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider"; private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider";
private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB";
private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch";
private static final String PROP_WORKER_ID = "workerId"; private static final String PROP_WORKER_ID = "workerId";
private Map<Class<?>, IPropertyValueDecoder<?>> classToDecoder; private Map<Class<?>, IPropertyValueDecoder<?>> classToDecoder;
@ -107,7 +109,7 @@ public class KinesisClientLibConfigurator {
String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME)); String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME));
String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME)); String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME));
AWSCredentialsProvider provider = AWSCredentialsProvider provider =
awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER)); awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS));
if (applicationName == null || applicationName.isEmpty()) { if (applicationName == null || applicationName.isEmpty()) {
throw new IllegalArgumentException("Value of applicationName should be explicitly provided."); throw new IllegalArgumentException("Value of applicationName should be explicitly provided.");
@ -116,6 +118,24 @@ public class KinesisClientLibConfigurator {
throw new IllegalArgumentException("Value of streamName should be explicitly provided."); throw new IllegalArgumentException("Value of streamName should be explicitly provided.");
} }
// Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider.
AWSCredentialsProvider providerDynamoDB;
String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB);
if (propCredentialsProviderDynamoDBValue == null) {
providerDynamoDB = provider;
} else {
providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue);
}
// Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider.
AWSCredentialsProvider providerCloudWatch;
String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH);
if (propCredentialsProviderCloudWatchValue == null) {
providerCloudWatch = provider;
} else {
providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue);
}
// Allow customer not to provide workerId or to provide empty worker id. // Allow customer not to provide workerId or to provide empty worker id.
String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID)); String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID));
if (workerId == null || workerId.isEmpty()) { if (workerId == null || workerId.isEmpty()) {
@ -125,13 +145,13 @@ public class KinesisClientLibConfigurator {
} }
KinesisClientLibConfiguration config = KinesisClientLibConfiguration config =
new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId); new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId);
Set<String> requiredNames = Set<String> requiredNames =
new HashSet<String>(Arrays.asList(PROP_STREAM_NAME, new HashSet<String>(Arrays.asList(PROP_STREAM_NAME,
PROP_APP_NAME, PROP_APP_NAME,
PROP_WORKER_ID, PROP_WORKER_ID,
PROP_CREDENTIALS_PROVIDER)); PROP_CREDENTIALS_PROVIDER_KINESIS));
// Set all the variables that are not used for constructor. // Set all the variables that are not used for constructor.
for (Object keyObject : properties.keySet()) { for (Object keyObject : properties.keySet()) {

View file

@ -295,7 +295,7 @@ class ConsumerStates {
public ITask createTask(ShardConsumer consumer) { public ITask createTask(ShardConsumer consumer) {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis()); consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist());
} }
@Override @Override

View file

@ -154,6 +154,13 @@ public class KinesisClientLibConfiguration {
*/ */
public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10;
/*
* The Worker will skip shard sync during initialization if there are one or more leases in the lease table.
* This assumes that the shards and leases are in-sync.
* This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
*/
public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false;
/** /**
* Default Shard prioritization strategy. * Default Shard prioritization strategy.
*/ */
@ -163,6 +170,7 @@ public class KinesisClientLibConfiguration {
private String tableName; private String tableName;
private String streamName; private String streamName;
private String kinesisEndpoint; private String kinesisEndpoint;
private String dynamoDBEndpoint;
private InitialPositionInStream initialPositionInStream; private InitialPositionInStream initialPositionInStream;
private AWSCredentialsProvider kinesisCredentialsProvider; private AWSCredentialsProvider kinesisCredentialsProvider;
private AWSCredentialsProvider dynamoDBCredentialsProvider; private AWSCredentialsProvider dynamoDBCredentialsProvider;
@ -191,6 +199,8 @@ public class KinesisClientLibConfiguration {
private int initialLeaseTableReadCapacity; private int initialLeaseTableReadCapacity;
private int initialLeaseTableWriteCapacity; private int initialLeaseTableWriteCapacity;
private InitialPositionInStreamExtended initialPositionInStreamExtended; private InitialPositionInStreamExtended initialPositionInStreamExtended;
// This is useful for optimizing deployments to large fleets working on a stable stream.
private boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ShardPrioritization shardPrioritization; private ShardPrioritization shardPrioritization;
/** /**
@ -228,7 +238,7 @@ public class KinesisClientLibConfiguration {
AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider,
String workerId) { String workerId) {
this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId,
DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS,
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
@ -296,6 +306,76 @@ public class KinesisClientLibConfiguration {
int metricsMaxQueueSize, int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing, boolean validateSequenceNumberBeforeCheckpointing,
String regionName) { String regionName) {
this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis,
callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis,
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName);
}
/**
* @param applicationName Name of the Kinesis application
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream
* @param kinesisEndpoint Kinesis endpoint
* @param dynamoDBEndpoint DynamoDB endpoint
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
* records from that location in the stream when an application starts up for the first time and there
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
* @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch
* @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others)
* @param workerId Used to distinguish different workers/processes of a Kinesis application
* @param maxRecords Max records to read per Kinesis getRecords() call
* @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
* GetRecords returned an empty record list.
* @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done
* @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards
* @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration
* in Kinesis)
* @param kinesisClientConfig Client Configuration used by Kinesis client
* @param dynamoDBClientConfig Client Configuration used by DynamoDB client
* @param cloudWatchClientConfig Client Configuration used by CloudWatch client
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
String dynamoDBEndpoint,
InitialPositionInStream initialPositionInStream,
AWSCredentialsProvider kinesisCredentialsProvider,
AWSCredentialsProvider dynamoDBCredentialsProvider,
AWSCredentialsProvider cloudWatchCredentialsProvider,
long failoverTimeMillis,
String workerId,
int maxRecords,
long idleTimeBetweenReadsInMillis,
boolean callProcessRecordsEvenForEmptyRecordList,
long parentShardPollIntervalMillis,
long shardSyncIntervalMillis,
boolean cleanupTerminatedShardsBeforeExpiry,
ClientConfiguration kinesisClientConfig,
ClientConfiguration dynamoDBClientConfig,
ClientConfiguration cloudWatchClientConfig,
long taskBackoffTimeMillis,
long metricsBufferTimeMillis,
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName) {
// Check following values are greater than zero // Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -310,6 +390,7 @@ public class KinesisClientLibConfiguration {
this.tableName = applicationName; this.tableName = applicationName;
this.streamName = streamName; this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint; this.kinesisEndpoint = kinesisEndpoint;
this.dynamoDBEndpoint = dynamoDBEndpoint;
this.initialPositionInStream = initialPositionInStream; this.initialPositionInStream = initialPositionInStream;
this.kinesisCredentialsProvider = kinesisCredentialsProvider; this.kinesisCredentialsProvider = kinesisCredentialsProvider;
this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider;
@ -338,6 +419,7 @@ public class KinesisClientLibConfiguration {
this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY;
this.initialPositionInStreamExtended = this.initialPositionInStreamExtended =
InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream);
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
} }
@ -468,6 +550,13 @@ public class KinesisClientLibConfiguration {
return kinesisEndpoint; return kinesisEndpoint;
} }
/**
* @return DynamoDB endpoint
*/
public String getDynamoDBEndpoint() {
return dynamoDBEndpoint;
}
/** /**
* @return the initialPositionInStream * @return the initialPositionInStream
*/ */
@ -561,6 +650,13 @@ public class KinesisClientLibConfiguration {
return regionName; return regionName;
} }
/**
* @return true if Worker should skip syncing shards and leases at startup if leases are present
*/
public boolean getSkipShardSyncAtWorkerInitializationIfLeasesExist() {
return skipShardSyncAtWorkerInitializationIfLeasesExist;
}
/** /**
* @return Max leases this Worker can handle at a time * @return Max leases this Worker can handle at a time
*/ */
@ -631,6 +727,15 @@ public class KinesisClientLibConfiguration {
return this; return this;
} }
/**
* @param dynamoDBEndpoint DynamoDB endpoint
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withDynamoDBEndpoint(String dynamoDBEndpoint) {
this.dynamoDBEndpoint = dynamoDBEndpoint;
return this;
}
/** /**
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library
* will start fetching records from this position when the application starts up if there are no checkpoints. * will start fetching records from this position when the application starts up if there are no checkpoints.
@ -862,6 +967,21 @@ public class KinesisClientLibConfiguration {
return this; return this;
} }
/**
* If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases
* in the lease table. This assumes that the shards and leases are in-sync.
* This enables customers to choose faster startup times (e.g. during incremental deployments of an application).
*
* @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker
* initialization).
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withSkipShardSyncAtStartupIfLeasesExist(
boolean skipShardSyncAtStartupIfLeasesExist) {
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtStartupIfLeasesExist;
return this;
}
/** /**
* *
* @param regionName The region name for the service * @param regionName The region name for the service

View file

@ -73,7 +73,8 @@ class ProcessTask implements ITask {
IRecordProcessor recordProcessor, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, RecordProcessorCheckpointer recordProcessorCheckpointer,
KinesisDataFetcher dataFetcher, KinesisDataFetcher dataFetcher,
long backoffTimeMillis) { long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
super(); super();
this.shardInfo = shardInfo; this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
@ -82,13 +83,19 @@ class ProcessTask implements ITask {
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.backoffTimeMillis = backoffTimeMillis; this.backoffTimeMillis = backoffTimeMillis;
IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy();
if (kinesisProxy instanceof IKinesisProxyExtended) { // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for
// this ProcessTask. In this case, duplicate KPL user records in the event of resharding will
// not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if
// KPL is used for ingestion and KPL's aggregation feature is used.
if (!skipShardSyncAtWorkerInitializationIfLeasesExist && kinesisProxy instanceof IKinesisProxyExtended) {
this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId()); this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId());
} else { } else {
this.shard = null;
}
if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) {
LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records " LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records "
+ "in the event of resharding will not be dropped during deaggregation of Amazon " + "in the event of resharding will not be dropped during deaggregation of Amazon "
+ "Kinesis records."); + "Kinesis records.");
this.shard = null;
} }
} }

View file

@ -52,6 +52,7 @@ class ShardConsumer {
private final long parentShardPollIntervalMillis; private final long parentShardPollIntervalMillis;
private final boolean cleanupLeasesOfCompletedShards; private final boolean cleanupLeasesOfCompletedShards;
private final long taskBackoffTimeMillis; private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
private ITask currentTask; private ITask currentTask;
private long currentTaskSubmitTime; private long currentTaskSubmitTime;
@ -90,7 +91,8 @@ class ShardConsumer {
boolean cleanupLeasesOfCompletedShards, boolean cleanupLeasesOfCompletedShards,
ExecutorService executorService, ExecutorService executorService,
IMetricsFactory metricsFactory, IMetricsFactory metricsFactory,
long backoffTimeMillis) { long backoffTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this.streamConfig = streamConfig; this.streamConfig = streamConfig;
this.recordProcessor = recordProcessor; this.recordProcessor = recordProcessor;
this.executorService = executorService; this.executorService = executorService;
@ -108,6 +110,7 @@ class ShardConsumer {
this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.parentShardPollIntervalMillis = parentShardPollIntervalMillis;
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.taskBackoffTimeMillis = backoffTimeMillis; this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
} }
/** /**
@ -165,6 +168,10 @@ class ShardConsumer {
return submittedNewTask; return submittedNewTask;
} }
public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() {
return skipShardSyncAtWorkerInitializationIfLeasesExist;
}
private enum TaskOutcome { private enum TaskOutcome {
SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE
} }

View file

@ -100,6 +100,8 @@ public class Worker implements Runnable {
new ConcurrentHashMap<ShardInfo, ShardConsumer>(); new ConcurrentHashMap<ShardInfo, ShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
/** /**
* Constructor. * Constructor.
* *
@ -243,7 +245,9 @@ public class Worker implements Runnable {
metricsFactory, metricsFactory,
config.getTaskBackoffTimeMillis(), config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(), config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
config.getShardPrioritizationStrategy()); config.getShardPrioritizationStrategy());
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) { if (config.getRegionName() != null) {
Region region = RegionUtils.getRegion(config.getRegionName()); Region region = RegionUtils.getRegion(config.getRegionName());
@ -252,6 +256,11 @@ public class Worker implements Runnable {
dynamoDBClient.setRegion(region); dynamoDBClient.setRegion(region);
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
} }
// If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint.
if (config.getDynamoDBEndpoint() != null) {
dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint());
LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint());
}
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) { if (config.getKinesisEndpoint() != null) {
kinesisClient.setEndpoint(config.getKinesisEndpoint()); kinesisClient.setEndpoint(config.getKinesisEndpoint());
@ -300,6 +309,7 @@ public class Worker implements Runnable {
IMetricsFactory metricsFactory, IMetricsFactory metricsFactory,
long taskBackoffTimeMillis, long taskBackoffTimeMillis,
long failoverTimeMillis, long failoverTimeMillis,
boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
ShardPrioritization shardPrioritization) { ShardPrioritization shardPrioritization) {
this.applicationName = applicationName; this.applicationName = applicationName;
this.recordProcessorFactory = recordProcessorFactory; this.recordProcessorFactory = recordProcessorFactory;
@ -322,6 +332,7 @@ public class Worker implements Runnable {
executorService); executorService);
this.taskBackoffTimeMillis = taskBackoffTimeMillis; this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.failoverTimeMillis = failoverTimeMillis; this.failoverTimeMillis = failoverTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.shardPrioritization = shardPrioritization; this.shardPrioritization = shardPrioritization;
} }
@ -403,16 +414,22 @@ public class Worker implements Runnable {
LOG.info("Initializing LeaseCoordinator"); LOG.info("Initializing LeaseCoordinator");
leaseCoordinator.initialize(); leaseCoordinator.initialize();
LOG.info("Syncing Kinesis shard info"); TaskResult result = null;
ShardSyncTask shardSyncTask = if (!skipShardSyncAtWorkerInitializationIfLeasesExist
new ShardSyncTask(streamConfig.getStreamProxy(), || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) {
leaseCoordinator.getLeaseManager(), LOG.info("Syncing Kinesis shard info");
initialPosition, ShardSyncTask shardSyncTask =
cleanupLeasesUponShardCompletion, new ShardSyncTask(streamConfig.getStreamProxy(),
0L); leaseCoordinator.getLeaseManager(),
TaskResult result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); initialPosition,
cleanupLeasesUponShardCompletion,
0L);
result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call();
} else {
LOG.info("Skipping shard sync per config setting (and lease table is not empty)");
}
if (result.getException() == null) { if (result == null || result.getException() == null) {
if (!leaseCoordinator.isRunning()) { if (!leaseCoordinator.isRunning()) {
LOG.info("Starting LeaseCoordinator"); LOG.info("Starting LeaseCoordinator");
leaseCoordinator.start(); leaseCoordinator.start();
@ -654,7 +671,7 @@ public class Worker implements Runnable {
return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor,
leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion,
executorService, metricsFactory, taskBackoffTimeMillis); executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist);
} }
@ -1079,7 +1096,9 @@ public class Worker implements Runnable {
metricsFactory, metricsFactory,
config.getTaskBackoffTimeMillis(), config.getTaskBackoffTimeMillis(),
config.getFailoverTimeMillis(), config.getFailoverTimeMillis(),
config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(),
shardPrioritization); shardPrioritization);
} }
} }

View file

@ -71,6 +71,7 @@ public class LeaseCoordinator<T extends Lease> {
// Package level access for testing. // Package level access for testing.
static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20;
private final ILeaseRenewer<T> leaseRenewer; private final ILeaseRenewer<T> leaseRenewer;
private final ILeaseTaker<T> leaseTaker; private final ILeaseTaker<T> leaseTaker;
private final long renewerIntervalMillis; private final long renewerIntervalMillis;

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -214,6 +214,14 @@ public class LeaseManager<T extends Lease> implements ILeaseManager<T> {
return list(null); return list(null);
} }
/**
* {@inheritDoc}
*/
@Override
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
return list(1).isEmpty();
}
/** /**
* List with the given page size. Package access for integration testing. * List with the given page size. Package access for integration testing.
* *

View file

@ -1,5 +1,5 @@
/* /*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* *
* Licensed under the Amazon Software License (the "License"). * Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License. * You may not use this file except in compliance with the License.
@ -180,4 +180,15 @@ public interface ILeaseManager<T extends Lease> {
public boolean updateLease(T lease) public boolean updateLease(T lease)
throws DependencyException, InvalidStateException, ProvisionedThroughputException; throws DependencyException, InvalidStateException, ProvisionedThroughputException;
/**
* Check (synchronously) if there are any leases in the lease table.
*
* @return true if there are no leases in the lease table
*
* @throws DependencyException if DynamoDB scan fails in an unexpected way
* @throws InvalidStateException if lease table does not exist
* @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity
*/
public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException;
} }

View file

@ -0,0 +1,115 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.config;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoder;
public class AWSCredentialsProviderPropertyValueDecoderTest {
private static final String TEST_ACCESS_KEY_ID = "123";
private static final String TEST_SECRET_KEY = "456";
private String credentialName1 =
"com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$AlwaysSucceedCredentialsProvider";
private String credentialName2 =
"com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$ConstructorCredentialsProvider";
private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder();
@Test
public void testSingleProvider() {
AWSCredentialsProvider provider = decoder.decodeValue(credentialName1);
assertEquals(provider.getClass(), AWSCredentialsProviderChain.class);
assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID);
assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY);
}
@Test
public void testTwoProviders() {
AWSCredentialsProvider provider = decoder.decodeValue(credentialName1 + "," + credentialName1);
assertEquals(provider.getClass(), AWSCredentialsProviderChain.class);
assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID);
assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY);
}
@Test
public void testProfileProviderWithOneArg() {
AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 + "|arg");
assertEquals(provider.getClass(), AWSCredentialsProviderChain.class);
assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg");
assertEquals(provider.getCredentials().getAWSSecretKey(), "blank");
}
@Test
public void testProfileProviderWithTwoArgs() {
AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 +
"|arg1|arg2");
assertEquals(provider.getClass(), AWSCredentialsProviderChain.class);
assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg1");
assertEquals(provider.getCredentials().getAWSSecretKey(), "arg2");
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProvider implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(TEST_ACCESS_KEY_ID, TEST_SECRET_KEY);
}
@Override
public void refresh() {
}
}
/**
* This credentials provider needs a constructor call to instantiate it
*/
public static class ConstructorCredentialsProvider implements AWSCredentialsProvider {
private String arg1;
private String arg2;
public ConstructorCredentialsProvider(String arg1) {
this.arg1 = arg1;
this.arg2 = "blank";
}
public ConstructorCredentialsProvider(String arg1, String arg2) {
this.arg1 = arg1;
this.arg2 = arg2;
}
@Override
public AWSCredentials getCredentials() {
return new BasicAWSCredentials(arg1, arg2);
}
@Override
public void refresh() {
}
}
}

View file

@ -40,6 +40,12 @@ public class KinesisClientLibConfiguratorTest {
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider"; "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider";
private String credentialName2 = private String credentialName2 =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider"; "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider";
private String credentialNameKinesis =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis";
private String credentialNameDynamoDB =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB";
private String credentialNameCloudWatch =
"com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch";
private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator(); private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator();
@Test @Test
@ -329,6 +335,74 @@ public class KinesisClientLibConfiguratorTest {
} }
} }
@Test
public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatch() {
String test = StringUtils.join(new String[] {
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = " + credentialNameKinesis,
"AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB,
"AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch,
"failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500"
}, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
KinesisClientLibConfiguration config = configurator.getConfiguration(input);
try {
config.getKinesisCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("Kinesis credential providers should not fail.");
}
try {
config.getDynamoDBCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("DynamoDB credential providers should not fail.");
}
try {
config.getCloudWatchCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("CloudWatch credential providers should not fail.");
}
}
@Test
public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatchFailed() {
String test = StringUtils.join(new String[] {
"streamName = a",
"applicationName = b",
"AWSCredentialsProvider = " + credentialNameKinesis,
"AWSCredentialsProviderDynamoDB = " + credentialName1,
"AWSCredentialsProviderCloudWatch = " + credentialName1,
"failoverTimeMillis = 100",
"shardSyncIntervalMillis = 500"
}, '\n');
InputStream input = new ByteArrayInputStream(test.getBytes());
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
// separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement
KinesisClientLibConfiguration config = configurator.getConfiguration(input);
try {
config.getKinesisCredentialsProvider().getCredentials();
} catch (Exception e) {
fail("Kinesis credential providers should not fail.");
}
try {
config.getDynamoDBCredentialsProvider().getCredentials();
fail("DynamoDB credential providers should fail.");
} catch (Exception e) {
// succeed
}
try {
config.getCloudWatchCredentialsProvider().getCredentials();
fail("CloudWatch credential providers should fail.");
} catch (Exception e) {
// succeed
}
}
/** /**
* This credentials provider will always succeed * This credentials provider will always succeed
*/ */
@ -345,6 +419,84 @@ public class KinesisClientLibConfiguratorTest {
} }
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderKinesis implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
};
}
@Override
public void refresh() {
}
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderDynamoDB implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
};
}
@Override
public void refresh() {
}
}
/**
* This credentials provider will always succeed
*/
public static class AlwaysSucceedCredentialsProviderCloudWatch implements AWSCredentialsProvider {
@Override
public AWSCredentials getCredentials() {
return new AWSCredentials() {
@Override
public String getAWSAccessKeyId() {
return "";
}
@Override
public String getAWSSecretKey() {
return "";
}
};
}
@Override
public void refresh() {
}
}
/** /**
* This credentials provider will always fail * This credentials provider will always fail
*/ */

View file

@ -212,4 +212,10 @@ class ExceptionThrowingLeaseManager implements ILeaseManager<KinesisClientLease>
leaseManager.deleteAll(); leaseManager.deleteAll();
} }
@Override
public boolean isLeaseTableEmpty() throws DependencyException,
InvalidStateException, ProvisionedThroughputException {
return false;
}
} }

View file

@ -62,6 +62,7 @@ public class KinesisClientLibConfigurationTest {
// Test constructor with all valid arguments. // Test constructor with all valid arguments.
config = config =
new KinesisClientLibConfiguration(TEST_STRING, new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING, TEST_STRING,
TEST_STRING, TEST_STRING,
InitialPositionInStream.LATEST, InitialPositionInStream.LATEST,
@ -99,6 +100,7 @@ public class KinesisClientLibConfigurationTest {
try { try {
config = config =
new KinesisClientLibConfiguration(TEST_STRING, new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING, TEST_STRING,
TEST_STRING, TEST_STRING,
InitialPositionInStream.LATEST, InitialPositionInStream.LATEST,
@ -132,6 +134,7 @@ public class KinesisClientLibConfigurationTest {
try { try {
config = config =
new KinesisClientLibConfiguration(TEST_STRING, new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING, TEST_STRING,
TEST_STRING, TEST_STRING,
InitialPositionInStream.LATEST, InitialPositionInStream.LATEST,
@ -294,6 +297,7 @@ public class KinesisClientLibConfigurationTest {
Mockito.mock(AWSCredentialsProvider.class); Mockito.mock(AWSCredentialsProvider.class);
try { try {
new KinesisClientLibConfiguration(TEST_STRING, new KinesisClientLibConfiguration(TEST_STRING,
TEST_STRING,
TEST_STRING, TEST_STRING,
TEST_STRING, TEST_STRING,
null, null,

View file

@ -89,7 +89,8 @@ public class ProcessTaskTest {
INITIAL_POSITION_LATEST); INITIAL_POSITION_LATEST);
final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null);
processTask = new ProcessTask( processTask = new ProcessTask(
shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis); shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
} }
@Test @Test

View file

@ -127,7 +127,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis); taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
@ -173,7 +174,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
spyExecutorService, spyExecutorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis); taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // initialize consumer.consumeShard(); // initialize
@ -213,7 +215,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis); taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
when(leaseManager.getLease(anyString())).thenReturn(null); when(leaseManager.getLease(anyString())).thenReturn(null);
when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123"));
@ -300,7 +303,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis); taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards
@ -402,7 +406,8 @@ public class ShardConsumerTest {
cleanupLeasesOfCompletedShards, cleanupLeasesOfCompletedShards,
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis); taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS)));
consumer.consumeShard(); // check on parent shards consumer.consumeShard(); // check on parent shards

View file

@ -233,6 +233,7 @@ public class WorkerTest {
nullMetricsFactory, nullMetricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
failoverTimeMillis, failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization); shardPrioritization);
ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory);
@ -272,7 +273,7 @@ public class WorkerTest {
Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint,
leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis,
shardPrioritization); KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization);
Worker workerSpy = spy(worker); Worker workerSpy = spy(worker);
@ -336,6 +337,7 @@ public class WorkerTest {
nullMetricsFactory, nullMetricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
failoverTimeMillis, failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization); shardPrioritization);
ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON);
@ -389,6 +391,7 @@ public class WorkerTest {
nullMetricsFactory, nullMetricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
failoverTimeMillis, failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization); shardPrioritization);
worker.run(); worker.run();
Assert.assertTrue(count > 0); Assert.assertTrue(count > 0);
@ -737,7 +740,7 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization); taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -811,7 +814,7 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization); taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -883,7 +886,7 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig,
INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis,
cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory,
taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization); taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization);
when(executorService.submit(Matchers.<Callable<TaskResult>> any())) when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture)); .thenAnswer(new ShutdownHandlingAnswer(taskFuture));
@ -1166,6 +1169,7 @@ public class WorkerTest {
metricsFactory, metricsFactory,
taskBackoffTimeMillis, taskBackoffTimeMillis,
failoverTimeMillis, failoverTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
shardPrioritization); shardPrioritization);
WorkerThread workerThread = new WorkerThread(worker); WorkerThread workerThread = new WorkerThread(worker);