From 7234599e1a3bf086e4764cd2b471c03796caa811 Mon Sep 17 00:00:00 2001 From: Nariman Mirzaei Date: Tue, 17 Oct 2017 13:29:48 -0400 Subject: [PATCH] Adding initialized boolean to record worker state --- .../clientlibrary/lib/worker/Worker.java | 8 +++ .../clientlibrary/lib/worker/WorkerTest.java | 57 +++++++++++++++++++ 2 files changed, 65 insertions(+) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index 3cfb9f2f..50f9767a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -99,6 +99,8 @@ public class Worker implements Runnable { private volatile long shutdownStartTimeMillis; private volatile boolean shutdownComplete = false; + private volatile boolean initialized = false; + // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. private ConcurrentMap shardInfoShardConsumerMap = new ConcurrentHashMap(); @@ -528,6 +530,7 @@ public class Worker implements Runnable { if (!isDone) { throw new RuntimeException(lastException); } + initialized = true; } /** @@ -682,6 +685,10 @@ public class Worker implements Runnable { return gracefulShutdownCoordinator.createGracefulShutdownCallable(startShutdown); } + public boolean isInitialized() { + return initialized; + } + public boolean hasGracefulShutdownStarted() { return gracefuleShutdownStarted; } @@ -786,6 +793,7 @@ public class Worker implements Runnable { if (metricsFactory instanceof WorkerCWMetricsFactory) { ((CWMetricsFactory) metricsFactory).shutdown(); } + initialized = false; shutdownComplete = true; } diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java index 5913bf0d..e2f66085 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/WorkerTest.java @@ -1334,6 +1334,63 @@ public class WorkerTest { } + + @Test + public final void testWorkerInitialization() throws Exception { + final List shardList = createShardListWithOneShard(); + final boolean callProcessRecordsForEmptyRecordList = true; + final long failoverTimeMillis = 50L; + final int numberOfRecordsPerShard = 1000; + + final List initialLeases = new ArrayList(); + for (Shard shard : shardList) { + KinesisClientLease lease = ShardSyncer.newKCLLease(shard); + lease.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); + initialLeases.add(lease); + } + + final File file = KinesisLocalFileDataCreator.generateTempDataFile( + shardList, numberOfRecordsPerShard, "normalShutdownUnitTest"); + final IKinesisProxy fileBasedProxy = new KinesisLocalFileProxy(file.getAbsolutePath()); + + final ExecutorService executorService = Executors.newCachedThreadPool(); + + // Make test case as efficient as possible. + final CountDownLatch processRecordsLatch = new CountDownLatch(1); + + when(v2RecordProcessorFactory.createProcessor()).thenReturn(v2RecordProcessor); + + doAnswer(new Answer () { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + // Signal that record processor has started processing records. + processRecordsLatch.countDown(); + return null; + } + }).when(v2RecordProcessor).processRecords(any(ProcessRecordsInput.class)); + + WorkerThread workerThread = runWorker(shardList, + initialLeases, + callProcessRecordsForEmptyRecordList, + failoverTimeMillis, + numberOfRecordsPerShard, + fileBasedProxy, + v2RecordProcessorFactory, + executorService, + nullMetricsFactory); + + // Only sleep for time that is required. + processRecordsLatch.await(); + + // Make sure record processor is initialized and processing records. + verify(v2RecordProcessorFactory, times(1)).createProcessor(); + verify(v2RecordProcessor, times(1)).initialize(any(InitializationInput.class)); + verify(v2RecordProcessor, atLeast(1)).processRecords(any(ProcessRecordsInput.class)); + verify(v2RecordProcessor, times(0)).shutdown(any(ShutdownInput.class)); + + Assert.assertTrue(workerThread.getWorker().isInitialized()); + } + private abstract class InjectableWorker extends Worker { InjectableWorker(String applicationName, IRecordProcessorFactory recordProcessorFactory, StreamConfig streamConfig, InitialPositionInStreamExtended initialPositionInStream,