Calling shutdown on the RetrievalStrategy (#222)

Fixes a bug where the retriever wasn't being shutdown when a record processor was being shutdown.
This commit is contained in:
Sahil Palvia 2017-09-22 11:43:34 -07:00 committed by Justin Pfifer
parent 9a82b6bd05
commit 4dd9423170
9 changed files with 143 additions and 99 deletions

View file

@ -36,10 +36,10 @@ To make it easier for developers to write record processors in other languages,
* [PR #188](https://github.com/awslabs/amazon-kinesis-client/pull/188)
* Support timeouts, and retry for GetRecords calls.
Applications can now set timeouts for GetRecord calls to Kinesis. As part of setting the timeout, the application must also provide a thread pool size for concurrent requests.
* [PR #214](https://github.com/awslabs/amazon-kinesis-client/pulls/214)
* [PR #214](https://github.com/awslabs/amazon-kinesis-client/pull/214)
* Notification when the lease table is throttled
When writes, or reads, to the lease table are throttled a warning will be emitted. If you're seeing this warning you should increase the IOPs for your lease table to prevent processing delays.
* [PR #212](https://github.com/awslabs/amazon-kinesis-client/pulls/212)
* [PR #212](https://github.com/awslabs/amazon-kinesis-client/pull/212)
* Support configuring the graceful shutdown timeout for MultiLang Clients
This adds support for setting the timeout that the Java process will wait for the MutliLang client to complete graceful shutdown. The timeout can be configured by adding `shutdownGraceMillis` to the properties file set to the number of milliseconds to wait.
* [PR #204](https://github.com/awslabs/amazon-kinesis-client/pull/204)

View file

@ -6,7 +6,7 @@
<artifactId>amazon-kinesis-client</artifactId>
<packaging>jar</packaging>
<name>Amazon Kinesis Client Library for Java</name>
<version>1.8.2</version>
<version>1.8.3-SNAPSHOT</version>
<description>The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data
from Amazon Kinesis.
</description>

View file

@ -312,7 +312,7 @@ class ConsumerStates {
return new ProcessTask(consumer.getShardInfo(), consumer.getStreamConfig(), consumer.getRecordProcessor(),
consumer.getRecordProcessorCheckpointer(), consumer.getDataFetcher(),
consumer.getTaskBackoffTimeMillis(), consumer.isSkipShardSyncAtWorkerInitializationIfLeasesExist(),
consumer.getRetryGetRecordsInSeconds(), consumer.getMaxGetRecordsThreadPool());
consumer.getGetRecordsRetrievalStrategy());
}
@Override
@ -516,7 +516,8 @@ class ConsumerStates {
consumer.getStreamConfig().getStreamProxy(),
consumer.getStreamConfig().getInitialPositionInStream(),
consumer.isCleanupLeasesOfCompletedShards(), consumer.getLeaseManager(),
consumer.getTaskBackoffTimeMillis());
consumer.getTaskBackoffTimeMillis(),
consumer.getGetRecordsRetrievalStrategy());
}
@Override

View file

@ -65,17 +65,6 @@ class ProcessTask implements ITask {
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) {
Optional<GetRecordsRetrievalStrategy> getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry ->
maxGetRecordsThreadPool.map(max ->
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId())));
return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
}
/**
* @param shardInfo
* contains information about the shard
@ -89,40 +78,17 @@ class ProcessTask implements ITask {
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist, Optional.empty(), Optional.empty());
}
/**
* @param shardInfo
* contains information about the shard
* @param streamConfig
* Stream configuration
* @param recordProcessor
* Record processor used to process the data records for the shard
* @param recordProcessorCheckpointer
* Passed to the RecordProcessor so it can checkpoint progress
* @param dataFetcher
* Kinesis data fetcher (used to fetch records from Kinesis)
* @param backoffTimeMillis
* backoff time when catching exceptions
* @param retryGetRecordsInSeconds
* time in seconds to wait before the worker retries to get a record.
* @param maxGetRecordsThreadPool
* max number of threads in the getRecords thread pool.
* @param getRecordsRetrievalStrategy
* The retrieval strategy for fetching records from kinesis
*/
public ProcessTask(ShardInfo shardInfo, StreamConfig streamConfig, IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer, KinesisDataFetcher dataFetcher,
long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist,
Optional<Integer> retryGetRecordsInSeconds, Optional<Integer> maxGetRecordsThreadPool) {
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
this(shardInfo, streamConfig, recordProcessor, recordProcessorCheckpointer, dataFetcher, backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new ThrottlingReporter(MAX_CONSECUTIVE_THROTTLES, shardInfo.getShardId()),
makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo));
getRecordsRetrievalStrategy);
}
/**

View file

@ -55,14 +55,24 @@ class ShardConsumer {
private final boolean cleanupLeasesOfCompletedShards;
private final long taskBackoffTimeMillis;
private final boolean skipShardSyncAtWorkerInitializationIfLeasesExist;
@Getter
private final Optional<Integer> retryGetRecordsInSeconds;
@Getter
private final Optional<Integer> maxGetRecordsThreadPool;
private ITask currentTask;
private long currentTaskSubmitTime;
private Future<TaskResult> future;
@Getter
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private static final GetRecordsRetrievalStrategy makeStrategy(KinesisDataFetcher dataFetcher,
Optional<Integer> retryGetRecordsInSeconds,
Optional<Integer> maxGetRecordsThreadPool,
ShardInfo shardInfo) {
Optional<GetRecordsRetrievalStrategy> getRecordsRetrievalStrategy = retryGetRecordsInSeconds.flatMap(retry ->
maxGetRecordsThreadPool.map(max ->
new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, retry, max, shardInfo.getShardId())));
return getRecordsRetrievalStrategy.orElse(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
}
/*
* Tracks current state. It is only updated via the consumeStream/shutdown APIs. Therefore we don't do
@ -149,8 +159,7 @@ class ShardConsumer {
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.taskBackoffTimeMillis = backoffTimeMillis;
this.skipShardSyncAtWorkerInitializationIfLeasesExist = skipShardSyncAtWorkerInitializationIfLeasesExist;
this.retryGetRecordsInSeconds = retryGetRecordsInSeconds;
this.maxGetRecordsThreadPool = maxGetRecordsThreadPool;
this.getRecordsRetrievalStrategy = makeStrategy(dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, shardInfo);
}
/**

View file

@ -1,16 +1,16 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
@ -46,20 +46,22 @@ class ShutdownTask implements ITask {
private final boolean cleanupLeasesOfCompletedShards;
private final TaskType taskType = TaskType.SHUTDOWN;
private final long backoffTimeMillis;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
/**
* Constructor.
*/
// CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES
ShutdownTask(ShardInfo shardInfo,
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason,
IKinesisProxy kinesisProxy,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis) {
IRecordProcessor recordProcessor,
RecordProcessorCheckpointer recordProcessorCheckpointer,
ShutdownReason reason,
IKinesisProxy kinesisProxy,
InitialPositionInStreamExtended initialPositionInStream,
boolean cleanupLeasesOfCompletedShards,
ILeaseManager<KinesisClientLease> leaseManager,
long backoffTimeMillis,
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
this.shardInfo = shardInfo;
this.recordProcessor = recordProcessor;
this.recordProcessorCheckpointer = recordProcessorCheckpointer;
@ -69,6 +71,7 @@ class ShutdownTask implements ITask {
this.cleanupLeasesOfCompletedShards = cleanupLeasesOfCompletedShards;
this.leaseManager = leaseManager;
this.backoffTimeMillis = backoffTimeMillis;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
}
/*
@ -79,7 +82,7 @@ class ShutdownTask implements ITask {
*/
@Override
public TaskResult call() {
Exception exception = null;
Exception exception;
boolean applicationException = false;
try {
@ -107,6 +110,8 @@ class ShutdownTask implements ITask {
+ shardInfo.getShardId());
}
}
LOG.debug("Shutting down retrieval strategy.");
getRecordsRetrievalStrategy.shutdown();
LOG.debug("Record processor completed shutdown() for shard " + shardInfo.getShardId());
} catch (Exception e) {
applicationException = true;

View file

@ -17,7 +17,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ConsumerState;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerStates.ShardConsumerState;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.never;
@ -26,7 +25,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.lang.reflect.Field;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@ -76,6 +74,8 @@ public class ConsumerStatesTest {
private IKinesisProxy kinesisProxy;
@Mock
private InitialPositionInStreamExtended initialPositionInStream;
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private long parentShardPollIntervalMillis = 0xCAFE;
private boolean cleanupLeasesOfCompletedShards = true;
@ -98,7 +98,7 @@ public class ConsumerStatesTest {
when(consumer.isCleanupLeasesOfCompletedShards()).thenReturn(cleanupLeasesOfCompletedShards);
when(consumer.getTaskBackoffTimeMillis()).thenReturn(taskBackoffTimeMillis);
when(consumer.getShutdownReason()).thenReturn(reason);
when(consumer.getGetRecordsRetrievalStrategy()).thenReturn(getRecordsRetrievalStrategy);
}
private static final Class<ILeaseManager<KinesisClientLease>> LEASE_MANAGER_CLASS = (Class<ILeaseManager<KinesisClientLease>>) (Class<?>) ILeaseManager.class;
@ -155,9 +155,6 @@ public class ConsumerStatesTest {
@Test
public void processingStateTestSynchronous() {
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.empty());
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.empty());
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer);
@ -168,7 +165,6 @@ public class ConsumerStatesTest {
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(SynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));
@ -186,9 +182,6 @@ public class ConsumerStatesTest {
@Test
public void processingStateTestAsynchronous() {
when(consumer.getMaxGetRecordsThreadPool()).thenReturn(Optional.of(1));
when(consumer.getRetryGetRecordsInSeconds()).thenReturn(Optional.of(2));
ConsumerState state = ShardConsumerState.PROCESSING.getConsumerState();
ITask task = state.createTask(consumer);
@ -199,7 +192,6 @@ public class ConsumerStatesTest {
assertThat(task, procTask(KinesisDataFetcher.class, "dataFetcher", equalTo(dataFetcher)));
assertThat(task, procTask(StreamConfig.class, "streamConfig", equalTo(streamConfig)));
assertThat(task, procTask(Long.class, "backoffTimeMillis", equalTo(taskBackoffTimeMillis)));
assertThat(task, procTask(GetRecordsRetrievalStrategy.class, "getRecordsRetrievalStrategy", instanceOf(AsynchronousGetRecordsRetrievalStrategy.class) ));
assertThat(state.successTransition(), equalTo(ShardConsumerState.PROCESSING.getConsumerState()));

View file

@ -1,16 +1,16 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
@ -20,6 +20,7 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
@ -39,6 +40,7 @@ import java.util.Date;
import java.util.List;
import java.util.ListIterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -509,6 +511,62 @@ public class ShardConsumerTest {
Thread.sleep(50L);
assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING)));
}
@Test
public void testCreateSynchronousGetRecordsRetrieval() {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer shardConsumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.empty(),
Optional.empty());
assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), SynchronousGetRecordsRetrievalStrategy.class);
}
@Test
public void testCreateAsynchronousGetRecordsRetrieval() {
ShardInfo shardInfo = new ShardInfo("s-0-0", "testToken", null, ExtendedSequenceNumber.TRIM_HORIZON);
StreamConfig streamConfig =
new StreamConfig(streamProxy,
1,
10,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, INITIAL_POSITION_LATEST);
ShardConsumer shardConsumer =
new ShardConsumer(shardInfo,
streamConfig,
checkpoint,
processor,
null,
parentShardPollIntervalMillis,
cleanupLeasesOfCompletedShards,
executorService,
metricsFactory,
taskBackoffTimeMillis,
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
Optional.of(1),
Optional.of(2));
assertEquals(shardConsumer.getGetRecordsRetrievalStrategy().getClass(), AsynchronousGetRecordsRetrievalStrategy.class);
}
//@formatter:off (gets the formatting wrong)
private void verifyConsumedRecords(List<Record> expectedRecords,

View file

@ -1,20 +1,22 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/asl/
* http://aws.amazon.com/asl/
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.HashSet;
@ -34,10 +36,14 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease;
import com.amazonaws.services.kinesis.leases.impl.KinesisClientLeaseManager;
import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
/**
*
*/
@RunWith(MockitoJUnitRunner.class)
public class ShutdownTaskTest {
private static final long TASK_BACKOFF_TIME_MILLIS = 1L;
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
@ -51,6 +57,9 @@ public class ShutdownTaskTest {
defaultParentShardIds,
ExtendedSequenceNumber.LATEST);
IRecordProcessor defaultRecordProcessor = new TestStreamlet();
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
/**
* @throws java.lang.Exception
@ -71,6 +80,7 @@ public class ShutdownTaskTest {
*/
@Before
public void setUp() throws Exception {
doNothing().when(getRecordsRetrievalStrategy).shutdown();
}
/**
@ -98,7 +108,8 @@ public class ShutdownTaskTest {
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
leaseManager,
TASK_BACKOFF_TIME_MILLIS);
TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof IllegalArgumentException);
@ -123,10 +134,12 @@ public class ShutdownTaskTest {
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
leaseManager,
TASK_BACKOFF_TIME_MILLIS);
TASK_BACKOFF_TIME_MILLIS,
getRecordsRetrievalStrategy);
TaskResult result = task.call();
Assert.assertNotNull(result.getException());
Assert.assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(getRecordsRetrievalStrategy).shutdown();
}
/**
@ -134,7 +147,7 @@ public class ShutdownTaskTest {
*/
@Test
public final void testGetTaskType() {
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0);
ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, null, 0, getRecordsRetrievalStrategy);
Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType());
}