2017-02-08 20:23:00 +00:00
|
|
|
package kcl
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bufio"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
2017-05-22 23:05:34 +00:00
|
|
|
"os"
|
|
|
|
|
"time"
|
2017-02-08 20:23:00 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type RecordProcessor interface {
|
2017-07-21 01:04:34 +00:00
|
|
|
Initialize(shardID string, checkpointer Checkpointer) error
|
2017-05-19 22:01:57 +00:00
|
|
|
ProcessRecords(records []Record) error
|
|
|
|
|
Shutdown(reason string) error
|
|
|
|
|
}
|
|
|
|
|
|
2017-07-21 01:04:34 +00:00
|
|
|
type Checkpointer interface {
|
2017-08-03 21:22:52 +00:00
|
|
|
Checkpoint(pair SequencePair, retryCount int) error
|
2017-07-21 01:04:34 +00:00
|
|
|
Shutdown()
|
|
|
|
|
}
|
|
|
|
|
|
2017-05-19 22:01:57 +00:00
|
|
|
type CheckpointError struct {
|
|
|
|
|
e string
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (ce CheckpointError) Error() string {
|
|
|
|
|
return ce.e
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ioHandler struct {
|
|
|
|
|
inputFile io.Reader
|
|
|
|
|
outputFile io.Writer
|
|
|
|
|
errorFile io.Writer
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//func newIOHandler(inputFile io.Reader, outputFile io.Writer, errorFile io.)
|
|
|
|
|
|
|
|
|
|
func (i ioHandler) writeLine(line string) {
|
|
|
|
|
fmt.Fprintf(i.outputFile, "\n%s\n", line)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i ioHandler) writeError(message string) {
|
|
|
|
|
fmt.Fprintf(i.errorFile, "%s\n", message)
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-03 21:22:52 +00:00
|
|
|
func (i ioHandler) readLine() (string, error) {
|
2017-02-08 20:23:00 +00:00
|
|
|
bio := bufio.NewReader(i.inputFile)
|
|
|
|
|
line, err := bio.ReadString('\n')
|
|
|
|
|
if err != nil {
|
2017-08-03 21:22:52 +00:00
|
|
|
return "", err
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
2017-08-03 21:22:52 +00:00
|
|
|
return line, nil
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ActionInitialize struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
ShardID string `json:"shardId"`
|
|
|
|
|
SequenceNumber string `json:"sequenceNumber"`
|
|
|
|
|
SubSequenceNumber int `json:"subSequenceNumber"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Record struct {
|
|
|
|
|
SequenceNumber string `json:"sequenceNumber"`
|
|
|
|
|
SubSequenceNumber int `json:"subSequenceNumber"`
|
|
|
|
|
ApproximateArrivalTimestamp int `json:"approximateArrivalTimestamp"`
|
|
|
|
|
PartitionKey string `json:"partitionKey"`
|
|
|
|
|
Data string `json:"data"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ActionProcessRecords struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
Records []Record `json:"records"`
|
|
|
|
|
MillisBehindLatest int `json:"millisBehindLatest"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ActionShutdown struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
Reason string `json:"reason"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ActionCheckpoint struct {
|
2017-05-18 00:46:29 +00:00
|
|
|
Action string `json:"action"`
|
|
|
|
|
SequenceNumber *string `json:"sequenceNumber,omitempty"`
|
|
|
|
|
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
|
|
|
|
|
Error *string `json:"error,omitempty"`
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i ioHandler) loadAction(line string) (interface{}, error) {
|
|
|
|
|
lineBytes := []byte(line)
|
|
|
|
|
var message struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
}
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &message); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
switch message.Action {
|
|
|
|
|
case "initialize":
|
|
|
|
|
var actionInitialize ActionInitialize
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return actionInitialize, nil
|
|
|
|
|
case "processRecords":
|
|
|
|
|
var actionProcessRecords ActionProcessRecords
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return actionProcessRecords, nil
|
2017-08-03 21:22:52 +00:00
|
|
|
case "shutdownRequested":
|
|
|
|
|
fallthrough
|
2017-02-08 20:23:00 +00:00
|
|
|
case "shutdown":
|
|
|
|
|
var actionShutdown ActionShutdown
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return actionShutdown, nil
|
|
|
|
|
case "checkpoint":
|
|
|
|
|
var actionCheckpoint ActionCheckpoint
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return actionCheckpoint, nil
|
|
|
|
|
default:
|
|
|
|
|
return nil, fmt.Errorf("no recognizable 'action' field in message: %s", line)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i ioHandler) writeAction(action interface{}) error {
|
|
|
|
|
line, err := json.Marshal(action)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
i.writeLine(string(line))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-03 21:22:52 +00:00
|
|
|
func New(
|
|
|
|
|
inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor,
|
|
|
|
|
) *KCLProcess {
|
2017-02-08 20:23:00 +00:00
|
|
|
i := ioHandler{
|
|
|
|
|
inputFile: inputFile,
|
|
|
|
|
outputFile: outputFile,
|
|
|
|
|
errorFile: errorFile,
|
|
|
|
|
}
|
|
|
|
|
return &KCLProcess{
|
2017-08-03 21:22:52 +00:00
|
|
|
ioHandler: i,
|
2017-02-08 20:23:00 +00:00
|
|
|
recordProcessor: recordProcessor,
|
2017-08-03 21:22:52 +00:00
|
|
|
|
|
|
|
|
next: make(chan struct{}),
|
|
|
|
|
out: make(chan string),
|
|
|
|
|
outErr: make(chan error),
|
|
|
|
|
|
|
|
|
|
checkpoint: make(chan SequencePair),
|
|
|
|
|
checkpointErr: make(chan error),
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type KCLProcess struct {
|
|
|
|
|
ioHandler ioHandler
|
|
|
|
|
recordProcessor RecordProcessor
|
2017-08-03 21:22:52 +00:00
|
|
|
|
|
|
|
|
next chan struct{}
|
|
|
|
|
out chan string
|
|
|
|
|
outErr chan error
|
|
|
|
|
|
|
|
|
|
checkpoint chan SequencePair
|
|
|
|
|
checkpointErr chan error
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) reportDone(responseFor string) error {
|
|
|
|
|
return kclp.ioHandler.writeAction(struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
ResponseFor string `json:"responseFor"`
|
|
|
|
|
}{
|
|
|
|
|
Action: "status",
|
|
|
|
|
ResponseFor: responseFor,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) performAction(a interface{}) (string, error) {
|
|
|
|
|
switch action := a.(type) {
|
|
|
|
|
case ActionInitialize:
|
2017-08-03 21:22:52 +00:00
|
|
|
return action.Action, kclp.recordProcessor.Initialize(action.ShardID, kclp)
|
2017-02-08 20:23:00 +00:00
|
|
|
case ActionProcessRecords:
|
2017-05-19 22:01:57 +00:00
|
|
|
return action.Action, kclp.recordProcessor.ProcessRecords(action.Records)
|
2017-02-08 20:23:00 +00:00
|
|
|
case ActionShutdown:
|
2017-05-19 22:01:57 +00:00
|
|
|
return action.Action, kclp.recordProcessor.Shutdown(action.Reason)
|
2017-02-08 20:23:00 +00:00
|
|
|
default:
|
2017-08-03 21:22:52 +00:00
|
|
|
return "", fmt.Errorf("unknown action to dispatch: %+#v", action)
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) handleLine(line string) error {
|
|
|
|
|
action, err := kclp.ioHandler.loadAction(line)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
responseFor, err := kclp.performAction(action)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
return kclp.reportDone(responseFor)
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-03 21:22:52 +00:00
|
|
|
func (kclp *KCLProcess) Checkpoint(pair SequencePair, retryCount int) error {
|
|
|
|
|
sleepDuration := 5 * time.Second
|
|
|
|
|
|
|
|
|
|
for n := 0; n <= retryCount; n++ {
|
|
|
|
|
kclp.checkpoint <- pair
|
|
|
|
|
err := <-kclp.checkpointErr
|
|
|
|
|
if err == nil {
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if cperr, ok := err.(CheckpointError); ok {
|
|
|
|
|
switch cperr.Error() {
|
|
|
|
|
case "ShutdownException":
|
|
|
|
|
return fmt.Errorf("Encountered shutdown exception, skipping checkpoint")
|
|
|
|
|
case "ThrottlingException":
|
|
|
|
|
fmt.Fprintf(os.Stderr, "Checkpointing throttling, pause for %s\n", sleepDuration)
|
|
|
|
|
case "InvalidStateException":
|
|
|
|
|
fmt.Fprintf(os.Stderr, "MultiLangDaemon invalid state while checkpointing\n")
|
|
|
|
|
default:
|
|
|
|
|
fmt.Fprintf(os.Stderr, "Encountered an error while checkpointing: %s", err)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if n == retryCount {
|
|
|
|
|
return fmt.Errorf("Failed to checkpoint after %d attempts, giving up.", retryCount)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
time.Sleep(sleepDuration)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) Shutdown() {
|
|
|
|
|
kclp.Checkpoint(SequencePair{}, 5)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) processCheckpoint(pair SequencePair) error {
|
|
|
|
|
var seq *string
|
|
|
|
|
var subSeq *int
|
|
|
|
|
if !pair.IsEmpty() { // an empty pair is a signal to shutdown
|
|
|
|
|
tmp := pair.Sequence.String()
|
|
|
|
|
seq = &tmp
|
|
|
|
|
subSeq = &pair.SubSequence
|
|
|
|
|
}
|
|
|
|
|
kclp.ioHandler.writeAction(ActionCheckpoint{
|
|
|
|
|
Action: "checkpoint",
|
|
|
|
|
SequenceNumber: seq,
|
|
|
|
|
SubSequenceNumber: subSeq,
|
|
|
|
|
})
|
|
|
|
|
line, err := kclp.ioHandler.readLine()
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
actionI, err := kclp.ioHandler.loadAction(line)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
action, ok := actionI.(ActionCheckpoint)
|
|
|
|
|
if !ok {
|
|
|
|
|
return fmt.Errorf("expected checkpoint response, got '%s'", line)
|
|
|
|
|
}
|
|
|
|
|
if action.Error != nil && *action.Error != "" {
|
|
|
|
|
return CheckpointError{e: *action.Error}
|
|
|
|
|
}
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) startLineProcessor(
|
|
|
|
|
next chan struct{}, out chan string, outErr chan error,
|
|
|
|
|
checkpoint chan SequencePair, checkpointErr chan error,
|
|
|
|
|
) {
|
|
|
|
|
go func() {
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-next:
|
|
|
|
|
line, err := kclp.ioHandler.readLine()
|
2017-08-06 01:52:11 +00:00
|
|
|
if err == nil {
|
|
|
|
|
if line == "" {
|
|
|
|
|
err = fmt.Errorf("Empty read line recieved")
|
|
|
|
|
}
|
|
|
|
|
err = kclp.handleLine(line)
|
2017-08-03 21:22:52 +00:00
|
|
|
}
|
2017-08-06 01:52:11 +00:00
|
|
|
outErr <- err
|
2017-08-03 21:22:52 +00:00
|
|
|
case pair := <-checkpoint:
|
|
|
|
|
err := kclp.processCheckpoint(pair)
|
|
|
|
|
checkpointErr <- err
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) processNextLine() error {
|
|
|
|
|
kclp.next <- struct{}{} // We're ready for a new line
|
|
|
|
|
|
2017-08-06 01:52:11 +00:00
|
|
|
return <-kclp.outErr
|
2017-08-03 21:22:52 +00:00
|
|
|
}
|
|
|
|
|
|
2017-02-08 20:23:00 +00:00
|
|
|
func (kclp *KCLProcess) Run() {
|
2017-08-03 21:22:52 +00:00
|
|
|
kclp.startLineProcessor(kclp.next, kclp.out, kclp.outErr, kclp.checkpoint, kclp.checkpointErr)
|
2017-02-08 20:23:00 +00:00
|
|
|
for {
|
2017-08-03 21:22:52 +00:00
|
|
|
err := kclp.processNextLine()
|
|
|
|
|
if err == io.EOF {
|
|
|
|
|
kclp.ioHandler.writeError("IO stream closed")
|
2017-02-08 20:23:00 +00:00
|
|
|
return
|
2017-08-03 21:22:52 +00:00
|
|
|
} else if err != nil {
|
|
|
|
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
|
2017-07-21 01:27:17 +00:00
|
|
|
return
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|