Merge remote-tracking branch 'upstream/master' into prefetch

This commit is contained in:
Sahil Palvia 2017-09-25 11:29:38 -07:00
commit 86c9f3d5b9
7 changed files with 46 additions and 29 deletions

View file

@ -2,7 +2,7 @@ Manifest-Version: 1.0
Bundle-ManifestVersion: 2
Bundle-Name: Amazon Kinesis Client Library for Java
Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true
Bundle-Version: 1.8.3
Bundle-Version: 1.8.4
Bundle-Vendor: Amazon Technologies, Inc
Bundle-RequiredExecutionEnvironment: JavaSE-1.7
Require-Bundle: org.apache.commons.codec;bundle-version="1.6",

View file

@ -29,6 +29,12 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi
To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages.
## Release Notes
### Release 1.8.4 (September 22, 2017)
* Create a new completion service for each request.
This ensures that canceled tasks are discarded. This will prevent a cancellation exception causing issues processing records.
* [PR #227](https://github.com/awslabs/amazon-kinesis-client/pull/227)
* [Issue #226](https://github.com/awslabs/amazon-kinesis-client/issues/226)
### Release 1.8.3 (September 22, 2017)
* Call shutdown on the retriever when the record processor is being shutdown
This fixes a bug that could leak threads if using the [`AsynchronousGetRecordsRetrievalStrategy`](https://github.com/awslabs/amazon-kinesis-client/blob/9a82b6bd05b3c9c5f8581af007141fa6d5f0fc4e/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java#L42) is being used.

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name>
<version>1.8.3</version>
<version>1.8.4</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
</description>

View file

@ -16,6 +16,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@ -26,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper;
import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope;
@ -47,7 +49,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
private final ExecutorService executorService;
private final int retryGetRecordsInSeconds;
private final String shardId;
final CompletionService<GetRecordsResult> completionService;
final Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
public AsynchronousGetRecordsRetrievalStrategy(@NonNull final KinesisDataFetcher dataFetcher,
final int retryGetRecordsInSeconds, final int maxGetRecordsThreadPool, String shardId) {
@ -56,16 +58,17 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
public AsynchronousGetRecordsRetrievalStrategy(final KinesisDataFetcher dataFetcher,
final ExecutorService executorService, final int retryGetRecordsInSeconds, String shardId) {
this(dataFetcher, executorService, retryGetRecordsInSeconds, new ExecutorCompletionService<>(executorService),
this(dataFetcher, executorService, retryGetRecordsInSeconds, () -> new ExecutorCompletionService<>(executorService),
shardId);
}
AsynchronousGetRecordsRetrievalStrategy(KinesisDataFetcher dataFetcher, ExecutorService executorService,
int retryGetRecordsInSeconds, CompletionService<GetRecordsResult> completionService, String shardId) {
int retryGetRecordsInSeconds, Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier,
String shardId) {
this.dataFetcher = dataFetcher;
this.executorService = executorService;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.completionService = completionService;
this.completionServiceSupplier = completionServiceSupplier;
this.shardId = shardId;
}
@ -75,6 +78,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
throw new IllegalStateException("Strategy has been shutdown");
}
GetRecordsResult result = null;
CompletionService<GetRecordsResult> completionService = completionServiceSupplier.get();
Set<Future<GetRecordsResult>> futures = new HashSet<>();
Callable<GetRecordsResult> retrieverCall = createRetrieverCallable(maxRecords);
while (true) {
@ -98,13 +102,7 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie
break;
}
}
futures.stream().peek(f -> f.cancel(true)).filter(Future::isCancelled).forEach(f -> {
try {
completionService.take();
} catch (InterruptedException e) {
log.error("Exception thrown while trying to empty the threadpool.");
}
});
futures.forEach(f -> f.cancel(true));
return result;
}

View file

@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration {
/**
* User agent set when Amazon Kinesis Client Library makes AWS requests.
*/
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.3";
public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.4";
/**
* KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls

View file

@ -27,12 +27,14 @@ import org.mockito.runners.MockitoJUnitRunner;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@ -59,6 +61,10 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Mock
private ShardInfo mockShardInfo;
@Mock
private Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
private CompletionService<GetRecordsResult> completionService;
private AsynchronousGetRecordsRetrievalStrategy getRecordsRetrivalStrategy;
private KinesisDataFetcher dataFetcher;
@ -66,7 +72,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
private ExecutorService executorService;
private RejectedExecutionHandler rejectedExecutionHandler;
private int numberOfRecords = 10;
private CompletionService<GetRecordsResult> completionService;
@Before
public void setup() {
@ -80,8 +86,9 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
new LinkedBlockingQueue<>(1),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("getrecords-worker-%d").build(),
rejectedExecutionHandler));
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, "shardId-0001");
completionService = spy(getRecordsRetrivalStrategy.completionService);
completionService = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService);
getRecordsRetrivalStrategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, executorService, RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, "shardId-0001");
result = null;
}
@ -97,12 +104,16 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
public void multiRequestTest() {
result = mock(GetRecordsResult.class);
ExecutorCompletionService<GetRecordsResult> completionService1 = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService1);
GetRecordsResult getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(numberOfRecords);
verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any());
assertEquals(result, getRecordsResult);
result = null;
ExecutorCompletionService<GetRecordsResult> completionService2 = spy(new ExecutorCompletionService<GetRecordsResult>(executorService));
when(completionServiceSupplier.get()).thenReturn(completionService2);
getRecordsResult = getRecordsRetrivalStrategy.getRecords(numberOfRecords);
assertNull(getRecordsResult);
}

View file

@ -30,7 +30,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@ -51,6 +53,8 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private ExecutorService executorService;
@Mock
private Supplier<CompletionService<GetRecordsResult>> completionServiceSupplier;
@Mock
private CompletionService<GetRecordsResult> completionService;
@Mock
private Future<GetRecordsResult> successfulFuture;
@ -59,10 +63,15 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Mock
private GetRecordsResult expectedResults;
@Before
public void before() {
when(completionServiceSupplier.get()).thenReturn(completionService);
}
@Test
public void testSingleSuccessfulRequestFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(successfulFuture);
@ -76,8 +85,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).get();
verify(successfulFuture).cancel(eq(true));
verify(successfulFuture).isCancelled();
verify(completionService, never()).take();
assertThat(result, equalTo(expectedResults));
}
@ -85,7 +92,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test
public void testBlockedAndSuccessfulFuture() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenReturn(successfulFuture);
@ -104,9 +111,6 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(blockedFuture, never()).get();
verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true));
verify(successfulFuture).isCancelled();
verify(blockedFuture).isCancelled();
verify(completionService).take();
assertThat(actualResults, equalTo(expectedResults));
}
@ -114,7 +118,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test(expected = IllegalStateException.class)
public void testStrategyIsShutdown() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(true);
@ -124,7 +128,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
@Test
public void testPoolOutOfResources() throws Exception {
AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher,
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionService, SHARD_ID);
executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID);
when(executorService.isShutdown()).thenReturn(false);
when(completionService.submit(any())).thenReturn(blockedFuture).thenThrow(new RejectedExecutionException("Rejected!")).thenReturn(successfulFuture);
@ -141,9 +145,7 @@ public class AsynchronousGetRecordsRetrievalStrategyTest {
verify(completionService, times(3)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS));
verify(successfulFuture).cancel(eq(true));
verify(blockedFuture).cancel(eq(true));
verify(successfulFuture).isCancelled();
verify(blockedFuture).isCancelled();
verify(completionService).take();
assertThat(actualResult, equalTo(expectedResults));
}