From 6d27f6f07bea6dc1cfab05e38664fb12775ab5b7 Mon Sep 17 00:00:00 2001 From: Marty Date: Fri, 17 Jun 2022 13:35:33 +0100 Subject: [PATCH] MPHEE: first pass at adding logging for applications --- .../kinesis/multilang/MultiLangProtocol.java | 13 ++-- .../multilang/messages/LogMessage.java | 62 +++++++++++++++++++ .../kinesis/multilang/messages/Message.java | 1 + 3 files changed, 68 insertions(+), 8 deletions(-) create mode 100644 amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/messages/LogMessage.java diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangProtocol.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangProtocol.java index 66a6ae9a..224c400f 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangProtocol.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/MultiLangProtocol.java @@ -27,14 +27,7 @@ import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration; -import software.amazon.kinesis.multilang.messages.CheckpointMessage; -import software.amazon.kinesis.multilang.messages.InitializeMessage; -import software.amazon.kinesis.multilang.messages.LeaseLostMessage; -import software.amazon.kinesis.multilang.messages.Message; -import software.amazon.kinesis.multilang.messages.ProcessRecordsMessage; -import software.amazon.kinesis.multilang.messages.ShardEndedMessage; -import software.amazon.kinesis.multilang.messages.ShutdownRequestedMessage; -import software.amazon.kinesis.multilang.messages.StatusMessage; +import software.amazon.kinesis.multilang.messages.*; import software.amazon.kinesis.processor.RecordProcessorCheckpointer; /** @@ -194,6 +187,10 @@ class MultiLangProtocol { return false; } + Optional messageLogged = message.filter(m -> m instanceof LogMessage) + .map(m -> (LogMessage) m) + .map(m -> m.getLogger().apply(m.getMessage())); + statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage) m ); } return this.validateStatusMessage(statusMessage.get(), action); diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/messages/LogMessage.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/messages/LogMessage.java new file mode 100644 index 00000000..d9f1aa73 --- /dev/null +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/messages/LogMessage.java @@ -0,0 +1,62 @@ +package software.amazon.kinesis.multilang.messages; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.util.function.Function; + +@Getter +@Setter +@Slf4j +public class LogMessage extends Message { + /** + * The name used for the action field in {@link Message}. + */ + public static final String ACTION = "log"; + + /** + * The shard id that this processor is getting initialized for. + */ + private String message; + private String logLevel = "info"; + + + public LogMessage() { + this.message = "initialized"; + } + + /** + * Default constructor. + */ + public LogMessage(String message) { + this.message = message; + log.info("Client logging: " + this.message); + } + + public Function getLogger() { + switch (logLevel) { + case "debug": { + return (m) -> { + log.debug(m); + return true; + }; + } + case "warn": + return (m) -> { + log.warn(m); + return true; + }; + case "error": + return (m) -> { + log.error(m); + return true; + }; + default: + return (m) -> { + log.info(m); + return true; + }; + } + } +} diff --git a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/messages/Message.java b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/messages/Message.java index bdb89181..3e4c4f6f 100644 --- a/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/messages/Message.java +++ b/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/messages/Message.java @@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; @Type(value = ShutdownRequestedMessage.class, name = ShutdownRequestedMessage.ACTION), @Type(value = LeaseLostMessage.class, name = LeaseLostMessage.ACTION), @Type(value = ShardEndedMessage.class, name = ShardEndedMessage.ACTION), + @Type(value = LogMessage.class, name = LogMessage.ACTION), }) public abstract class Message {