Added test for switching of states

This commit is contained in:
nyo 2018-02-26 12:20:51 +01:00
parent f8d380845d
commit 96f305a03f
3 changed files with 96 additions and 5 deletions

View file

@ -445,7 +445,6 @@ public class Worker implements Runnable {
if (shutdown) { if (shutdown) {
return; return;
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
try { try {
initialize(); initialize();
@ -455,7 +454,6 @@ public class Worker implements Runnable {
shutdown(); shutdown();
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
while (!shouldShutdown()) { while (!shouldShutdown()) {
runProcessLoop(); runProcessLoop();
} }
@ -501,6 +499,7 @@ public class Worker implements Runnable {
} }
private void initialize() { private void initialize() {
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.INITIALIZING);
boolean isDone = false; boolean isDone = false;
Exception lastException = null; Exception lastException = null;
@ -550,6 +549,7 @@ public class Worker implements Runnable {
if (!isDone) { if (!isDone) {
throw new RuntimeException(lastException); throw new RuntimeException(lastException);
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.STARTED);
} }
/** /**
@ -786,7 +786,6 @@ public class Worker implements Runnable {
LOG.warn("Shutdown requested a second time."); LOG.warn("Shutdown requested a second time.");
return; return;
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUTTING_DOWN);
LOG.info("Worker shutdown requested."); LOG.info("Worker shutdown requested.");
// Set shutdown flag, so Worker.run can start shutdown process. // Set shutdown flag, so Worker.run can start shutdown process.
@ -797,6 +796,7 @@ public class Worker implements Runnable {
// Lost leases will force Worker to begin shutdown process for all shard consumers in // Lost leases will force Worker to begin shutdown process for all shard consumers in
// Worker.run(). // Worker.run().
leaseCoordinator.stop(); leaseCoordinator.stop();
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
} }
/** /**
@ -813,7 +813,6 @@ public class Worker implements Runnable {
if (metricsFactory instanceof WorkerCWMetricsFactory) { if (metricsFactory instanceof WorkerCWMetricsFactory) {
((CWMetricsFactory) metricsFactory).shutdown(); ((CWMetricsFactory) metricsFactory).shutdown();
} }
workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.SHUT_DOWN);
shutdownComplete = true; shutdownComplete = true;
} }

View file

@ -9,7 +9,6 @@ public interface WorkerStateChangeListener {
CREATED, CREATED,
INITIALIZING, INITIALIZING,
STARTED, STARTED,
SHUTTING_DOWN,
SHUT_DOWN SHUT_DOWN
} }

View file

@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyString; import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
@ -89,6 +90,7 @@ import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcess
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerCWMetricsFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.WorkerThreadPoolExecutor;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.WorkerStateChangeListener.WorkerState;
import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisProxy;
import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy; import com.amazonaws.services.kinesis.clientlibrary.proxies.KinesisLocalFileProxy;
@ -170,6 +172,8 @@ public class WorkerTest {
private Future<TaskResult> taskFuture; private Future<TaskResult> taskFuture;
@Mock @Mock
private TaskResult taskResult; private TaskResult taskResult;
@Mock
private WorkerStateChangeListener workerStateChangeListener;
@Before @Before
public void setup() { public void setup() {
@ -1510,6 +1514,95 @@ public class WorkerTest {
Assert.assertTrue(worker.getWorkerStateChangeListener() instanceof NoOpWorkerStateChangeListener); Assert.assertTrue(worker.getWorkerStateChangeListener() instanceof NoOpWorkerStateChangeListener);
} }
@Test
public void testBuilderWhenWorkerStateListenerIsSet() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.workerStateChangeListener(workerStateChangeListener)
.config(config)
.build();
Assert.assertSame(workerStateChangeListener, worker.getWorkerStateChangeListener());
}
@Test
public void testWorkerStateListenerStatePassesThroughCreatedState() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.workerStateChangeListener(workerStateChangeListener)
.config(config)
.build();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED));
}
@Test
public void testWorkerStateChangeListenerGoesThroughStates() throws Exception {
final CountDownLatch workerInitialized = new CountDownLatch(1);
final CountDownLatch workerStarted = new CountDownLatch(1);
final IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);
final IRecordProcessor processor = mock(IRecordProcessor.class);
ExtendedSequenceNumber checkpoint = new ExtendedSequenceNumber("123", 0L);
KinesisClientLeaseBuilder builder = new KinesisClientLeaseBuilder().withCheckpoint(checkpoint)
.withConcurrencyToken(UUID.randomUUID()).withLastCounterIncrementNanos(0L).withLeaseCounter(0L)
.withOwnerSwitchesSinceCheckpoint(0L).withLeaseOwner("Self");
final List<KinesisClientLease> leases = new ArrayList<>();
KinesisClientLease lease = builder.withLeaseKey(String.format("shardId-%03d", 1)).build();
leases.add(lease);
doAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocation) throws Throwable {
workerInitialized.countDown();
return true;
}
}).when(leaseManager).waitUntilLeaseTableExists(anyLong(), anyLong());
doAnswer(new Answer<IRecordProcessor>() {
@Override
public IRecordProcessor answer(InvocationOnMock invocation) throws Throwable {
workerStarted.countDown();
return processor;
}
}).when(recordProcessorFactory).createProcessor();
when(config.getWorkerIdentifier()).thenReturn("Self");
when(leaseManager.listLeases()).thenReturn(leases);
when(leaseManager.renewLease(leases.get(0))).thenReturn(true);
when(executorService.submit(Matchers.<Callable<TaskResult>> any()))
.thenAnswer(new ShutdownHandlingAnswer(taskFuture));
when(taskFuture.isDone()).thenReturn(true);
when(taskFuture.get()).thenReturn(taskResult);
when(taskResult.isShardEndReached()).thenReturn(true);
Worker worker = new Worker.Builder()
.recordProcessorFactory(recordProcessorFactory)
.config(config)
.leaseManager(leaseManager)
.kinesisProxy(kinesisProxy)
.execService(executorService)
.workerStateChangeListener(workerStateChangeListener)
.build();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.CREATED));
WorkerThread workerThread = new WorkerThread(worker);
workerThread.start();
workerInitialized.await();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.INITIALIZING));
workerStarted.await();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.STARTED));
boolean workerShutdown = worker.createGracefulShutdownCallable()
.call();
verify(workerStateChangeListener, times(1)).onWorkerStateChange(eq(WorkerState.SHUT_DOWN));
}
@Test @Test
public void testBuilderWithDefaultLeaseManager() { public void testBuilderWithDefaultLeaseManager() {
IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class); IRecordProcessorFactory recordProcessorFactory = mock(IRecordProcessorFactory.class);