Moving the idleMillisBetweenCalls to the cache. Fixed unit and integ tests. Calling StrategyShutdown from within the cache thread.

This commit is contained in:
Sahil Palvia 2017-09-27 15:50:23 -07:00
parent 060913d49c
commit 7b76d1d56e
15 changed files with 214 additions and 203 deletions

View file

@ -15,6 +15,9 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Duration;
import java.time.Instant;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
@ -28,10 +31,15 @@ import lombok.extern.apachecommons.CommonsLog;
public class BlockingGetRecordsCache implements GetRecordsCache {
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
public BlockingGetRecordsCache(final int maxRecordsPerCall, final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
public BlockingGetRecordsCache(final int maxRecordsPerCall,
final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
final long idleMillisBetweenCalls) {
this.maxRecordsPerCall = maxRecordsPerCall;
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
}
@Override
@ -43,13 +51,33 @@ public class BlockingGetRecordsCache implements GetRecordsCache {
@Override
public ProcessRecordsInput getNextResult() {
sleepBeforeNextCall();
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(getRecordsResult.getRecords())
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest());
return processRecordsInput;
}
private void sleepBeforeNextCall() {
if (!Thread.interrupted()) {
if (lastSuccessfulCall == null) {
return;
}
long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
if (timeSinceLastCall < idleMillisBetweenCalls) {
try {
Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall);
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating that shutdown was called.");
}
}
} else {
log.info("Thread has been interrupted, indicating that it is in the shutdown phase.");
}
}
@Override
public GetRecordsRetrievalStrategy getGetRecordsRetrievalStrategy() {
return getRecordsRetrievalStrategy;

View file

@ -1,5 +1,5 @@
/*
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Amazon Software License (the "License").
* You may not use this file except in compliance with the License.
@ -182,11 +182,6 @@ public class KinesisClientLibConfiguration {
*/
public static final int DEFAULT_MAX_LEASE_RENEWAL_THREADS = 20;
/**
* The amount of time to sleep in between 2 get calls from the data fetcher.
*/
public static final long DEFAULT_IDLE_MILLIS_BETWEEN_CALLS = 1500L;
private String applicationName;
private String tableName;
private String streamName;
@ -240,9 +235,6 @@ public class KinesisClientLibConfiguration {
@Getter
private RecordsFetcherFactory recordsFetcherFactory;
@Getter
private long idleMillisBetweenCalls;
/**
* Constructor.
*
@ -302,8 +294,7 @@ public class KinesisClientLibConfiguration {
DEFAULT_METRICS_MAX_QUEUE_SIZE,
DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING,
null,
DEFAULT_SHUTDOWN_GRACE_MILLIS,
DEFAULT_IDLE_MILLIS_BETWEEN_CALLS);
DEFAULT_SHUTDOWN_GRACE_MILLIS);
}
/**
@ -365,8 +356,7 @@ public class KinesisClientLibConfiguration {
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName,
long shutdownGraceMillis,
long idleMillisBetweenCalls) {
long shutdownGraceMillis) {
this(applicationName, streamName, kinesisEndpoint, null, initialPositionInStream, kinesisCredentialsProvider,
dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId,
maxRecords, idleTimeBetweenReadsInMillis,
@ -374,7 +364,7 @@ public class KinesisClientLibConfiguration {
shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry,
kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig,
taskBackoffTimeMillis, metricsBufferTimeMillis, metricsMaxQueueSize,
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, idleMillisBetweenCalls);
validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis);
}
/**
@ -437,8 +427,7 @@ public class KinesisClientLibConfiguration {
int metricsMaxQueueSize,
boolean validateSequenceNumberBeforeCheckpointing,
String regionName,
long shutdownGraceMillis,
long idleMillisBetweenCalls) {
long shutdownGraceMillis) {
// Check following values are greater than zero
checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis);
checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis);
@ -449,7 +438,6 @@ public class KinesisClientLibConfiguration {
checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis);
checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize);
checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis);
checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls);
checkIsRegionNameValid(regionName);
this.applicationName = applicationName;
this.tableName = applicationName;
@ -487,7 +475,6 @@ public class KinesisClientLibConfiguration {
this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST;
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(this.maxRecords);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
}
/**
@ -598,7 +585,6 @@ public class KinesisClientLibConfiguration {
this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION;
this.recordsFetcherFactory = recordsFetcherFactory;
this.shutdownGraceMillis = shutdownGraceMillis;
this.shutdownGraceMillis = shutdownGraceMillis;
}
// Check if value is positive, otherwise throw an exception
@ -1308,30 +1294,24 @@ public class KinesisClientLibConfiguration {
*/
public KinesisClientLibConfiguration withMaxCacheSize(final int maxCacheSize) {
checkIsValuePositive("maxCacheSize", maxCacheSize);
recordsFetcherFactory.setMaxSize(maxCacheSize);
this.recordsFetcherFactory.setMaxSize(maxCacheSize);
return this;
}
public KinesisClientLibConfiguration withMaxCacheByteSize(final int maxCacheByteSize) {
checkIsValuePositive("maxCacheByteSize", maxCacheByteSize);
recordsFetcherFactory.setMaxByteSize(maxCacheByteSize);
this.recordsFetcherFactory.setMaxByteSize(maxCacheByteSize);
return this;
}
public KinesisClientLibConfiguration withDataFetchingStrategy(String dataFetchingStrategy) {
switch (dataFetchingStrategy.toUpperCase()) {
case "PREFETCH_CACHED":
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.PREFETCH_CACHED);
break;
default:
recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.DEFAULT);
}
this.recordsFetcherFactory.setDataFetchingStrategy(DataFetchingStrategy.valueOf(dataFetchingStrategy.toUpperCase()));
return this;
}
public KinesisClientLibConfiguration withMaxRecordsCount(final int maxRecordsCount) {
checkIsValuePositive("maxRecordsCount", maxRecordsCount);
recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount);
this.recordsFetcherFactory.setMaxRecordsCount(maxRecordsCount);
return this;
}
@ -1358,7 +1338,7 @@ public class KinesisClientLibConfiguration {
*/
public KinesisClientLibConfiguration withIdleMillisBetweenCalls(long idleMillisBetweenCalls) {
checkIsValuePositive("IdleMillisBetweenCalls", idleMillisBetweenCalls);
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
this.recordsFetcherFactory.setIdleMillisBetweenCalls(idleMillisBetweenCalls);
return this;
}
}

View file

@ -14,7 +14,6 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Instant;
import java.util.Collections;
import java.util.Date;
@ -43,18 +42,15 @@ class KinesisDataFetcher {
private final String shardId;
private boolean isShardEndReached;
private boolean isInitialized;
private Instant lastResponseTime;
private long idleMillisBetweenCalls;
/**
*
* @param kinesisProxy Kinesis proxy
* @param shardInfo The shardInfo object.
*/
public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo, KinesisClientLibConfiguration configuration) {
public KinesisDataFetcher(IKinesisProxy kinesisProxy, ShardInfo shardInfo) {
this.shardId = shardInfo.getShardId();
this.kinesisProxy = new MetricsCollectingKinesisProxyDecorator("KinesisDataFetcher", kinesisProxy, this.shardId);
this.idleMillisBetweenCalls = configuration.getIdleMillisBetweenCalls();
}
/**

View file

@ -15,6 +15,7 @@
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@ -42,6 +43,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
private final int maxRecordsPerCall;
private final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
private final ExecutorService executorService;
private final long idleMillisBetweenCalls;
private Instant lastSuccessfulCall;
private PrefetchCounters prefetchCounters;
@ -50,7 +53,8 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
public PrefetchGetRecordsCache(final int maxSize, final int maxByteSize, final int maxRecordsCount,
final int maxRecordsPerCall,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
@NonNull final ExecutorService executorService) {
@NonNull final ExecutorService executorService,
long idleMillisBetweenCalls) {
this.getRecordsRetrievalStrategy = getRecordsRetrievalStrategy;
this.maxRecordsPerCall = maxRecordsPerCall;
this.maxSize = maxSize;
@ -59,6 +63,7 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
this.getRecordsResultQueue = new LinkedBlockingQueue<>(this.maxSize);
this.prefetchCounters = new PrefetchCounters();
this.executorService = executorService;
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
}
@Override
@ -100,7 +105,6 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override
public void shutdown() {
getRecordsRetrievalStrategy.shutdown();
executorService.shutdownNow();
started = false;
}
@ -109,25 +113,40 @@ public class PrefetchGetRecordsCache implements GetRecordsCache {
@Override
public void run() {
while (true) {
if (Thread.interrupted()) {
if (Thread.currentThread().isInterrupted()) {
log.warn("Prefetch thread was interrupted.");
break;
}
if (prefetchCounters.shouldGetNewRecords()) {
try {
sleepBeforeNextCall();
GetRecordsResult getRecordsResult = getRecordsRetrievalStrategy.getRecords(maxRecordsPerCall);
lastSuccessfulCall = Instant.now();
ProcessRecordsInput processRecordsInput = new ProcessRecordsInput()
.withRecords(getRecordsResult.getRecords())
.withMillisBehindLatest(getRecordsResult.getMillisBehindLatest())
.withCacheEntryTime(Instant.now());
.withCacheEntryTime(lastSuccessfulCall);
getRecordsResultQueue.put(processRecordsInput);
prefetchCounters.added(processRecordsInput);
} catch (InterruptedException e) {
log.info("Thread was interrupted, indicating shutdown was called on the cache");
log.info("Thread was interrupted, indicating shutdown was called on the cache. Calling shutdown on the GetRecordsRetrievalStrategy.");
getRecordsRetrievalStrategy.shutdown();
} catch (Error e) {
log.error("Error was thrown while getting records, please check for the error", e);
}
}
}
}
private void sleepBeforeNextCall() throws InterruptedException {
if (lastSuccessfulCall == null) {
return;
}
long timeSinceLastCall = Duration.between(lastSuccessfulCall, Instant.now()).abs().toMillis();
if (timeSinceLastCall < idleMillisBetweenCalls) {
Thread.sleep(idleMillisBetweenCalls - timeSinceLastCall);
}
}
}
private class PrefetchCounters {

View file

@ -36,4 +36,6 @@ public interface RecordsFetcherFactory {
void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy);
void setIdleMillisBetweenCalls(long idleMillisBetweenCalls);
}

View file

@ -20,7 +20,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import lombok.Getter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -32,6 +31,8 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
/**
* Responsible for consuming data records of a (specified) shard.
* The instance should be shutdown when we lose the primary responsibility for a shard.
@ -177,7 +178,7 @@ class ShardConsumer {
metricsFactory,
backoffTimeMillis,
skipShardSyncAtWorkerInitializationIfLeasesExist,
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config),
new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo),
retryGetRecordsInSeconds,
maxGetRecordsThreadPool,
config

View file

@ -24,6 +24,7 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
private int maxSize = 3;
private int maxByteSize = 8 * 1024 * 1024;
private int maxRecordsCount = 30000;
private long idleMillisBetweenCalls = 1500L;
private DataFetchingStrategy dataFetchingStrategy = DataFetchingStrategy.DEFAULT;
public SimpleRecordsFetcherFactory(int maxRecords) {
@ -33,10 +34,10 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
@Override
public GetRecordsCache createRecordsFetcher(GetRecordsRetrievalStrategy getRecordsRetrievalStrategy) {
if(dataFetchingStrategy.equals(DataFetchingStrategy.DEFAULT)) {
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy);
return new BlockingGetRecordsCache(maxRecords, getRecordsRetrievalStrategy, idleMillisBetweenCalls);
} else {
return new PrefetchGetRecordsCache(maxSize, maxByteSize, maxRecordsCount, maxRecords,
getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1));
getRecordsRetrievalStrategy, Executors.newFixedThreadPool(1), idleMillisBetweenCalls);
}
}
@ -59,4 +60,9 @@ public class SimpleRecordsFetcherFactory implements RecordsFetcherFactory {
public void setDataFetchingStrategy(DataFetchingStrategy dataFetchingStrategy){
this.dataFetchingStrategy = dataFetchingStrategy;
}
@Override
public void setIdleMillisBetweenCalls(final long idleMillisBetweenCalls) {
this.idleMillisBetweenCalls = idleMillisBetweenCalls;
}
}

View file

@ -14,29 +14,6 @@
*/
package com.amazonaws.services.kinesis.clientlibrary.lib.worker;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsEqual.equalTo;
@ -50,6 +27,29 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@RunWith(MockitoJUnitRunner.class)
public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@ -64,8 +64,6 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Mock
private ShardInfo mockShardInfo;
@Mock
private KinesisClientLibConfiguration configuration;
@Mock
private Supplier<CompletionService<DataFetcherResult>> completionServiceSupplier;
@Mock
private DataFetcherResult result;
@ -83,7 +81,7 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
@Before
public void setup() {
dataFetcher = spy(new KinesisDataFetcherForTests(mockKinesisProxy, mockShardInfo, configuration));
dataFetcher = spy(new KinesisDataFetcherForTests(mockKinesisProxy, mockShardInfo));
rejectedExecutionHandler = spy(new ThreadPoolExecutor.AbortPolicy());
executorService = spy(new ThreadPoolExecutor(
CORE_POOL_SIZE,
@ -154,9 +152,8 @@ public class AsynchronousGetRecordsRetrievalStrategyIntegrationTest {
}
private class KinesisDataFetcherForTests extends KinesisDataFetcher {
public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo,
final KinesisClientLibConfiguration configuration) {
super(kinesisProxy, shardInfo, configuration);
public KinesisDataFetcherForTests(final IKinesisProxy kinesisProxy, final ShardInfo shardInfo) {
super(kinesisProxy, shardInfo);
}
@Override

View file

@ -40,6 +40,7 @@ import com.amazonaws.services.kinesis.model.Record;
@RunWith(MockitoJUnitRunner.class)
public class BlockingGetRecordsCacheTest {
private static final int MAX_RECORDS_PER_COUNT = 10_000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 500L;
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@ -52,7 +53,7 @@ public class BlockingGetRecordsCacheTest {
@Before
public void setup() {
records = new ArrayList<>();
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy);
blockingGetRecordsCache = new BlockingGetRecordsCache(MAX_RECORDS_PER_COUNT, getRecordsRetrievalStrategy, IDLE_MILLIS_BETWEEN_CALLS);
when(getRecordsRetrievalStrategy.getRecords(eq(MAX_RECORDS_PER_COUNT))).thenReturn(getRecordsResult);
when(getRecordsResult.getRecords()).thenReturn(records);

View file

@ -19,7 +19,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import junit.framework.Assert;
import java.util.Date;
import org.junit.Test;
import org.mockito.Mockito;
@ -35,7 +35,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorF
import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel;
import com.google.common.collect.ImmutableSet;
import java.util.Date;
import junit.framework.Assert;
public class KinesisClientLibConfigurationTest {
private static final long INVALID_LONG = 0L;
@ -85,7 +85,6 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_INT,
skipCheckpointValidationValue,
null,
TEST_VALUE_LONG,
TEST_VALUE_LONG);
}
@ -97,7 +96,7 @@ public class KinesisClientLibConfigurationTest {
KinesisClientLibConfiguration config = null;
long[] longValues =
{ TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG, TEST_VALUE_LONG,
TEST_VALUE_LONG, TEST_VALUE_LONG };
TEST_VALUE_LONG };
for (int i = 0; i < PARAMETER_COUNT; i++) {
longValues[i] = INVALID_LONG;
try {
@ -126,8 +125,7 @@ public class KinesisClientLibConfigurationTest {
TEST_VALUE_INT,
skipCheckpointValidationValue,
null,
longValues[6],
longValues[7]);
longValues[6]);
} catch (IllegalArgumentException e) {
System.out.println(e.getMessage());
}
@ -162,7 +160,6 @@ public class KinesisClientLibConfigurationTest {
intValues[1],
skipCheckpointValidationValue,
null,
TEST_VALUE_LONG,
TEST_VALUE_LONG);
} catch (IllegalArgumentException e) {
System.out.println(e.getMessage());
@ -327,7 +324,6 @@ public class KinesisClientLibConfigurationTest {
1,
skipCheckpointValidationValue,
"abcd",
TEST_VALUE_LONG,
TEST_VALUE_LONG);
Assert.fail("No expected Exception is thrown.");
} catch(IllegalArgumentException e) {

View file

@ -35,9 +35,6 @@ import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@ -66,8 +63,6 @@ public class KinesisDataFetcherTest {
@Mock
private KinesisProxy kinesisProxy;
@Mock
private KinesisClientLibConfiguration configuration;
private static final int MAX_RECORDS = 1;
private static final String SHARD_ID = "shardId-1";
@ -139,9 +134,8 @@ public class KinesisDataFetcherTest {
public void testadvanceIteratorTo() throws KinesisClientLibException {
IKinesisProxy kinesis = mock(IKinesisProxy.class);
ICheckpoint checkpoint = mock(ICheckpoint.class);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher);
String iteratorA = "foo";
@ -173,9 +167,8 @@ public class KinesisDataFetcherTest {
@Test
public void testadvanceIteratorToTrimHorizonLatestAndAtTimestamp() {
IKinesisProxy kinesis = mock(IKinesisProxy.class);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
String iteratorHorizon = "horizon";
when(kinesis.getIterator(SHARD_ID, ShardIteratorType.TRIM_HORIZON.toString())).thenReturn(iteratorHorizon);
@ -204,10 +197,9 @@ public class KinesisDataFetcherTest {
KinesisProxy mockProxy = mock(KinesisProxy.class);
doReturn(nextIterator).when(mockProxy).getIterator(SHARD_ID, ShardIteratorType.LATEST.toString());
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
// Create data fectcher and initialize it with latest type checkpoint
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO);
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(dataFetcher);
// Call getRecords of dataFetcher which will throw an exception
@ -224,9 +216,8 @@ public class KinesisDataFetcherTest {
KinesisProxy mockProxy = mock(KinesisProxy.class);
doThrow(new ResourceNotFoundException("Test Exception")).when(mockProxy).get(nextIterator, maxRecords);
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO, configuration);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(mockProxy, SHARD_INFO);
dataFetcher.initialize(SentinelCheckpoint.LATEST.toString(), INITIAL_POSITION_LATEST);
DataFetcherResult dataFetcherResult = dataFetcher.getRecords(maxRecords);
@ -252,7 +243,7 @@ public class KinesisDataFetcherTest {
when(kinesisProxy.get(eq(NEXT_ITERATOR_TWO), anyInt())).thenReturn(finalResult);
when(finalResult.getNextShardIterator()).thenReturn(null);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO, configuration);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(kinesisProxy, SHARD_INFO);
dataFetcher.initialize("TRIM_HORIZON",
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));
@ -331,9 +322,8 @@ public class KinesisDataFetcherTest {
ICheckpoint checkpoint = mock(ICheckpoint.class);
when(checkpoint.getCheckpoint(SHARD_ID)).thenReturn(new ExtendedSequenceNumber(seqNo));
when(configuration.getIdleMillisBetweenCalls()).thenReturn(500L);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO, configuration);
KinesisDataFetcher fetcher = new KinesisDataFetcher(kinesis, SHARD_INFO);
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy = new SynchronousGetRecordsRetrievalStrategy(fetcher);
fetcher.initialize(seqNo, initialPositionInStream);
List<Record> actualRecords = getRecordsRetrievalStrategy.getRecords(MAX_RECORDS).getRecords();

View file

@ -19,7 +19,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@ -28,13 +27,10 @@ import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.Record;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -43,12 +39,17 @@ import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.model.GetRecordsResult;
import com.amazonaws.services.kinesis.model.Record;
import lombok.extern.apachecommons.CommonsLog;
/**
*
* These are the integration tests for the PrefetchGetRecordsCache class.
*/
@RunWith(MockitoJUnitRunner.class)
@CommonsLog
public class PrefetchGetRecordsCacheIntegrationTest {
private static final int MAX_SIZE = 3;
private static final int MAX_BYTE_SIZE = 5 * 1024 * 1024;
@ -68,15 +69,10 @@ public class PrefetchGetRecordsCacheIntegrationTest {
@Mock
private ShardInfo shardInfo;
@Mock
private KinesisClientLibConfiguration configuration;
@Before
public void setup() {
when(configuration.getIdleMillisBetweenCalls()).thenReturn(IDLE_MILLIS_BETWEEN_CALLS);
records = new ArrayList<>();
dataFetcher = new KinesisDataFetcherForTest(proxy, shardInfo, configuration);
dataFetcher = new KinesisDataFetcherForTest(proxy, shardInfo);
getRecordsRetrievalStrategy = spy(new SynchronousGetRecordsRetrievalStrategy(dataFetcher));
executorService = spy(Executors.newFixedThreadPool(1));
@ -85,7 +81,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService);
executorService,
IDLE_MILLIS_BETWEEN_CALLS);
}
@Test
@ -120,7 +117,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
@Test
public void testDifferentShardCaches() {
ExecutorService executorService2 = spy(Executors.newFixedThreadPool(1));
KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcherForTest(proxy, shardInfo, configuration));
KinesisDataFetcher kinesisDataFetcher = spy(new KinesisDataFetcherForTest(proxy, shardInfo));
GetRecordsRetrievalStrategy getRecordsRetrievalStrategy2 = spy(new AsynchronousGetRecordsRetrievalStrategy(kinesisDataFetcher, 5 , 5, "Test-shard"));
GetRecordsCache getRecordsCache2 = new PrefetchGetRecordsCache(
MAX_SIZE,
@ -128,8 +125,8 @@ public class PrefetchGetRecordsCacheIntegrationTest {
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy2,
executorService2
);
executorService2,
IDLE_MILLIS_BETWEEN_CALLS);
getRecordsCache.start();
sleep(IDLE_MILLIS_BETWEEN_CALLS);
@ -156,6 +153,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
assertEquals(p2.getRecords().size(), records.size());
getRecordsCache2.shutdown();
sleep(100L);
verify(executorService2).shutdownNow();
verify(getRecordsRetrievalStrategy2).shutdown();
}
@ -163,6 +161,7 @@ public class PrefetchGetRecordsCacheIntegrationTest {
@After
public void shutdown() {
getRecordsCache.shutdown();
sleep(100L);
verify(executorService).shutdownNow();
verify(getRecordsRetrievalStrategy).shutdown();
}
@ -175,16 +174,12 @@ public class PrefetchGetRecordsCacheIntegrationTest {
private class KinesisDataFetcherForTest extends KinesisDataFetcher {
public KinesisDataFetcherForTest(final IKinesisProxy kinesisProxy,
final ShardInfo shardInfo,
final KinesisClientLibConfiguration configuration) {
super(kinesisProxy, shardInfo, configuration);
final ShardInfo shardInfo) {
super(kinesisProxy, shardInfo);
}
@Override
public DataFetcherResult getRecords(final int maxRecords) {
GetRecordsResult getRecordsResult = new GetRecordsResult();
getRecordsResult.setRecords(new ArrayList<>(records));
getRecordsResult.setMillisBehindLatest(1000L);

View file

@ -57,6 +57,7 @@ public class PrefetchGetRecordsCacheTest {
private static final int MAX_RECORDS_PER_CALL = 10000;
private static final int MAX_SIZE = 5;
private static final int MAX_RECORDS_COUNT = 15000;
private static final long IDLE_MILLIS_BETWEEN_CALLS = 0L;
@Mock
private GetRecordsRetrievalStrategy getRecordsRetrievalStrategy;
@ -79,7 +80,8 @@ public class PrefetchGetRecordsCacheTest {
MAX_RECORDS_COUNT,
MAX_RECORDS_PER_CALL,
getRecordsRetrievalStrategy,
executorService);
executorService,
IDLE_MILLIS_BETWEEN_CALLS);
spyQueue = spy(getRecordsCache.getRecordsResultQueue);
records = spy(new ArrayList<>());

View file

@ -22,7 +22,6 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.atLeastOnce;
@ -36,7 +35,6 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.ListIterator;
@ -48,7 +46,6 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hamcrest.Description;
@ -122,7 +119,6 @@ public class ShardConsumerTest {
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(maxRecords));
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getIdleMillisBetweenCalls()).thenReturn(0l);
}
/**
@ -340,9 +336,11 @@ public class ShardConsumerTest {
)
);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache);
ShardConsumer consumer =
@ -468,9 +466,11 @@ public class ShardConsumerTest {
)
);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo, config);
KinesisDataFetcher dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo);
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, new SynchronousGetRecordsRetrievalStrategy(dataFetcher)));
getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords,
new SynchronousGetRecordsRetrievalStrategy(dataFetcher),
0L));
when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache);
ShardConsumer consumer =

View file

@ -24,7 +24,16 @@ import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.lang.Thread.State;
@ -132,12 +141,11 @@ public class WorkerTest {
private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d";
private RecordsFetcherFactory recordsFetcherFactory;
private KinesisClientLibConfiguration config;
@Mock
private KinesisClientLibLeaseCoordinator leaseCoordinator;
@Mock
private KinesisClientLibConfiguration config;
@Mock
private ILeaseManager<KinesisClientLease> leaseManager;
@Mock
private com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory v1RecordProcessorFactory;
@ -162,9 +170,9 @@ public class WorkerTest {
@Before
public void setup() {
config = spy(new KinesisClientLibConfiguration("app", null, null, null));
recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory(500));
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(config.getIdleMillisBetweenCalls()).thenReturn(500L);
}
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
@ -207,14 +215,13 @@ public class WorkerTest {
/**
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#getApplicationName()}.
* Test method for {@link Worker#getApplicationName()}.
*/
@Test
public final void testGetStageName() {
final String stageName = "testStageName";
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
Worker worker = new Worker(v1RecordProcessorFactory, clientConfig);
config = new KinesisClientLibConfiguration(stageName, null, null, null);
Worker worker = new Worker(v1RecordProcessorFactory, config);
Assert.assertEquals(stageName, worker.getApplicationName());
}
@ -222,8 +229,7 @@ public class WorkerTest {
public final void testCreateOrGetShardConsumer() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
config = new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
@ -243,7 +249,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
streamletFactory,
clientConfig,
config,
streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
@ -273,8 +279,6 @@ public class WorkerTest {
public void testWorkerLoopWithCheckpoint() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
@ -344,8 +348,7 @@ public class WorkerTest {
public final void testCleanupShardConsumers() {
final String stageName = "testStageName";
IRecordProcessorFactory streamletFactory = SAMPLE_RECORD_PROCESSOR_FACTORY_V2;
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
config = new KinesisClientLibConfiguration(stageName, null, null, null);
IKinesisProxy proxy = null;
ICheckpoint checkpoint = null;
int maxRecords = 1;
@ -365,7 +368,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
streamletFactory,
clientConfig,
config,
streamConfig, INITIAL_POSITION_LATEST,
parentShardPollIntervalMillis,
shardSyncIntervalMillis,
@ -405,8 +408,7 @@ public class WorkerTest {
public final void testInitializationFailureWithRetries() {
String stageName = "testInitializationWorker";
IRecordProcessorFactory recordProcessorFactory = new TestStreamletFactory(null, null);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration(stageName, null, null, null);
config = new KinesisClientLibConfiguration(stageName, null, null, null);
int count = 0;
when(proxy.getShardList()).thenThrow(new RuntimeException(Integer.toString(count++)));
int maxRecords = 2;
@ -422,7 +424,7 @@ public class WorkerTest {
Worker worker =
new Worker(stageName,
recordProcessorFactory,
clientConfig,
config,
streamConfig, INITIAL_POSITION_TRIM_HORIZON,
shardPollInterval,
shardSyncIntervalMillis,
@ -474,7 +476,7 @@ public class WorkerTest {
/**
* Runs worker with threadPoolSize < numShards
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#run()}.
* Test method for {@link Worker#run()}.
*/
@Test
public final void testOneSplitShard2Threads() throws Exception {
@ -485,12 +487,12 @@ public class WorkerTest {
KinesisClientLease lease = ShardSyncer.newKCLLease(shardList.get(0));
lease.setCheckpoint(new ExtendedSequenceNumber("2"));
initialLeases.add(lease);
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard);
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config);
}
/**
* Runs worker with threadPoolSize < numShards
* Test method for {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker#run()}.
* Test method for {@link Worker#run()}.
*/
@Test
public final void testOneSplitShard2ThreadsWithCallsForEmptyRecords() throws Exception {
@ -502,7 +504,10 @@ public class WorkerTest {
lease.setCheckpoint(new ExtendedSequenceNumber("2"));
initialLeases.add(lease);
boolean callProcessRecordsForEmptyRecordList = true;
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard);
RecordsFetcherFactory recordsFetcherFactory = new SimpleRecordsFetcherFactory(500);
recordsFetcherFactory.setIdleMillisBetweenCalls(0L);
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config);
}
@Test
@ -527,7 +532,8 @@ public class WorkerTest {
10,
kinesisProxy, v2RecordProcessorFactory,
executorService,
cwMetricsFactory);
cwMetricsFactory,
config);
// Give some time for thread to run.
workerStarted.await();
@ -563,7 +569,8 @@ public class WorkerTest {
10,
kinesisProxy, v2RecordProcessorFactory,
executorService,
cwMetricsFactory);
cwMetricsFactory,
config);
// Give some time for thread to run.
workerStarted.await();
@ -610,6 +617,12 @@ public class WorkerTest {
}
}).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class));
RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class);
GetRecordsCache getRecordsCache = mock(GetRecordsCache.class);
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
when(recordsFetcherFactory.createRecordsFetcher(any())).thenReturn(getRecordsCache);
when(getRecordsCache.getNextResult()).thenReturn(new ProcessRecordsInput().withRecords(Collections.emptyList()).withMillisBehindLatest(0L));
WorkerThread workerThread = runWorker(shardList,
initialLeases,
callProcessRecordsForEmptyRecordList,
@ -618,7 +631,8 @@ public class WorkerTest {
fileBasedProxy,
v2RecordProcessorFactory,
executorService,
nullMetricsFactory);
nullMetricsFactory,
config);
// Only sleep for time that is required.
processRecordsLatch.await();
@ -709,7 +723,8 @@ public class WorkerTest {
fileBasedProxy,
v2RecordProcessorFactory,
executorService,
nullMetricsFactory);
nullMetricsFactory,
config);
// Only sleep for time that is required.
processRecordsLatch.await();
@ -746,8 +761,6 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -836,8 +849,6 @@ public class WorkerTest {
public void testShutdownCallableNotAllowedTwice() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -903,8 +914,6 @@ public class WorkerTest {
public void testGracefulShutdownSingleFuture() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -993,8 +1002,6 @@ public class WorkerTest {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1021,7 +1028,7 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown",
recordProcessorFactory,
clientConfig,
config,
streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
@ -1069,8 +1076,6 @@ public class WorkerTest {
public void testRequestShutdownWithLostLease() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1105,7 +1110,7 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown",
recordProcessorFactory,
clientConfig,
config,
streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
@ -1184,8 +1189,6 @@ public class WorkerTest {
public void testRequestShutdownWithAllLeasesLost() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1220,7 +1223,7 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown",
recordProcessorFactory,
clientConfig,
config,
streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
@ -1304,8 +1307,6 @@ public class WorkerTest {
public void testLeaseCancelledAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1339,7 +1340,7 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown",
recordProcessorFactory,
clientConfig,
config,
streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
@ -1390,8 +1391,6 @@ public class WorkerTest {
public void testEndOfShardAfterShutdownRequest() throws Exception {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final KinesisClientLibConfiguration clientConfig =
new KinesisClientLibConfiguration("app", null, null, null);
StreamConfig streamConfig = mock(StreamConfig.class);
IMetricsFactory metricsFactory = mock(IMetricsFactory.class);
@ -1425,7 +1424,7 @@ public class WorkerTest {
Worker worker = new Worker("testRequestShutdown",
recordProcessorFactory,
clientConfig,
config,
streamConfig,
INITIAL_POSITION_TRIM_HORIZON,
parentShardPollIntervalMillis,
@ -1728,14 +1727,15 @@ public class WorkerTest {
lease.setCheckpoint(ExtendedSequenceNumber.AT_TIMESTAMP);
initialLeases.add(lease);
}
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard);
runAndTestWorker(shardList, threadPoolSize, initialLeases, callProcessRecordsForEmptyRecordList, numberOfRecordsPerShard, config);
}
private void runAndTestWorker(List<Shard> shardList,
int threadPoolSize,
List<KinesisClientLease> initialLeases,
boolean callProcessRecordsForEmptyRecordList,
int numberOfRecordsPerShard) throws Exception {
int numberOfRecordsPerShard,
KinesisClientLibConfiguration clientConfig) throws Exception {
File file = KinesisLocalFileDataCreator.generateTempDataFile(shardList, numberOfRecordsPerShard, "unitTestWT001");
IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
@ -1747,7 +1747,7 @@ public class WorkerTest {
WorkerThread workerThread = runWorker(
shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis,
numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory);
numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig);
// TestStreamlet will release the semaphore once for every record it processes
recordCounter.acquire(numberOfRecordsPerShard * shardList.size());
@ -1771,7 +1771,8 @@ public class WorkerTest {
IKinesisProxy kinesisProxy,
IRecordProcessorFactory recordProcessorFactory,
ExecutorService executorService,
IMetricsFactory metricsFactory) throws Exception {
IMetricsFactory metricsFactory,
KinesisClientLibConfiguration clientConfig) throws Exception {
final String stageName = "testStageName";
final int maxRecords = 2;
@ -1799,9 +1800,6 @@ public class WorkerTest {
idleTimeInMilliseconds,
callProcessRecordsForEmptyRecordList,
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
KinesisClientLibConfiguration clientConfig = spy(new KinesisClientLibConfiguration("app", null, null, null));
when(clientConfig.getIdleMillisBetweenCalls()).thenReturn(0L);
Worker worker =
new Worker(stageName,