amazon-kinesis-client-go/kcl/kcl.go

329 lines
7.8 KiB
Go
Raw Normal View History

2017-02-08 20:23:00 +00:00
package kcl
import (
"bufio"
"encoding/json"
"fmt"
"io"
"os"
"time"
2017-02-08 20:23:00 +00:00
)
type RecordProcessor interface {
Initialize(shardID string, checkpointer Checkpointer) error
ProcessRecords(records []Record) error
Shutdown(reason string) error
}
type Checkpointer interface {
Checkpoint(pair SequencePair, retryCount int) error
Shutdown()
}
type CheckpointError struct {
e string
}
func (ce CheckpointError) Error() string {
return ce.e
2017-02-08 20:23:00 +00:00
}
type ioHandler struct {
inputFile io.Reader
outputFile io.Writer
errorFile io.Writer
}
//func newIOHandler(inputFile io.Reader, outputFile io.Writer, errorFile io.)
func (i ioHandler) writeLine(line string) {
fmt.Fprintf(i.outputFile, "\n%s\n", line)
}
func (i ioHandler) writeError(message string) {
fmt.Fprintf(i.errorFile, "%s\n", message)
}
func (i ioHandler) readLine() (string, error) {
2017-02-08 20:23:00 +00:00
bio := bufio.NewReader(i.inputFile)
line, err := bio.ReadString('\n')
if err != nil {
return "", err
2017-02-08 20:23:00 +00:00
}
return line, nil
2017-02-08 20:23:00 +00:00
}
type ActionInitialize struct {
Action string `json:"action"`
ShardID string `json:"shardId"`
SequenceNumber string `json:"sequenceNumber"`
SubSequenceNumber int `json:"subSequenceNumber"`
}
type Record struct {
SequenceNumber string `json:"sequenceNumber"`
SubSequenceNumber int `json:"subSequenceNumber"`
ApproximateArrivalTimestamp int `json:"approximateArrivalTimestamp"`
PartitionKey string `json:"partitionKey"`
Data string `json:"data"`
}
type ActionProcessRecords struct {
Action string `json:"action"`
Records []Record `json:"records"`
MillisBehindLatest int `json:"millisBehindLatest"`
}
type ActionShutdown struct {
Action string `json:"action"`
Reason string `json:"reason"`
}
type ActionCheckpoint struct {
Action string `json:"action"`
SequenceNumber *string `json:"sequenceNumber,omitempty"`
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
Error *string `json:"error,omitempty"`
2017-02-08 20:23:00 +00:00
}
func (i ioHandler) loadAction(line string) (interface{}, error) {
lineBytes := []byte(line)
var message struct {
Action string `json:"action"`
}
if err := json.Unmarshal(lineBytes, &message); err != nil {
return nil, err
}
switch message.Action {
case "initialize":
var actionInitialize ActionInitialize
if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil {
return nil, err
}
return actionInitialize, nil
case "processRecords":
var actionProcessRecords ActionProcessRecords
if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil {
return nil, err
}
return actionProcessRecords, nil
case "shutdownRequested":
fallthrough
2017-02-08 20:23:00 +00:00
case "shutdown":
var actionShutdown ActionShutdown
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
return nil, err
}
return actionShutdown, nil
case "checkpoint":
var actionCheckpoint ActionCheckpoint
if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil {
return nil, err
}
return actionCheckpoint, nil
default:
return nil, fmt.Errorf("no recognizable 'action' field in message: %s", line)
}
}
func (i ioHandler) writeAction(action interface{}) error {
line, err := json.Marshal(action)
if err != nil {
return err
}
i.writeLine(string(line))
return nil
}
func New(
inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor,
) *KCLProcess {
2017-02-08 20:23:00 +00:00
i := ioHandler{
inputFile: inputFile,
outputFile: outputFile,
errorFile: errorFile,
}
return &KCLProcess{
ioHandler: i,
2017-02-08 20:23:00 +00:00
recordProcessor: recordProcessor,
next: make(chan struct{}),
out: make(chan string),
outErr: make(chan error),
checkpoint: make(chan SequencePair),
checkpointErr: make(chan error),
2017-02-08 20:23:00 +00:00
}
}
type KCLProcess struct {
ioHandler ioHandler
recordProcessor RecordProcessor
next chan struct{}
out chan string
outErr chan error
checkpoint chan SequencePair
checkpointErr chan error
2017-02-08 20:23:00 +00:00
}
func (kclp *KCLProcess) reportDone(responseFor string) error {
return kclp.ioHandler.writeAction(struct {
Action string `json:"action"`
ResponseFor string `json:"responseFor"`
}{
Action: "status",
ResponseFor: responseFor,
})
}
func (kclp *KCLProcess) performAction(a interface{}) (string, error) {
switch action := a.(type) {
case ActionInitialize:
return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp)
2017-02-08 20:23:00 +00:00
case ActionProcessRecords:
return action.Action, kclp.recordProcessor.ProcessRecords(action.Records)
2017-02-08 20:23:00 +00:00
case ActionShutdown:
return action.Action, kclp.recordProcessor.Shutdown(action.Reason)
2017-02-08 20:23:00 +00:00
default:
return "", fmt.Errorf("unknown action to dispatch: %+#v", action)
2017-02-08 20:23:00 +00:00
}
}
func (kclp *KCLProcess) handleLine(line string) error {
action, err := kclp.ioHandler.loadAction(line)
if err != nil {
return err
}
responseFor, err := kclp.performAction(action)
if err != nil {
return err
}
return kclp.reportDone(responseFor)
}
func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error {
sleepDuration := 5 * time.Second
for n := 0; n <= retryCount; n++ {
kclp.checkpoint <- pair
err := <-kclp.checkpointErr
if err == nil {
return nil
}
if cperr, ok := err.(CheckpointError); ok {
switch cperr.Error() {
case "ShutdownException":
return fmt.Errorf("Encountered shutdown exception, skipping checkpoint")
case "ThrottlingException":
fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s\n", sleepDuration)
case "InvalidStateException":
fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing\n")
default:
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err)
}
}
if n == retryCount {
return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount)
}
time.Sleep(sleepDuration)
}
return nil
}
func (kclp *KCLProcess) Shutdown() {
kclp.Checkpoint(SequencePair{}, 5)
}
func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error {
var seq *string
var subSeq *int
if !pair.IsEmpty() { // an empty pair is a signal to shutdown
tmp := pair.Sequence.String()
seq = &tmp
subSeq = &pair.SubSequence
}
kclp.ioHandler.writeAction(ActionCheckpoint{
Action: "checkpoint",
SequenceNumber: seq,
SubSequenceNumber: subSeq,
})
line, err := kclp.ioHandler.readLine()
if err != nil {
return err
}
actionI, err := kclp.ioHandler.loadAction(line)
if err != nil {
return err
}
action, ok := actionI.(ActionCheckpoint)
if !ok {
return fmt.Errorf("expected checkpoint response, got '%s'", line)
}
if action.Error != nil && *action.Error != "" {
return CheckpointError{e: *action.Error}
}
return nil
}
func (kclp *KCLProcess) startLineProcessor(
next chan struct{}, out chan string, outErr chan error,
checkpoint chan SequencePair, checkpointErr chan error,
) {
go func() {
for {
select {
case <-next:
line, err := kclp.ioHandler.readLine()
if err != nil {
outErr <- err
} else {
out <- line
}
case pair := <-checkpoint:
err := kclp.processCheckpoint(pair)
checkpointErr <- err
}
}
}()
}
func (kclp *KCLProcess) processNextLine() error {
kclp.next <- struct{}{} // We're ready for a new line
var err error
var line string
select {
case err = <-kclp.outErr:
case line = <-kclp.out:
if line == "" {
err = fmt.Errorf("Empty read line recieved")
} else {
err = kclp.handleLine(line)
}
}
return err
}
2017-02-08 20:23:00 +00:00
func (kclp *KCLProcess) Run() {
kclp.startLineProcessor(kclp.next, kclp.out, kclp.outErr, kclp.checkpoint, kclp.checkpointErr)
2017-02-08 20:23:00 +00:00
for {
err := kclp.processNextLine()
if err == io.EOF {
kclp.ioHandler.writeError("IO stream closed")
2017-02-08 20:23:00 +00:00
return
} else if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
2017-07-21 01:27:17 +00:00
return
2017-02-08 20:23:00 +00:00
}
}
}