diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java index 1f508771..9976b071 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoder.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.config; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.lang.reflect.Constructor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -30,6 +31,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static final Log LOG = LogFactory.getLog(AWSCredentialsProviderPropertyValueDecoder.class); private static final String AUTH_PREFIX = "com.amazonaws.auth."; private static final String LIST_DELIMITER = ","; + private static final String ARG_DELIMITER = "|"; /** * Constructor. @@ -39,7 +41,7 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode /** * Get AWSCredentialsProvider property. - * + * * @param value property value as String * @return corresponding variable in correct type */ @@ -70,11 +72,25 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static List getValidCredentialsProviders(List providerNames) { List credentialsProviders = new ArrayList(); for (String providerName : providerNames) { - try { - Class className = Class.forName(providerName); - credentialsProviders.add((AWSCredentialsProvider) className.newInstance()); - } catch (Exception e) { - LOG.debug("Can't find any credentials provider matching " + providerName + "."); + if (providerName.contains(ARG_DELIMITER)) { + String[] nameAndArgs = providerName.split("\\" + ARG_DELIMITER); + Class[] argTypes = new Class[nameAndArgs.length - 1]; + Arrays.fill(argTypes, String.class); + try { + Class className = Class.forName(nameAndArgs[0]); + Constructor c = className.getConstructor(argTypes); + credentialsProviders.add((AWSCredentialsProvider) c.newInstance( + Arrays.copyOfRange(nameAndArgs, 1, nameAndArgs.length))); + } catch (Exception e) { + LOG.debug("Can't find any credentials provider matching " + providerName + "."); + } + } else { + try { + Class className = Class.forName(providerName); + credentialsProviders.add((AWSCredentialsProvider) className.newInstance()); + } catch (Exception e) { + LOG.debug("Can't find any credentials provider matching " + providerName + "."); + } } } return credentialsProviders; @@ -97,13 +113,13 @@ class AWSCredentialsProviderPropertyValueDecoder implements IPropertyValueDecode private static List getPossibleFullClassNames(String s) { /* * We take care of three cases : - * + * * 1. Customer provides a short name of common providers in com.amazonaws.auth package i.e. any classes * implementing the AWSCredentialsProvider interface: * http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/AWSCredentialsProvider.html - * + * * 2. Customer provides a full name of common providers e.g. com.amazonaws.auth.ClasspathFileCredentialsProvider - * + * * 3. Customer provides a custom credentials provider with full name of provider */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java index 027dde22..e239f967 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfigurator.java @@ -50,7 +50,9 @@ public class KinesisClientLibConfigurator { // Required properties private static final String PROP_APP_NAME = "applicationName"; private static final String PROP_STREAM_NAME = "streamName"; - private static final String PROP_CREDENTIALS_PROVIDER = "AWSCredentialsProvider"; + private static final String PROP_CREDENTIALS_PROVIDER_KINESIS = "AWSCredentialsProvider"; + private static final String PROP_CREDENTIALS_PROVIDER_DYNAMODB = "AWSCredentialsProviderDynamoDB"; + private static final String PROP_CREDENTIALS_PROVIDER_CLOUDWATCH = "AWSCredentialsProviderCloudWatch"; private static final String PROP_WORKER_ID = "workerId"; private Map, IPropertyValueDecoder> classToDecoder; @@ -107,7 +109,7 @@ public class KinesisClientLibConfigurator { String applicationName = stringValueDecoder.decodeValue(properties.getProperty(PROP_APP_NAME)); String streamName = stringValueDecoder.decodeValue(properties.getProperty(PROP_STREAM_NAME)); AWSCredentialsProvider provider = - awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER)); + awsCPPropGetter.decodeValue(properties.getProperty(PROP_CREDENTIALS_PROVIDER_KINESIS)); if (applicationName == null || applicationName.isEmpty()) { throw new IllegalArgumentException("Value of applicationName should be explicitly provided."); @@ -116,6 +118,24 @@ public class KinesisClientLibConfigurator { throw new IllegalArgumentException("Value of streamName should be explicitly provided."); } + // Decode the DynamoDB credentials provider if it exists. If not use the Kinesis credentials provider. + AWSCredentialsProvider providerDynamoDB; + String propCredentialsProviderDynamoDBValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_DYNAMODB); + if (propCredentialsProviderDynamoDBValue == null) { + providerDynamoDB = provider; + } else { + providerDynamoDB = awsCPPropGetter.decodeValue(propCredentialsProviderDynamoDBValue); + } + + // Decode the CloudWatch credentials provider if it exists. If not use the Kinesis credentials provider. + AWSCredentialsProvider providerCloudWatch; + String propCredentialsProviderCloudWatchValue = properties.getProperty(PROP_CREDENTIALS_PROVIDER_CLOUDWATCH); + if (propCredentialsProviderCloudWatchValue == null) { + providerCloudWatch = provider; + } else { + providerCloudWatch = awsCPPropGetter.decodeValue(propCredentialsProviderCloudWatchValue); + } + // Allow customer not to provide workerId or to provide empty worker id. String workerId = stringValueDecoder.decodeValue(properties.getProperty(PROP_WORKER_ID)); if (workerId == null || workerId.isEmpty()) { @@ -125,13 +145,13 @@ public class KinesisClientLibConfigurator { } KinesisClientLibConfiguration config = - new KinesisClientLibConfiguration(applicationName, streamName, provider, workerId); + new KinesisClientLibConfiguration(applicationName, streamName, provider, providerDynamoDB, providerCloudWatch, workerId); Set requiredNames = new HashSet(Arrays.asList(PROP_STREAM_NAME, PROP_APP_NAME, PROP_WORKER_ID, - PROP_CREDENTIALS_PROVIDER)); + PROP_CREDENTIALS_PROVIDER_KINESIS)); // Set all the variables that are not used for constructor. for (Object keyObject : properties.keySet()) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index da8dce4e..c8678974 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -295,7 +295,7 @@ class ConsumerStates { public ITask createTask(ShardConsumer consumer) { return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(), consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(), - consumer.getTaskBackoffTimeMillis()); + consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 0f930053..174fb65e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -154,6 +154,13 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10; + /* + * The Worker will skip shard sync during initialization if there are one or more leases in the lease table. + * This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + */ + public static final boolean DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST = false; + /** * Default Shard prioritization strategy. */ @@ -163,6 +170,7 @@ public class KinesisClientLibConfiguration { private String tableName; private String streamName; private String kinesisEndpoint; + private String dynamoDBEndpoint; private InitialPositionInStream initialPositionInStream; private AWSCredentialsProvider kinesisCredentialsProvider; private AWSCredentialsProvider dynamoDBCredentialsProvider; @@ -191,11 +199,13 @@ public class KinesisClientLibConfiguration { private int initialLeaseTableReadCapacity; private int initialLeaseTableWriteCapacity; private InitialPositionInStreamExtended initialPositionInStreamExtended; + // This is useful for optimizing deployments to large fleets working on a stable stream. + private boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ShardPrioritization shardPrioritization; /** * Constructor. - * + * * @param applicationName Name of the Amazon Kinesis application. * By default the application name is included in the user agent string used to make AWS requests. This * can assist with troubleshooting (e.g. distinguish requests made by separate applications). @@ -212,7 +222,7 @@ public class KinesisClientLibConfiguration { /** * Constructor. - * + * * @param applicationName Name of the Amazon Kinesis application * By default the application name is included in the user agent string used to make AWS requests. This * can assist with troubleshooting (e.g. distinguish requests made by separate applications). @@ -228,7 +238,7 @@ public class KinesisClientLibConfiguration { AWSCredentialsProvider dynamoDBCredentialsProvider, AWSCredentialsProvider cloudWatchCredentialsProvider, String workerId) { - this(applicationName, streamName, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, + this(applicationName, streamName, null, null, DEFAULT_INITIAL_POSITION_IN_STREAM, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, DEFAULT_FAILOVER_TIME_MILLIS, workerId, DEFAULT_MAX_RECORDS, DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, @@ -296,6 +306,76 @@ public class KinesisClientLibConfiguration { int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName) { + this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, + dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, + maxRecords, idleTimeBetweenReadsInMillis, + callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, + shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, + kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, + taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, + validateSequenceNumberBeforeCheckpointing, regionName); + } + + /** + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamName Name of the Kinesis stream + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + */ + // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES + // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + public KinesisClientLibConfiguration(String applicationName, + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -310,6 +390,7 @@ public class KinesisClientLibConfiguration { this.tableName = applicationName; this.streamName = streamName; this.kinesisEndpoint = kinesisEndpoint; + this.dynamoDBEndpoint = dynamoDBEndpoint; this.initialPositionInStream = initialPositionInStream; this.kinesisCredentialsProvider = kinesisCredentialsProvider; this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; @@ -338,13 +419,14 @@ public class KinesisClientLibConfiguration { this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; this.initialPositionInStreamExtended = InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); + this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; } // Check if value is positive, otherwise throw an exception private void checkIsValuePositive(String key, long value) { if (value <= 0) { - throw new IllegalArgumentException("Value of " + key + throw new IllegalArgumentException("Value of " + key + " should be positive, but current value is " + value); } } @@ -363,11 +445,11 @@ public class KinesisClientLibConfiguration { config.setUserAgent(existingUserAgent); return config; } - + private void checkIsRegionNameValid(String regionNameToCheck) { if (regionNameToCheck != null && RegionUtils.getRegion(regionNameToCheck) == null) { throw new IllegalArgumentException("The specified region name is not valid"); - } + } } /** @@ -468,6 +550,13 @@ public class KinesisClientLibConfiguration { return kinesisEndpoint; } + /** + * @return DynamoDB endpoint + */ + public String getDynamoDBEndpoint() { + return dynamoDBEndpoint; + } + /** * @return the initialPositionInStream */ @@ -561,6 +650,13 @@ public class KinesisClientLibConfiguration { return regionName; } + /** + * @return true if Worker should skip syncing shards and leases at startup if leases are present + */ + public boolean getSkipShardSyncAtWorkerInitializationIfLeasesExist() { + return skipShardSyncAtWorkerInitializationIfLeasesExist; + } + /** * @return Max leases this Worker can handle at a time */ @@ -631,6 +727,15 @@ public class KinesisClientLibConfiguration { return this; } + /** + * @param dynamoDBEndpoint DynamoDB endpoint + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withDynamoDBEndpoint(String dynamoDBEndpoint) { + this.dynamoDBEndpoint = dynamoDBEndpoint; + return this; + } + /** * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The Amazon Kinesis Client Library * will start fetching records from this position when the application starts up if there are no checkpoints. @@ -767,7 +872,7 @@ public class KinesisClientLibConfiguration { /** * Override the default user agent (application name). - * + * * @param userAgent User agent to use in AWS requests * @return KinesisClientLibConfiguration */ @@ -823,7 +928,7 @@ public class KinesisClientLibConfiguration { * NONE * SUMMARY * DETAILED - * + * * @param metricsLevel Metrics level to enable. * @return KinesisClientLibConfiguration */ @@ -850,7 +955,7 @@ public class KinesisClientLibConfiguration { } /** - * + * * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers * with a call to Amazon Kinesis before checkpointing for calls to * {@link RecordProcessorCheckpointer#checkpoint(String)}. @@ -863,7 +968,22 @@ public class KinesisClientLibConfiguration { } /** - * + * If set to true, the Worker will not sync shards and leases during initialization if there are one or more leases + * in the lease table. This assumes that the shards and leases are in-sync. + * This enables customers to choose faster startup times (e.g. during incremental deployments of an application). + * + * @param skipShardSyncAtStartupIfLeasesExist Should Worker skip syncing shards and leases at startup (Worker + * initialization). + * @return KinesisClientLibConfiguration + */ + public KinesisClientLibConfiguration withSkipShardSyncAtStartupIfLeasesExist( + boolean skipShardSyncAtStartupIfLeasesExist) { + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtStartupIfLeasesExist; + return this; + } + + /** + * * @param regionName The region name for the service * @return KinesisClientLibConfiguration */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java index db0970d5..6ee34880 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTask.java @@ -73,7 +73,8 @@ class ProcessTask implements ITask { IRecordProcessor recordProcessor, RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { super(); this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; @@ -82,13 +83,19 @@ class ProcessTask implements ITask { this.streamConfig = streamConfig; this.backoffTimeMillis = backoffTimeMillis; IKinesisProxy kinesisProxy = this.streamConfig.getStreamProxy(); - if (kinesisProxy instanceof IKinesisProxyExtended) { + // If skipShardSyncAtWorkerInitializationIfLeasesExist is set, we will not get the shard for + // this ProcessTask. In this case, duplicate KPL user records in the event of resharding will + // not be dropped during deaggregation of Amazon Kinesis records. This is only applicable if + // KPL is used for ingestion and KPL's aggregation feature is used. + if (!skipShardSyncAtWorkerInitializationIfLeasesExist && kinesisProxy instanceof IKinesisProxyExtended) { this.shard = ((IKinesisProxyExtended) kinesisProxy).getShard(this.shardInfo.getShardId()); } else { + this.shard = null; + } + if (this.shard == null && !skipShardSyncAtWorkerInitializationIfLeasesExist) { LOG.warn("Cannot get the shard for this ProcessTask, so duplicate KPL user records " + "in the event of resharding will not be dropped during deaggregation of Amazon " + "Kinesis records."); - this.shard = null; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 03a8cd33..63cce40d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -52,6 +52,7 @@ class ShardConsumer { private final long parentShardPollIntervalMillis; private final boolean cleanupLeasesOfCompletedShards; private final long taskBackoffTimeMillis; + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; private ITask currentTask; private long currentTaskSubmitTime; @@ -90,7 +91,8 @@ class ShardConsumer { boolean cleanupLeasesOfCompletedShards, ExecutorService executorService, IMetricsFactory metricsFactory, - long backoffTimeMillis) { + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist) { this.streamConfig = streamConfig; this.recordProcessor = recordProcessor; this.executorService = executorService; @@ -108,6 +110,7 @@ class ShardConsumer { this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards; this.taskBackoffTimeMillis = backoffTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; } /** @@ -165,6 +168,10 @@ class ShardConsumer { return submittedNewTask; } + public boolean isSkipShardSyncAtWorkerInitializationIfLeasesExist() { + return skipShardSyncAtWorkerInitializationIfLeasesExist; + } + private enum TaskOutcome { SUCCESSFUL, END_OF_SHARD, NOT_COMPLETE, FAILURE } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index ddc05275..2a1e5484 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -77,7 +77,7 @@ public class Worker implements Runnable { private final long idleTimeInMilliseconds; // Backoff time when polling to check if application has finished processing // parent shards - private final long parentShardPollIntervalMillis; + private final long parentShardPollIntervalMillis; private final ExecutorService executorService; private final IMetricsFactory metricsFactory; // Backoff time when running tasks if they encounter exceptions @@ -99,6 +99,8 @@ public class Worker implements Runnable { private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; + + private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist; /** * Constructor. @@ -216,7 +218,7 @@ public class Worker implements Runnable { ExecutorService execService) { this( config.getApplicationName(), - new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), + new V1ToV2RecordProcessorFactoryAdapter(recordProcessorFactory), new StreamConfig( new KinesisProxyFactory(config.getKinesisCredentialsProvider(), kinesisClient) .getProxy(config.getStreamName()), @@ -243,7 +245,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), config.getShardPrioritizationStrategy()); + // If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB. if (config.getRegionName() != null) { Region region = RegionUtils.getRegion(config.getRegionName()); @@ -252,6 +256,11 @@ public class Worker implements Runnable { dynamoDBClient.setRegion(region); LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName()); } + // If a dynamoDB endpoint was explicitly specified, use it to set the DynamoDB endpoint. + if (config.getDynamoDBEndpoint() != null) { + dynamoDBClient.setEndpoint(config.getDynamoDBEndpoint()); + LOG.debug("The endpoint of Amazon DynamoDB client has been set to " + config.getDynamoDBEndpoint()); + } // If a kinesis endpoint was explicitly specified, use it to set the region of kinesis. if (config.getKinesisEndpoint() != null) { kinesisClient.setEndpoint(config.getKinesisEndpoint()); @@ -300,6 +309,7 @@ public class Worker implements Runnable { IMetricsFactory metricsFactory, long taskBackoffTimeMillis, long failoverTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, ShardPrioritization shardPrioritization) { this.applicationName = applicationName; this.recordProcessorFactory = recordProcessorFactory; @@ -321,7 +331,8 @@ public class Worker implements Runnable { metricsFactory, executorService); this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.failoverTimeMillis = failoverTimeMillis; + this.failoverTimeMillis = failoverTimeMillis; + this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist; this.shardPrioritization = shardPrioritization; } @@ -403,16 +414,22 @@ public class Worker implements Runnable { LOG.info("Initializing LeaseCoordinator"); leaseCoordinator.initialize(); - LOG.info("Syncing Kinesis shard info"); - ShardSyncTask shardSyncTask = - new ShardSyncTask(streamConfig.getStreamProxy(), - leaseCoordinator.getLeaseManager(), - initialPosition, - cleanupLeasesUponShardCompletion, - 0L); - TaskResult result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + TaskResult result = null; + if (!skipShardSyncAtWorkerInitializationIfLeasesExist + || leaseCoordinator.getLeaseManager().isLeaseTableEmpty()) { + LOG.info("Syncing Kinesis shard info"); + ShardSyncTask shardSyncTask = + new ShardSyncTask(streamConfig.getStreamProxy(), + leaseCoordinator.getLeaseManager(), + initialPosition, + cleanupLeasesUponShardCompletion, + 0L); + result = new MetricsCollectingTaskDecorator(shardSyncTask, metricsFactory).call(); + } else { + LOG.info("Skipping shard sync per config setting (and lease table is not empty)"); + } - if (result.getException() == null) { + if (result == null || result.getException() == null) { if (!leaseCoordinator.isRunning()) { LOG.info("Starting LeaseCoordinator"); leaseCoordinator.start(); @@ -654,7 +671,7 @@ public class Worker implements Runnable { return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, leaseCoordinator.getLeaseManager(), parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, - executorService, metricsFactory, taskBackoffTimeMillis); + executorService, metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist); } @@ -1079,7 +1096,9 @@ public class Worker implements Runnable { metricsFactory, config.getTaskBackoffTimeMillis(), config.getFailoverTimeMillis(), + config.getSkipShardSyncAtWorkerInitializationIfLeasesExist(), shardPrioritization); + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java index 7629ad1d..921cfbc4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCoordinator.java @@ -71,6 +71,7 @@ public class LeaseCoordinator { // Package level access for testing. static final int MAX_LEASE_RENEWAL_THREAD_COUNT = 20; + private final ILeaseRenewer leaseRenewer; private final ILeaseTaker leaseTaker; private final long renewerIntervalMillis; diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java index 3a4d408c..226756eb 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -214,6 +214,14 @@ public class LeaseManager implements ILeaseManager { return list(null); } + /** + * {@inheritDoc} + */ + @Override + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + return list(1).isEmpty(); + } + /** * List with the given page size. Package access for integration testing. * diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java index 06d6d432..ab296cc1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java +++ b/src/main/java/com/amazonaws/services/kinesis/leases/interfaces/ILeaseManager.java @@ -1,5 +1,5 @@ /* - * Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2012-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * Licensed under the Amazon Software License (the "License"). * You may not use this file except in compliance with the License. @@ -180,4 +180,15 @@ public interface ILeaseManager { public boolean updateLease(T lease) throws DependencyException, InvalidStateException, ProvisionedThroughputException; + /** + * Check (synchronously) if there are any leases in the lease table. + * + * @return true if there are no leases in the lease table + * + * @throws DependencyException if DynamoDB scan fails in an unexpected way + * @throws InvalidStateException if lease table does not exist + * @throws ProvisionedThroughputException if DynamoDB scan fails due to lack of capacity + */ + public boolean isLeaseTableEmpty() throws DependencyException, InvalidStateException, ProvisionedThroughputException; + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java new file mode 100644 index 00000000..cddd837a --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/AWSCredentialsProviderPropertyValueDecoderTest.java @@ -0,0 +1,115 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/asl/ + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package com.amazonaws.services.kinesis.clientlibrary.config; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSCredentialsProviderChain; +import com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoder; + +public class AWSCredentialsProviderPropertyValueDecoderTest { + + private static final String TEST_ACCESS_KEY_ID = "123"; + private static final String TEST_SECRET_KEY = "456"; + + private String credentialName1 = + "com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$AlwaysSucceedCredentialsProvider"; + private String credentialName2 = + "com.amazonaws.services.kinesis.clientlibrary.config.AWSCredentialsProviderPropertyValueDecoderTest$ConstructorCredentialsProvider"; + private AWSCredentialsProviderPropertyValueDecoder decoder = new AWSCredentialsProviderPropertyValueDecoder(); + + @Test + public void testSingleProvider() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName1); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID); + assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY); + } + + @Test + public void testTwoProviders() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName1 + "," + credentialName1); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), TEST_ACCESS_KEY_ID); + assertEquals(provider.getCredentials().getAWSSecretKey(), TEST_SECRET_KEY); + } + + @Test + public void testProfileProviderWithOneArg() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 + "|arg"); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg"); + assertEquals(provider.getCredentials().getAWSSecretKey(), "blank"); + } + + @Test + public void testProfileProviderWithTwoArgs() { + AWSCredentialsProvider provider = decoder.decodeValue(credentialName2 + + "|arg1|arg2"); + assertEquals(provider.getClass(), AWSCredentialsProviderChain.class); + assertEquals(provider.getCredentials().getAWSAccessKeyId(), "arg1"); + assertEquals(provider.getCredentials().getAWSSecretKey(), "arg2"); + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProvider implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new BasicAWSCredentials(TEST_ACCESS_KEY_ID, TEST_SECRET_KEY); + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider needs a constructor call to instantiate it + */ + public static class ConstructorCredentialsProvider implements AWSCredentialsProvider { + + private String arg1; + private String arg2; + + public ConstructorCredentialsProvider(String arg1) { + this.arg1 = arg1; + this.arg2 = "blank"; + } + + public ConstructorCredentialsProvider(String arg1, String arg2) { + this.arg1 = arg1; + this.arg2 = arg2; + } + + @Override + public AWSCredentials getCredentials() { + return new BasicAWSCredentials(arg1, arg2); + } + + @Override + public void refresh() { + } + + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java index 1ccd4941..cbdd0a2d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/config/KinesisClientLibConfiguratorTest.java @@ -40,6 +40,12 @@ public class KinesisClientLibConfiguratorTest { "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProvider"; private String credentialName2 = "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysFailCredentialsProvider"; + private String credentialNameKinesis = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderKinesis"; + private String credentialNameDynamoDB = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderDynamoDB"; + private String credentialNameCloudWatch = + "com.amazonaws.services.kinesis.clientlibrary.config.KinesisClientLibConfiguratorTest$AlwaysSucceedCredentialsProviderCloudWatch"; private KinesisClientLibConfigurator configurator = new KinesisClientLibConfigurator(); @Test @@ -329,6 +335,74 @@ public class KinesisClientLibConfiguratorTest { } } + @Test + public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatch() { + String test = StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = " + credentialNameKinesis, + "AWSCredentialsProviderDynamoDB = " + credentialNameDynamoDB, + "AWSCredentialsProviderCloudWatch = " + credentialNameCloudWatch, + "failoverTimeMillis = 100", + "shardSyncIntervalMillis = 500" + }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + KinesisClientLibConfiguration config = configurator.getConfiguration(input); + try { + config.getKinesisCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("Kinesis credential providers should not fail."); + } + try { + config.getDynamoDBCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("DynamoDB credential providers should not fail."); + } + try { + config.getCloudWatchCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("CloudWatch credential providers should not fail."); + } + } + + @Test + public void testWithDifferentAWSCredentialsForDynamoDBAndCloudWatchFailed() { + String test = StringUtils.join(new String[] { + "streamName = a", + "applicationName = b", + "AWSCredentialsProvider = " + credentialNameKinesis, + "AWSCredentialsProviderDynamoDB = " + credentialName1, + "AWSCredentialsProviderCloudWatch = " + credentialName1, + "failoverTimeMillis = 100", + "shardSyncIntervalMillis = 500" + }, '\n'); + InputStream input = new ByteArrayInputStream(test.getBytes()); + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + + // separate input stream with getConfiguration to explicitly catch exception from the getConfiguration statement + KinesisClientLibConfiguration config = configurator.getConfiguration(input); + try { + config.getKinesisCredentialsProvider().getCredentials(); + } catch (Exception e) { + fail("Kinesis credential providers should not fail."); + } + try { + config.getDynamoDBCredentialsProvider().getCredentials(); + fail("DynamoDB credential providers should fail."); + } catch (Exception e) { + // succeed + } + try { + config.getCloudWatchCredentialsProvider().getCredentials(); + fail("CloudWatch credential providers should fail."); + } catch (Exception e) { + // succeed + } + } + /** * This credentials provider will always succeed */ @@ -345,6 +419,84 @@ public class KinesisClientLibConfiguratorTest { } + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderKinesis implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderDynamoDB implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + + /** + * This credentials provider will always succeed + */ + public static class AlwaysSucceedCredentialsProviderCloudWatch implements AWSCredentialsProvider { + + @Override + public AWSCredentials getCredentials() { + return new AWSCredentials() { + @Override + public String getAWSAccessKeyId() { + return ""; + } + + @Override + public String getAWSSecretKey() { + return ""; + } + }; + } + + @Override + public void refresh() { + } + + } + /** * This credentials provider will always fail */ diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java index 765aaa44..2a07d1ed 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ExceptionThrowingLeaseManager.java @@ -212,4 +212,10 @@ class ExceptionThrowingLeaseManager implements ILeaseManager leaseManager.deleteAll(); } + @Override + public boolean isLeaseTableEmpty() throws DependencyException, + InvalidStateException, ProvisionedThroughputException { + return false; + } + } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index e71948c9..4874a164 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -62,6 +62,7 @@ public class KinesisClientLibConfigurationTest { // Test constructor with all valid arguments. config = new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, InitialPositionInStream.LATEST, @@ -99,6 +100,7 @@ public class KinesisClientLibConfigurationTest { try { config = new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, InitialPositionInStream.LATEST, @@ -132,6 +134,7 @@ public class KinesisClientLibConfigurationTest { try { config = new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, InitialPositionInStream.LATEST, @@ -209,7 +212,7 @@ public class KinesisClientLibConfigurationTest { AmazonDynamoDBClient dclient = Mockito.mock(AmazonDynamoDBClient.class); AmazonCloudWatchClient cclient = Mockito.mock(AmazonCloudWatchClient.class); Region region = RegionUtils.getRegion("us-west-2"); - + AWSCredentialsProvider credentialsProvider = Mockito.mock(AWSCredentialsProvider.class); KinesisClientLibConfiguration kclConfig = new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0") @@ -262,7 +265,7 @@ public class KinesisClientLibConfigurationTest { Mockito.verify(kclConfig, Mockito.times(9)).getRegionName(); Mockito.verify(kclConfig, Mockito.times(4)).getKinesisEndpoint(); - + kclConfig = Mockito.spy( new KinesisClientLibConfiguration("Test", "Test", credentialsProvider, "0") .withKinesisEndpoint("https://kinesis.eu-west-1.amazonaws.com")); @@ -294,6 +297,7 @@ public class KinesisClientLibConfigurationTest { Mockito.mock(AWSCredentialsProvider.class); try { new KinesisClientLibConfiguration(TEST_STRING, + TEST_STRING, TEST_STRING, TEST_STRING, null, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java index f1d908f0..4d32566e 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ProcessTaskTest.java @@ -89,7 +89,8 @@ public class ProcessTaskTest { INITIAL_POSITION_LATEST); final ShardInfo shardInfo = new ShardInfo(shardId, null, null, null); processTask = new ProcessTask( - shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis); + shardInfo, config, mockRecordProcessor, mockCheckpointer, mockDataFetcher, taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); } @Test diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 347d44b4..893f64ed 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -127,7 +127,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -173,7 +174,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, spyExecutorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // initialize @@ -213,7 +215,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); when(leaseManager.getLease(anyString())).thenReturn(null); when(checkpoint.getCheckpoint(anyString())).thenReturn(new ExtendedSequenceNumber("123")); @@ -300,7 +303,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards @@ -402,7 +406,8 @@ public class ShardConsumerTest { cleanupLeasesOfCompletedShards, executorService, metricsFactory, - taskBackoffTimeMillis); + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); // check on parent shards diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index f05a58ff..baafa447 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -233,6 +233,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo = new ShardInfo(dummyKinesisShardId, testConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); ShardConsumer consumer = worker.createOrGetShardConsumer(shardInfo, streamletFactory); @@ -272,7 +273,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, streamletFactory, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, checkpoint, leaseCoordinator, execService, nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, - shardPrioritization); + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); Worker workerSpy = spy(worker); @@ -336,6 +337,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); ShardInfo shardInfo1 = new ShardInfo(dummyKinesisShardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); @@ -389,6 +391,7 @@ public class WorkerTest { nullMetricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); worker.run(); Assert.assertTrue(count > 0); @@ -737,7 +740,7 @@ public class WorkerTest { Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization); + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -811,7 +814,7 @@ public class WorkerTest { Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization); + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -883,7 +886,7 @@ public class WorkerTest { Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, leaseCoordinator, executorService, metricsFactory, - taskBackoffTimeMillis, failoverTimeMillis, shardPrioritization); + taskBackoffTimeMillis, failoverTimeMillis, false, shardPrioritization); when(executorService.submit(Matchers.> any())) .thenAnswer(new ShutdownHandlingAnswer(taskFuture)); @@ -1166,8 +1169,9 @@ public class WorkerTest { metricsFactory, taskBackoffTimeMillis, failoverTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); - + WorkerThread workerThread = new WorkerThread(worker); workerThread.start(); return workerThread;