diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 037a54b2..15b8891c 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -142,7 +142,7 @@ public class WorkerTest { private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d"; private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d"; - + private RecordsFetcherFactory recordsFetcherFactory; private KinesisClientLibConfiguration config; @@ -170,7 +170,7 @@ public class WorkerTest { private Future taskFuture; @Mock private TaskResult taskResult; - + @Before public void setup() { config = spy(new KinesisClientLibConfiguration("app", null, null, null)); @@ -179,7 +179,7 @@ public class WorkerTest { } // CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES - private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = + private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY = new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() { @Override @@ -212,8 +212,8 @@ public class WorkerTest { }; } }; - - private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = + + private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 = new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY); @@ -619,7 +619,7 @@ public class WorkerTest { return null; } }).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class)); - + RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class); GetRecordsCache getRecordsCache = mock(GetRecordsCache.class); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); @@ -659,7 +659,7 @@ public class WorkerTest { * This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of * {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads. * This behavior makes the test a bit racy, since we need to ensure a specific order of events. - * + * * @throws Exception */ @Test @@ -1356,7 +1356,7 @@ public class WorkerTest { executorService, metricsFactory, taskBackoffTimeMillis, - failoverTimeMillis, + failoverTimeMillis, false, shardPrioritization); @@ -1432,7 +1432,7 @@ public class WorkerTest { config, streamConfig, INITIAL_POSITION_TRIM_HORIZON, - parentShardPollIntervalMillis, + parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupLeasesUponShardCompletion, leaseCoordinator, @@ -1482,7 +1482,6 @@ public class WorkerTest { .recordProcessorFactory(recordProcessorFactory) .config(config) .build(); - Assert.assertNotNull(worker.getStreamConfig().getStreamProxy()); Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisProxy); } @@ -1496,7 +1495,6 @@ public class WorkerTest { .config(config) .kinesisProxy(kinesisProxy) .build(); - Assert.assertNotNull(worker.getStreamConfig().getStreamProxy()); Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy); } @@ -1801,7 +1799,7 @@ public class WorkerTest { TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier); ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize); - + WorkerThread workerThread = runWorker( shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis, numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig); @@ -1857,7 +1855,7 @@ public class WorkerTest { idleTimeInMilliseconds, callProcessRecordsForEmptyRecordList, skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp)); - + Worker worker = new Worker(stageName, recordProcessorFactory, @@ -1874,7 +1872,7 @@ public class WorkerTest { failoverTimeMillis, KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, shardPrioritization); - + WorkerThread workerThread = new WorkerThread(worker); workerThread.start(); return workerThread;