diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java index f94e819b..d9fc011e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCache.java @@ -15,6 +15,9 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.time.Duration; +import java.time.Instant; + import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.GetRecordsResult; @@ -28,10 +31,15 @@ import lombok.extern.apachecommons.CommonsLog; public class BlockingGetRecordsCache implements GetRecordsCache { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; + private final long idleMillisBetweenCalls; + private Instant lastSuccessfulCall; - public BlockingGetRecordsCache(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { + public BlockingGetRecordsCache(final int maxRecordsPerCall, + final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, + final long idleMillisBetweenCalls) { this.maxRecordsPerCall = maxRecordsPerCall; this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; + this.idleMillisBetweenCalls = idleMillisBetweenCalls; } @Override @@ -43,12 +51,32 @@ public class BlockingGetRecordsCache implements GetRecordsCache { @Override public ProcessRecordsInput getNextResult() { + sleepBeforeNextCall(); GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + lastSuccessfulCall = Instant.now(); ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() .withRecords(getRecordsResult.getRecords()) .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()); return processRecordsInput; } + + private void sleepBeforeNextCall() { + if (!Thread.interrupted()) { + if (lastSuccessfulCall == null) { + return; + } + long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis(); + if (timeSinceLastCall < idleMillisBetweenCalls) { + try { + Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall); + } catch (InterruptedException e) { + log.info("Thread was interrupted, indicating that shutdown was called."); + } + } + } else { + log.info("Thread has been interrupted, indicating that it is in the shutdown phase."); + } + } @Override public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index eee323d4..9954a3c1 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -1,16 +1,16 @@ /* - * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. * - * Licensed under the Amazon Software License (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at + * Licensed under the Amazon Software License (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at * - * http://aws.amazon.com/asl/ + * http://aws.amazon.com/asl/ * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; @@ -182,11 +182,6 @@ public class KinesisClientLibConfiguration { */ public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20; - /** - * The amount of time to sleep in between 2 get calls from the data fetcher. - */ - public static final long DEFAULT_IDLE_MILLIS_BETWEEN_CALLS = 1500L; - private String applicationName; private String tableName; private String streamName; @@ -239,9 +234,6 @@ public class KinesisClientLibConfiguration { @Getter private RecordsFetcherFactory recordsFetcherFactory; - - @Getter - private long idleMillisBetweenCalls; /** * Constructor. @@ -302,8 +294,7 @@ public class KinesisClientLibConfiguration { DEFAULT_METRICS_MAX_QUEUE_SIZE, DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, null, - DEFAULT_SHUTDOWN_GRACE_MILLIS, - DEFAULT_IDLE_MILLIS_BETWEEN_CALLS); + DEFAULT_SHUTDOWN_GRACE_MILLIS); } /** @@ -365,8 +356,7 @@ public class KinesisClientLibConfiguration { int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName, - long shutdownGraceMillis, - long idleMillisBetweenCalls) { + long shutdownGraceMillis) { this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider, dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, @@ -374,7 +364,7 @@ public class KinesisClientLibConfiguration { shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize, - validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, idleMillisBetweenCalls); + validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis); } /** @@ -437,8 +427,7 @@ public class KinesisClientLibConfiguration { int metricsMaxQueueSize, boolean validateSequenceNumberBeforeCheckpointing, String regionName, - long shutdownGraceMillis, - long idleMillisBetweenCalls) { + long shutdownGraceMillis) { // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -449,7 +438,6 @@ public class KinesisClientLibConfiguration { checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); - checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); checkIsRegionNameValid(regionName); this.applicationName = applicationName; this.tableName = applicationName; @@ -487,7 +475,6 @@ public class KinesisClientLibConfiguration { this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords); - this.idleMillisBetweenCalls = idleMillisBetweenCalls; } /** @@ -598,7 +585,6 @@ public class KinesisClientLibConfiguration { this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = recordsFetcherFactory; this.shutdownGraceMillis = shutdownGraceMillis; - this.shutdownGraceMillis = shutdownGraceMillis; } // Check if value is positive, otherwise throw an exception @@ -1308,30 +1294,24 @@ public class KinesisClientLibConfiguration { */ public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) { checkIsValuePositive("maxCacheSize", maxCacheSize); - recordsFetcherFactory.setMaxSize(maxCacheSize); + this.recordsFetcherFactory.setMaxSize(maxCacheSize); return this; } public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) { checkIsValuePositive("maxCacheByteSize", maxCacheByteSize); - recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); + this.recordsFetcherFactory.setMaxByteSize(maxCacheByteSize); return this; } public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) { - switch (dataFetchingStrategy.toUpperCase()) { - case "PREFETCH_CACHED": - recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED); - break; - default: - recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.DEFAULT); - } + this.recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy.toUpperCase())); return this; } public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) { checkIsValuePositive("maxRecordsCount", maxRecordsCount); - recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); + this.recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount); return this; } @@ -1358,7 +1338,7 @@ public class KinesisClientLibConfiguration { */ public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) { checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls); - this.idleMillisBetweenCalls = idleMillisBetweenCalls; + this.recordsFetcherFactory.setIdleMillisBetweenCalls(idleMillisBetweenCalls); return this; } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index a5efea6a..c2ba9d15 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -14,7 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import java.time.Instant; import java.util.Collections; import java.util.Date; @@ -43,18 +42,15 @@ class KinesisDataFetcher { private final String shardId; private boolean isShardEndReached; private boolean isInitialized; - private Instant lastResponseTime; - private long idleMillisBetweenCalls; /** * * @param kinesisProxy Kinesis proxy * @param shardInfo The shardInfo object. */ - public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo, KinesisClientLibConfiguration configuration) { + public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) { this.shardId = shardInfo.getShardId(); this.kinesisProxy = new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId); - this.idleMillisBetweenCalls = configuration.getIdleMillisBetweenCalls(); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java index c849f7f2..5c61132a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCache.java @@ -15,6 +15,7 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.time.Duration; import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -42,6 +43,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { private final int maxRecordsPerCall; private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; private final ExecutorService executorService; + private final long idleMillisBetweenCalls; + private Instant lastSuccessfulCall; private PrefetchCounters prefetchCounters; @@ -50,7 +53,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount, final int maxRecordsPerCall, @NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy, - @NonNull final ExecutorService executorService) { + @NonNull final ExecutorService executorService, + long idleMillisBetweenCalls) { this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy; this.maxRecordsPerCall = maxRecordsPerCall; this.maxSize = maxSize; @@ -59,6 +63,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize); this.prefetchCounters = new PrefetchCounters(); this.executorService = executorService; + this.idleMillisBetweenCalls = idleMillisBetweenCalls; } @Override @@ -100,7 +105,6 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void shutdown() { - getRecordsRetrievalStrategy.shutdown(); executorService.shutdownNow(); started = false; } @@ -109,25 +113,40 @@ public class PrefetchGetRecordsCache implements GetRecordsCache { @Override public void run() { while (true) { - if (Thread.interrupted()) { + if (Thread.currentThread().isInterrupted()) { log.warn("Prefetch thread was interrupted."); break; } if (prefetchCounters.shouldGetNewRecords()) { try { + sleepBeforeNextCall(); GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall); + lastSuccessfulCall = Instant.now(); ProcessRecordsInput processRecordsInput = new ProcessRecordsInput() .withRecords(getRecordsResult.getRecords()) .withMillisBehindLatest(getRecordsResult.getMillisBehindLatest()) - .withCacheEntryTime(Instant.now()); + .withCacheEntryTime(lastSuccessfulCall); getRecordsResultQueue.put(processRecordsInput); prefetchCounters.added(processRecordsInput); } catch (InterruptedException e) { - log.info("Thread was interrupted, indicating shutdown was called on the cache"); + log.info("Thread was interrupted, indicating shutdown was called on the cache. Calling shutdown on the GetRecordsRetrievalStrategy."); + getRecordsRetrievalStrategy.shutdown(); + } catch (Error e) { + log.error("Error was thrown while getting records, please check for the error", e); } } } } + + private void sleepBeforeNextCall() throws InterruptedException { + if (lastSuccessfulCall == null) { + return; + } + long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis(); + if (timeSinceLastCall < idleMillisBetweenCalls) { + Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall); + } + } } private class PrefetchCounters { diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java index cdd80e49..98073e5b 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/RecordsFetcherFactory.java @@ -35,5 +35,7 @@ public interface RecordsFetcherFactory { void setMaxRecordsCount(int maxRecordsCount); void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy); + + void setIdleMillisBetweenCalls(long idleMillisBetweenCalls); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index de49dada..e6912335 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; -import lombok.Getter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -32,6 +31,8 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; + /** * Responsible for consuming data records of a (specified) shard. * The instance should be shutdown when we lose the primary responsibility for a shard. @@ -177,7 +178,7 @@ class ShardConsumer { metricsFactory, backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, - new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config), + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), retryGetRecordsInSeconds, maxGetRecordsThreadPool, config diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java index 3a3958f3..2b6e4e83 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/SimpleRecordsFetcherFactory.java @@ -24,6 +24,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { private int maxSize = 3; private int maxByteSize = 8 * 1024 * 1024; private int maxRecordsCount = 30000; + private long idleMillisBetweenCalls = 1500L; private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT; public SimpleRecordsFetcherFactory(int maxRecords) { @@ -33,10 +34,10 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { @Override public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) { if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) { - return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy); + return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls); } else { return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords, - getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1)); + getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1), idleMillisBetweenCalls); } } @@ -59,4 +60,9 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory { public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){ this.dataFetchingStrategy = dataFetchingStrategy; } + + @Override + public void setIdleMillisBetweenCalls(final long idleMillisBetweenCalls) { + this.idleMillisBetweenCalls = idleMillisBetweenCalls; + } } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java index 31ea3cd8..30b877e8 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/AsynchronousGetRecordsRetrievalStrategyIntegrationTest.java @@ -14,29 +14,6 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; -import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; -import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mock; -import org.mockito.runners.MockitoJUnitRunner; - -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - - import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsEqual.equalTo; @@ -50,6 +27,29 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import org.junit.After; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + @RunWith(MockitoJUnitRunner.class) public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @@ -64,8 +64,6 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @Mock private ShardInfo mockShardInfo; @Mock - private KinesisClientLibConfiguration configuration; - @Mock private Supplier> completionServiceSupplier; @Mock private DataFetcherResult result; @@ -83,7 +81,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { @Before public void setup() { - dataFetcher = spy(new KinesisDataFetcherForTests(mockKinesisProxy, mockShardInfo, configuration)); + dataFetcher = spy(new KinesisDataFetcherForTests(mockKinesisProxy, mockShardInfo)); rejectedExecutionHandler = spy(new ThreadPoolExecutor.AbortPolicy()); executorService = spy(new ThreadPoolExecutor( CORE_POOL_SIZE, @@ -154,9 +152,8 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest { } private class KinesisDataFetcherForTests extends KinesisDataFetcher { - public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo, - final KinesisClientLibConfiguration configuration) { - super(kinesisProxy, shardInfo, configuration); + public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo) { + super(kinesisProxy, shardInfo); } @Override diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java index 0636baea..731c4653 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/BlockingGetRecordsCacheTest.java @@ -40,6 +40,7 @@ import com.amazonaws.services.kinesis.model.Record; @RunWith(MockitoJUnitRunner.class) public class BlockingGetRecordsCacheTest { private static final int MAX_RECORDS_PER_COUNT = 10_000; + private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L; @Mock private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @@ -52,7 +53,7 @@ public class BlockingGetRecordsCacheTest { @Before public void setup() { records = new ArrayList<>(); - blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy); + blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy, IDLE_MILLIS_BETWEEN_CALLS); when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult); when(getRecordsResult.getRecords()).thenReturn(records); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java index 1b67412f..177546db 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfigurationTest.java @@ -19,7 +19,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import junit.framework.Assert; +import java.util.Date; import org.junit.Test; import org.mockito.Mockito; @@ -35,7 +35,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorF import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.google.common.collect.ImmutableSet; -import java.util.Date; +import junit.framework.Assert; public class KinesisClientLibConfigurationTest { private static final long INVALID_LONG = 0L; @@ -85,7 +85,6 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_INT, skipCheckpointValidationValue, null, - TEST_VALUE_LONG, TEST_VALUE_LONG); } @@ -97,7 +96,7 @@ public class KinesisClientLibConfigurationTest { KinesisClientLibConfiguration config = null; long[] longValues = { TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, - TEST_VALUE_LONG, TEST_VALUE_LONG }; + TEST_VALUE_LONG }; for (int i = 0; i < PARAMETER_COUNT; i++) { longValues[i] = INVALID_LONG; try { @@ -126,8 +125,7 @@ public class KinesisClientLibConfigurationTest { TEST_VALUE_INT, skipCheckpointValidationValue, null, - longValues[6], - longValues[7]); + longValues[6]); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); } @@ -162,7 +160,6 @@ public class KinesisClientLibConfigurationTest { intValues[1], skipCheckpointValidationValue, null, - TEST_VALUE_LONG, TEST_VALUE_LONG); } catch (IllegalArgumentException e) { System.out.println(e.getMessage()); @@ -327,7 +324,6 @@ public class KinesisClientLibConfigurationTest { 1, skipCheckpointValidationValue, "abcd", - TEST_VALUE_LONG, TEST_VALUE_LONG); Assert.fail("No expected Exception is thrown."); } catch(IllegalArgumentException e) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java index dbac3a54..6648b919 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcherTest.java @@ -35,9 +35,6 @@ import java.util.ArrayList; import java.util.Date; import java.util.List; -import org.hamcrest.CoreMatchers; -import org.hamcrest.Matcher; -import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -66,8 +63,6 @@ public class KinesisDataFetcherTest { @Mock private KinesisProxy kinesisProxy; - @Mock - private KinesisClientLibConfiguration configuration; private static final int MAX_RECORDS = 1; private static final String SHARD_ID = "shardId-1"; @@ -139,9 +134,8 @@ public class KinesisDataFetcherTest { public void testadvanceIteratorTo() throws KinesisClientLibException { IKinesisProxy kinesis = mock(IKinesisProxy.class); ICheckpoint checkpoint = mock(ICheckpoint.class); - when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); String iteratorA = "foo"; @@ -173,9 +167,8 @@ public class KinesisDataFetcherTest { @Test public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() { IKinesisProxy kinesis = mock(IKinesisProxy.class); - when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); String iteratorHorizon = "horizon"; when(kinesis.getIterator(SHARD_ID, ShardIteratorType.TRIM_HORIZON.toString())).thenReturn(iteratorHorizon); @@ -204,10 +197,9 @@ public class KinesisDataFetcherTest { KinesisProxy mockProxy = mock(KinesisProxy.class); doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString()); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); - when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); // Create data fectcher and initialize it with latest type checkpoint - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(dataFetcher); // Call getRecords of dataFetcher which will throw an exception @@ -224,9 +216,8 @@ public class KinesisDataFetcherTest { KinesisProxy mockProxy = mock(KinesisProxy.class); doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords); - when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO); dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST); DataFetcherResult dataFetcherResult = dataFetcher.getRecords(maxRecords); @@ -252,7 +243,7 @@ public class KinesisDataFetcherTest { when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult); when(finalResult.getNextShardIterator()).thenReturn(null); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO, configuration); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO); dataFetcher.initialize("TRIM_HORIZON", InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)); @@ -331,9 +322,8 @@ public class KinesisDataFetcherTest { ICheckpoint checkpoint = mock(ICheckpoint.class); when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo)); - when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L); - KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration); + KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher); fetcher.initialize(seqNo, initialPositionInStream); List actualRecords = getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).getRecords(); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java index 1c661663..98ebcff2 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheIntegrationTest.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -28,13 +27,10 @@ import static org.mockito.Mockito.when; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; -import com.amazonaws.services.kinesis.model.Record; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -43,12 +39,17 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.Record; + +import lombok.extern.apachecommons.CommonsLog; /** - * + * These are the integration tests for the PrefetchGetRecordsCache class. */ @RunWith(MockitoJUnitRunner.class) +@CommonsLog public class PrefetchGetRecordsCacheIntegrationTest { private static final int MAX_SIZE = 3; private static final int MAX_BYTE_SIZE = 5 * 1024 * 1024; @@ -68,15 +69,10 @@ public class PrefetchGetRecordsCacheIntegrationTest { @Mock private ShardInfo shardInfo; - @Mock - private KinesisClientLibConfiguration configuration; - @Before public void setup() { - when(configuration.getIdleMillisBetweenCalls()).thenReturn(IDLE_MILLIS_BETWEEN_CALLS); - records = new ArrayList<>(); - dataFetcher = new KinesisDataFetcherForTest(proxy, shardInfo, configuration); + dataFetcher = new KinesisDataFetcherForTest(proxy, shardInfo); getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher)); executorService = spy(Executors.newFixedThreadPool(1)); @@ -85,7 +81,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { MAX_RECORDS_COUNT, MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy, - executorService); + executorService, + IDLE_MILLIS_BETWEEN_CALLS); } @Test @@ -120,7 +117,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { @Test public void testDifferentShardCaches() { ExecutorService executorService2 = spy(Executors.newFixedThreadPool(1)); - KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcherForTest(proxy, shardInfo, configuration)); + KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcherForTest(proxy, shardInfo)); GetRecordsRetrievalStrategy getRecordsRetrievalStrategy2 = spy(new AsynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher, 5 , 5, "Test-shard")); GetRecordsCache getRecordsCache2 = new PrefetchGetRecordsCache( MAX_SIZE, @@ -128,8 +125,8 @@ public class PrefetchGetRecordsCacheIntegrationTest { MAX_RECORDS_COUNT, MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy2, - executorService2 - ); + executorService2, + IDLE_MILLIS_BETWEEN_CALLS); getRecordsCache.start(); sleep(IDLE_MILLIS_BETWEEN_CALLS); @@ -156,6 +153,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { assertEquals(p2.getRecords().size(), records.size()); getRecordsCache2.shutdown(); + sleep(100L); verify(executorService2).shutdownNow(); verify(getRecordsRetrievalStrategy2).shutdown(); } @@ -163,6 +161,7 @@ public class PrefetchGetRecordsCacheIntegrationTest { @After public void shutdown() { getRecordsCache.shutdown(); + sleep(100L); verify(executorService).shutdownNow(); verify(getRecordsRetrievalStrategy).shutdown(); } @@ -175,16 +174,12 @@ public class PrefetchGetRecordsCacheIntegrationTest { private class KinesisDataFetcherForTest extends KinesisDataFetcher { public KinesisDataFetcherForTest(final IKinesisProxy kinesisProxy, - final ShardInfo shardInfo, - final KinesisClientLibConfiguration configuration) { - super(kinesisProxy, shardInfo, configuration); + final ShardInfo shardInfo) { + super(kinesisProxy, shardInfo); } @Override public DataFetcherResult getRecords(final int maxRecords) { - - - GetRecordsResult getRecordsResult = new GetRecordsResult(); getRecordsResult.setRecords(new ArrayList<>(records)); getRecordsResult.setMillisBehindLatest(1000L); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java index 8517138f..91a27e7d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/PrefetchGetRecordsCacheTest.java @@ -57,6 +57,7 @@ public class PrefetchGetRecordsCacheTest { private static final int MAX_RECORDS_PER_CALL = 10000; private static final int MAX_SIZE = 5; private static final int MAX_RECORDS_COUNT = 15000; + private static final long IDLE_MILLIS_BETWEEN_CALLS = 0L; @Mock private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy; @@ -79,7 +80,8 @@ public class PrefetchGetRecordsCacheTest { MAX_RECORDS_COUNT, MAX_RECORDS_PER_CALL, getRecordsRetrievalStrategy, - executorService); + executorService, + IDLE_MILLIS_BETWEEN_CALLS); spyQueue = spy(getRecordsCache.getRecordsResultQueue); records = spy(new ArrayList<>()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index 33d613de..d5a68666 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.atLeastOnce; @@ -36,7 +35,6 @@ import static org.mockito.Mockito.when; import java.io.File; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.List; import java.util.ListIterator; @@ -48,7 +46,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Description; @@ -122,7 +119,6 @@ public class ShardConsumerTest { recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords)); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); - when(config.getIdleMillisBetweenCalls()).thenReturn(0l); } /** @@ -340,9 +336,11 @@ public class ShardConsumerTest { ) ); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); - getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, + new SynchronousGetRecordsRetrievalStrategy(dataFetcher), + 0L)); when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); ShardConsumer consumer = @@ -468,9 +466,11 @@ public class ShardConsumerTest { ) ); - KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config); + KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); - getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, + new SynchronousGetRecordsRetrievalStrategy(dataFetcher), + 0L)); when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); ShardConsumer consumer = diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index bf30c510..6190e3c9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -24,7 +24,16 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.same; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.lang.Thread.State; @@ -132,12 +141,11 @@ public class WorkerTest { private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; private RecordsFetcherFactory recordsFetcherFactory; + private KinesisClientLibConfiguration config; @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; @Mock - private KinesisClientLibConfiguration config; - @Mock private ILeaseManager leaseManager; @Mock private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory; @@ -162,9 +170,9 @@ public class WorkerTest { @Before public void setup() { + config = spy(new KinesisClientLibConfiguration("app", null, null, null)); recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500)); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); - when(config.getIdleMillisBetweenCalls()).thenReturn(500L); } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES @@ -207,14 +215,13 @@ public class WorkerTest { /** - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#getApplicationName()}. + * Test method for {@link Worker#getApplicationName()}. */ @Test public final void testGetStageName() { final String stageName = "testStageName"; - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration(stageName, null, null, null); - Worker worker = new Worker(v1RecordProcessorFactory, clientConfig); + config = new KinesisClientLibConfiguration(stageName, null, null, null); + Worker worker = new Worker(v1RecordProcessorFactory, config); Assert.assertEquals(stageName, worker.getApplicationName()); } @@ -222,8 +229,7 @@ public class WorkerTest { public final void testCreateOrGetShardConsumer() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration(stageName, null, null, null); + config = new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -243,7 +249,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, streamletFactory, - clientConfig, + config, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, @@ -273,8 +279,6 @@ public class WorkerTest { public void testWorkerLoopWithCheckpoint() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -344,8 +348,7 @@ public class WorkerTest { public final void testCleanupShardConsumers() { final String stageName = "testStageName"; IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2; - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration(stageName, null, null, null); + config = new KinesisClientLibConfiguration(stageName, null, null, null); IKinesisProxy proxy = null; ICheckpoint checkpoint = null; int maxRecords = 1; @@ -365,7 +368,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, streamletFactory, - clientConfig, + config, streamConfig, INITIAL_POSITION_LATEST, parentShardPollIntervalMillis, shardSyncIntervalMillis, @@ -405,8 +408,7 @@ public class WorkerTest { public final void testInitializationFailureWithRetries() { String stageName = "testInitializationWorker"; IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null); - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration(stageName, null, null, null); + config = new KinesisClientLibConfiguration(stageName, null, null, null); int count = 0; when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++))); int maxRecords = 2; @@ -422,7 +424,7 @@ public class WorkerTest { Worker worker = new Worker(stageName, recordProcessorFactory, - clientConfig, + config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, shardPollInterval, shardSyncIntervalMillis, @@ -474,7 +476,7 @@ public class WorkerTest { /** * Runs worker with threadPoolSize < numShards - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#run()}. + * Test method for {@link Worker#run()}. */ @Test public final void testOneSplitShard2Threads() throws Exception { @@ -485,12 +487,12 @@ public class WorkerTest { KinesisClientLease lease = ShardSyncer.newKCLLease(shardList.get(0)); lease.setCheckpoint(new ExtendedSequenceNumber("2")); initialLeases.add(lease); - runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard); + runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config); } /** * Runs worker with threadPoolSize < numShards - * Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#run()}. + * Test method for {@link Worker#run()}. */ @Test public final void testOneSplitShard2ThreadsWithCallsForEmptyRecords() throws Exception { @@ -502,7 +504,10 @@ public class WorkerTest { lease.setCheckpoint(new ExtendedSequenceNumber("2")); initialLeases.add(lease); boolean callProcessRecordsForEmptyRecordList = true; - runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard); + RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(500); + recordsFetcherFactory.setIdleMillisBetweenCalls(0L); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config); } @Test @@ -527,7 +532,8 @@ public class WorkerTest { 10, kinesisProxy, v2RecordProcessorFactory, executorService, - cwMetricsFactory); + cwMetricsFactory, + config); // Give some time for thread to run. workerStarted.await(); @@ -563,7 +569,8 @@ public class WorkerTest { 10, kinesisProxy, v2RecordProcessorFactory, executorService, - cwMetricsFactory); + cwMetricsFactory, + config); // Give some time for thread to run. workerStarted.await(); @@ -609,6 +616,12 @@ public class WorkerTest { return null; } }).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class)); + + RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); + GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); + when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); + when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache); + when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L)); WorkerThread workerThread = runWorker(shardList, initialLeases, @@ -618,7 +631,8 @@ public class WorkerTest { fileBasedProxy, v2RecordProcessorFactory, executorService, - nullMetricsFactory); + nullMetricsFactory, + config); // Only sleep for time that is required. processRecordsLatch.await(); @@ -709,7 +723,8 @@ public class WorkerTest { fileBasedProxy, v2RecordProcessorFactory, executorService, - nullMetricsFactory); + nullMetricsFactory, + config); // Only sleep for time that is required. processRecordsLatch.await(); @@ -746,8 +761,6 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -836,8 +849,6 @@ public class WorkerTest { public void testShutdownCallableNotAllowedTwice() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -903,8 +914,6 @@ public class WorkerTest { public void testGracefulShutdownSingleFuture() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -993,8 +1002,6 @@ public class WorkerTest { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1021,7 +1028,7 @@ public class WorkerTest { Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, - clientConfig, + config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, @@ -1069,8 +1076,6 @@ public class WorkerTest { public void testRequestShutdownWithLostLease() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1105,7 +1110,7 @@ public class WorkerTest { Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, - clientConfig, + config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, @@ -1184,8 +1189,6 @@ public class WorkerTest { public void testRequestShutdownWithAllLeasesLost() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1220,7 +1223,7 @@ public class WorkerTest { Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, - clientConfig, + config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, @@ -1304,8 +1307,6 @@ public class WorkerTest { public void testLeaseCancelledAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1339,7 +1340,7 @@ public class WorkerTest { Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, - clientConfig, + config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, @@ -1390,8 +1391,6 @@ public class WorkerTest { public void testEndOfShardAfterShutdownRequest() throws Exception { IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); - final KinesisClientLibConfiguration clientConfig = - new KinesisClientLibConfiguration("app", null, null, null); StreamConfig streamConfig = mock(StreamConfig.class); IMetricsFactory metricsFactory = mock(IMetricsFactory.class); @@ -1425,7 +1424,7 @@ public class WorkerTest { Worker worker = new Worker("testRequestShutdown", recordProcessorFactory, - clientConfig, + config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, parentShardPollIntervalMillis, @@ -1728,14 +1727,15 @@ public class WorkerTest { lease.setCheckpoint(ExtendedSequenceNumber.AT_TIMESTAMP); initialLeases.add(lease); } - runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard); + runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config); } private void runAndTestWorker(List shardList, - int threadPoolSize, - List initialLeases, - boolean callProcessRecordsForEmptyRecordList, - int numberOfRecordsPerShard) throws Exception { + int threadPoolSize, + List initialLeases, + boolean callProcessRecordsForEmptyRecordList, + int numberOfRecordsPerShard, + KinesisClientLibConfiguration clientConfig) throws Exception { File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numberOfRecordsPerShard, "unitTestWT001"); IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); @@ -1744,10 +1744,10 @@ public class WorkerTest { TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier); ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); - + WorkerThread workerThread = runWorker( shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis, - numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory); + numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig); // TestStreamlet will release the semaphore once for every record it processes recordCounter.acquire(numberOfRecordsPerShard * shardList.size()); @@ -1764,14 +1764,15 @@ public class WorkerTest { } private WorkerThread runWorker(List shardList, - List initialLeases, - boolean callProcessRecordsForEmptyRecordList, - long failoverTimeMillis, - int numberOfRecordsPerShard, - IKinesisProxy kinesisProxy, - IRecordProcessorFactory recordProcessorFactory, - ExecutorService executorService, - IMetricsFactory metricsFactory) throws Exception { + List initialLeases, + boolean callProcessRecordsForEmptyRecordList, + long failoverTimeMillis, + int numberOfRecordsPerShard, + IKinesisProxy kinesisProxy, + IRecordProcessorFactory recordProcessorFactory, + ExecutorService executorService, + IMetricsFactory metricsFactory, + KinesisClientLibConfiguration clientConfig) throws Exception { final String stageName = "testStageName"; final int maxRecords = 2; @@ -1799,9 +1800,6 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - KinesisClientLibConfiguration clientConfig = spy(new KinesisClientLibConfiguration("app", null, null, null)); - - when(clientConfig.getIdleMillisBetweenCalls()).thenReturn(0L); Worker worker = new Worker(stageName,