'Version 1.1.1 of the Amazon Kinesis Client Library'

This commit is contained in:
Umesh Kumar 2014-09-11 16:40:44 +00:00
parent 13aad26a80
commit 50000086e3
6 changed files with 56 additions and 61 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2 Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.0.0 Bundle-Version: 1.1.1
Bundle-Vendor: Amazon Technologies, Inc Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0", Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0",
@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0",
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.1.1", com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.1.1",
org.apache.httpcomponents.httpcore;bundle-version="4.2.0", org.apache.httpcomponents.httpcore;bundle-version="4.2.0",
org.apache.httpcomponents.httpclient;bundle-version="4.2.0" org.apache.httpcomponents.httpclient;bundle-version="4.2.0"
com.amazonaws.sdk;bundle-version="1.6.9", com.amazonaws.sdk;bundle-version="1.7.13",
Export-Package: com.amazonaws.services.kinesis, Export-Package: com.amazonaws.services.kinesis,
com.amazonaws.services.kinesis.clientlibrary, com.amazonaws.services.kinesis.clientlibrary,
com.amazonaws.services.kinesis.clientlibrary.exceptions, com.amazonaws.services.kinesis.clientlibrary.exceptions,

40
pom.xml
View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.1.0</version> <version>1.1.1</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description> <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.</description>
<url>https://aws.amazon.com/kinesis</url> <url>https://aws.amazon.com/kinesis</url>
@ -24,7 +24,6 @@
<properties> <properties>
<aws-java-sdk.version>1.7.13</aws-java-sdk.version> <aws-java-sdk.version>1.7.13</aws-java-sdk.version>
<jackson.version>2.1.1</jackson.version>
</properties> </properties>
<dependencies> <dependencies>
@ -33,43 +32,6 @@
<artifactId>aws-java-sdk</artifactId> <artifactId>aws-java-sdk</artifactId>
<version>${aws-java-sdk.version}</version> <version>${aws-java-sdk.version}</version>
</dependency> </dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
</dependencies> </dependencies>
<developers> <developers>

View file

@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration {
/** /**
* User agent set when Amazon Kinesis Client Library makes AWS requests. * User agent set when Amazon Kinesis Client Library makes AWS requests.
*/ */
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.0"; public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.1";
/** /**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

View file

@ -350,4 +350,13 @@ class ShardConsumer {
return currentState; return currentState;
} }
/**
* Private/Internal method - has package level access solely for testing purposes.
*
* @return the beginShutdown
*/
boolean isBeginShutdown() {
return beginShutdown;
}
} }

View file

@ -41,6 +41,8 @@ class ShardInfo {
if (parentShardIds != null) { if (parentShardIds != null) {
this.parentShardIds.addAll(parentShardIds); this.parentShardIds.addAll(parentShardIds);
} }
// ShardInfo stores parent shard Ids in canonical order in the parentShardIds list.
// This makes it easy to check for equality in ShardInfo.equals method.
Collections.sort(this.parentShardIds); Collections.sort(this.parentShardIds);
} }
@ -83,6 +85,14 @@ class ShardInfo {
*/ */
// CHECKSTYLE:OFF CyclomaticComplexity // CHECKSTYLE:OFF CyclomaticComplexity
// CHECKSTYLE:OFF NPathComplexity // CHECKSTYLE:OFF NPathComplexity
/**
* This method assumes parentShardIds is ordered. The Worker.cleanupShardConsumers() method relies on this method
* returning true for ShardInfo objects which may have been instantiated with parentShardIds in a different order
* (and rest of the fields being the equal). For example shardInfo1.equals(shardInfo2) should return true with
* shardInfo1 and shardInfo2 defined as follows.
* ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2"));
* ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
*/
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) { if (this == obj) {
@ -118,8 +128,14 @@ class ShardInfo {
} }
return true; return true;
} }
// CHECKSTYLE:ON CyclomaticComplexity // CHECKSTYLE:ON CyclomaticComplexity
// CHECKSTYLE:ON NPathComplexity // CHECKSTYLE:ON NPathComplexity
@Override
public String toString() {
return "ShardInfo [shardId=" + shardId + ", concurrencyToken=" + concurrencyToken + ", parentShardIds="
+ parentShardIds + "]";
}
} }

View file

@ -74,9 +74,9 @@ public class Worker implements Runnable {
private boolean shutdown; private boolean shutdown;
// Holds consumers for shards the worker is currently tracking. Key is shard // Holds consumers for shards the worker is currently tracking. Key is shard
// id, value is ShardConsumer. // info, value is ShardConsumer.
private ConcurrentMap<String, ShardConsumer> shardIdShardConsumerMap = private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap =
new ConcurrentHashMap<String, ShardConsumer>(); new ConcurrentHashMap<ShardInfo, ShardConsumer>();
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;
/** /**
@ -328,7 +328,7 @@ public class Worker implements Runnable {
while (!shutdown) { while (!shutdown) {
try { try {
boolean foundCompletedShard = false; boolean foundCompletedShard = false;
Set<String> assignedShardIds = new HashSet<String>(); Set<ShardInfo> assignedShards = new HashSet<ShardInfo>();
for (ShardInfo shardInfo : getShardInfoForAssignments()) { for (ShardInfo shardInfo : getShardInfoForAssignments()) {
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory);
if (shardConsumer.isShutdown() if (shardConsumer.isShutdown()
@ -337,7 +337,7 @@ public class Worker implements Runnable {
} else { } else {
shardConsumer.consumeShard(); shardConsumer.consumeShard();
} }
assignedShardIds.add(shardInfo.getShardId()); assignedShards.add(shardInfo);
} }
if (foundCompletedShard) { if (foundCompletedShard) {
@ -345,7 +345,7 @@ public class Worker implements Runnable {
} }
// clean up shard consumers for unassigned shards // clean up shard consumers for unassigned shards
cleanupShardConsumers(assignedShardIds); cleanupShardConsumers(assignedShards);
wlog.info("Sleeping ..."); wlog.info("Sleeping ...");
Thread.sleep(idleTimeInMilliseconds); Thread.sleep(idleTimeInMilliseconds);
@ -415,14 +415,24 @@ public class Worker implements Runnable {
} }
} }
private void cleanupShardConsumers(Set<String> assignedShardIds) { /**
for (String shardId : shardIdShardConsumerMap.keySet()) { * NOTE: This method is internal/private to the Worker class. It has package
if (!assignedShardIds.contains(shardId)) { * access solely for testing.
*
* This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
* instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
* shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows.
* ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2"));
* ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
*/
void cleanupShardConsumers(Set<ShardInfo> assignedShards) {
for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) {
if (!assignedShards.contains(shard)) {
// Shutdown the consumer since we are not longer responsible for // Shutdown the consumer since we are not longer responsible for
// the shard. // the shard.
boolean isShutdown = shardIdShardConsumerMap.get(shardId).beginShutdown(); boolean isShutdown = shardInfoShardConsumerMap.get(shard).beginShutdown();
if (isShutdown) { if (isShutdown) {
shardIdShardConsumerMap.remove(shardId); shardInfoShardConsumerMap.remove(shard);
} }
} }
} }
@ -468,9 +478,8 @@ public class Worker implements Runnable {
* @return ShardConsumer for the shard * @return ShardConsumer for the shard
*/ */
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
synchronized (shardIdShardConsumerMap) { synchronized (shardInfoShardConsumerMap) {
String shardId = shardInfo.getShardId(); ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
ShardConsumer consumer = shardIdShardConsumerMap.get(shardId);
// Instantiate a new consumer if we don't have one, or the one we // Instantiate a new consumer if we don't have one, or the one we
// had was from an earlier // had was from an earlier
// lease instance (and was shutdown). Don't need to create another // lease instance (and was shutdown). Don't need to create another
@ -491,9 +500,8 @@ public class Worker implements Runnable {
executorService, executorService,
metricsFactory, metricsFactory,
taskBackoffTimeMillis); taskBackoffTimeMillis);
shardIdShardConsumerMap.put(shardId, consumer); shardInfoShardConsumerMap.put(shardInfo, consumer);
wlog.infoForce("Created new shardConsumer for shardId: " + shardId + ", concurrencyToken: " wlog.infoForce("Created new shardConsumer for : " + shardInfo);
+ shardInfo.getConcurrencyToken());
} }
return consumer; return consumer;
} }