MPHEE: first pass at adding logging for applications

This commit is contained in:
Marty 2022-06-17 13:35:33 +01:00
parent a344ee4d05
commit 6d27f6f07b
3 changed files with 68 additions and 8 deletions

View file

@ -27,14 +27,7 @@ import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.multilang.config.MultiLangDaemonConfiguration;
import software.amazon.kinesis.multilang.messages.CheckpointMessage;
import software.amazon.kinesis.multilang.messages.InitializeMessage;
import software.amazon.kinesis.multilang.messages.LeaseLostMessage;
import software.amazon.kinesis.multilang.messages.Message;
import software.amazon.kinesis.multilang.messages.ProcessRecordsMessage;
import software.amazon.kinesis.multilang.messages.ShardEndedMessage;
import software.amazon.kinesis.multilang.messages.ShutdownRequestedMessage;
import software.amazon.kinesis.multilang.messages.StatusMessage;
import software.amazon.kinesis.multilang.messages.*;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
/**
@ -194,6 +187,10 @@ class MultiLangProtocol {
return false;
}
Optional<Boolean> messageLogged = message.filter(m -> m instanceof LogMessage)
.map(m -> (LogMessage) m)
.map(m -> m.getLogger().apply(m.getMessage()));
statusMessage = message.filter(m -> m instanceof StatusMessage).map(m -> (StatusMessage) m );
}
return this.validateStatusMessage(statusMessage.get(), action);

View file

@ -0,0 +1,62 @@
package software.amazon.kinesis.multilang.messages;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import java.util.function.Function;
@Getter
@Setter
@Slf4j
public class LogMessage extends Message {
/**
* The name used for the action field in {@link Message}.
*/
public static final String ACTION = "log";
/**
* The shard id that this processor is getting initialized for.
*/
private String message;
private String logLevel = "info";
public LogMessage() {
this.message = "initialized";
}
/**
* Default constructor.
*/
public LogMessage(String message) {
this.message = message;
log.info("Client logging: " + this.message);
}
public Function<String, Boolean> getLogger() {
switch (logLevel) {
case "debug": {
return (m) -> {
log.debug(m);
return true;
};
}
case "warn":
return (m) -> {
log.warn(m);
return true;
};
case "error":
return (m) -> {
log.error(m);
return true;
};
default:
return (m) -> {
log.info(m);
return true;
};
}
}
}

View file

@ -32,6 +32,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
@Type(value = ShutdownRequestedMessage.class, name = ShutdownRequestedMessage.ACTION),
@Type(value = LeaseLostMessage.class, name = LeaseLostMessage.ACTION),
@Type(value = ShardEndedMessage.class, name = ShardEndedMessage.ACTION),
@Type(value = LogMessage.class, name = LogMessage.ACTION),
})
public abstract class Message {