'Version 1.1.0 of the Amazon Kinesis Client Library'

This commit is contained in:
Gaurav Ghare 2014-06-30 12:18:58 -07:00
parent ce9054cb1b
commit 13aad26a80
14 changed files with 625 additions and 167 deletions

View file

@ -1,3 +1,3 @@
AmazonKinesisClientLibrary
Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.

View file

@ -22,6 +22,11 @@ The **Amazon Kinesis Client Library for Java** enables Java developers to easily
After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use this command: `mvn clean install -Dgpg.skip=true`
## Release Notes
### Release 1.1 (June 30, 2014)
* **Checkpointing at a specific sequence number** — The IRecordProcessorCheckpointer interface now supports checkpointing at a sequence number specified by the record processor.
* **Set region** — KinesisClientLibConfiguration now supports setting the region name to indicate the location of the Amazon Kinesis service. The Amazon DynamoDB table and Amazon CloudWatch metrics associated with your application will also use this region setting.
[kinesis]: http://aws.amazon.com/kinesis
[kinesis-forum]: http://developer.amazonwebservices.com/connect/forum.jspa?forumID=169
[kinesis-client-library-issues]: https://github.com/awslabs/amazon-kinesis-client/issues

View file

@ -6,10 +6,8 @@
<artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name>
<version>1.0.0</version>
<description>
</description>
<version>1.1.0</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
<url>https://aws.amazon.com/kinesis</url>
<scm>
@ -25,7 +23,7 @@
</licenses>
<properties>
<aws-java-sdk.version>1.6.9.1</aws-java-sdk.version>
<aws-java-sdk.version>1.7.13</aws-java-sdk.version>
<jackson.version>2.1.1</jackson.version>
</properties>

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -47,4 +47,29 @@ public interface IRecordProcessorCheckpointer {
void checkpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException;
/**
* This method will checkpoint the progress at the provided sequenceNumber. This method is analogous to
* {@link #checkpoint()} but provides the ability to specify the sequence number at which to
* checkpoint.
*
* @param sequenceNumber A sequence number at which to checkpoint in this shard. Upon failover,
* the Kinesis Client Library will start fetching records after this sequence number.
* @throws ThrottlingException Can't store checkpoint. Can be caused by checkpointing too frequently.
* Consider increasing the throughput/capacity of the checkpoint store or reducing checkpoint frequency.
* @throws ShutdownException The record processor instance has been shutdown. Another instance may have
* started processing some of these records already.
* The application should abort processing via this RecordProcessor instance.
* @throws InvalidStateException Can't store checkpoint.
* Unable to store the checkpoint in the DynamoDB table (e.g. table doesn't exist).
* @throws KinesisClientLibDependencyException Encountered an issue when storing the checkpoint. The application can
* backoff and retry.
* @throws IllegalArgumentException The sequence number is invalid for one of the following reasons:
* 1.) It appears to be out of range, i.e. it is smaller than the last check point value, or larger than the
* greatest sequence number seen by the associated record processor.
* 2.) It is not a valid sequence number for a record in this shard.
*/
void checkpoint(String sequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException;
}

View file

@ -0,0 +1,126 @@
/*
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.Comparator;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
/**
*
* Defines an ordering on checkpoint values, taking into account sentinel values: TRIM_HORIZON, LATEST,
* SHARD_END.
*
* SHARD_END -> infinity
* TRIM_HORIZON and LATEST -> less than sequence numbers
* sequence numbers -> BigInteger value of string
*
*/
class CheckpointValueComparator implements Comparator<String>, Serializable {
private static final long serialVersionUID = 1L;
// Define TRIM_HORIZON and LATEST to be less than all sequence numbers
private static final BigInteger TRIM_HORIZON_BIG_INTEGER_VALUE = BigInteger.valueOf(-2);
private static final BigInteger LATEST_BIG_INTEGER_VALUE = BigInteger.valueOf(-1);
/**
* Constructor.
*/
CheckpointValueComparator() {
}
/**
* Compares checkpoint values with these rules.
*
* SHARD_END is considered greatest
* TRIM_HORIZON and LATEST are considered less than sequence numbers
* sequence numbers are given their big integer value
*
* @param first The first element to be compared
* @param second The second element to be compared
* @return returns negative/0/positive if first is less than/equal to/greater than second
* @throws IllegalArgumentException If either input is a non-numeric non-sentinel value string.
*/
@Override
public int compare(String first, String second) {
if (!isDigitsOrSentinelValue(first) || !isDigitsOrSentinelValue(second)) {
throw new IllegalArgumentException("Expected a sequence number or a sentinel checkpoint value but "
+ "received: first=" + first + " and second=" + second);
}
// SHARD_END is the greatest
if (SentinelCheckpoint.SHARD_END.toString().equals(first)
&& SentinelCheckpoint.SHARD_END.toString().equals(second)) {
return 0;
} else if (SentinelCheckpoint.SHARD_END.toString().equals(second)) {
return -1;
} else if (SentinelCheckpoint.SHARD_END.toString().equals(first)) {
return 1;
}
// Compare other sentinel values and serial numbers after converting them to a big integer value
return bigIntegerValue(first).compareTo(bigIntegerValue(second));
}
/**
* Sequence numbers are converted, sentinels are given a value of -1. Note this method is only used after special
* logic associated with SHARD_END and the case of comparing two sentinel values has already passed, so we map
* sentinel values LATEST and TRIM_HORIZON to negative numbers so that they are considered less than sequence
* numbers.
*
* @param checkpointValue string to convert to big integer value
* @return a BigInteger value representation of the checkpointValue
*/
private static BigInteger bigIntegerValue(String checkpointValue) {
if (SequenceNumberValidator.isDigits(checkpointValue)) {
return new BigInteger(checkpointValue);
} else if (SentinelCheckpoint.LATEST.toString().equals(checkpointValue)) {
return LATEST_BIG_INTEGER_VALUE;
} else if (SentinelCheckpoint.TRIM_HORIZON.toString().equals(checkpointValue)) {
return TRIM_HORIZON_BIG_INTEGER_VALUE;
} else {
throw new IllegalArgumentException("Expected a string of digits, TRIM_HORIZON, or LATEST but received "
+ checkpointValue);
}
}
/**
* Checks if the string is all digits or one of the SentinelCheckpoint values.
*
* @param string
* @return true if and only if the string is all digits or one of the SentinelCheckpoint values
*/
private static boolean isDigitsOrSentinelValue(String string) {
return SequenceNumberValidator.isDigits(string) || isSentinelValue(string);
}
/**
* Checks if the string is a SentinelCheckpoint value.
*
* @param string
* @return true if and only if the string can be converted to a SentinelCheckpoint
*/
private static boolean isSentinelValue(String string) {
try {
SentinelCheckpoint.valueOf(string);
return true;
} catch (Exception e) {
return false;
}
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -52,8 +52,10 @@ class InitializeTask implements ITask {
this.backoffTimeMillis = backoffTimeMillis;
}
/* Initializes the data fetcher (position in shard) and invokes the RecordProcessor initialize() API.
/*
* Initializes the data fetcher (position in shard) and invokes the RecordProcessor initialize() API.
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/
@Override
@ -65,7 +67,7 @@ class InitializeTask implements ITask {
LOG.debug("Initializing ShardId " + shardInfo.getShardId());
String initialCheckpoint = checkpoint.getCheckpoint(shardInfo.getShardId());
dataFetcher.initialize(initialCheckpoint);
recordProcessorCheckpointer.setSequenceNumber(initialCheckpoint);
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(initialCheckpoint);
try {
LOG.debug("Calling the record processor initialize().");
recordProcessor.initialize(shardInfo.getShardId());
@ -94,7 +96,9 @@ class InitializeTask implements ITask {
return new TaskResult(exception);
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType()
*/
@Override

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.regions.RegionUtils;
/**
* Configuration for the Amazon Kinesis Client Library.
@ -31,51 +32,51 @@ public class KinesisClientLibConfiguration {
* the number of DynamoDB IOPS required for tracking leases.
*/
public static final long DEFAULT_FAILOVER_TIME_MILLIS = 10000L;
/**
* Max records to fetch from Kinesis in a single GetRecords call.
*/
public static final int DEFAULT_MAX_RECORDS = 10000;
/**
* Idle time between record reads in milliseconds.
*/
public static final long DEFAULT_IDLETIME_BETWEEN_READS_MILLIS = 1000L;
/**
* Don't call processRecords() on the record processor for empty record lists.
*/
public static final boolean DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST = false;
/**
* Interval in milliseconds between polling to check for parent shard completion.
* Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on
* completion of parent shards).
*/
public static final long DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS = 10000L;
/**
* Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks.
*/
public static final long DEFAULT_SHARD_SYNC_INTERVAL_MILLIS = 60000L;
/**
* Cleanup leases upon shards completion (don't wait until they expire in Kinesis).
* Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try
* to delete the ones we don't need any longer.
*/
public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true;
/**
* Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures).
*/
public static final long DEFAULT_TASK_BACKOFF_TIME_MILLIS = 500L;
/**
* Buffer metrics for at most this long before publishing to CloudWatch.
*/
public static final long DEFAULT_METRICS_BUFFER_TIME_MILLIS = 10000L;
/**
* Buffer at most this many metrics before publishing to CloudWatch.
*/
@ -84,7 +85,13 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.0.0";
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.0";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
* to {@link RecordProcessorCheckpointer#checkpoint(String)} by default.
*/
public static final boolean DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING = true;
private String applicationName;
private String streamName;
@ -108,12 +115,15 @@ public class KinesisClientLibConfiguration {
private long taskBackoffTimeMillis;
private long metricsBufferTimeMillis;
private int metricsMaxQueueSize;
private boolean validateSequenceNumberBeforeCheckpointing;
private String regionName;
/**
* Constructor.
*
* @param applicationName Name of the Amazon Kinesis application.
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream
* @param credentialsProvider Provides credentials used to sign AWS requests
* @param workerId Used to distinguish different workers/processes of a Kinesis application
@ -127,9 +137,10 @@ public class KinesisClientLibConfiguration {
/**
* Constructor.
*
* @param applicationName Name of the Amazon Kinesis application
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
@ -148,18 +159,18 @@ public class KinesisClientLibConfiguration {
DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS,
DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION,
new ClientConfiguration(), new ClientConfiguration(), new ClientConfiguration(),
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS,
DEFAULT_METRICS_MAX_QUEUE_SIZE);
DEFAULT_TASK_BACKOFF_TIME_MILLIS, DEFAULT_METRICS_BUFFER_TIME_MILLIS, DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null);
}
/**
* @param applicationName Name of the Kinesis application
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* By default the application name is included in the user agent string used to make AWS requests. This
* can assist with troubleshooting (e.g. distinguish requests made by separate applications).
* @param streamName Name of the Kinesis stream
* @param kinesisEndpoint Kinesis endpoint
* @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching
* records from that location in the stream when an application starts up for the first time and there
* records from that location in the stream when an application starts up for the first time and there
* are no checkpoints. If there are checkpoints, then we start from the checkpoint position.
* @param kinesisCredentialsProvider Provides credentials used to access Kinesis
* @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB
@ -180,10 +191,14 @@ public class KinesisClientLibConfiguration {
* @param taskBackoffTimeMillis Backoff period when tasks encounter an exception
* @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch
* @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}
* @param regionName The region name for the service
*
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 25 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 25 LINES
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES
public KinesisClientLibConfiguration(String applicationName,
String streamName,
String kinesisEndpoint,
@ -204,7 +219,9 @@ public class KinesisClientLibConfiguration {
ClientConfiguration cloudWatchClientConfig,
long taskBackoffTimeMillis,
long metricsBufferTimeMillis,
int metricsMaxQueueSize) {
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -214,6 +231,7 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis);
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsRegionNameValid(regionName);
this.applicationName = applicationName;
this.streamName = streamName;
this.kinesisEndpoint = kinesisEndpoint;
@ -229,15 +247,14 @@ public class KinesisClientLibConfiguration {
this.shardSyncIntervalMillis = shardSyncIntervalMillis;
this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry;
this.workerIdentifier = workerId;
this.kinesisClientConfig =
checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
this.dynamoDBClientConfig =
checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
this.cloudWatchClientConfig =
checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
this.taskBackoffTimeMillis = taskBackoffTimeMillis;
this.metricsBufferTimeMillis = metricsBufferTimeMillis;
this.metricsMaxQueueSize = metricsMaxQueueSize;
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
this.regionName = regionName;
}
// Check if value is positive, otherwise throw an exception
@ -247,7 +264,7 @@ public class KinesisClientLibConfiguration {
+ " should be positive, but current value is " + value);
}
}
// Check if user agent in configuration is the default agent.
// If so, replace it with application name plus KINESIS_CLIENT_LIB_USER_AGENT.
// If not, append KINESIS_CLIENT_LIB_USER_AGENT to the end.
@ -262,6 +279,12 @@ public class KinesisClientLibConfiguration {
config.setUserAgent(existingUserAgent);
return config;
}
private void checkIsRegionNameValid(String regionNameToCheck) {
if (regionNameToCheck != null && RegionUtils.getRegion(regionNameToCheck) == null) {
throw new IllegalArgumentException("The specified region name is not valid");
}
}
/**
* @return Name of the application
@ -369,19 +392,19 @@ public class KinesisClientLibConfiguration {
}
/**
* @return Kinesis client configuration
* @return Kinesis client configuration
*/
public ClientConfiguration getKinesisClientConfiguration() {
return kinesisClientConfig;
}
/**
* @return DynamoDB client configuration
*/
public ClientConfiguration getDynamoDBClientConfiguration() {
return dynamoDBClientConfig;
}
/**
* @return CloudWatch client configuration
*/
@ -417,7 +440,22 @@ public class KinesisClientLibConfiguration {
return cleanupLeasesUponShardCompletion;
}
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 180 LINES
/**
* @return true if KCL should validate client provided sequence numbers with a call to Amazon Kinesis before
* checkpointing for calls to {@link RecordProcessorCheckpointer#checkpoint(String)}
*/
public boolean shouldValidateSequenceNumberBeforeCheckpointing() {
return validateSequenceNumberBeforeCheckpointing;
}
/**
* @return Region for the service
*/
public String getRegionName() {
return regionName;
}
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 190 LINES
/**
* @param kinesisEndpoint Kinesis endpoint
* @return KinesisClientLibConfiguration
@ -484,7 +522,7 @@ public class KinesisClientLibConfiguration {
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withCallProcessRecordsEvenForEmptyRecordList(
boolean callProcessRecordsEvenForEmptyRecordList) {
boolean callProcessRecordsEvenForEmptyRecordList) {
this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList;
return this;
}
@ -505,18 +543,17 @@ public class KinesisClientLibConfiguration {
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withCleanupLeasesUponShardCompletion(
boolean cleanupLeasesUponShardCompletion) {
boolean cleanupLeasesUponShardCompletion) {
this.cleanupLeasesUponShardCompletion = cleanupLeasesUponShardCompletion;
return this;
}
/**
* @param clientConfig Common client configuration used by Kinesis/DynamoDB/CloudWatch client
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withCommonClientConfig(ClientConfiguration clientConfig) {
ClientConfiguration tempClientConfig =
checkAndAppendKinesisClientLibUserAgent(clientConfig);
ClientConfiguration tempClientConfig = checkAndAppendKinesisClientLibUserAgent(clientConfig);
this.kinesisClientConfig = tempClientConfig;
this.dynamoDBClientConfig = tempClientConfig;
this.cloudWatchClientConfig = tempClientConfig;
@ -528,33 +565,31 @@ public class KinesisClientLibConfiguration {
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withKinesisClientConfig(ClientConfiguration kinesisClientConfig) {
this.kinesisClientConfig =
checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig);
return this;
}
/**
* @param dynamoDBClientConfig Client configuration used by DynamoDB client
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withDynamoDBClientConfig(ClientConfiguration dynamoDBClientConfig) {
this.dynamoDBClientConfig =
checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig);
return this;
}
/**
* @param cloudWatchClientConfig Client configuration used by CloudWatch client
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withCloudWatchClientConfig(ClientConfiguration cloudWatchClientConfig) {
this.cloudWatchClientConfig =
checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig);
return this;
}
/**
* Override the default user agent (application name).
*
* @param userAgent User agent to use in AWS requests
* @return KinesisClientLibConfiguration
*/
@ -595,4 +630,29 @@ public class KinesisClientLibConfiguration {
this.metricsMaxQueueSize = metricsMaxQueueSize;
return this;
}
/**
*
* @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers
* with a call to Amazon Kinesis before checkpointing for calls to
* {@link RecordProcessorCheckpointer#checkpoint(String)}.
* @return KinesisClientLibConfiguration
*/
public KinesisClientLibConfiguration withValidateSequenceNumberBeforeCheckpointing(
boolean validateSequenceNumberBeforeCheckpointing) {
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
return this;
}
/**
*
* @param regionName The region name for the service
* @return KinesisClientLibConfiguration
*/
// CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 2 LINES
public KinesisClientLibConfiguration withRegionName(String regionName) {
checkIsRegionNameValid(regionName);
this.regionName = regionName;
return this;
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -71,7 +71,9 @@ class ProcessTask implements ITask {
this.backoffTimeMillis = backoffTimeMillis;
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/
// CHECKSTYLE:OFF CyclomaticComplexity
@ -92,7 +94,7 @@ class ProcessTask implements ITask {
return new TaskResult(null, shardEndReached);
}
List<Record> records = getRecords();
if (records.isEmpty()) {
LOG.debug("Kinesis didn't return any records for shard " + shardInfo.getShardId());
@ -111,12 +113,12 @@ class ProcessTask implements ITask {
}
if ((!records.isEmpty()) || streamConfig.shouldCallProcessRecordsEvenForEmptyRecordList()) {
// If we got more records, record the max sequence number. Sleep if there are no records.
if (!records.isEmpty()) {
String maxSequenceNumber = getMaxSequenceNumber(scope, records);
recordProcessorCheckpointer.setSequenceNumber(maxSequenceNumber);
}
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(maxSequenceNumber);
}
try {
LOG.debug("Calling application processRecords() with " + records.size() + " records from "
+ shardInfo.getShardId());
@ -126,7 +128,7 @@ class ProcessTask implements ITask {
+ ": Application processRecords() threw an exception when processing shard ", e);
LOG.error("ShardId " + shardInfo.getShardId() + ": Skipping over the following data records: "
+ records);
}
}
}
} catch (RuntimeException | KinesisClientLibException e) {
LOG.error("ShardId " + shardInfo.getShardId() + ": Caught exception: ", e);
@ -142,6 +144,7 @@ class ProcessTask implements ITask {
return new TaskResult(exception);
}
// CHECKSTYLE:ON CyclomaticComplexity
/**
@ -192,7 +195,7 @@ class ProcessTask implements ITask {
* Advance the iterator to after the greatest processed sequence number (remembered by
* recordProcessorCheckpointer).
*/
dataFetcher.advanceIteratorAfter(recordProcessorCheckpointer.getSequenceNumber());
dataFetcher.advanceIteratorAfter(recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
// Try a second time - if we fail this time, expose the failure.
try {

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -24,6 +24,7 @@ import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.ICheckpoint;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.lib.checkpoint.SentinelCheckpoint;
/**
* This class is used to enable RecordProcessors to checkpoint their progress.
@ -36,30 +37,71 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
private ICheckpoint checkpoint;
private String sequenceNumber;
private String largestPermittedCheckpointValue;
// Set to the last value set via checkpoint().
// Sample use: verify application shutdown() invoked checkpoint() at the end of a shard.
private String lastCheckpointValue;
private ShardInfo shardInfo;
private SequenceNumberValidator sequenceNumberValidator;
private CheckpointValueComparator checkpointValueComparator;
private String sequenceNumberAtShardEnd;
/**
* Only has package level access, since only the Amazon Kinesis Client Library should be creating these.
*
* @param checkpoint Used to checkpoint progress of a RecordProcessor
* @param validator Used for validating sequence numbers
* @param comparator Used for checking the order of checkpoint values
*/
RecordProcessorCheckpointer(ShardInfo shardInfo, ICheckpoint checkpoint) {
RecordProcessorCheckpointer(ShardInfo shardInfo,
ICheckpoint checkpoint,
SequenceNumberValidator validator,
CheckpointValueComparator comparator) {
this.shardInfo = shardInfo;
this.checkpoint = checkpoint;
this.sequenceNumberValidator = validator;
this.checkpointValueComparator = comparator;
}
/* (non-Javadoc)
* @see com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer#checkpoint()
/**
* {@inheritDoc}
*/
@Override
public synchronized void checkpoint()
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
advancePosition();
advancePosition(this.largestPermittedCheckpointValue);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void checkpoint(String sequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException,
IllegalArgumentException {
// throws exception if sequence number shouldn't be checkpointed for this shard
sequenceNumberValidator.validateSequenceNumber(sequenceNumber);
/*
* If there isn't a last checkpoint value, we only care about checking the upper bound.
* If there is a last checkpoint value, we want to check both the lower and upper bound.
*/
if ((lastCheckpointValue == null
|| checkpointValueComparator.compare(lastCheckpointValue, sequenceNumber) <= 0)
&& checkpointValueComparator.compare(sequenceNumber, largestPermittedCheckpointValue) <= 0) {
this.advancePosition(sequenceNumber);
} else {
throw new IllegalArgumentException("Could not checkpoint at sequence number " + sequenceNumber
+ " it did not fall into acceptable range between the last sequence number checkpointed "
+ this.lastCheckpointValue + " and the greatest sequence number passed to this record processor "
+ this.largestPermittedCheckpointValue);
}
}
/**
@ -74,42 +116,62 @@ class RecordProcessorCheckpointer implements IRecordProcessorCheckpointer {
*
* @return the sequenceNumber
*/
synchronized String getSequenceNumber() {
return sequenceNumber;
synchronized String getLargestPermittedCheckpointValue() {
return largestPermittedCheckpointValue;
}
/**
* @param maxSequenceNumber the sequenceNumber to set
* @param largestPermittedCheckpointValue the checkpoint value to set
*/
synchronized void setSequenceNumber(String sequenceNumber) {
this.sequenceNumber = sequenceNumber;
synchronized void setLargestPermittedCheckpointValue(String checkpointValue) {
this.largestPermittedCheckpointValue = checkpointValue;
}
/**
* Used to remember the last sequence number before SHARD_END to allow us to prevent the checkpointer from
* checkpointing at the end of the shard twice (i.e. at the last sequence number and then again at SHARD_END).
*
* @param sequenceNumber
*/
synchronized void setSequenceNumberAtShardEnd(String sequenceNumber) {
this.sequenceNumberAtShardEnd = sequenceNumber;
}
/**
* Internal API - has package level access only for testing purposes.
*
* @param sequenceNumber
*
* @throws KinesisClientLibDependencyException
* @throws ThrottlingException
* @throws ShutdownException
* @throws InvalidStateException
*/
void advancePosition()
void advancePosition(String sequenceNumber)
throws KinesisClientLibDependencyException, InvalidStateException, ThrottlingException, ShutdownException {
try {
checkpoint.setCheckpoint(shardInfo.getShardId(), sequenceNumber, shardInfo.getConcurrencyToken());
lastCheckpointValue = sequenceNumber;
} catch (ThrottlingException e) {
throw e;
} catch (ShutdownException e) {
throw e;
} catch (InvalidStateException e) {
throw e;
} catch (KinesisClientLibDependencyException e) {
throw e;
} catch (KinesisClientLibException e) {
LOG.warn("Caught exception setting checkpoint.", e);
throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e);
String checkpointValue = sequenceNumber;
if (sequenceNumberAtShardEnd != null && sequenceNumberAtShardEnd.equals(sequenceNumber)) {
// If we are about to checkpoint the very last sequence number for this shard, we might as well
// just checkpoint at SHARD_END
checkpointValue = SentinelCheckpoint.SHARD_END.toString();
}
// Don't checkpoint a value we already successfully checkpointed
if (sequenceNumber != null && !sequenceNumber.equals(lastCheckpointValue)) {
try {
checkpoint.setCheckpoint(shardInfo.getShardId(), checkpointValue, shardInfo.getConcurrencyToken());
lastCheckpointValue = checkpointValue;
} catch (ThrottlingException e) {
throw e;
} catch (ShutdownException e) {
throw e;
} catch (InvalidStateException e) {
throw e;
} catch (KinesisClientLibDependencyException e) {
throw e;
} catch (KinesisClientLibException e) {
LOG.warn("Caught exception setting checkpoint.", e);
throw new KinesisClientLibDependencyException("Caught exception while checkpointing", e);
}
}
}
}

View file

@ -0,0 +1,118 @@
/*
* Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.KinesisClientLibDependencyException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.model.InvalidArgumentException;
import com.amazonaws.services.kinesis.model.ProvisionedThroughputExceededException;
import com.amazonaws.services.kinesis.model.ShardIteratorType;
/**
* This class provides some methods for validating sequence numbers. It provides a method
* {@link #validateSequenceNumber(String)} which validates a sequence number by attempting to get an iterator from
* Amazon Kinesis for that sequence number. (e.g. Before checkpointing a client provided sequence number in
* {@link RecordProcessorCheckpointer#checkpoint(String)} to prevent invalid sequence numbers from being checkpointed,
* which could prevent another shard consumer instance from processing the shard later on). This class also provides a
* utility function {@link #isDigits(String)} which is used to check whether a string is all digits
*/
class SequenceNumberValidator {
private static final Log LOG = LogFactory.getLog(SequenceNumberValidator.class);
private IKinesisProxy proxy;
private String shardId;
private boolean validateWithGetIterator;
private static final int SERVER_SIDE_ERROR_CODE = 500;
/**
* Constructor.
*
* @param proxy Kinesis proxy to be used for getIterator call
* @param shardId ShardId to check with sequence numbers
* @param validateWithGetIterator Whether to attempt to get an iterator for this shard id and the sequence numbers
* being validated
*/
SequenceNumberValidator(IKinesisProxy proxy, String shardId, boolean validateWithGetIterator) {
this.proxy = proxy;
this.shardId = shardId;
this.validateWithGetIterator = validateWithGetIterator;
}
/**
* Validates the sequence number by attempting to get an iterator from Amazon Kinesis. Repackages exceptions from
* Amazon Kinesis into the appropriate KCL exception to allow clients to determine exception handling strategies
*
* @param sequenceNumber The sequence number to be validated. Must be a numeric string
* @throws IllegalArgumentException Thrown when sequence number validation fails.
* @throws ThrottlingException Thrown when GetShardIterator returns a ProvisionedThroughputExceededException which
* indicates that too many getIterator calls are being made for this shard.
* @throws KinesisClientLibDependencyException Thrown when a service side error is received. This way clients have
* the option of retrying
*/
void validateSequenceNumber(String sequenceNumber)
throws IllegalArgumentException, ThrottlingException, KinesisClientLibDependencyException {
if (!isDigits(sequenceNumber)) {
LOG.info("Sequence number must be numeric, but was " + sequenceNumber);
throw new IllegalArgumentException("Sequence number must be numeric, but was " + sequenceNumber);
}
try {
if (validateWithGetIterator) {
proxy.getIterator(shardId, ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString(), sequenceNumber);
LOG.info("Validated sequence number " + sequenceNumber + " with shard id " + shardId);
}
} catch (InvalidArgumentException e) {
LOG.info("Sequence number " + sequenceNumber + " is invalid for shard " + shardId, e);
throw new IllegalArgumentException("Sequence number " + sequenceNumber + " is invalid for shard "
+ shardId, e);
} catch (ProvisionedThroughputExceededException e) {
// clients should have back off logic in their checkpoint logic
LOG.info("Exceeded throughput while getting an iterator for shard " + shardId, e);
throw new ThrottlingException("Exceeded throughput while getting an iterator for shard " + shardId, e);
} catch (AmazonServiceException e) {
LOG.info("Encountered service exception while getting an iterator for shard " + shardId, e);
if (e.getStatusCode() >= SERVER_SIDE_ERROR_CODE) {
// clients can choose whether to retry in their checkpoint logic
throw new KinesisClientLibDependencyException("Encountered service exception while getting an iterator"
+ " for shard " + shardId, e);
}
// Just throw any other exceptions, e.g. 400 errors caused by the client
throw e;
}
}
/**
* Checks if the string is composed of only digits.
*
* @param string
* @return true for a string of all digits, false otherwise (including false for null and empty string)
*/
static boolean isDigits(String string) {
if (string == null || string.length() == 0) {
return false;
}
for (int i = 0; i < string.length(); i++) {
if (!Character.isDigit(string.charAt(i))) {
return false;
}
}
return true;
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -75,7 +75,6 @@ class ShardConsumer {
private boolean beginShutdown;
private ShutdownReason shutdownReason;
/**
* @param shardInfo Shard information
* @param streamConfig Stream configuration to use
@ -103,7 +102,13 @@ class ShardConsumer {
this.executorService = executorService;
this.shardInfo = shardInfo;
this.checkpoint = checkpoint;
this.recordProcessorCheckpointer = new RecordProcessorCheckpointer(shardInfo, checkpoint);
this.recordProcessorCheckpointer =
new RecordProcessorCheckpointer(shardInfo,
checkpoint,
new SequenceNumberValidator(streamConfig.getStreamProxy(),
shardInfo.getShardId(),
streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()),
new CheckpointValueComparator());
this.dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
this.leaseManager = leaseManager;
this.metricsFactory = metricsFactory;
@ -145,7 +150,7 @@ class ShardConsumer {
+ " is blocked on completion of parent shard.");
} else {
LOG.debug("Caught exception running " + currentTask.getTaskType() + " task: ",
result.getException());
result.getException());
}
}
}
@ -184,11 +189,13 @@ class ShardConsumer {
return submittedNewTask;
}
// CHECKSTYLE:ON CyclomaticComplexity
/**
* Shutdown this ShardConsumer (including invoking the RecordProcessor shutdown API).
* This is called by Worker when it loses responsibility for a shard.
*
* @return true if shutdown is complete (false if shutdown is still in progress)
*/
synchronized boolean beginShutdown() {
@ -198,7 +205,7 @@ class ShardConsumer {
}
return isShutdown();
}
synchronized void markForShutdown(ShutdownReason reason) {
beginShutdown = true;
// ShutdownReason.ZOMBIE takes precedence over TERMINATE (we won't be able to save checkpoint at end of shard)
@ -216,7 +223,7 @@ class ShardConsumer {
boolean isShutdown() {
return currentState == ShardConsumerState.SHUTDOWN_COMPLETE;
}
/**
* @return the shutdownReason
*/
@ -226,6 +233,7 @@ class ShardConsumer {
/**
* Figure out next task to run based on current state, task, and shutdown context.
*
* @return Return next task to run
*/
private ITask getNextTask() {
@ -280,6 +288,7 @@ class ShardConsumer {
/**
* Note: This is a private/internal method with package level access solely for testing purposes.
* Update state based on information about: task success, current state, and shutdown info.
*
* @param taskCompletedSuccessfully Whether (current) task completed successfully.
*/
// CHECKSTYLE:OFF CyclomaticComplexity

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -65,8 +65,10 @@ class ShutdownTask implements ITask {
this.backoffTimeMillis = backoffTimeMillis;
}
/* Invokes RecordProcessor shutdown() API.
/*
* Invokes RecordProcessor shutdown() API.
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#call()
*/
@Override
@ -77,9 +79,11 @@ class ShutdownTask implements ITask {
try {
// If we reached end of the shard, set sequence number to SHARD_END.
if (reason == ShutdownReason.TERMINATE) {
recordProcessorCheckpointer.setSequenceNumber(SentinelCheckpoint.SHARD_END.toString());
recordProcessorCheckpointer.setSequenceNumberAtShardEnd(
recordProcessorCheckpointer.getLargestPermittedCheckpointValue());
recordProcessorCheckpointer.setLargestPermittedCheckpointValue(SentinelCheckpoint.SHARD_END.toString());
}
LOG.debug("Invoking shutdown() for shard " + shardInfo.getShardId() + ", concurrencyToken "
+ shardInfo.getConcurrencyToken() + ". Shutdown reason: " + reason);
try {
@ -97,7 +101,7 @@ class ShutdownTask implements ITask {
applicationException = true;
throw e;
}
if (reason == ShutdownReason.TERMINATE) {
LOG.debug("Looking for child shards of shard " + shardInfo.getShardId());
// create leases for the child shards
@ -127,7 +131,9 @@ class ShutdownTask implements ITask {
return new TaskResult(exception);
}
/* (non-Javadoc)
/*
* (non-Javadoc)
*
* @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ITask#getTaskType()
*/
@Override

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -26,6 +26,7 @@ class StreamConfig {
private final long idleTimeInMilliseconds;
private final boolean callProcessRecordsEvenForEmptyRecordList;
private InitialPositionInStream initialPositionInStream;
private final boolean validateSequenceNumberBeforeCheckpointing;
/**
* @param proxy Used to fetch records and information about the stream
@ -33,13 +34,15 @@ class StreamConfig {
* @param idleTimeInMilliseconds Idle time between get calls to the stream
* @param callProcessRecordsEvenForEmptyRecordList Call the RecordProcessor::processRecords() API even if
* GetRecords returned an empty record list.
* @param validateSequenceNumberBeforeCheckpointing Whether to call Amazon Kinesis to validate sequence numbers
*/
StreamConfig(IKinesisProxy proxy,
int maxRecords,
long idleTimeInMilliseconds,
boolean callProcessRecordsEvenForEmptyRecordList) {
boolean callProcessRecordsEvenForEmptyRecordList,
boolean validateSequenceNumberBeforeCheckpointing) {
this(proxy, maxRecords, idleTimeInMilliseconds, callProcessRecordsEvenForEmptyRecordList,
InitialPositionInStream.LATEST);
validateSequenceNumberBeforeCheckpointing, InitialPositionInStream.LATEST);
}
/**
@ -48,17 +51,20 @@ class StreamConfig {
* @param idleTimeInMilliseconds Idle time between get calls to the stream
* @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if
* GetRecords returned an empty record list.
* @param validateSequenceNumberBeforeCheckpointing Whether to call Amazon Kinesis to validate sequence numbers
* @param initialPositionInStream Initial position in stream
*/
StreamConfig(IKinesisProxy proxy,
int maxRecords,
long idleTimeInMilliseconds,
boolean callProcessRecordsEvenForEmptyRecordList,
boolean validateSequenceNumberBeforeCheckpointing,
InitialPositionInStream initialPositionInStream) {
this.streamProxy = proxy;
this.maxRecords = maxRecords;
this.idleTimeInMilliseconds = idleTimeInMilliseconds;
this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList;
this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing;
this.initialPositionInStream = initialPositionInStream;
}
@ -97,4 +103,11 @@ class StreamConfig {
return initialPositionInStream;
}
/**
* @return validateSequenceNumberBeforeCheckpointing
*/
boolean shouldValidateSequenceNumberBeforeCheckpointing() {
return validateSequenceNumberBeforeCheckpointing;
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2013 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2012-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -26,6 +26,8 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
@ -40,9 +42,10 @@ import com.amazonaws.services.kinesis.metrics.impl.CWMetricsFactory;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
/**
* Worker is the high level class that Kinesis applications use to start processing data.
* It initializes and oversees different components (e.g. syncing shard and lease information, tracking shard
* assignments, and processing data from the shards).
* Worker is the high level class that Kinesis applications use to start
* processing data. It initializes and oversees different components (e.g.
* syncing shard and lease information, tracking shard assignments, and
* processing data from the shards).
*/
public class Worker implements Runnable {
@ -56,7 +59,8 @@ public class Worker implements Runnable {
private final InitialPositionInStream initialPosition;
private final ICheckpoint checkpointTracker;
private final long idleTimeInMilliseconds;
// Backoff time when polling to check if application has finished processing parent shards
// Backoff time when polling to check if application has finished processing
// parent shards
private final long parentShardPollIntervalMillis;
private final ExecutorService executorService;
private final IMetricsFactory metricsFactory;
@ -69,13 +73,15 @@ public class Worker implements Runnable {
private boolean shutdown;
// Holds consumers for shards the worker is currently tracking. Key is shard id, value is ShardConsumer.
// Holds consumers for shards the worker is currently tracking. Key is shard
// id, value is ShardConsumer.
private ConcurrentMap<String, ShardConsumer> shardIdShardConsumerMap =
new ConcurrentHashMap<String, ShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion;
/**
* Constructor.
*
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
*/
@ -85,32 +91,34 @@ public class Worker implements Runnable {
/**
* Constructor.
*
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param execService ExecutorService to use for processing records (support for multi-threaded
* consumption)
*/
public Worker(IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config, ExecutorService execService) {
this(recordProcessorFactory, config,
new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()),
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration()),
new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(),
config.getCloudWatchClientConfiguration()),
execService);
public Worker(IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config,
ExecutorService execService) {
this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
config.getKinesisClientConfiguration()),
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
config.getDynamoDBClientConfiguration()),
new AmazonCloudWatchClient(config.getCloudWatchCredentialsProvider(),
config.getCloudWatchClientConfiguration()), execService);
}
/**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param metricsFactory Metrics factory used to emit metrics
*/
public Worker(IRecordProcessorFactory recordProcessorFactory,
public Worker(IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config,
IMetricsFactory metricsFactory) {
this(recordProcessorFactory, config, metricsFactory, Executors.newCachedThreadPool());
}
/**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
@ -118,17 +126,16 @@ public class Worker implements Runnable {
* @param execService ExecutorService to use for processing records (support for multi-threaded
* consumption)
*/
public Worker(IRecordProcessorFactory recordProcessorFactory,
public Worker(IRecordProcessorFactory recordProcessorFactory,
KinesisClientLibConfiguration config,
IMetricsFactory metricsFactory,
ExecutorService execService) {
this(recordProcessorFactory, config,
new AmazonKinesisClient(config.getKinesisCredentialsProvider(), config.getKinesisClientConfiguration()),
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(), config.getDynamoDBClientConfiguration()),
metricsFactory,
execService);
this(recordProcessorFactory, config, new AmazonKinesisClient(config.getKinesisCredentialsProvider(),
config.getKinesisClientConfiguration()),
new AmazonDynamoDBClient(config.getDynamoDBCredentialsProvider(),
config.getDynamoDBClientConfiguration()), metricsFactory, execService);
}
/**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
@ -141,17 +148,16 @@ public class Worker implements Runnable {
AmazonKinesisClient kinesisClient,
AmazonDynamoDBClient dynamoDBClient,
AmazonCloudWatchClient cloudWatchClient) {
this(recordProcessorFactory, config,
kinesisClient, dynamoDBClient, cloudWatchClient,
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, cloudWatchClient,
Executors.newCachedThreadPool());
}
/**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
* @param kinesisClient Kinesis Client used for fetching data
* @param dynamoDBClient DynamoDB client used for checkpoints and tracking leases
* @param cloudWatchClient Clould Watch Client for using cloud watch
* @param cloudWatchClient CloudWatch Client for publishing metrics
* @param execService ExecutorService to use for processing records (support for multi-threaded
* consumption)
*/
@ -161,16 +167,17 @@ public class Worker implements Runnable {
AmazonDynamoDBClient dynamoDBClient,
AmazonCloudWatchClient cloudWatchClient,
ExecutorService execService) {
this(recordProcessorFactory, config,
kinesisClient, dynamoDBClient,
new CWMetricsFactory(
cloudWatchClient,
config.getApplicationName(),
config.getMetricsBufferTimeMillis(),
config.getMetricsMaxQueueSize()),
execService);
this(recordProcessorFactory, config, kinesisClient, dynamoDBClient, new CWMetricsFactory(cloudWatchClient,
config.getApplicationName(),
config.getMetricsBufferTimeMillis(),
config.getMetricsMaxQueueSize()), execService);
if (config.getRegionName() != null) {
Region region = RegionUtils.getRegion(config.getRegionName());
cloudWatchClient.setRegion(region);
LOG.debug("The region of Amazon CloudWatch client has been set to " + config.getRegionName());
}
}
/**
* @param recordProcessorFactory Used to get record processor instances for processing data from shards
* @param config Kinesis Client Library configuration
@ -186,23 +193,40 @@ public class Worker implements Runnable {
AmazonDynamoDBClient dynamoDBClient,
IMetricsFactory metricsFactory,
ExecutorService execService) {
this(recordProcessorFactory, config,
new StreamConfig(
new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
kinesisClient).getProxy(config.getStreamName()),
this(
recordProcessorFactory,
config,
new StreamConfig(new KinesisProxyFactory(config.getKinesisCredentialsProvider(),
kinesisClient).getProxy(config.getStreamName()),
config.getMaxRecords(),
config.getIdleTimeBetweenReadsInMillis(),
config.shouldCallProcessRecordsEvenForEmptyRecordList()),
new KinesisClientLibLeaseCoordinator(
new KinesisClientLeaseManager(config.getApplicationName(), dynamoDBClient),
config.shouldCallProcessRecordsEvenForEmptyRecordList(),
config.shouldValidateSequenceNumberBeforeCheckpointing()),
new KinesisClientLibLeaseCoordinator(new KinesisClientLeaseManager(config.getApplicationName(),
dynamoDBClient),
config.getWorkerIdentifier(),
config.getFailoverTimeMillis(),
config.getEpsilonMillis(),
metricsFactory),
metricsFactory, execService);
// If an endpoint was explicitly specified, use it.
metricsFactory), metricsFactory, execService);
// If a region name was explicitly specified, use it as the region for Amazon Kinesis and Amazon DynamoDB.
if (config.getRegionName() != null) {
Region region = RegionUtils.getRegion(config.getRegionName());
kinesisClient.setRegion(region);
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getRegionName());
dynamoDBClient.setRegion(region);
LOG.debug("The region of Amazon DynamoDB client has been set to " + config.getRegionName());
}
// If a kinesis endpoint was explicitly specified, use it to set the region of kinesis.
if (config.getKinesisEndpoint() != null) {
kinesisClient.setEndpoint(config.getKinesisEndpoint());
if (config.getRegionName() != null) {
LOG.warn("Received configuration for both region name as " + config.getRegionName()
+ ", and Amazon Kinesis endpoint as " + config.getKinesisEndpoint()
+ ". Amazon Kinesis endpoint will overwrite region name.");
LOG.debug("The region of Amazon Kinesis client has been overwritten to " + config.getKinesisEndpoint());
} else {
LOG.debug("The region of Amazon Kinesis client has been set to " + config.getKinesisEndpoint());
}
}
}
@ -289,7 +313,8 @@ public class Worker implements Runnable {
}
/**
* Start consuming data from the stream, and pass it to the application record processors.
* Start consuming data from the stream, and pass it to the application
* record processors.
*/
public void run() {
try {
@ -393,7 +418,8 @@ public class Worker implements Runnable {
private void cleanupShardConsumers(Set<String> assignedShardIds) {
for (String shardId : shardIdShardConsumerMap.keySet()) {
if (!assignedShardIds.contains(shardId)) {
// Shutdown the consumer since we are not longer responsible for the shard.
// Shutdown the consumer since we are not longer responsible for
// the shard.
boolean isShutdown = shardIdShardConsumerMap.get(shardId).beginShutdown();
if (isShutdown) {
shardIdShardConsumerMap.remove(shardId);
@ -426,15 +452,16 @@ public class Worker implements Runnable {
}
/**
* Sets the killed flag so this worker will stop on the next iteration of its loop.
* Sets the killed flag so this worker will stop on the next iteration of
* its loop.
*/
public void shutdown() {
this.shutdown = true;
}
/**
* NOTE: This method is internal/private to the Worker class. It has package access solely for
* testing.
* NOTE: This method is internal/private to the Worker class. It has package
* access solely for testing.
*
* @param shardInfo Kinesis shard info
* @param factory RecordProcessor factory
@ -444,8 +471,10 @@ public class Worker implements Runnable {
synchronized (shardIdShardConsumerMap) {
String shardId = shardInfo.getShardId();
ShardConsumer consumer = shardIdShardConsumerMap.get(shardId);
// Instantiate a new consumer if we don't have one, or the one we had was from an earlier
// lease instance (and was shutdown). Don't need to create another one if the shard has been
// Instantiate a new consumer if we don't have one, or the one we
// had was from an earlier
// lease instance (and was shutdown). Don't need to create another
// one if the shard has been
// completely processed (shutdown reason terminate).
if ((consumer == null)
|| (consumer.isShutdown() && consumer.getShutdownReason().equals(ShutdownReason.ZOMBIE))) {
@ -471,10 +500,10 @@ public class Worker implements Runnable {
}
/**
* Logger for suppressing too much INFO logging.
* To avoid too much logging information Worker will output logging at INFO level
* for a single pass through the main loop every minute.
* At DEBUG level it will output all INFO logs on every pass.
* Logger for suppressing too much INFO logging. To avoid too much logging
* information Worker will output logging at INFO level for a single pass
* through the main loop every minute. At DEBUG level it will output all
* INFO logs on every pass.
*/
private static class WorkerLog {