Adding initialized boolean to record worker state

This commit is contained in:
Nariman Mirzaei 2017-10-17 13:29:48 -04:00
parent 9720b1b249
commit 7234599e1a
2 changed files with 65 additions and 0 deletions

View file

@ -99,6 +99,8 @@ public class Worker implements Runnable {
private volatile long shutdownStartTimeMillis;
private volatile boolean shutdownComplete = false;
private volatile boolean initialized = false;
// Holds consumers for shards the worker is currently tracking. Key is shard
// info, value is ShardConsumer.
private ConcurrentMap<ShardInfo, ShardConsumer> shardInfoShardConsumerMap = new ConcurrentHashMap<ShardInfo, ShardConsumer>();
@ -528,6 +530,7 @@ public class Worker implements Runnable {
if (!isDone) {
throw new RuntimeException(lastException);
}
initialized = true;
}
/**
@ -682,6 +685,10 @@ public class Worker implements Runnable {
return gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown);
}
public boolean isInitialized() {
return initialized;
}
public boolean hasGracefulShutdownStarted() {
return gracefuleShutdownStarted;
}
@ -786,6 +793,7 @@ public class Worker implements Runnable {
if (metricsFactory instanceof WorkerCWMetricsFactory) {
((CWMetricsFactory) metricsFactory).shutdown();
}
initialized = false;
shutdownComplete = true;
}

View file

@ -1334,6 +1334,63 @@ public class WorkerTest {
}
@Test
public final void testWorkerInitialization() throws Exception {
final List<Shard> shardList = createShardListWithOneShard();
final boolean callProcessRecordsForEmptyRecordList = true;
final long failoverTimeMillis = 50L;
final int numberOfRecordsPerShard = 1000;
final List<KinesisClientLease> initialLeases = new ArrayList<KinesisClientLease>();
for (Shard shard : shardList) {
KinesisClientLease lease = ShardSyncer.newKCLLease(shard);
lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON);
initialLeases.add(lease);
}
final File file = KinesisLocalFileDataCreator.generateTempDataFile(
shardList, numberOfRecordsPerShard, "normalShutdownUnitTest");
final IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath());
final ExecutorService executorService = Executors.newCachedThreadPool();
// Make test case as efficient as possible.
final CountDownLatch processRecordsLatch = new CountDownLatch(1);
when(v2RecordProcessorFactory.createProcessor()).thenReturn(v2RecordProcessor);
doAnswer(new Answer<Object> () {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
// Signal that record processor has started processing records.
processRecordsLatch.countDown();
return null;
}
}).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class));
WorkerThread workerThread = runWorker(shardList,
initialLeases,
callProcessRecordsForEmptyRecordList,
failoverTimeMillis,
numberOfRecordsPerShard,
fileBasedProxy,
v2RecordProcessorFactory,
executorService,
nullMetricsFactory);
// Only sleep for time that is required.
processRecordsLatch.await();
// Make sure record processor is initialized and processing records.
verify(v2RecordProcessorFactory, times(1)).createProcessor();
verify(v2RecordProcessor, times(1)).initialize(any(InitializationInput.class));
verify(v2RecordProcessor, atLeast(1)).processRecords(any(ProcessRecordsInput.class));
verify(v2RecordProcessor, times(0)).shutdown(any(ShutdownInput.class));
Assert.assertTrue(workerThread.getWorker().isInitialized());
}
private abstract class InjectableWorker extends Worker {
InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory,
StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,