From 50000086e31bf71a6204befc8a6c1d9965a02ea0 Mon Sep 17 00:00:00 2001 From: Umesh Kumar Date: Thu, 11 Sep 2014 16:40:44 +0000 Subject: [PATCH] 'Version 1.1.1 of the Amazon Kinesis Client Library' --- META-INF/MANIFEST.MF | 4 +- pom.xml | 40 +----------------- .../worker/KinesisClientLibConfiguration.java | 2 +- .../lib/worker/ShardConsumer.java | 9 ++++ .../clientlibrary/lib/worker/ShardInfo.java | 20 ++++++++- .../clientlibrary/lib/worker/Worker.java | 42 +++++++++++-------- 6 files changed, 56 insertions(+), 61 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index 7406f984..fd9b3a61 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.0.0 +Bundle-Version: 1.1.1 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0", @@ -12,7 +12,7 @@ Require-Bundle: org.apache.commons.codec;bundle-version="1.3.0", com.fasterxml.jackson.core.jackson-annotations;bundle-version="2.1.1", org.apache.httpcomponents.httpcore;bundle-version="4.2.0", org.apache.httpcomponents.httpclient;bundle-version="4.2.0" - com.amazonaws.sdk;bundle-version="1.6.9", + com.amazonaws.sdk;bundle-version="1.7.13", Export-Package: com.amazonaws.services.kinesis, com.amazonaws.services.kinesis.clientlibrary, com.amazonaws.services.kinesis.clientlibrary.exceptions, diff --git a/pom.xml b/pom.xml index 0ec97c1f..97650fbb 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.1.0 + 1.1.1 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. https://aws.amazon.com/kinesis @@ -24,7 +24,6 @@ 1.7.13 - 2.1.1 @@ -33,43 +32,6 @@ aws-java-sdk ${aws-java-sdk.version} - - commons-logging - commons-logging - 1.1.1 - - - org.apache.httpcomponents - httpclient - 4.2 - - - commons-codec - commons-codec - 1.3 - - - com.fasterxml.jackson.core - jackson-core - ${jackson.version} - jar - compile - - - com.fasterxml.jackson.core - jackson-databind - ${jackson.version} - jar - compile - - - com.fasterxml.jackson.core - jackson-annotations - ${jackson.version} - jar - compile - - diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 784994d2..3cfba902 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -85,7 +85,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.0"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.1.1"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index f5df7537..843ba504 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -350,4 +350,13 @@ class ShardConsumer { return currentState; } + /** + * Private/Internal method - has package level access solely for testing purposes. + * + * @return the beginShutdown + */ + boolean isBeginShutdown() { + return beginShutdown; + } + } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java index b3a45c5c..54f64568 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java @@ -23,7 +23,7 @@ import java.util.List; * Used to pass shard related info among different classes and as a key to the map of shard consumers. */ class ShardInfo { - + private final String shardId; private final String concurrencyToken; // Sorted list of parent shardIds. @@ -41,6 +41,8 @@ class ShardInfo { if (parentShardIds != null) { this.parentShardIds.addAll(parentShardIds); } + // ShardInfo stores parent shard Ids in canonical order in the parentShardIds list. + // This makes it easy to check for equality in ShardInfo.equals method. Collections.sort(this.parentShardIds); } @@ -83,6 +85,14 @@ class ShardInfo { */ // CHECKSTYLE:OFF CyclomaticComplexity // CHECKSTYLE:OFF NPathComplexity + /** + * This method assumes parentShardIds is ordered. The Worker.cleanupShardConsumers() method relies on this method + * returning true for ShardInfo objects which may have been instantiated with parentShardIds in a different order + * (and rest of the fields being the equal). For example shardInfo1.equals(shardInfo2) should return true with + * shardInfo1 and shardInfo2 defined as follows. + * ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); + * ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); + */ @Override public boolean equals(Object obj) { if (this == obj) { @@ -118,8 +128,14 @@ class ShardInfo { } return true; } + // CHECKSTYLE:ON CyclomaticComplexity // CHECKSTYLE:ON NPathComplexity - + + @Override + public String toString() { + return "ShardInfo [shardId=" + shardId + ", concurrencyToken=" + concurrencyToken + ", parentShardIds=" + + parentShardIds + "]"; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 3f2117c1..8eedb5e9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -74,9 +74,9 @@ public class Worker implements Runnable { private boolean shutdown; // Holds consumers for shards the worker is currently tracking. Key is shard - // id, value is ShardConsumer. - private ConcurrentMap shardIdShardConsumerMap = - new ConcurrentHashMap(); + // info, value is ShardConsumer. + private ConcurrentMap shardInfoShardConsumerMap = + new ConcurrentHashMap(); private final boolean cleanupLeasesUponShardCompletion; /** @@ -328,7 +328,7 @@ public class Worker implements Runnable { while (!shutdown) { try { boolean foundCompletedShard = false; - Set assignedShardIds = new HashSet(); + Set assignedShards = new HashSet(); for (ShardInfo shardInfo : getShardInfoForAssignments()) { ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, recordProcessorFactory); if (shardConsumer.isShutdown() @@ -337,7 +337,7 @@ public class Worker implements Runnable { } else { shardConsumer.consumeShard(); } - assignedShardIds.add(shardInfo.getShardId()); + assignedShards.add(shardInfo); } if (foundCompletedShard) { @@ -345,7 +345,7 @@ public class Worker implements Runnable { } // clean up shard consumers for unassigned shards - cleanupShardConsumers(assignedShardIds); + cleanupShardConsumers(assignedShards); wlog.info("Sleeping ..."); Thread.sleep(idleTimeInMilliseconds); @@ -415,14 +415,24 @@ public class Worker implements Runnable { } } - private void cleanupShardConsumers(Set assignedShardIds) { - for (String shardId : shardIdShardConsumerMap.keySet()) { - if (!assignedShardIds.contains(shardId)) { + /** + * NOTE: This method is internal/private to the Worker class. It has package + * access solely for testing. + * + * This method relies on ShardInfo.equals() method returning true for ShardInfo objects which may have been + * instantiated with parentShardIds in a different order (and rest of the fields being the equal). For example + * shardInfo1.equals(shardInfo2) should return true with shardInfo1 and shardInfo2 defined as follows. + * ShardInfo shardInfo1 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent1", "parent2")); + * ShardInfo shardInfo2 = new ShardInfo(shardId1, concurrencyToken1, Arrays.asList("parent2", "parent1")); + */ + void cleanupShardConsumers(Set assignedShards) { + for (ShardInfo shard : shardInfoShardConsumerMap.keySet()) { + if (!assignedShards.contains(shard)) { // Shutdown the consumer since we are not longer responsible for // the shard. - boolean isShutdown = shardIdShardConsumerMap.get(shardId).beginShutdown(); + boolean isShutdown = shardInfoShardConsumerMap.get(shard).beginShutdown(); if (isShutdown) { - shardIdShardConsumerMap.remove(shardId); + shardInfoShardConsumerMap.remove(shard); } } } @@ -468,9 +478,8 @@ public class Worker implements Runnable { * @return ShardConsumer for the shard */ ShardConsumer createOrGetShardConsumer(ShardInfo shardInfo, IRecordProcessorFactory factory) { - synchronized (shardIdShardConsumerMap) { - String shardId = shardInfo.getShardId(); - ShardConsumer consumer = shardIdShardConsumerMap.get(shardId); + synchronized (shardInfoShardConsumerMap) { + ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier // lease instance (and was shutdown). Don't need to create another @@ -491,9 +500,8 @@ public class Worker implements Runnable { executorService, metricsFactory, taskBackoffTimeMillis); - shardIdShardConsumerMap.put(shardId, consumer); - wlog.infoForce("Created new shardConsumer for shardId: " + shardId + ", concurrencyToken: " - + shardInfo.getConcurrencyToken()); + shardInfoShardConsumerMap.put(shardInfo, consumer); + wlog.infoForce("Created new shardConsumer for : " + shardInfo); } return consumer; }