From b5946cf19b8b95c39afe30c56f52aa82e819c0ed Mon Sep 17 00:00:00 2001 From: Ikram ulhaq Date: Mon, 5 Jun 2017 13:10:10 -0700 Subject: [PATCH] Shutdown requested from plumbing in multi lang record processor. --- .../kinesis/multilang/MultiLangRecordProcessor.java | 13 +++---------- .../multilang/StreamingRecordProcessorTest.java | 1 - 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java index e23508d1..dd141fa3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java +++ b/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangRecordProcessor.java @@ -148,16 +148,9 @@ public class MultiLangRecordProcessor implements IRecordProcessor, IShutdownNoti LOG.info("Record processor was not initialized so no need to initiate a final checkpoint."); return; } - try { - LOG.info("Requesting a checkpoint on shutdown notification."); -// ProcessRecordsInput emptyInput = new ProcessRecordsInput(); - - - checkpointer.checkpoint(); - } catch (InvalidStateException e) { - LOG.error("Checkpoint triggered during shutdown encountered InvalidStateException: " + e, e); - } catch (ShutdownException e) { - LOG.error("Checkpoint triggered during shutdown encountered ShutdownException: " + e, e); + LOG.info("Requesting a checkpoint on shutdown notification."); + if (!protocol.shutdownRequested(checkpointer)) { + LOG.error("Child process failed to shutdown"); } } diff --git a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java index b645af9b..2c02b5e9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/multilang/StreamingRecordProcessorTest.java @@ -172,7 +172,6 @@ public class StreamingRecordProcessorTest { recordProcessor.initialize(new InitializationInput().withShardId(shardId)); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); recordProcessor.processRecords(new ProcessRecordsInput().withRecords(testRecords).withCheckpointer(unimplementedCheckpointer)); - recordProcessor.shutdownRequested(Mockito.mock(IRecordProcessorCheckpointer.class)); recordProcessor.shutdown(new ShutdownInput().withCheckpointer(unimplementedCheckpointer).withShutdownReason(ShutdownReason.ZOMBIE)); }