diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF
index 3a8282e4..a7ac9a5a 100644
--- a/META-INF/MANIFEST.MF
+++ b/META-INF/MANIFEST.MF
@@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
-Bundle-Version: 1.8.3
+Bundle-Version: 1.8.4
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",
diff --git a/pom.xml b/pom.xml
index 61c8c6cf..5d8bdb53 100644
--- a/pom.xml
+++ b/pom.xml
@@ -6,7 +6,7 @@
amazon-kinesis-client
jar
Amazon Kinesis Client Library for Java
- 1.8.3
+ 1.8.4-SNAPSHOT
The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java
index 92057327..b592c29b 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java
@@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.HashSet;
import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
@@ -47,7 +49,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
private final ExecutorService executorService;
private final int retryGetRecordsInSeconds;
private final String shardId;
- final CompletionService completionService;
+ final Supplier> completionServiceSupplier;
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher,
final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) {
@@ -56,16 +58,17 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher,
final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) {
- this(dataFetcher, executorService, retryGetRecordsInSeconds, new ExecutorCompletionService<>(executorService),
+ this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService),
shardId);
}
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
- int retryGetRecordsInSeconds, CompletionService completionService, String shardId) {
+ int retryGetRecordsInSeconds, Supplier> completionServiceSupplier,
+ String shardId) {
this.dataFetcher = dataFetcher;
this.executorService = executorService;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
- this.completionService = completionService;
+ this.completionServiceSupplier = completionServiceSupplier;
this.shardId = shardId;
}
@@ -75,6 +78,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
throw new IllegalStateException("Strategy has been shutdown");
}
GetRecordsResult result = null;
+ CompletionService completionService = completionServiceSupplier.get();
Set> futures = new HashSet<>();
Callable retrieverCall = createRetrieverCallable(maxRecords);
while (true) {
@@ -98,13 +102,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
break;
}
}
- futures.stream().peek(f -> f.cancel(true)).filter(Future::isCancelled).forEach(f -> {
- try {
- completionService.take();
- } catch (InterruptedException e) {
- log.error("Exception thrown while trying to empty the threadpool.");
- }
- });
+ futures.forEach(f -> f.cancel(true));
return result;
}
diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
index c970daa0..4620bfe4 100644
--- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
+++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java
@@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
- public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.3";
+ public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.4";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java
index 8518c992..b3659605 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java
@@ -27,12 +27,14 @@ import org.mockito.runners.MockitoJUnitRunner;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -59,6 +61,10 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Mock
private ShardInfo mockShardInfo;
+ @Mock
+ private Supplier> completionServiceSupplier;
+
+ private CompletionService completionService;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
private KinesisDataFetcher dataFetcher;
@@ -66,7 +72,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
private ExecutorService executorService;
private RejectedExecutionHandler rejectedExecutionHandler;
private int numberOfRecords = 10;
- private CompletionService completionService;
+
@Before
public void setup() {
@@ -80,8 +86,9 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
new LinkedBlockingQueue<>(1),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(),
rejectedExecutionHandler));
- getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, "shardId-0001");
- completionService = spy(getRecordsRetrivalStrategy.completionService);
+ completionService = spy(new ExecutorCompletionService(executorService));
+ when(completionServiceSupplier.get()).thenReturn(completionService);
+ getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
result = null;
}
@@ -97,12 +104,16 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
public void multiRequestTest() {
result = mock(GetRecordsResult.class);
+ ExecutorCompletionService completionService1 = spy(new ExecutorCompletionService(executorService));
+ when(completionServiceSupplier.get()).thenReturn(completionService1);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords);
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertEquals(result, getRecordsResult);
result = null;
+ ExecutorCompletionService completionService2 = spy(new ExecutorCompletionService(executorService));
+ when(completionServiceSupplier.get()).thenReturn(completionService2);
getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
assertNull(getRecordsResult);
}
diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java
index 9ecea68d..820f4a57 100644
--- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java
+++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java
@@ -30,7 +30,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -51,6 +53,8 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private ExecutorService executorService;
@Mock
+ private Supplier> completionServiceSupplier;
+ @Mock
private CompletionService completionService;
@Mock
private Future successfulFuture;
@@ -59,10 +63,15 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private GetRecordsResult expectedResults;
+ @Before
+ public void before() {
+ when(completionServiceSupplier.get()).thenReturn(completionService);
+ }
+
@Test
public void testSingleSuccessfulRequestFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
- executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
+ executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(successfulFuture);
@@ -76,8 +85,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).get();
verify(successfulFuture).cancel(eq(true));
- verify(successfulFuture).isCancelled();
- verify(completionService, never()).take();
assertThat(result, equalTo(expectedResults));
}
@@ -85,7 +92,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test
public void testBlockedAndSuccessfulFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
- executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
+ executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture);
@@ -104,9 +111,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(blockedFuture, never()).get();
verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true));
- verify(successfulFuture).isCancelled();
- verify(blockedFuture).isCancelled();
- verify(completionService).take();
assertThat(actualResults, equalTo(expectedResults));
}
@@ -114,7 +118,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test(expected = IllegalStateException.class)
public void testStrategyIsShutdown() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
- executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
+ executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(true);
@@ -124,7 +128,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test
public void testPoolOutOfResources() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
- executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
+ executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
@@ -141,9 +145,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true));
- verify(successfulFuture).isCancelled();
- verify(blockedFuture).isCancelled();
- verify(completionService).take();
+
assertThat(actualResult, equalTo(expectedResults));
}