records) {
+ this.records = records;
+ }
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java
new file mode 100644
index 00000000..524b481e
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/ShutdownMessage.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package com.amazonaws.services.kinesis.multilang.messages;
+
+import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
+
+/**
+ * A message to indicate to the client's process that it should shutdown and then terminate.
+ */
+public class ShutdownMessage extends Message {
+ /**
+ * The name used for the action field in {@link Message}.
+ */
+ public static final String ACTION = "shutdown";
+
+ /**
+ * The reason for shutdown, e.g. TERMINATE or ZOMBIE
+ */
+ private String reason;
+
+ /**
+ * Default constructor.
+ */
+ public ShutdownMessage() {
+ }
+
+ /**
+ * Convenience constructor.
+ *
+ * @param reason The reason.
+ */
+ public ShutdownMessage(ShutdownReason reason) {
+ if (reason == null) {
+ this.setReason(null);
+ } else {
+ this.setReason(String.valueOf(reason));
+ }
+ }
+
+ /**
+ * @return reason The reason.
+ */
+ public String getReason() {
+ return reason;
+ }
+
+ /**
+ * @param reason The reason.
+ */
+ public void setReason(String reason) {
+ this.reason = reason;
+ }
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/messages/StatusMessage.java b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/StatusMessage.java
new file mode 100644
index 00000000..5ea5aa75
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/messages/StatusMessage.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package com.amazonaws.services.kinesis.multilang.messages;
+
+/**
+ * A message sent by the client's process to indicate to the record processor that it completed a particular action.
+ */
+public class StatusMessage extends Message {
+ /**
+ * The name used for the action field in {@link Message}.
+ */
+ public static final String ACTION = "status";
+
+ /**
+ * The name of the most recently received action.
+ */
+ private String responseFor;
+
+ /**
+ * Default constructor.
+ */
+ public StatusMessage() {
+ }
+
+ /**
+ * Convenience constructor.
+ *
+ * @param responseFor The response for.
+ */
+ public StatusMessage(String responseFor) {
+ this.setResponseFor(responseFor);
+ }
+
+ /**
+ *
+ * @return The response for.
+ */
+ public String getResponseFor() {
+ return responseFor;
+ }
+
+ /**
+ *
+ * @param responseFor The response for.
+ */
+ public void setResponseFor(String responseFor) {
+ this.responseFor = responseFor;
+ }
+}
diff --git a/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java b/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java
new file mode 100644
index 00000000..2ec96e40
--- /dev/null
+++ b/src/main/java/com/amazonaws/services/kinesis/multilang/package-info.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Amazon Software License (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/asl/
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+/**
+ * This package provides a KCL application which implements the multi language protocol. The multi language protocol
+ * defines a system for communication between a KCL multi-lang application and another process (referred to as the
+ * "child process") over STDIN and STDOUT of the child process. The units of communication are JSON messages which
+ * represent the actions the receiving entity should perform. The child process is responsible for reacting
+ * appropriately to four different messages: initialize, processRecords, checkpoint, and shutdown. The KCL multi-lang
+ * app is responsible for reacting appropriately to two messages generated by the child process: status and checkpoint.
+ *
+ * Action messages sent to child process
+ *
+ *
+ * { "action" : "initialize",
+ * "shardId" : "string",
+ * }
+ *
+ * { "action" : "processRecords",
+ * "records" : [{ "data" : "<base64encoded_string>",
+ * "partitionKey" : "<partition key>",
+ * "sequenceNumber" : "<sequence number>";
+ * }] // a list of records
+ * }
+ *
+ * { "action" : "checkpoint",
+ * "checkpoint" : "<sequence number>",
+ * "error" : "<NameOfException>"
+ * }
+ *
+ * { "action" : "shutdown",
+ * "reason" : "<TERMINATE|ZOMBIE>"
+ * }
+ *
+ *
+ * Action messages sent to KCL by the child process
+ *
+ *
+ * { "action" : "checkpoint",
+ * "checkpoint" : "<sequenceNumberToCheckpoint>";
+ * }
+ *
+ * { "action" : "status",
+ * "responseFor" : "<nameOfAction>";
+ * }
+ *
+ *
+ * High Level Description Of Protocol
+ *
+ * The child process will be started by the KCL multi-lang application. There will be one child process for each shard
+ * that this worker is assigned to. The multi-lang app will send an initialize, processRecords, or shutdown message upon
+ * invocation of its corresponding methods. Each message will be on a single line, the messages will be
+ * separated by new lines.The child process is expected to read these messages off its STDIN line by line. The child
+ * process must respond over its STDOUT with a status message indicating that is has finished performing the most recent
+ * action. The multi-lang daemon will not begin to send another message until it has received the response for the
+ * previous message.
+ *
+ * Checkpointing Behavior
+ *
+ * The child process may send a checkpoint message at any time after receiving a processRecords or shutdown
+ * action and before sending the corresponding status message back to the processor. After sending a checkpoint
+ * message over STDOUT, the child process is expected to immediately begin to read its STDIN, waiting for the checkpoint
+ * result message from the KCL multi-lang processor.
+ *
+ * Protocol From Child Process Perspective
+ *
+ * Initialize
+ *
+ *
+ * - Read an "initialize" action from STDIN
+ * - Perform initialization steps
+ * - Write "status" message to indicate you are done
+ * - Begin reading line from STDIN to receive next action
+ *
+ *
+ * ProcessRecords
+ *
+ *
+ * - Read a "processRecords" action from STDIN
+ * - Perform processing tasks (you may write a checkpoint message at any time)
+ * - Write "status" message to STDOUT to indicate you are done.
+ * - Begin reading line from STDIN to receive next action
+ *
+ *
+ * Shutdown
+ *
+ *
+ * - Read a "shutdown" action from STDIN
+ * - Perform shutdown tasks (you may write a checkpoint message at any time)
+ * - Write "status" message to STDOUT to indicate you are done.
+ * - Begin reading line from STDIN to receive next action
+ *
+ *
+ * Checkpoint
+ *
+ *
+ * - Read a "checkpoint" action from STDIN
+ * - Decide whether to checkpoint again based on whether there is an error or not.
+ *
+ *
+ * Base 64 Encoding
+ *
+ * The "data" field of the processRecords action message is an array of arbitrary bytes. To send this in a JSON string
+ * we apply base 64 encoding which transforms the byte array into a string (specifically this string doesn't have JSON
+ * special symbols or new lines in it). The multi-lang processor will use the Jackson library which uses a variant of
+ * MIME called MIME_NO_LINEFEEDS (see
+ * Jackson doc for more details) MIME is the basis of most base64 encoding variants including RFC 3548 which is the standard used by Python's base64 module.
+ *
+ */
+package com.amazonaws.services.kinesis.multilang;
+