diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF
index 3a8282e4..a7ac9a5a 100644
--- a/META-INF/MANIFEST.MF
+++ b/META-INF/MANIFEST.MF
@@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
-Bundle-Version: 1.8.3
+Bundle-Version: 1.8.4
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
diff --git a/README.md b/README.md
index ddaa6194..00ed3caa 100644
--- a/README.md
+++ b/README.md
@@ -29,6 +29,12 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
## Release Notes
+### Release 1.8.4 (September 22, 2017)
+* Create a new completion service for each request.
+ This ensures that canceled tasks are discarded. This will prevent a cancellation exception causing issues processing records.
+ * [PR #227](https://github.com/awslabs/amazon-kinesis-client/pull/227)
+ * [Issue #226](https://github.com/awslabs/amazon-kinesis-client/issues/226)
+
### Release 1.8.3 (September 22, 2017)
* Call shutdown on the retriever when the record processor is being shutdown
This fixes a bug that could leak threads if using the [`AsynchronousGetRecordsRetrievalStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/9a82b6bd05b3c9c5f8581af007141fa6d5f0fc4e/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java#L42) is being used.
diff --git a/pom.xml b/pom.xml
index 61c8c6cf..f0f783a2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
amazon-kinesis-client
jar
Amazon Kinesis Client Library for Java
- 1.8.3
+ 1.8.4
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java
index 92057327..b592c29b 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java
@@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
@@ -47,7 +49,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
private final ExecutorService executorService;
private final int retryGetRecordsInSeconds;
private final String shardId;
- final CompletionService completionService;
+ final Supplier> completionServiceSupplier;
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher,
final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) {
@@ -56,16 +58,17 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher,
final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) {
- this(dataFetcher, executorService, retryGetRecordsInSeconds, new ExecutorCompletionService<>(executorService),
+ this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService),
shardId);
}
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
- int retryGetRecordsInSeconds, CompletionService completionService, String shardId) {
+ int retryGetRecordsInSeconds, Supplier> completionServiceSupplier,
+ String shardId) {
this.dataFetcher = dataFetcher;
this.executorService = executorService;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
- this.completionService = completionService;
+ this.completionServiceSupplier = completionServiceSupplier;
this.shardId = shardId;
}
@@ -75,6 +78,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
throw new IllegalStateException("Strategy has been shutdown");
}
GetRecordsResult result = null;
+ CompletionService completionService = completionServiceSupplier.get();
Set> futures = new HashSet<>();
Callable retrieverCall = createRetrieverCallable(maxRecords);
while (true) {
@@ -98,13 +102,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
break;
}
}
- futures.stream().peek(f -> f.cancel(true)).filter(Future::isCancelled).forEach(f -> {
- try {
- completionService.take();
- } catch (InterruptedException e) {
- log.error("Exception thrown while trying to empty the threadpool.");
- }
- });
+ futures.forEach(f -> f.cancel(true));
return result;
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
index ebc4b559..9be05c43 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
@@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
- public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.3";
+ public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.4";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java
index a120a480..e8380805 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java
@@ -27,12 +27,14 @@ import org.mockito.runners.MockitoJUnitRunner;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -59,6 +61,10 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Mock
private ShardInfo mockShardInfo;
+ @Mock
+ private Supplier> completionServiceSupplier;
+
+ private CompletionService completionService;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
private KinesisDataFetcher dataFetcher;
@@ -66,7 +72,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
private ExecutorService executorService;
private RejectedExecutionHandler rejectedExecutionHandler;
private int numberOfRecords = 10;
- private CompletionService completionService;
+
@Before
public void setup() {
@@ -80,8 +86,9 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
new LinkedBlockingQueue<>(1),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(),
rejectedExecutionHandler));
- getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, "shardId-0001");
- completionService = spy(getRecordsRetrivalStrategy.completionService);
+ completionService = spy(new ExecutorCompletionService(executorService));
+ when(completionServiceSupplier.get()).thenReturn(completionService);
+ getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
result = null;
}
@@ -97,12 +104,16 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
public void multiRequestTest() {
result = mock(GetRecordsResult.class);
+ ExecutorCompletionService completionService1 = spy(new ExecutorCompletionService(executorService));
+ when(completionServiceSupplier.get()).thenReturn(completionService1);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords);
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertEquals(result, getRecordsResult);
result = null;
+ ExecutorCompletionService completionService2 = spy(new ExecutorCompletionService(executorService));
+ when(completionServiceSupplier.get()).thenReturn(completionService2);
getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
assertNull(getRecordsResult);
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java
index 9ecea68d..820f4a57 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java
@@ -30,7 +30,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -51,6 +53,8 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private ExecutorService executorService;
@Mock
+ private Supplier> completionServiceSupplier;
+ @Mock
private CompletionService completionService;
@Mock
private Future successfulFuture;
@@ -59,10 +63,15 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private GetRecordsResult expectedResults;
+ @Before
+ public void before() {
+ when(completionServiceSupplier.get()).thenReturn(completionService);
+ }
+
@Test
public void testSingleSuccessfulRequestFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
- executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
+ executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(successfulFuture);
@@ -76,8 +85,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).get();
verify(successfulFuture).cancel(eq(true));
- verify(successfulFuture).isCancelled();
- verify(completionService, never()).take();
assertThat(result, equalTo(expectedResults));
}
@@ -85,7 +92,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test
public void testBlockedAndSuccessfulFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
- executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
+ executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture);
@@ -104,9 +111,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(blockedFuture, never()).get();
verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true));
- verify(successfulFuture).isCancelled();
- verify(blockedFuture).isCancelled();
- verify(completionService).take();
assertThat(actualResults, equalTo(expectedResults));
}
@@ -114,7 +118,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test(expected = IllegalStateException.class)
public void testStrategyIsShutdown() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
- executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
+ executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(true);
@@ -124,7 +128,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test
public void testPoolOutOfResources() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
- executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
+ executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
@@ -141,9 +145,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true));
- verify(successfulFuture).isCancelled();
- verify(blockedFuture).isCancelled();
- verify(completionService).take();
+
assertThat(actualResult, equalTo(expectedResults));
}