diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f2a12a4 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +# osx / sshfs +._* +.DS_Store + +# emacs / vim +*~ +\#*\# +.\#* +jars/ +build/ diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..c407446 --- /dev/null +++ b/Makefile @@ -0,0 +1,54 @@ +SHELL := /bin/bash +JAR_DIR := jars +PKG := github.com/Clever/amazon-kinesis-client-go +.PHONY: download_jars run build install_deps + +URL_PREFIX := http://search.maven.org/remotecontent?filepath= + +# this list lifted from https://github.com/awslabs/amazon-kinesis-client-python/blob/fb49c6390c0593fbcf81d6c34c5245726c15b2f3/setup.py#L60 +JARS_TO_DOWNLOAD := $(addprefix $(JAR_DIR)/,com/amazonaws/amazon-kinesis-client/1.7.2/amazon-kinesis-client-1.7.2.jar \ +com/amazonaws/aws-java-sdk-dynamodb/1.11.14/aws-java-sdk-dynamodb-1.11.14.jar \ +com/amazonaws/aws-java-sdk-s3/1.11.14/aws-java-sdk-s3-1.11.14.jar \ +com/amazonaws/aws-java-sdk-kms/1.11.14/aws-java-sdk-kms-1.11.14.jar \ +com/amazonaws/aws-java-sdk-core/1.11.14/aws-java-sdk-core-1.11.14.jar \ +commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar \ +org/apache/httpcomponents/httpclient/4.5.2/httpclient-4.5.2.jar \ +org/apache/httpcomponents/httpcore/4.4.4/httpcore-4.4.4.jar \ +commons-codec/commons-codec/1.9/commons-codec-1.9.jar \ +com/fasterxml/jackson/core/jackson-databind/2.6.6/jackson-databind-2.6.6.jar \ +com/fasterxml/jackson/core/jackson-annotations/2.6.0/jackson-annotations-2.6.0.jar \ +com/fasterxml/jackson/core/jackson-core/2.6.6/jackson-core-2.6.6.jar \ +com/fasterxml/jackson/dataformat/jackson-dataformat-cbor/2.6.6/jackson-dataformat-cbor-2.6.6.jar \ +joda-time/joda-time/2.8.1/joda-time-2.8.1.jar \ +com/amazonaws/aws-java-sdk-kinesis/1.11.14/aws-java-sdk-kinesis-1.11.14.jar \ +com/amazonaws/aws-java-sdk-cloudwatch/1.11.14/aws-java-sdk-cloudwatch-1.11.14.jar \ +com/google/guava/guava/18.0/guava-18.0.jar \ +com/google/protobuf/protobuf-java/2.6.1/protobuf-java-2.6.1.jar \ +commons-lang/commons-lang/2.6/commons-lang-2.6.jar) + +EMPTY := +SPACE := $(EMPTY) $(EMPTY) +JAVA_CLASS_PATH := $(subst $(SPACE),:,$(JARS_TO_DOWNLOAD)) + +$(JARS_TO_DOWNLOAD): + mkdir -p `dirname $@` + curl -s -L -o $@ -O $(URL_PREFIX)`echo $@ | sed 's/$(JAR_DIR)\///g'` + +download_jars: $(JARS_TO_DOWNLOAD) + +GLIDE_VERSION = v0.12.3 +$(GOPATH)/src/github.com/Masterminds/glide: + git clone -b $(GLIDE_VERSION) https://github.com/Masterminds/glide.git $(GOPATH)/src/github.com/Masterminds/glide + +$(GOPATH)/bin/glide: $(GOPATH)/src/github.com/Masterminds/glide + go build -o $(GOPATH)/bin/glide github.com/Masterminds/glide + +install_deps: $(GOPATH)/bin/glide + @$(GOPATH)/bin/glide install -v + +build: + CGO_ENABLED=0 go build -installsuffix cgo -o build/consumer $(PKG)/cmd/consumer + +run: build download_jars + command -v java >/dev/null 2>&1 || { echo >&2 "Java not installed. Install java!"; exit 1; } + java -cp $(JAVA_CLASS_PATH) com.amazonaws.services.kinesis.multilang.MultiLangDaemon consumer.properties diff --git a/README.md b/README.md index b48b6e0..6ed804b 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,20 @@ # amazon-kinesis-client-go -Amazon Kinesis Client for Go + +A port of [amazon-kinesis-client-python](https://github.com/awslabs/amazon-kinesis-client-python) to Go. + +## Running the Consumer + +Edit the file `consumer.properties` to point at a Kinesis stream that has some data. +Build the consumer binary: + +``` bash +make build +``` + +Then run: + +``` bash +make run +``` + +This will download the jar files necessary to run the KCL, and then launch the KCL communicating with the consumer binary. diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go new file mode 100644 index 0000000..cb311b1 --- /dev/null +++ b/cmd/consumer/main.go @@ -0,0 +1,102 @@ +package main + +import ( + "fmt" + "math/big" + "os" + "time" + + "github.com/Clever/amazon-kinesis-client-go/kcl" +) + +type SampleRecordProcessor struct { + sleepDuration time.Duration + checkpointRetries int + checkpointFreq time.Duration + largestSeq *big.Int + largestSubSeq int + lastCheckpoint time.Time +} + +func New() *SampleRecordProcessor { + return &SampleRecordProcessor{ + sleepDuration: 5 * time.Second, + checkpointRetries: 5, + checkpointFreq: 60 * time.Second, + } +} + +func (srp *SampleRecordProcessor) Initialize(shardID string) error { + srp.lastCheckpoint = time.Now() + return nil +} + +func (srp *SampleRecordProcessor) checkpoint(checkpointer kcl.Checkpointer, sequenceNumber string, subSequenceNumber int) { + for n := 0; n < srp.checkpointRetries; n++ { + if err := checkpointer.Checkpoint(sequenceNumber, subSequenceNumber); err == nil { + break + } else { + if cperr, ok := err.(kcl.CheckpointError); ok { + switch cperr.Error() { + case "ShutdownException": + fmt.Fprintf(os.Stderr, "Encountered shutdown exception, skipping checkpoint\n") + return + case "ThrottlingException": + if srp.checkpointRetries-1 == n { + fmt.Fprintf(os.Stderr, "Failed to checkpoint after %d attempts, giving up.\n", n) + return + } + fmt.Fprintf(os.Stderr, "Was throttled while checkpointing, will attempt again in %s", srp.sleepDuration) + case "InvalidStateException": + fmt.Fprintf(os.Stderr, "MultiLangDaemon reported an invalid state while checkpointing\n") + } + } + fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err) + } + time.Sleep(srp.sleepDuration) + } +} + +func (srp *SampleRecordProcessor) shouldUpdateSequence(sequenceNumber *big.Int, subSequenceNumber int) bool { + return srp.largestSeq == nil || sequenceNumber.Cmp(srp.largestSeq) == 1 || + (sequenceNumber.Cmp(srp.largestSeq) == 0 && subSequenceNumber > srp.largestSubSeq) +} + +func (srp *SampleRecordProcessor) ProcessRecords(records []kcl.Record, checkpointer kcl.Checkpointer) error { + for _, record := range records { + seqNumber := new(big.Int) + if _, ok := seqNumber.SetString(record.SequenceNumber, 10); !ok { + fmt.Fprintf(os.Stderr, "could not parse sequence number '%s'\n", record.SequenceNumber) + continue + } + if srp.shouldUpdateSequence(seqNumber, record.SubSequenceNumber) { + srp.largestSeq = seqNumber + srp.largestSubSeq = record.SubSequenceNumber + } + } + if time.Now().Sub(srp.lastCheckpoint) > srp.checkpointFreq { + srp.checkpoint(checkpointer, srp.largestSeq.String(), srp.largestSubSeq) + srp.lastCheckpoint = time.Now() + } + return nil +} + +func (srp *SampleRecordProcessor) Shutdown(checkpointer kcl.Checkpointer, reason string) error { + if reason == "TERMINATE" { + fmt.Fprintf(os.Stderr, "Was told to terminate, will attempt to checkpoint.\n") + srp.checkpoint(checkpointer, "", 0) + } else { + fmt.Fprintf(os.Stderr, "Shutting down due to failover. Will not checkpoint.\n") + } + return nil +} + +func main() { + f, err := os.Create("/tmp/kcl_stderr") + if err != nil { + panic(err) + } + defer f.Close() + kclProcess := kcl.New(os.Stdin, os.Stdout, os.Stderr, &SampleRecordProcessor{}) + kclProcess.Run() +} diff --git a/consumer.properties b/consumer.properties new file mode 100644 index 0000000..ef71b2d --- /dev/null +++ b/consumer.properties @@ -0,0 +1,83 @@ +# The script that abides by the multi-language protocol. This script will +# be executed by the MultiLangDaemon, which will communicate with this script +# over STDIN and STDOUT according to the multi-language protocol. +executableName = build/consumer + +# The name of an Amazon Kinesis stream to process. +streamName = test-stream + +# Used by the KCL as the name of this application. Will be used as the name +# of an Amazon DynamoDB table which will store the lease and checkpoint +# information for workers with this application name +applicationName = KCLGoExample + +# Users can change the credentials provider the KCL will use to retrieve credentials. +# The DefaultAWSCredentialsProviderChain checks several other providers, which is +# described here: +# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html +AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + +# Appended to the user agent of the KCL. Does not impact the functionality of the +# KCL in any other way. +processingLanguage = golang + +# Valid options at TRIM_HORIZON or LATEST. +# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax +initialPositionInStream = TRIM_HORIZON + +# The following properties are also available for configuring the KCL Worker that is created +# by the MultiLangDaemon. + +# The KCL defaults to us-east-1 +regionName = us-west-1 + +# Fail over time in milliseconds. A worker which does not renew it's lease within this time interval +# will be regarded as having problems and it's shards will be assigned to other workers. +# For applications that have a large number of shards, this msy be set to a higher number to reduce +# the number of DynamoDB IOPS required for tracking leases +#failoverTimeMillis = 10000 + +# A worker id that uniquely identifies this worker among all workers using the same applicationName +# If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. +#workerId = + +# Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. +#shardSyncIntervalMillis = 60000 + +# Max records to fetch from Kinesis in a single GetRecords call. +#maxRecords = 10000 + +# Idle time between record reads in milliseconds. +#idleTimeBetweenReadsInMillis = 1000 + +# Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) +#callProcessRecordsEvenForEmptyRecordList = false + +# Interval in milliseconds between polling to check for parent shard completion. +# Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on +# completion of parent shards). +#parentShardPollIntervalMillis = 10000 + +# Cleanup leases upon shards completion (don't wait until they expire in Kinesis). +# Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try +# to delete the ones we don't need any longer. +#cleanupLeasesUponShardCompletion = true + +# Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). +#taskBackoffTimeMillis = 500 + +# Buffer metrics for at most this long before publishing to CloudWatch. +#metricsBufferTimeMillis = 10000 + +# Buffer at most this many metrics before publishing to CloudWatch. +#metricsMaxQueueSize = 10000 + +# KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls +# to RecordProcessorCheckpointer#checkpoint(String) by default. +#validateSequenceNumberBeforeCheckpointing = true + +# The maximum number of active threads for the MultiLangDaemon to permit. +# If a value is provided then a FixedThreadPool is used with the maximum +# active threads set to the provided value. If a non-positive integer or no +# value is provided a CachedThreadPool is used. +#maxActiveThreads = 0 diff --git a/kcl/kcl.go b/kcl/kcl.go new file mode 100644 index 0000000..ca38f2e --- /dev/null +++ b/kcl/kcl.go @@ -0,0 +1,241 @@ +package kcl + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" +) + +type RecordProcessor interface { + Initialize(shardID string) error + ProcessRecords(records []Record, checkpointer Checkpointer) error + Shutdown(checkpointer Checkpointer, reason string) error +} + +type Checkpointer struct { + ioHandler ioHandler +} + +func (c Checkpointer) getAction() (interface{}, error) { + line, err := c.ioHandler.readLine() + if err != nil { + return nil, err + } + action, err := c.ioHandler.loadAction(line.String()) + if err != nil { + return nil, err + } + return action, nil +} + +type CheckpointError struct { + e string +} + +func (ce CheckpointError) Error() string { + return ce.e +} + +func (c Checkpointer) Checkpoint(sequenceNumber string, subSequenceNumber int) error { + c.ioHandler.writeAction(ActionCheckpoint{ + Action: "checkpoint", + SequenceNumber: sequenceNumber, + SubSequenceNumber: subSequenceNumber, + }) + line, err := c.ioHandler.readLine() + if err != nil { + return err + } + actionI, err := c.ioHandler.loadAction(line.String()) + if err != nil { + return err + } + action, ok := actionI.(ActionCheckpoint) + if !ok { + return fmt.Errorf("expected checkpoint response, got '%s'", line.String()) + } + if action.Error != "" { + return CheckpointError{ + e: action.Error, + } + } + return nil + +} + +type ioHandler struct { + inputFile io.Reader + outputFile io.Writer + errorFile io.Writer +} + +//func newIOHandler(inputFile io.Reader, outputFile io.Writer, errorFile io.) + +func (i ioHandler) writeLine(line string) { + fmt.Fprintf(i.outputFile, "\n%s\n", line) +} + +func (i ioHandler) writeError(message string) { + fmt.Fprintf(i.errorFile, "%s\n", message) +} + +func (i ioHandler) readLine() (*bytes.Buffer, error) { + bio := bufio.NewReader(i.inputFile) + line, err := bio.ReadString('\n') + if err != nil { + return nil, err + } + return bytes.NewBufferString(line), nil +} + +type ActionInitialize struct { + Action string `json:"action"` + ShardID string `json:"shardId"` + SequenceNumber string `json:"sequenceNumber"` + SubSequenceNumber int `json:"subSequenceNumber"` +} + +type Record struct { + SequenceNumber string `json:"sequenceNumber"` + SubSequenceNumber int `json:"subSequenceNumber"` + ApproximateArrivalTimestamp int `json:"approximateArrivalTimestamp"` + PartitionKey string `json:"partitionKey"` + Data string `json:"data"` +} + +type ActionProcessRecords struct { + Action string `json:"action"` + Records []Record `json:"records"` + MillisBehindLatest int `json:"millisBehindLatest"` +} + +type ActionShutdown struct { + Action string `json:"action"` + Reason string `json:"reason"` +} + +type ActionCheckpoint struct { + Action string `json:"action"` + SequenceNumber string `json:"sequenceNumber"` + SubSequenceNumber int `json:"subSequenceNumber"` + Error string `json:"error"` +} + +func (i ioHandler) loadAction(line string) (interface{}, error) { + lineBytes := []byte(line) + var message struct { + Action string `json:"action"` + } + if err := json.Unmarshal(lineBytes, &message); err != nil { + return nil, err + } + switch message.Action { + case "initialize": + var actionInitialize ActionInitialize + if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil { + return nil, err + } + return actionInitialize, nil + case "processRecords": + var actionProcessRecords ActionProcessRecords + if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil { + return nil, err + } + return actionProcessRecords, nil + case "shutdown": + var actionShutdown ActionShutdown + if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil { + return nil, err + } + return actionShutdown, nil + case "checkpoint": + var actionCheckpoint ActionCheckpoint + if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil { + return nil, err + } + return actionCheckpoint, nil + default: + return nil, fmt.Errorf("no recognizable 'action' field in message: %s", line) + } +} + +func (i ioHandler) writeAction(action interface{}) error { + line, err := json.Marshal(action) + if err != nil { + return err + } + i.writeLine(string(line)) + return nil +} + +func New(inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor) *KCLProcess { + i := ioHandler{ + inputFile: inputFile, + outputFile: outputFile, + errorFile: errorFile, + } + return &KCLProcess{ + ioHandler: i, + checkpointer: Checkpointer{ + ioHandler: i, + }, + recordProcessor: recordProcessor, + } +} + +type KCLProcess struct { + ioHandler ioHandler + checkpointer Checkpointer + recordProcessor RecordProcessor +} + +func (kclp *KCLProcess) reportDone(responseFor string) error { + return kclp.ioHandler.writeAction(struct { + Action string `json:"action"` + ResponseFor string `json:"responseFor"` + }{ + Action: "status", + ResponseFor: responseFor, + }) +} + +func (kclp *KCLProcess) performAction(a interface{}) (string, error) { + switch action := a.(type) { + case ActionInitialize: + return action.Action, kclp.recordProcessor.Initialize(action.ShardID) + case ActionProcessRecords: + return action.Action, kclp.recordProcessor.ProcessRecords(action.Records, kclp.checkpointer) + case ActionShutdown: + return action.Action, kclp.recordProcessor.Shutdown(kclp.checkpointer, action.Reason) + default: + return "", fmt.Errorf("unknown action to dispatch: %s", action) + } +} + +func (kclp *KCLProcess) handleLine(line string) error { + action, err := kclp.ioHandler.loadAction(line) + if err != nil { + return err + } + + responseFor, err := kclp.performAction(action) + if err != nil { + return err + } + return kclp.reportDone(responseFor) +} + +func (kclp *KCLProcess) Run() { + for { + line, err := kclp.ioHandler.readLine() + if err != nil { + kclp.ioHandler.writeError(err.Error()) + return + } else if line == nil { + break + } + kclp.handleLine(line.String()) + } +}