move shutdownComplete call to ShardConsumer (#1308)
* move shutdownComplete call to ShardConsumer
This commit is contained in:
parent
981899499f
commit
969341130a
5 changed files with 10 additions and 28 deletions
|
|
@ -478,7 +478,6 @@ class ConsumerStates {
|
|||
argument.shardRecordProcessor(),
|
||||
argument.recordProcessorCheckpointer(),
|
||||
consumer.shutdownReason(),
|
||||
consumer.shutdownNotification(),
|
||||
argument.initialPositionInStream(),
|
||||
argument.cleanupLeasesOfCompletedShards(),
|
||||
argument.ignoreUnexpectedChildShards(),
|
||||
|
|
|
|||
|
|
@ -313,6 +313,12 @@ public class ShardConsumer {
|
|||
}
|
||||
|
||||
executeTask(shardEndProcessRecordsInput);
|
||||
|
||||
// call shutdownNotification.shutdownComplete() if shutting down as part of gracefulShutdown
|
||||
if (currentState.state() == ConsumerStates.ShardConsumerState.SHUTTING_DOWN &&
|
||||
taskOutcome == TaskOutcome.SUCCESSFUL && shutdownNotification != null) {
|
||||
shutdownNotification.shutdownComplete();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}, executorService);
|
||||
|
|
|
|||
|
|
@ -87,7 +87,6 @@ public class ShutdownTask implements ConsumerTask {
|
|||
private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
|
||||
@NonNull
|
||||
private final ShutdownReason reason;
|
||||
private final ShutdownNotification shutdownNotification;
|
||||
@NonNull
|
||||
private final InitialPositionInStreamExtended initialPositionInStream;
|
||||
private final boolean cleanupLeasesOfCompletedShards;
|
||||
|
|
@ -151,11 +150,6 @@ public class ShutdownTask implements ConsumerTask {
|
|||
log.debug("Shutting down retrieval strategy for shard {}.", leaseKey);
|
||||
recordsPublisher.shutdown();
|
||||
|
||||
// shutdownNotification is only set and used when gracefulShutdown starts
|
||||
if (shutdownNotification != null) {
|
||||
shutdownNotification.shutdownComplete();
|
||||
}
|
||||
|
||||
log.debug("Record processor completed shutdown() for shard {}", leaseKey);
|
||||
|
||||
return new TaskResult(null);
|
||||
|
|
|
|||
|
|
@ -697,6 +697,9 @@ public class ShardConsumerTest {
|
|||
shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
|
||||
// No task is created/run for this shutdownRequestedAwaitState, so there's no task outcome.
|
||||
|
||||
// shutdownNotification.shutdownComplete() should only be called for gracefulShutdown
|
||||
verify(shutdownNotification, times(1)).shutdownComplete();
|
||||
|
||||
verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput);
|
||||
verify(taskExecutionListener, times(2)).afterTaskExecution(processTaskInput);
|
||||
verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedTaskInput);
|
||||
|
|
|
|||
|
|
@ -310,18 +310,6 @@ public class ShutdownTaskTest {
|
|||
verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
|
||||
}
|
||||
|
||||
/**
|
||||
* shutdownNotification is only set when ShardConsumer.gracefulShutdown() is called and should be null otherwise.
|
||||
* The task should still call recordsPublisher.shutdown() regardless of the notification
|
||||
*/
|
||||
@Test
|
||||
public void testCallWhenShutdownNotificationIsSet() {
|
||||
final TaskResult result = createShutdownTaskWithNotification(LEASE_LOST, Collections.emptyList()).call();
|
||||
assertNull(result.getException());
|
||||
verify(recordsPublisher).shutdown();
|
||||
verify(shutdownNotification).shutdownComplete();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCallWhenShutdownNotificationIsNull() {
|
||||
final TaskResult result = createShutdownTask(LEASE_LOST, Collections.emptyList()).call();
|
||||
|
|
@ -394,15 +382,7 @@ public class ShutdownTaskTest {
|
|||
private ShutdownTask createShutdownTask(final ShutdownReason reason, final List<ChildShard> childShards,
|
||||
final ShardInfo shardInfo) {
|
||||
return new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||
reason, null, INITIAL_POSITION_TRIM_HORIZON, false, false,
|
||||
leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer,
|
||||
NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager);
|
||||
}
|
||||
|
||||
private ShutdownTask createShutdownTaskWithNotification(final ShutdownReason reason,
|
||||
final List<ChildShard> childShards) {
|
||||
return new ShutdownTask(SHARD_INFO, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||
reason, shutdownNotification, INITIAL_POSITION_TRIM_HORIZON, false, false,
|
||||
reason, INITIAL_POSITION_TRIM_HORIZON, false, false,
|
||||
leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer,
|
||||
NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue