From 5c3ff2b31edd65e578095ff2adb5200b0efc6a45 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 8 Nov 2017 12:03:09 -0800 Subject: [PATCH 1/5] Handle Expired Iterators Correctly Fix for the lease losses in the PrefetchCache and AsyncGetRecordsStrategy caused due to ExpiredIteratorException. (#263) --- ...ynchronousGetRecordsRetrievalStrategy.java | 57 +++++++++++-------- .../worker/GetRecordsRetrievalStrategy.java | 7 +++ .../lib/worker/KinesisDataFetcher.java | 21 +++++++ .../lib/worker/PrefetchGetRecordsCache.java | 21 ++++++- .../lib/worker/RecordsFetcherFactory.java | 3 +- .../worker/SimpleRecordsFetcherFactory.java | 7 ++- ...ynchronousGetRecordsRetrievalStrategy.java | 5 ++ ...cordsRetrievalStrategyIntegrationTest.java | 23 ++++++++ ...ronousGetRecordsRetrievalStrategyTest.java | 27 +++++++++ .../lib/worker/KinesisDataFetcherTest.java | 40 +++++++++++++ ...refetchGetRecordsCacheIntegrationTest.java | 39 ++++++++++--- .../worker/PrefetchGetRecordsCacheTest.java | 25 +++++++- .../lib/worker/RecordsFetcherFactoryTest.java | 5 +- .../lib/worker/ShardConsumerTest.java | 8 ++- .../clientlibrary/lib/worker/WorkerTest.java | 4 +- 15 files changed, 248 insertions(+), 44 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java index 2e3cbd9e..2db74fba5 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategy.java @@ -16,7 +16,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.HashSet; import java.util.Set; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutionException; @@ -31,6 +30,7 @@ import java.util.function.Supplier; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingScope; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -81,33 +81,39 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie CompletionService completionService = completionServiceSupplier.get(); Set> futures = new HashSet<>(); Callable retrieverCall = createRetrieverCallable(maxRecords); - while (true) { - try { - futures.add(completionService.submit(retrieverCall)); - } catch (RejectedExecutionException e) { - log.warn("Out of resources, unable to start additional requests."); - } + try { + while (true) { + try { + futures.add(completionService.submit(retrieverCall)); + } catch (RejectedExecutionException e) { + log.warn("Out of resources, unable to start additional requests."); + } - try { - Future resultFuture = completionService.poll(retryGetRecordsInSeconds, - TimeUnit.SECONDS); - if (resultFuture != null) { - // - // Fix to ensure that we only let the shard iterator advance when we intend to return the result - // to the caller. This ensures that the shard iterator is consistently advance in step with - // what the caller sees. - // - result = resultFuture.get().accept(); + try { + Future resultFuture = completionService.poll(retryGetRecordsInSeconds, + TimeUnit.SECONDS); + if (resultFuture != null) { + // + // Fix to ensure that we only let the shard iterator advance when we intend to return the result + // to the caller. This ensures that the shard iterator is consistently advance in step with + // what the caller sees. + // + result = resultFuture.get().accept(); + break; + } + } catch (ExecutionException e) { + if (e.getCause() instanceof ExpiredIteratorException) { + throw (ExpiredIteratorException) e.getCause(); + } + log.error("ExecutionException thrown while trying to get records", e); + } catch (InterruptedException e) { + log.error("Thread was interrupted", e); break; } - } catch (ExecutionException e) { - log.error("ExecutionException thrown while trying to get records", e); - } catch (InterruptedException e) { - log.error("Thread was interrupted", e); - break; } + } finally { + futures.forEach(f -> f.cancel(true)); } - futures.forEach(f -> f.cancel(true)); return result; } @@ -140,4 +146,9 @@ public class AsynchronousGetRecordsRetrievalStrategy implements GetRecordsRetrie new ThreadFactoryBuilder().setDaemon(true).setNameFormat(threadNameFormat).build(), new ThreadPoolExecutor.AbortPolicy()); } + + @Override + public KinesisDataFetcher getDataFetcher() { + return dataFetcher; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java index 8f7afe25..4f474887 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/GetRecordsRetrievalStrategy.java @@ -44,4 +44,11 @@ public interface GetRecordsRetrievalStrategy { * @return true if the strategy has been shutdown, false otherwise. */ boolean isShutdown(); + + /** + * Returns the KinesisDataFetcher used to getRecords from Kinesis. + * + * @return KinesisDataFetcher + */ + KinesisDataFetcher getDataFetcher(); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index c2ba9d15..0bd4bee3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -17,6 +17,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Collections; import java.util.Date; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -27,6 +28,8 @@ import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import com.amazonaws.services.kinesis.model.ShardIteratorType; +import com.amazonaws.util.CollectionUtils; +import com.google.common.collect.Iterables; import lombok.Data; @@ -42,6 +45,8 @@ class KinesisDataFetcher { private final String shardId; private boolean isShardEndReached; private boolean isInitialized; + private String lastKnownSequenceNumber; + private InitialPositionInStreamExtended initialPositionInStream; /** * @@ -108,6 +113,9 @@ class KinesisDataFetcher { @Override public GetRecordsResult accept() { nextIterator = result.getNextShardIterator(); + if (!CollectionUtils.isNullOrEmpty(result.getRecords())) { + lastKnownSequenceNumber = Iterables.getLast(result.getRecords()).getSequenceNumber(); + } if (nextIterator == null) { isShardEndReached = true; } @@ -161,6 +169,8 @@ class KinesisDataFetcher { if (nextIterator == null) { isShardEndReached = true; } + this.lastKnownSequenceNumber = sequenceNumber; + this.initialPositionInStream = initialPositionInStream; } /** @@ -217,6 +227,17 @@ class KinesisDataFetcher { return iterator; } + /** + * Gets a new iterator from the last known sequence number i.e. the sequence number of the last record from the last + * getRecords call. + */ + public void restartIterator() { + if (StringUtils.isEmpty(lastKnownSequenceNumber) || initialPositionInStream == null) { + throw new IllegalStateException("Make sure to initialize the KinesisDataFetcher before restarting the iterator."); + } + advanceIteratorTo(lastKnownSequenceNumber, initialPositionInStream); + } + /** * @return the shardEndReached */ diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index 06e77c8c..982d70cc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -23,10 +23,13 @@ import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.lang.Validate; import com.amazonaws.SdkClientException; +import com.amazonaws.services.cloudwatch.model.StandardUnit; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.impl.ThreadSafeMetricsDelegatingFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsResult; import lombok.NonNull; @@ -42,6 +45,7 @@ import lombok.extern.apachecommons.CommonsLog; */ @CommonsLog public class PrefetchGetRecordsCache implements GetRecordsCache { + private static final String EXPIRED_ITERATOR_METRIC = "ExpiredIterator"; LinkedBlockingQueue getRecordsResultQueue; private int maxPendingProcessRecordsInput; private int maxByteSize; @@ -56,6 +60,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private PrefetchCounters prefetchCounters; private boolean started = false; private final String operation; + private final KinesisDataFetcher dataFetcher; + private final String shardId; /** * Constructor for the PrefetchGetRecordsCache. This cache prefetches records from Kinesis and stores them in a @@ -76,9 +82,10 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, @NonNull final ExecutorService executorService, - long idleMillisBetweenCalls, + final long idleMillisBetweenCalls, @NonNull final IMetricsFactory metricsFactory, - @NonNull String operation) { + @NonNull final String operation, + @NonNull final String shardId) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; this.maxPendingProcessRecordsInput = maxPendingProcessRecordsInput; @@ -92,6 +99,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.defaultGetRecordsCacheDaemon = new DefaultGetRecordsCacheDaemon(); Validate.notEmpty(operation, "Operation cannot be empty"); this.operation = operation; + this.dataFetcher = this.getRecordsRetrievalStrategy.getDataFetcher(); + this.shardId = shardId; } @Override @@ -162,6 +171,14 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { log.info("Thread was interrupted, indicating shutdown was called on the cache."); + } catch (ExpiredIteratorException e) { + log.info(String.format("ShardId %s: getRecords threw ExpiredIteratorException - restarting" + + " after greatest seqNum passed to customer", shardId), e); + + MetricsHelper.getMetricsScope().addData(EXPIRED_ITERATOR_METRIC, 1, StandardUnit.Count, + MetricsLevel.SUMMARY); + + dataFetcher.restartIterator(); } catch (SdkClientException e) { log.error("Exception thrown while fetching records from Kinesis", e); } catch (Throwable e) { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index be8316d7..afc6c4f2 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -29,7 +29,8 @@ public interface RecordsFetcherFactory { * * @return GetRecordsCache used to get records from Kinesis. */ - GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory); + GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, + IMetricsFactory metricsFactory); /** * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 44c93e7b..bd33fd98 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -18,6 +18,7 @@ import java.util.concurrent.Executors; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; + import lombok.extern.apachecommons.CommonsLog; @CommonsLog @@ -34,7 +35,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { } @Override - public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, IMetricsFactory metricsFactory) { + public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, + IMetricsFactory metricsFactory) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); } else { @@ -46,7 +48,8 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { .build()), idleMillisBetweenCalls, metricsFactory, - "ProcessTask"); + "ProcessTask", + shardId); } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java index c862c348..f4209189 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SynchronousGetRecordsRetrievalStrategy.java @@ -42,4 +42,9 @@ public class SynchronousGetRecordsRetrievalStrategy implements GetRecordsRetriev public boolean isShutdown() { return false; } + + @Override + public KinesisDataFetcher getDataFetcher() { + return dataFetcher; + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java index 30b877e8..37f58c1c 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java @@ -38,17 +38,21 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.mockito.stubbing.Answer; @RunWith(MockitoJUnitRunner.class) public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @@ -133,6 +137,24 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { verify(mockFuture).get(); assertNull(getRecordsResult); } + + @Test (expected = ExpiredIteratorException.class) + public void testExpiredIteratorExcpetion() throws InterruptedException { + when(dataFetcher.getRecords(eq(numberOfRecords))).thenAnswer(new Answer() { + @Override + public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws Throwable { + Thread.sleep(SLEEP_GET_RECORDS_IN_SECONDS * 1000); + throw new ExpiredIteratorException("ExpiredIterator"); + } + }); + + try { + getRecordsRetrivalStrategy.getRecords(numberOfRecords); + } finally { + verify(dataFetcher, atLeast(getLeastNumberOfCalls())).getRecords(eq(numberOfRecords)); + verify(executorService, atLeast(getLeastNumberOfCalls())).execute(any()); + } + } private int getLeastNumberOfCalls() { int leastNumberOfCalls = 0; @@ -163,6 +185,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { } catch (InterruptedException e) { // Do nothing } + return result; } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java index aa9e9a24..151300de 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyTest.java @@ -20,20 +20,25 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -153,5 +158,27 @@ public class AsynchronousGetRecordsRetrievalStrategyTest { assertThat(actualResult, equalTo(expectedResults)); } + + @Test (expected = ExpiredIteratorException.class) + public void testExpiredIteratorExceptionCase() throws Exception { + AsynchronousGetRecordsRetrievalStrategy strategy = new AsynchronousGetRecordsRetrievalStrategy(dataFetcher, + executorService, (int) RETRY_GET_RECORDS_IN_SECONDS, completionServiceSupplier, SHARD_ID); + Future successfulFuture2 = mock(Future.class); + + when(executorService.isShutdown()).thenReturn(false); + when(completionService.submit(any())).thenReturn(successfulFuture, successfulFuture2); + when(completionService.poll(anyLong(), any())).thenReturn(null).thenReturn(successfulFuture); + when(successfulFuture.get()).thenThrow(new ExecutionException(new ExpiredIteratorException("ExpiredException"))); + + try { + strategy.getRecords(10); + } finally { + verify(executorService).isShutdown(); + verify(completionService, times(2)).submit(any()); + verify(completionService, times(2)).poll(eq(RETRY_GET_RECORDS_IN_SECONDS), eq(TimeUnit.SECONDS)); + verify(successfulFuture).cancel(eq(true)); + verify(successfulFuture2).cancel(eq(true)); + } + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index 6648b919..fbe720ae 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -32,6 +32,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; @@ -273,6 +274,45 @@ public class KinesisDataFetcherTest { verify(kinesisProxy, never()).get(anyString(), anyInt()); } + + @Test + public void testRestartIterator() { + GetRecordsResult getRecordsResult = mock(GetRecordsResult.class); + GetRecordsResult restartGetRecordsResult = new GetRecordsResult(); + Record record = mock(Record.class); + final String initialIterator = "InitialIterator"; + final String nextShardIterator = "NextShardIterator"; + final String restartShardIterator = "RestartIterator"; + final String sequenceNumber = "SequenceNumber"; + final String iteratorType = "AT_SEQUENCE_NUMBER"; + KinesisProxy kinesisProxy = mock(KinesisProxy.class); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO); + + when(kinesisProxy.getIterator(eq(SHARD_ID), eq(InitialPositionInStream.LATEST.toString()))).thenReturn(initialIterator); + when(kinesisProxy.get(eq(initialIterator), eq(10))).thenReturn(getRecordsResult); + when(getRecordsResult.getRecords()).thenReturn(Collections.singletonList(record)); + when(getRecordsResult.getNextShardIterator()).thenReturn(nextShardIterator); + when(record.getSequenceNumber()).thenReturn(sequenceNumber); + + fetcher.initialize(InitialPositionInStream.LATEST.toString(), INITIAL_POSITION_LATEST); + verify(kinesisProxy).getIterator(eq(SHARD_ID), eq(InitialPositionInStream.LATEST.toString())); + Assert.assertEquals(getRecordsResult, fetcher.getRecords(10).accept()); + verify(kinesisProxy).get(eq(initialIterator), eq(10)); + + when(kinesisProxy.getIterator(eq(SHARD_ID), eq(iteratorType), eq(sequenceNumber))).thenReturn(restartShardIterator); + when(kinesisProxy.get(eq(restartShardIterator), eq(10))).thenReturn(restartGetRecordsResult); + + fetcher.restartIterator(); + Assert.assertEquals(restartGetRecordsResult, fetcher.getRecords(10).accept()); + verify(kinesisProxy).getIterator(eq(SHARD_ID), eq(iteratorType), eq(sequenceNumber)); + verify(kinesisProxy).get(eq(restartShardIterator), eq(10)); + } + + @Test (expected = IllegalStateException.class) + public void testRestartIteratorNotInitialized() { + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO); + dataFetcher.restartIterator(); + } private DataFetcherResult assertAdvanced(KinesisDataFetcher dataFetcher, GetRecordsResult expectedResult, String previousValue, String nextValue) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java index 37d0e446..e24d5bb0 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -31,19 +33,19 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; -import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; -import com.amazonaws.services.kinesis.model.Record; - import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; @@ -70,14 +72,13 @@ public class PrefetchGetRecordsCacheIntegrationTest { @Mock private IKinesisProxy proxy; - @Mock private ShardInfo shardInfo; @Before public void setup() { records = new ArrayList<>(); - dataFetcher = new KinesisDataFetcherForTest(proxy, shardInfo); + dataFetcher = spy(new KinesisDataFetcherForTest(proxy, shardInfo)); getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); executorService = spy(Executors.newFixedThreadPool(1)); @@ -89,7 +90,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { executorService, IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), - operation); + operation, + "test-shard"); } @Test @@ -135,7 +137,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { executorService2, IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), - operation); + operation, + "test-shard-2"); getRecordsCache.start(); sleep(IDLE_MILLIS_BETWEEN_CALLS); @@ -167,6 +170,26 @@ public class PrefetchGetRecordsCacheIntegrationTest { verify(getRecordsRetrievalStrategy2).shutdown(); } + @Test + public void testExpiredIteratorException() { + when(dataFetcher.getRecords(eq(MAX_RECORDS_PER_CALL))).thenAnswer(new Answer() { + @Override + public DataFetcherResult answer(final InvocationOnMock invocationOnMock) throws Throwable { + throw new ExpiredIteratorException("ExpiredIterator"); + } + }).thenCallRealMethod(); + doNothing().when(dataFetcher).restartIterator(); + + getRecordsCache.start(); + sleep(IDLE_MILLIS_BETWEEN_CALLS); + + ProcessRecordsInput processRecordsInput = getRecordsCache.getNextResult(); + + assertNotNull(processRecordsInput); + assertTrue(processRecordsInput.getRecords().isEmpty()); + verify(dataFetcher).restartIterator(); + } + @After public void shutdown() { getRecordsCache.shutdown(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java index 6091baa9..2b650866 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -36,7 +37,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.stream.IntStream; -import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -45,6 +45,8 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.model.ExpiredIteratorException; import com.amazonaws.services.kinesis.model.GetRecordsResult; import com.amazonaws.services.kinesis.model.Record; @@ -66,6 +68,8 @@ public class PrefetchGetRecordsCacheTest { private GetRecordsResult getRecordsResult; @Mock private Record record; + @Mock + private KinesisDataFetcher dataFetcher; private List records; private ExecutorService executorService; @@ -75,6 +79,8 @@ public class PrefetchGetRecordsCacheTest { @Before public void setup() { + when(getRecordsRetrievalStrategy.getDataFetcher()).thenReturn(dataFetcher); + executorService = spy(Executors.newFixedThreadPool(1)); getRecordsCache = new PrefetchGetRecordsCache( MAX_SIZE, @@ -85,7 +91,8 @@ public class PrefetchGetRecordsCacheTest { executorService, IDLE_MILLIS_BETWEEN_CALLS, new NullMetricsFactory(), - operation); + operation, + "shardId"); spyQueue = spy(getRecordsCache.getRecordsResultQueue); records = spy(new ArrayList<>()); @@ -194,6 +201,20 @@ public class PrefetchGetRecordsCacheTest { when(executorService.isShutdown()).thenReturn(true); getRecordsCache.getNextResult(); } + + @Test + public void testExpiredIteratorException() { + getRecordsCache.start(); + + when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class).thenReturn(getRecordsResult); + doNothing().when(dataFetcher).restartIterator(); + + getRecordsCache.getNextResult(); + + sleep(1000); + + verify(dataFetcher).restartIterator(); + } @After public void shutdown() { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java index 912804da..7107d0fd 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -1,7 +1,5 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; - import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; @@ -10,13 +8,14 @@ import org.junit.Test; import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; + public class RecordsFetcherFactoryTest { private String shardId = "TestShard"; private RecordsFetcherFactory recordsFetcherFactory; @Mock private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; - @Mock private IMetricsFactory metricsFactory; diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 9a7f2234..f235ca93 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -343,7 +343,9 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); - when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), + any(IMetricsFactory.class))) + .thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -472,7 +474,9 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); - when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), + any(IMetricsFactory.class))) + .thenReturn(getRecordsCache); ShardConsumer consumer = new ShardConsumer(shardInfo, diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index a8856a0b..fd3382a3 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -621,7 +621,9 @@ public class WorkerTest { RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); - when(recordsFetcherFactory.createRecordsFetcher(any(), anyString(),any())).thenReturn(getRecordsCache); + when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), + any(IMetricsFactory.class))) + .thenReturn(getRecordsCache); when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); WorkerThread workerThread = runWorker(shardList, From 1abb41dbdbd074218427469b9ea1029285a544d0 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Fri, 10 Nov 2017 06:32:16 -0800 Subject: [PATCH 2/5] Correctly Send MaxRecords to SingleRecordsFetcherFactory Fixed #262 Fixes #262 Changing the signture of SingleRecordsFetcherFactory to no longer take maxRecords as the parameter to the constructor. Changed the createRecordsFetcher signature to take maxRecords as a parameter. (#264) --- .../lib/worker/KinesisClientLibConfiguration.java | 2 +- .../lib/worker/RecordsFetcherFactory.java | 3 ++- .../clientlibrary/lib/worker/ShardConsumer.java | 2 +- .../lib/worker/SimpleRecordsFetcherFactory.java | 9 ++------- .../lib/worker/RecordsFetcherFactoryTest.java | 6 +++--- .../clientlibrary/lib/worker/ShardConsumerTest.java | 10 +++++----- .../kinesis/clientlibrary/lib/worker/WorkerTest.java | 7 ++++--- 7 files changed, 18 insertions(+), 21 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index fae780f5..f24fc574 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -477,7 +477,7 @@ public class KinesisClientLibConfiguration { InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; - this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); + this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index afc6c4f2..c1a513a9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -26,11 +26,12 @@ public interface RecordsFetcherFactory { * @param getRecordsRetrievalStrategy GetRecordsRetrievalStrategy to be used with the GetRecordsCache * @param shardId ShardId of the shard that the fetcher will retrieve records for * @param metricsFactory MetricsFactory used to create metricScope + * @param maxRecords Max number of records to be returned in a single get call * * @return GetRecordsCache used to get records from Kinesis. */ GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, - IMetricsFactory metricsFactory); + IMetricsFactory metricsFactory, int maxRecords); /** * Sets the maximum number of ProcessRecordsInput objects the GetRecordsCache can hold, before further requests are diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index 14a1d08c..d8aa88d1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -235,7 +235,7 @@ class ShardConsumer { this.dataFetcher = kinesisDataFetcher; this.getRecordsCache = config.getRecordsFetcherFactory().createRecordsFetcher( makeStrategy(this.dataFetcher, retryGetRecordsInSeconds, maxGetRecordsThreadPool, this.shardInfo), - this.getShardInfo().getShardId(), this.metricsFactory); + this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords()); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index bd33fd98..79ad9f55 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -23,20 +23,15 @@ import lombok.extern.apachecommons.CommonsLog; @CommonsLog public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { - private final int maxRecords; private int maxPendingProcessRecordsInput = 3; private int maxByteSize = 8 * 1024 * 1024; private int maxRecordsCount = 30000; private long idleMillisBetweenCalls = 1500L; private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; - - public SimpleRecordsFetcherFactory(int maxRecords) { - this.maxRecords = maxRecords; - } - + @Override public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, String shardId, - IMetricsFactory metricsFactory) { + IMetricsFactory metricsFactory, int maxRecords) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); } else { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java index 7107d0fd..d686c914 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactoryTest.java @@ -22,13 +22,13 @@ public class RecordsFetcherFactoryTest { @Before public void setUp() { MockitoAnnotations.initMocks(this); - recordsFetcherFactory = new SimpleRecordsFetcherFactory(1); + recordsFetcherFactory = new SimpleRecordsFetcherFactory(); } @Test public void createDefaultRecordsFetcherTest() { GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, - metricsFactory); + metricsFactory, 1); assertThat(recordsCache, instanceOf(BlockingGetRecordsCache.class)); } @@ -36,7 +36,7 @@ public class RecordsFetcherFactoryTest { public void createPrefetchRecordsFetcherTest() { recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); GetRecordsCache recordsCache = recordsFetcherFactory.createRecordsFetcher(getRecordsRetrievalStrategy, shardId, - metricsFactory); + metricsFactory, 1); assertThat(recordsCache, instanceOf(PrefetchGetRecordsCache.class)); } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index f235ca93..516788c7 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.atLeastOnce; @@ -97,7 +98,6 @@ public class ShardConsumerTest { // Use Executors.newFixedThreadPool since it returns ThreadPoolExecutor, which is // ... a non-final public class, and so can be mocked and spied. private final ExecutorService executorService = Executors.newFixedThreadPool(1); - private final int maxRecords = 500; private RecordsFetcherFactory recordsFetcherFactory; private GetRecordsCache getRecordsCache; @@ -119,7 +119,7 @@ public class ShardConsumerTest { public void setup() { getRecordsCache = null; - recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords)); + recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty()); } @@ -344,7 +344,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), - any(IMetricsFactory.class))) + any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); ShardConsumer consumer = @@ -475,7 +475,7 @@ public class ShardConsumerTest { getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), - any(IMetricsFactory.class))) + any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); ShardConsumer consumer = @@ -571,7 +571,7 @@ public class ShardConsumerTest { final ExtendedSequenceNumber checkpointSequenceNumber = new ExtendedSequenceNumber("123"); final ExtendedSequenceNumber pendingCheckpointSequenceNumber = new ExtendedSequenceNumber("999"); when(leaseManager.getLease(anyString())).thenReturn(null); - when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory(2)); + when(config.getRecordsFetcherFactory()).thenReturn(new SimpleRecordsFetcherFactory()); when(checkpoint.getCheckpointObject(anyString())).thenReturn( new Checkpoint(checkpointSequenceNumber, pendingCheckpointSequenceNumber)); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index fd3382a3..ce406dce 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; @@ -172,7 +173,7 @@ public class WorkerTest { @Before public void setup() { config = spy(new KinesisClientLibConfiguration("app", null, null, null)); - recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500)); + recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); } @@ -505,7 +506,7 @@ public class WorkerTest { lease.setCheckpoint(new ExtendedSequenceNumber("2")); initialLeases.add(lease); boolean callProcessRecordsForEmptyRecordList = true; - RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(500); + RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(); recordsFetcherFactory.setIdleMillisBetweenCalls(0L); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config); @@ -622,7 +623,7 @@ public class WorkerTest { GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), - any(IMetricsFactory.class))) + any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); From 09b312028a237f25f29c0ed6cd84c5ee56b30487 Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Mon, 13 Nov 2017 09:45:40 -0800 Subject: [PATCH 3/5] Unit test fixes for retrying when requesting a shutdown. (#267) * Shutdown that throws an exception will be retried. Without this change a transient error on shutdown with reason terminate prevents child shards from starting. * Fixing the tests for the Shutdown fix. --- .../lib/worker/ShardConsumer.java | 2 +- .../lib/worker/ShardConsumerTest.java | 158 +++++++++++++++++- 2 files changed, 157 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index d8aa88d1..9fb8e8e9 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -412,7 +412,7 @@ class ShardConsumer { if (taskOutcome == TaskOutcome.END_OF_SHARD) { markForShutdown(ShutdownReason.TERMINATE); } - if (isShutdownRequested()) { + if (isShutdownRequested() && taskOutcome != TaskOutcome.FAILURE) { currentState = currentState.shutdownTransition(shutdownReason); } else if (taskOutcome == TaskOutcome.SUCCESSFUL) { if (currentState.getTaskType() == currentTask.getTaskType()) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 516788c7..8a91c6e6 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.ListIterator; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -69,12 +70,14 @@ import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProx import com.amazonaws.services.kinesis.clientlibrary.proxies.util.KinesisLocalFileDataCreator; import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; +import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.amazonaws.services.kinesis.model.Record; +import com.amazonaws.services.kinesis.model.Shard; import com.amazonaws.services.kinesis.model.ShardIteratorType; /** @@ -102,6 +105,8 @@ public class ShardConsumerTest { private GetRecordsCache getRecordsCache; + private KinesisDataFetcher dataFetcher; + @Mock private IRecordProcessor processor; @Mock @@ -118,6 +123,7 @@ public class ShardConsumerTest { @Before public void setup() { getRecordsCache = null; + dataFetcher = null; recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); @@ -339,7 +345,7 @@ public class ShardConsumerTest { ) ); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); @@ -422,6 +428,154 @@ public class ShardConsumerTest { file.delete(); } + private static final class TransientShutdownErrorTestStreamlet extends TestStreamlet { + private final CountDownLatch errorShutdownLatch = new CountDownLatch(1); + + @Override + public void shutdown(ShutdownInput input) { + ShutdownReason reason = input.getShutdownReason(); + if (reason.equals(ShutdownReason.TERMINATE) && errorShutdownLatch.getCount() > 0) { + errorShutdownLatch.countDown(); + throw new RuntimeException("test"); + } else { + super.shutdown(input); + } + } + } + + /** + * Test method for {@link ShardConsumer#consumeShard()} that ensures a transient error thrown from the record + * processor's shutdown method with reason terminate will be retried. + */ + @Test + public final void testConsumeShardWithTransientTerminateError() throws Exception { + int numRecs = 10; + BigInteger startSeqNum = BigInteger.ONE; + String streamShardId = "kinesis-0-0"; + String testConcurrencyToken = "testToken"; + List shardList = KinesisLocalFileDataCreator.createShardList(1, "kinesis-0-", startSeqNum); + // Close the shard so that shutdown is called with reason terminate + shardList.get(0).getSequenceNumberRange().setEndingSequenceNumber( + KinesisLocalFileProxy.MAX_SEQUENCE_NUMBER.subtract(BigInteger.ONE).toString()); + File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numRecs, "unitTestSCT002"); + + IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); + + final int maxRecords = 2; + final int idleTimeMS = 0; // keep unit tests fast + ICheckpoint checkpoint = new InMemoryCheckpointImpl(startSeqNum.toString()); + checkpoint.setCheckpoint(streamShardId, ExtendedSequenceNumber.TRIM_HORIZON, testConcurrencyToken); + when(leaseManager.getLease(anyString())).thenReturn(null); + + TransientShutdownErrorTestStreamlet processor = new TransientShutdownErrorTestStreamlet(); + + StreamConfig streamConfig = + new StreamConfig(fileBasedProxy, + maxRecords, + idleTimeMS, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + + ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); + + dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, + new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), + any(IMetricsFactory.class), anyInt())) + .thenReturn(getRecordsCache); + + RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpoint, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing() + ) + ); + + ShardConsumer consumer = + new ShardConsumer(shardInfo, + streamConfig, + checkpoint, + processor, + recordProcessorCheckpointer, + leaseManager, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config); + + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); + consumer.consumeShard(); // check on parent shards + Thread.sleep(50L); + consumer.consumeShard(); // start initialization + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.INITIALIZING))); + consumer.consumeShard(); // initialize + processor.getInitializeLatch().await(5, TimeUnit.SECONDS); + verify(getRecordsCache).start(); + + // We expect to process all records in numRecs calls + for (int i = 0; i < numRecs;) { + boolean newTaskSubmitted = consumer.consumeShard(); + if (newTaskSubmitted) { + LOG.debug("New processing task was submitted, call # " + i); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.PROCESSING))); + // CHECKSTYLE:IGNORE ModifiedControlVariable FOR NEXT 1 LINES + i += maxRecords; + } + Thread.sleep(50L); + } + + // Consume shards until shutdown terminate is called and it has thrown an exception + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.errorShutdownLatch.await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.errorShutdownLatch.getCount()); + + // Wait for a retry of shutdown terminate that should succeed + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (processor.getShutdownLatch().await(50, TimeUnit.MILLISECONDS)) { + break; + } + } + assertEquals(0, processor.getShutdownLatch().getCount()); + + // Wait for shutdown complete now that terminate shutdown is successful + for (int i = 0; i < 100; i++) { + consumer.consumeShard(); + if (consumer.getCurrentState() == ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE) { + break; + } + Thread.sleep(50L); + } + assertThat(consumer.getCurrentState(), equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE)); + + assertThat(processor.getShutdownReason(), is(equalTo(ShutdownReason.TERMINATE))); + + verify(getRecordsCache).shutdown(); + + executorService.shutdown(); + executorService.awaitTermination(60, TimeUnit.SECONDS); + + String iterator = fileBasedProxy.getIterator(streamShardId, ShardIteratorType.TRIM_HORIZON.toString()); + List expectedRecords = toUserRecords(fileBasedProxy.get(iterator, numRecs).getRecords()); + verifyConsumedRecords(expectedRecords, processor.getProcessedRecords()); + file.delete(); + } + /** * Test method for {@link ShardConsumer#consumeShard()} that starts from initial position of type AT_TIMESTAMP. */ @@ -470,7 +624,7 @@ public class ShardConsumerTest { ) ); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); + dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); From 3c705300b5efb4e06adaab8e81f661c6225f000b Mon Sep 17 00:00:00 2001 From: Sahil Palvia Date: Wed, 15 Nov 2017 10:44:10 -0800 Subject: [PATCH 4/5] Adding release notes and updating the KCL version to 1.8.8. (#268) --- README.md | 22 +++++++++++++++++++++- pom.xml | 2 +- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index c612845f..7a89922e 100644 --- a/README.md +++ b/README.md @@ -15,7 +15,7 @@ The **Amazon Kinesis Client Library for Java** (Amazon KCL) enables Java develop 1. **Sign up for AWS** — Before you begin, you need an AWS account. For more information about creating an AWS account and retrieving your AWS credentials, see [AWS Account and Credentials][docs-signup] in the AWS SDK for Java Developer Guide. 1. **Sign up for Amazon Kinesis** — Go to the Amazon Kinesis console to sign up for the service and create an Amazon Kinesis stream. For more information, see [Create an Amazon Kinesis Stream][kinesis-guide-create] in the Amazon Kinesis Developer Guide. -1. **Minimum requirements** — To use the Amazon Kinesis Client Library, you'll need **Java 1.7+**. For more information about Amazon Kinesis Client Library requirements, see [Before You Begin][kinesis-guide-begin] in the Amazon Kinesis Developer Guide. +1. **Minimum requirements** — To use the Amazon Kinesis Client Library, you'll need **Java 1.8+**. For more information about Amazon Kinesis Client Library requirements, see [Before You Begin][kinesis-guide-begin] in the Amazon Kinesis Developer Guide. 1. **Using the Amazon Kinesis Client Library** — The best way to get familiar with the Amazon Kinesis Client Library is to read [Developing Record Consumer Applications][kinesis-guide-applications] in the Amazon Kinesis Developer Guide. ## Building from Source @@ -29,6 +29,26 @@ For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesi To make it easier for developers to write record processors in other languages, we have implemented a Java based daemon, called MultiLangDaemon that does all the heavy lifting. Our approach has the daemon spawn a sub-process, which in turn runs the record processor, which can be written in any language. The MultiLangDaemon process and the record processor sub-process communicate with each other over [STDIN and STDOUT using a defined protocol][multi-lang-protocol]. There will be a one to one correspondence amongst record processors, child processes, and shards. For Python developers specifically, we have abstracted these implementation details away and [expose an interface][kclpy] that enables you to focus on writing record processing logic in Python. This approach enables KCL to be language agnostic, while providing identical features and similar parallel processing model across all languages. ## Release Notes +### Release 1.8.8 +* Fixed issues with leases losses due to `ExpiredIteratorException` in `PrefetchGetRecordsCache` and `AsynchronousFetchingStrategy`. + PrefetchGetRecordsCache will request for a new iterator and start fetching data again. + * [PR#263](https://github.com/awslabs/amazon-kinesis-client/pull/263) +* Added warning message for long running tasks. + Logging long running tasks can be enabled by setting the following configuration property: + + | Name | Default | Description | + | ---- | ------- | ----------- | + | [`logWarningForTaskAfterMillis`](https://github.com/awslabs/amazon-kinesis-client/blob/3de901ea9327370ed732af86c4d4999c8d99541c/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java#L1367) | Not set | Milliseconds after which the logger will log a warning message for the long running task | + + * [PR#259](https://github.com/awslabs/amazon-kinesis-client/pull/259) +* Handling spurious lease renewal failures gracefully. + Added better handling of DynamoDB failures when updating leases. These failures would occur when a request to DynamoDB appeared to fail, but was actually successful. + * [PR#247](https://github.com/awslabs/amazon-kinesis-client/pull/247) +* ShutdownTask gets retried if the previous attempt on the ShutdownTask fails. + * [PR#267](https://github.com/awslabs/amazon-kinesis-client/pull/267) +* Fix for using maxRecords from `KinesisClientLibConfiguration` in `GetRecordsCache` for fetching records. + * [PR#264](https://github.com/awslabs/amazon-kinesis-client/pull/264) + ### Release 1.8.7 * Don't add a delay for synchronous requests to Kinesis Removes a delay that had been added for synchronous `GetRecords` calls to Kinesis. diff --git a/pom.xml b/pom.xml index b07d9ac0..1c521b0e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ amazon-kinesis-client jar Amazon Kinesis Client Library for Java - 1.8.8-SNAPSHOT + 1.8.8 The Amazon Kinesis Client Library for Java enables Java developers to easily consume and process data from Amazon Kinesis. From 47e6206e8dc93bd5f7f6f3020d0bb706cd27b6c6 Mon Sep 17 00:00:00 2001 From: "Pfifer, Justin" Date: Wed, 15 Nov 2017 10:44:49 -0800 Subject: [PATCH 5/5] Release 1.8.8 of the Amazon Kinesis Client for Java Release 1.8.8 * Fixed issues with leases losses due to `ExpiredIteratorException` in `PrefetchGetRecordsCache` and `AsynchronousFetchingStrategy`. PrefetchGetRecordsCache will request for a new iterator and start fetching data again. * PR#263 * Added warning message for long running tasks. * PR#259 * Handling spurious lease renewal failures gracefully. Added better handling of DynamoDB failures when updating leases. These failures would occur when a request to DynamoDB appeared to fail, but was actually successful. * PR#247 * ShutdownTask gets retried if the previous attempt on the ShutdownTask fails. * PR#267 * Fix for using maxRecords from `KinesisClientLibConfiguration` in `GetRecordsCache` for fetching records. * PR#264 --- META-INF/MANIFEST.MF | 2 +- .../clientlibrary/lib/worker/KinesisClientLibConfiguration.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/META-INF/MANIFEST.MF b/META-INF/MANIFEST.MF index 3c6411d8..35e86907 100644 --- a/META-INF/MANIFEST.MF +++ b/META-INF/MANIFEST.MF @@ -2,7 +2,7 @@ Manifest-Version: 1.0 Bundle-ManifestVersion: 2 Bundle-Name: Amazon Kinesis Client Library for Java Bundle-SymbolicName: com.amazonaws.kinesisclientlibrary;singleton:=true -Bundle-Version: 1.8.7 +Bundle-Version: 1.8.8 Bundle-Vendor: Amazon Technologies, Inc Bundle-RequiredExecutionEnvironment: JavaSE-1.8 Require-Bundle: org.apache.commons.codec;bundle-version="1.6", diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index f24fc574..3fb36754 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -126,7 +126,7 @@ public class KinesisClientLibConfiguration { /** * User agent set when Amazon Kinesis Client Library makes AWS requests. */ - public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.7"; + public static final String KINESIS_CLIENT_LIB_USER_AGENT = "amazon-kinesis-client-library-java-1.8.8"; /** * KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls