diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF
index 7406f984..fd9b3a61 100644
--- a/META-INF/MANIFEST.MF
+++ b/META-INF/MANIFEST.MF
@@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
-Bundle-Version: 1.0.0
+Bundle-Version: 1.1.1
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0",
@@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0",
com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.1.1",
org.apache.httpcomponents.httpcore;bundle-version="4.2.0",
org.apache.httpcomponents.httpclient;bundle-version="4.2.0"
- com.amazonaws.sdk;bundle-version="1.6.9",
+ com.amazonaws.sdk;bundle-version="1.7.13",
Export-Package: com.amazonaws.services.kinesis,
com.amazonaws.services.kinesis.clientlibrary,
com.amazonaws.services.kinesis.clientlibrary.exceptions,
diff --git a/pom.xml b/pom.xml
index 0ec97c1f..97650fbb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
amazon-kinesis-client
jar
Amazon Kinesis Client Library for Java
- 1.1.0
+ 1.1.1
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis.
https://aws.amazon.com/kinesis
@@ -24,7 +24,6 @@
1.7.13
- 2.1.1
@@ -33,43 +32,6 @@
aws-java-sdk
${aws-java-sdk.version}
-
- commons-logging
- commons-logging
- 1.1.1
-
-
- org.apache.httpcomponents
- httpclient
- 4.2
-
-
- commons-codec
- commons-codec
- 1.3
-
-
- com.fasterxml.jackson.core
- jackson-core
- ${jackson.version}
- jar
- compile
-
-
- com.fasterxml.jackson.core
- jackson-databind
- ${jackson.version}
- jar
- compile
-
-
- com.fasterxml.jackson.core
- jackson-annotations
- ${jackson.version}
- jar
- compile
-
-
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
index 784994d2..3cfba902 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
@@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
- public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.0";
+ public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.1";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
index f5df7537..843ba504 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java
@@ -350,4 +350,13 @@ class ShardConsumer {
return currentState;
}
+ /**
+ * Private/Internal method - has package level access solely for testing purposes.
+ *
+ * @return the beginShutdown
+ */
+ boolean isBeginShutdown() {
+ return beginShutdown;
+ }
+
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java
index b3a45c5c..54f64568 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java
@@ -23,7 +23,7 @@ import java.util.List;
* Used to pass shard related info among different classes and as a key to the map of shard consumers.
*/
class ShardInfo {
-
+
private final String shardId;
private final String concurrencyToken;
// Sorted list of parent shardIds.
@@ -41,6 +41,8 @@ class ShardInfo {
if (parentShardIds != null) {
this.parentShardIds.addAll(parentShardIds);
}
+ // ShardInfo stores parent shard Ids in canonical order in the parentShardIds list.
+ // This makes it easy to check for equality in ShardInfo.equals method.
Collections.sort(this.parentShardIds);
}
@@ -83,6 +85,14 @@ class ShardInfo {
*/
// CHECKSTYLE:OFF CyclomaticComplexity
// CHECKSTYLE:OFF NPathComplexity
+ /**
+ * This method assumes parentShardIds is ordered. The Worker.cleanupShardConsumers() method relies on this method
+ * returning true for ShardInfo objects which may have been instantiated with parentShardIds in a different order
+ * (and rest of the fields being the equal). For example shardInfo1.equals(shardInfo2) should return true with
+ * shardInfo1 and shardInfo2 defined as follows.
+ * ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2"));
+ * ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
+ */
@Override
public boolean equals(Object obj) {
if (this == obj) {
@@ -118,8 +128,14 @@ class ShardInfo {
}
return true;
}
+
// CHECKSTYLE:ON CyclomaticComplexity
// CHECKSTYLE:ON NPathComplexity
-
+
+ @Override
+ public String toString() {
+ return "ShardInfo [shardId=" + shardId + ", concurrencyToken=" + concurrencyToken + ", parentShardIds="
+ + parentShardIds + "]";
+ }
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
index 3f2117c1..8eedb5e9 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java
@@ -74,9 +74,9 @@ public class Worker implements Runnable {
private boolean shutdown;
// Holds consumers for shards the worker is currently tracking. Key is shard
- // id, value is ShardConsumer.
- private ConcurrentMap shardIdShardConsumerMap =
- new ConcurrentHashMap();
+ // info, value is ShardConsumer.
+ private ConcurrentMap shardInfoShardConsumerMap =
+ new ConcurrentHashMap();
private final boolean cleanupLeasesUponShardCompletion;
/**
@@ -328,7 +328,7 @@ public class Worker implements Runnable {
while (!shutdown) {
try {
boolean foundCompletedShard = false;
- Set assignedShardIds = new HashSet();
+ Set assignedShards = new HashSet();
for (ShardInfo shardInfo : getShardInfoForAssignments()) {
ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory);
if (shardConsumer.isShutdown()
@@ -337,7 +337,7 @@ public class Worker implements Runnable {
} else {
shardConsumer.consumeShard();
}
- assignedShardIds.add(shardInfo.getShardId());
+ assignedShards.add(shardInfo);
}
if (foundCompletedShard) {
@@ -345,7 +345,7 @@ public class Worker implements Runnable {
}
// clean up shard consumers for unassigned shards
- cleanupShardConsumers(assignedShardIds);
+ cleanupShardConsumers(assignedShards);
wlog.info("Sleeping ...");
Thread.sleep(idleTimeInMilliseconds);
@@ -415,14 +415,24 @@ public class Worker implements Runnable {
}
}
- private void cleanupShardConsumers(Set assignedShardIds) {
- for (String shardId : shardIdShardConsumerMap.keySet()) {
- if (!assignedShardIds.contains(shardId)) {
+ /**
+ * NOTE: This method is internal/private to the Worker class. It has package
+ * access solely for testing.
+ *
+ * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been
+ * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example
+ * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows.
+ * ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2"));
+ * ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1"));
+ */
+ void cleanupShardConsumers(Set assignedShards) {
+ for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) {
+ if (!assignedShards.contains(shard)) {
// Shutdown the consumer since we are not longer responsible for
// the shard.
- boolean isShutdown = shardIdShardConsumerMap.get(shardId).beginShutdown();
+ boolean isShutdown = shardInfoShardConsumerMap.get(shard).beginShutdown();
if (isShutdown) {
- shardIdShardConsumerMap.remove(shardId);
+ shardInfoShardConsumerMap.remove(shard);
}
}
}
@@ -468,9 +478,8 @@ public class Worker implements Runnable {
* @return ShardConsumer for the shard
*/
ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) {
- synchronized (shardIdShardConsumerMap) {
- String shardId = shardInfo.getShardId();
- ShardConsumer consumer = shardIdShardConsumerMap.get(shardId);
+ synchronized (shardInfoShardConsumerMap) {
+ ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo);
// Instantiate a new consumer if we don't have one, or the one we
// had was from an earlier
// lease instance (and was shutdown). Don't need to create another
@@ -491,9 +500,8 @@ public class Worker implements Runnable {
executorService,
metricsFactory,
taskBackoffTimeMillis);
- shardIdShardConsumerMap.put(shardId, consumer);
- wlog.infoForce("Created new shardConsumer for shardId: " + shardId + ", concurrencyToken: "
- + shardInfo.getConcurrencyToken());
+ shardInfoShardConsumerMap.put(shardInfo, consumer);
+ wlog.infoForce("Created new shardConsumer for : " + shardInfo);
}
return consumer;
}