Merge 5aef2afb45 into 6fc148740d
This commit is contained in:
commit
d7ae60defc
1 changed files with 12 additions and 14 deletions
|
|
@ -142,7 +142,7 @@ public class WorkerTest {
|
|||
|
||||
private static final String KINESIS_SHARD_ID_FORMAT = "kinesis-0-0-%d";
|
||||
private static final String CONCURRENCY_TOKEN_FORMAT = "testToken-%d";
|
||||
|
||||
|
||||
private RecordsFetcherFactory recordsFetcherFactory;
|
||||
private KinesisClientLibConfiguration config;
|
||||
|
||||
|
|
@ -170,7 +170,7 @@ public class WorkerTest {
|
|||
private Future<TaskResult> taskFuture;
|
||||
@Mock
|
||||
private TaskResult taskResult;
|
||||
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
config = spy(new KinesisClientLibConfiguration("app", null, null, null));
|
||||
|
|
@ -179,7 +179,7 @@ public class WorkerTest {
|
|||
}
|
||||
|
||||
// CHECKSTYLE:IGNORE AnonInnerLengthCheck FOR NEXT 50 LINES
|
||||
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
|
||||
private static final com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY =
|
||||
new com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory() {
|
||||
|
||||
@Override
|
||||
|
|
@ -212,8 +212,8 @@ public class WorkerTest {
|
|||
};
|
||||
}
|
||||
};
|
||||
|
||||
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
|
||||
|
||||
private static final IRecordProcessorFactory SAMPLE_RECORD_PROCESSOR_FACTORY_V2 =
|
||||
new V1ToV2RecordProcessorFactoryAdapter(SAMPLE_RECORD_PROCESSOR_FACTORY);
|
||||
|
||||
|
||||
|
|
@ -619,7 +619,7 @@ public class WorkerTest {
|
|||
return null;
|
||||
}
|
||||
}).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class));
|
||||
|
||||
|
||||
RecordsFetcherFactory recordsFetcherFactory = mock(RecordsFetcherFactory.class);
|
||||
GetRecordsCache getRecordsCache = mock(GetRecordsCache.class);
|
||||
when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory);
|
||||
|
|
@ -659,7 +659,7 @@ public class WorkerTest {
|
|||
* This test is testing the {@link Worker}'s shutdown behavior and by extension the behavior of
|
||||
* {@link ThreadPoolExecutor#shutdownNow()}. It depends on the thread pool sending an interrupt to the pool threads.
|
||||
* This behavior makes the test a bit racy, since we need to ensure a specific order of events.
|
||||
*
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
|
|
@ -1356,7 +1356,7 @@ public class WorkerTest {
|
|||
executorService,
|
||||
metricsFactory,
|
||||
taskBackoffTimeMillis,
|
||||
failoverTimeMillis,
|
||||
failoverTimeMillis,
|
||||
false,
|
||||
shardPrioritization);
|
||||
|
||||
|
|
@ -1432,7 +1432,7 @@ public class WorkerTest {
|
|||
config,
|
||||
streamConfig,
|
||||
INITIAL_POSITION_TRIM_HORIZON,
|
||||
parentShardPollIntervalMillis,
|
||||
parentShardPollIntervalMillis,
|
||||
shardSyncIntervalMillis,
|
||||
cleanupLeasesUponShardCompletion,
|
||||
leaseCoordinator,
|
||||
|
|
@ -1482,7 +1482,6 @@ public class WorkerTest {
|
|||
.recordProcessorFactory(recordProcessorFactory)
|
||||
.config(config)
|
||||
.build();
|
||||
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
|
||||
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisProxy);
|
||||
}
|
||||
|
||||
|
|
@ -1496,7 +1495,6 @@ public class WorkerTest {
|
|||
.config(config)
|
||||
.kinesisProxy(kinesisProxy)
|
||||
.build();
|
||||
Assert.assertNotNull(worker.getStreamConfig().getStreamProxy());
|
||||
Assert.assertTrue(worker.getStreamConfig().getStreamProxy() instanceof KinesisLocalFileProxy);
|
||||
}
|
||||
|
||||
|
|
@ -1801,7 +1799,7 @@ public class WorkerTest {
|
|||
TestStreamletFactory recordProcessorFactory = new TestStreamletFactory(recordCounter, shardSequenceVerifier);
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(threadPoolSize);
|
||||
|
||||
|
||||
WorkerThread workerThread = runWorker(
|
||||
shardList, initialLeases, callProcessRecordsForEmptyRecordList, failoverTimeMillis,
|
||||
numberOfRecordsPerShard, fileBasedProxy, recordProcessorFactory, executorService, nullMetricsFactory, clientConfig);
|
||||
|
|
@ -1857,7 +1855,7 @@ public class WorkerTest {
|
|||
idleTimeInMilliseconds,
|
||||
callProcessRecordsForEmptyRecordList,
|
||||
skipCheckpointValidationValue, InitialPositionInStreamExtended.newInitialPositionAtTimestamp(timestamp));
|
||||
|
||||
|
||||
Worker worker =
|
||||
new Worker(stageName,
|
||||
recordProcessorFactory,
|
||||
|
|
@ -1874,7 +1872,7 @@ public class WorkerTest {
|
|||
failoverTimeMillis,
|
||||
KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST,
|
||||
shardPrioritization);
|
||||
|
||||
|
||||
WorkerThread workerThread = new WorkerThread(worker);
|
||||
workerThread.start();
|
||||
return workerThread;
|
||||
|
|
|
|||
Loading…
Reference in a new issue