Shutdown requested from plumbing in multi lang record processor.
This commit is contained in:
parent
7be1059b9a
commit
b5946cf19b
2 changed files with 3 additions and 11 deletions
|
|
@ -148,16 +148,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti
|
||||||
LOG.info("Record processor was not initialized so no need to initiate a final checkpoint.");
|
LOG.info("Record processor was not initialized so no need to initiate a final checkpoint.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
|
||||||
LOG.info("Requesting a checkpoint on shutdown notification.");
|
LOG.info("Requesting a checkpoint on shutdown notification.");
|
||||||
// ProcessRecordsInput emptyInput = new ProcessRecordsInput();
|
if (!protocol.shutdownRequested(checkpointer)) {
|
||||||
|
LOG.error("Child process failed to shutdown");
|
||||||
|
|
||||||
checkpointer.checkpoint();
|
|
||||||
} catch (InvalidStateException e) {
|
|
||||||
LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e, e);
|
|
||||||
} catch (ShutdownException e) {
|
|
||||||
LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e, e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -172,7 +172,6 @@ public class StreamingRecordProcessorTest {
|
||||||
recordProcessor.initialize(new InitializationInput().withShardId(shardId));
|
recordProcessor.initialize(new InitializationInput().withShardId(shardId));
|
||||||
recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
|
recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
|
||||||
recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
|
recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer));
|
||||||
recordProcessor.shutdownRequested(Mockito.mock(IRecordProcessorCheckpointer.class));
|
|
||||||
recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE));
|
recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue