Merge pull request #227 from pfifer/cancel-fix

Recreate the completion service instead of reusing them.
This commit is contained in:
Justin Pfifer 2017-09-22 16:18:52 -07:00 committed by GitHub
commit e8039b31ae
6 changed files with 40 additions and 29 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2 Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.8.3 Bundle-Version: 1.8.4
Bundle-Vendor: Amazon Technologies, Inc Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7 Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6", Require-Bundle: org.apache.commons.codec;bundle-version="1.6",

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId> <artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging> <packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name> <name>Amazon Kinesis Client Library for Java</name>
<version>1.8.3</version> <version>1.8.4-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data <description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis. from Amazon Kinesis.
</description> </description>

View file

@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope; import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
@ -47,7 +49,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
private final ExecutorService executorService; private final ExecutorService executorService;
private final int retryGetRecordsInSeconds; private final int retryGetRecordsInSeconds;
private final String shardId; private final String shardId;
final CompletionService<GetRecordsResult> completionService; final Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher, public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher,
final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) { final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) {
@ -56,16 +58,17 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher, public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher,
final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) { final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) {
this(dataFetcher, executorService, retryGetRecordsInSeconds, new ExecutorCompletionService<>(executorService), this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService),
shardId); shardId);
} }
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService, AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
int retryGetRecordsInSeconds, CompletionService<GetRecordsResult> completionService, String shardId) { int retryGetRecordsInSeconds, Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier,
String shardId) {
this.dataFetcher = dataFetcher; this.dataFetcher = dataFetcher;
this.executorService = executorService; this.executorService = executorService;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds; this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.completionService = completionService; this.completionServiceSupplier = completionServiceSupplier;
this.shardId = shardId; this.shardId = shardId;
} }
@ -75,6 +78,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
throw new IllegalStateException("Strategy has been shutdown"); throw new IllegalStateException("Strategy has been shutdown");
} }
GetRecordsResult result = null; GetRecordsResult result = null;
CompletionService<GetRecordsResult> completionService = completionServiceSupplier.get();
Set<Future<GetRecordsResult>> futures = new HashSet<>(); Set<Future<GetRecordsResult>> futures = new HashSet<>();
Callable<GetRecordsResult> retrieverCall = createRetrieverCallable(maxRecords); Callable<GetRecordsResult> retrieverCall = createRetrieverCallable(maxRecords);
while (true) { while (true) {
@ -98,13 +102,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
break; break;
} }
} }
futures.stream().peek(f -> f.cancel(true)).filter(Future::isCancelled).forEach(f -> { futures.forEach(f -> f.cancel(true));
try {
completionService.take();
} catch (InterruptedException e) {
log.error("Exception thrown while trying to empty the threadpool.");
}
});
return result; return result;
} }

View file

@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
/** /**
* User agent set when Amazon Kinesis Client Library makes AWS requests. * User agent set when Amazon Kinesis Client Library makes AWS requests.
*/ */
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.3"; public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.4";
/** /**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

View file

@ -27,12 +27,14 @@ import org.mockito.runners.MockitoJUnitRunner;
import java.util.concurrent.CompletionService; import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
@ -59,6 +61,10 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Mock @Mock
private ShardInfo mockShardInfo; private ShardInfo mockShardInfo;
@Mock
private Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
private CompletionService<GetRecordsResult> completionService;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy; private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
private KinesisDataFetcher dataFetcher; private KinesisDataFetcher dataFetcher;
@ -66,7 +72,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
private ExecutorService executorService; private ExecutorService executorService;
private RejectedExecutionHandler rejectedExecutionHandler; private RejectedExecutionHandler rejectedExecutionHandler;
private int numberOfRecords = 10; private int numberOfRecords = 10;
private CompletionService<GetRecordsResult> completionService;
@Before @Before
public void setup() { public void setup() {
@ -80,8 +86,9 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
new LinkedBlockingQueue<>(1), new LinkedBlockingQueue<>(1),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(), new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(),
rejectedExecutionHandler)); rejectedExecutionHandler));
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, "shardId-0001"); completionService = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
completionService = spy(getRecordsRetrivalStrategy.completionService); when(completionServiceSupplier.get()).thenReturn(completionService);
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
result = null; result = null;
} }
@ -97,12 +104,16 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
public void multiRequestTest() { public void multiRequestTest() {
result = mock(GetRecordsResult.class); result = mock(GetRecordsResult.class);
ExecutorCompletionService<GetRecordsResult> completionService1 = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService1);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords); verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords);
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any()); verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertEquals(result, getRecordsResult); assertEquals(result, getRecordsResult);
result = null; result = null;
ExecutorCompletionService<GetRecordsResult> completionService2 = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService2);
getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords); getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
assertNull(getRecordsResult); assertNull(getRecordsResult);
} }

View file

@ -30,7 +30,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
@ -51,6 +53,8 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock @Mock
private ExecutorService executorService; private ExecutorService executorService;
@Mock @Mock
private Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
@Mock
private CompletionService<GetRecordsResult> completionService; private CompletionService<GetRecordsResult> completionService;
@Mock @Mock
private Future<GetRecordsResult> successfulFuture; private Future<GetRecordsResult> successfulFuture;
@ -59,10 +63,15 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock @Mock
private GetRecordsResult expectedResults; private GetRecordsResult expectedResults;
@Before
public void before() {
when(completionServiceSupplier.get()).thenReturn(completionService);
}
@Test @Test
public void testSingleSuccessfulRequestFuture() throws Exception { public void testSingleSuccessfulRequestFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false); when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(successfulFuture); when(completionService.submit(any())).thenReturn(successfulFuture);
@ -76,8 +85,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS)); verify(completionService).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).get(); verify(successfulFuture).get();
verify(successfulFuture).cancel(eq(true)); verify(successfulFuture).cancel(eq(true));
verify(successfulFuture).isCancelled();
verify(completionService, never()).take();
assertThat(result, equalTo(expectedResults)); assertThat(result, equalTo(expectedResults));
} }
@ -85,7 +92,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test @Test
public void testBlockedAndSuccessfulFuture() throws Exception { public void testBlockedAndSuccessfulFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false); when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture); when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture);
@ -104,9 +111,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(blockedFuture, never()).get(); verify(blockedFuture, never()).get();
verify(successfulFuture).cancel(eq(true)); verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true)); verify(blockedFuture).cancel(eq(true));
verify(successfulFuture).isCancelled();
verify(blockedFuture).isCancelled();
verify(completionService).take();
assertThat(actualResults, equalTo(expectedResults)); assertThat(actualResults, equalTo(expectedResults));
} }
@ -114,7 +118,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test(expected = IllegalStateException.class) @Test(expected = IllegalStateException.class)
public void testStrategyIsShutdown() throws Exception { public void testStrategyIsShutdown() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(true); when(executorService.isShutdown()).thenReturn(true);
@ -124,7 +128,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test @Test
public void testPoolOutOfResources() throws Exception { public void testPoolOutOfResources() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID); executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false); when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture); when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
@ -141,9 +145,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS)); verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).cancel(eq(true)); verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true)); verify(blockedFuture).cancel(eq(true));
verify(successfulFuture).isCancelled();
verify(blockedFuture).isCancelled();
verify(completionService).take();
assertThat(actualResult, equalTo(expectedResults)); assertThat(actualResult, equalTo(expectedResults));
} }