2017-02-08 20:23:00 +00:00
|
|
|
package kcl
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"bufio"
|
|
|
|
|
"encoding/json"
|
|
|
|
|
"fmt"
|
|
|
|
|
"io"
|
2017-08-06 02:59:28 +00:00
|
|
|
"sync"
|
2017-05-22 23:05:34 +00:00
|
|
|
"time"
|
2017-02-08 20:23:00 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
type RecordProcessor interface {
|
2017-07-21 01:04:34 +00:00
|
|
|
Initialize(shardID string, checkpointer Checkpointer) error
|
2017-05-19 22:01:57 +00:00
|
|
|
ProcessRecords(records []Record) error
|
2017-08-08 19:09:31 +00:00
|
|
|
// Shutdown this call should block until it's safe to shutdown the process
|
2017-05-19 22:01:57 +00:00
|
|
|
Shutdown(reason string) error
|
|
|
|
|
}
|
|
|
|
|
|
2017-07-21 01:04:34 +00:00
|
|
|
type Checkpointer interface {
|
2017-08-06 05:02:55 +00:00
|
|
|
Checkpoint(pair SequencePair)
|
2017-07-21 01:04:34 +00:00
|
|
|
Shutdown()
|
|
|
|
|
}
|
|
|
|
|
|
2017-02-08 20:23:00 +00:00
|
|
|
type ioHandler struct {
|
2017-08-20 03:26:32 +00:00
|
|
|
input *bufio.Reader
|
2017-02-08 20:23:00 +00:00
|
|
|
outputFile io.Writer
|
|
|
|
|
errorFile io.Writer
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-20 03:26:32 +00:00
|
|
|
func newIOHandler(inputFile io.Reader, outputFile, errorFile io.Writer) ioHandler {
|
|
|
|
|
return ioHandler{
|
|
|
|
|
input: bufio.NewReader(inputFile),
|
|
|
|
|
outputFile: outputFile,
|
|
|
|
|
errorFile: errorFile,
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2017-02-08 20:23:00 +00:00
|
|
|
func (i ioHandler) writeLine(line string) {
|
|
|
|
|
fmt.Fprintf(i.outputFile, "\n%s\n", line)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i ioHandler) writeError(message string) {
|
|
|
|
|
fmt.Fprintf(i.errorFile, "%s\n", message)
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-03 21:22:52 +00:00
|
|
|
func (i ioHandler) readLine() (string, error) {
|
2017-08-20 03:26:32 +00:00
|
|
|
line, err := i.input.ReadString('\n')
|
2017-02-08 20:23:00 +00:00
|
|
|
if err != nil {
|
2017-08-03 21:22:52 +00:00
|
|
|
return "", err
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
2017-08-03 21:22:52 +00:00
|
|
|
return line, nil
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ActionInitialize struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
ShardID string `json:"shardId"`
|
|
|
|
|
SequenceNumber string `json:"sequenceNumber"`
|
|
|
|
|
SubSequenceNumber int `json:"subSequenceNumber"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type Record struct {
|
|
|
|
|
SequenceNumber string `json:"sequenceNumber"`
|
|
|
|
|
SubSequenceNumber int `json:"subSequenceNumber"`
|
|
|
|
|
ApproximateArrivalTimestamp int `json:"approximateArrivalTimestamp"`
|
|
|
|
|
PartitionKey string `json:"partitionKey"`
|
|
|
|
|
Data string `json:"data"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ActionProcessRecords struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
Records []Record `json:"records"`
|
|
|
|
|
MillisBehindLatest int `json:"millisBehindLatest"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ActionShutdown struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
Reason string `json:"reason"`
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ActionCheckpoint struct {
|
2017-05-18 00:46:29 +00:00
|
|
|
Action string `json:"action"`
|
|
|
|
|
SequenceNumber *string `json:"sequenceNumber,omitempty"`
|
|
|
|
|
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
|
|
|
|
|
Error *string `json:"error,omitempty"`
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i ioHandler) loadAction(line string) (interface{}, error) {
|
|
|
|
|
lineBytes := []byte(line)
|
|
|
|
|
var message struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
}
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &message); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
switch message.Action {
|
|
|
|
|
case "initialize":
|
|
|
|
|
var actionInitialize ActionInitialize
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return actionInitialize, nil
|
|
|
|
|
case "processRecords":
|
|
|
|
|
var actionProcessRecords ActionProcessRecords
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return actionProcessRecords, nil
|
2017-08-03 21:22:52 +00:00
|
|
|
case "shutdownRequested":
|
|
|
|
|
fallthrough
|
2017-02-08 20:23:00 +00:00
|
|
|
case "shutdown":
|
|
|
|
|
var actionShutdown ActionShutdown
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return actionShutdown, nil
|
|
|
|
|
case "checkpoint":
|
|
|
|
|
var actionCheckpoint ActionCheckpoint
|
|
|
|
|
if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
return actionCheckpoint, nil
|
|
|
|
|
default:
|
|
|
|
|
return nil, fmt.Errorf("no recognizable 'action' field in message: %s", line)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (i ioHandler) writeAction(action interface{}) error {
|
|
|
|
|
line, err := json.Marshal(action)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
i.writeLine(string(line))
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-03 21:22:52 +00:00
|
|
|
func New(
|
|
|
|
|
inputFile io.Reader, outputFile, errorFile io.Writer, recordProcessor RecordProcessor,
|
|
|
|
|
) *KCLProcess {
|
2017-02-08 20:23:00 +00:00
|
|
|
return &KCLProcess{
|
2017-08-20 03:26:32 +00:00
|
|
|
ioHandler: newIOHandler(inputFile, outputFile, errorFile),
|
2017-02-08 20:23:00 +00:00
|
|
|
recordProcessor: recordProcessor,
|
2017-08-03 21:22:52 +00:00
|
|
|
|
2017-08-06 05:02:55 +00:00
|
|
|
nextCheckpointPair: SequencePair{},
|
2017-08-18 01:42:36 +00:00
|
|
|
wasAskedToShutdown: false,
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type KCLProcess struct {
|
2017-08-06 05:02:55 +00:00
|
|
|
ckpmux sync.Mutex
|
2017-08-06 02:59:28 +00:00
|
|
|
|
2017-02-08 20:23:00 +00:00
|
|
|
ioHandler ioHandler
|
|
|
|
|
recordProcessor RecordProcessor
|
2017-08-03 21:22:52 +00:00
|
|
|
|
2017-08-06 05:02:55 +00:00
|
|
|
nextCheckpointPair SequencePair
|
2017-08-18 01:42:36 +00:00
|
|
|
wasAskedToShutdown bool
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
|
2017-08-06 05:02:55 +00:00
|
|
|
func (kclp *KCLProcess) Checkpoint(pair SequencePair) {
|
2017-08-06 03:46:58 +00:00
|
|
|
kclp.ckpmux.Lock()
|
|
|
|
|
defer kclp.ckpmux.Unlock()
|
2017-08-06 05:02:55 +00:00
|
|
|
|
2017-08-10 20:16:41 +00:00
|
|
|
if kclp.nextCheckpointPair.IsNil() || kclp.nextCheckpointPair.IsLessThan(pair) {
|
2017-08-06 05:02:55 +00:00
|
|
|
kclp.nextCheckpointPair = pair
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) Shutdown() {
|
2017-08-08 19:09:31 +00:00
|
|
|
kclp.ioHandler.writeError("Checkpoint shutdown")
|
2017-08-18 01:42:36 +00:00
|
|
|
kclp.wasAskedToShutdown = true
|
2017-08-06 05:02:55 +00:00
|
|
|
}
|
|
|
|
|
|
2017-08-06 10:58:46 +00:00
|
|
|
func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
|
|
|
|
|
if action.Error == nil { // Successful checkpoint
|
|
|
|
|
return nil
|
|
|
|
|
}
|
2017-08-03 21:22:52 +00:00
|
|
|
|
2017-08-06 10:58:46 +00:00
|
|
|
msg := *action.Error
|
|
|
|
|
switch msg {
|
|
|
|
|
case "ShutdownException":
|
|
|
|
|
return fmt.Errorf("Encountered shutdown exception, skipping checkpoint")
|
|
|
|
|
case "ThrottlingException":
|
|
|
|
|
sleep := 5 * time.Second
|
2017-08-20 23:42:27 +00:00
|
|
|
kclp.ioHandler.writeError(fmt.Sprintf("Checkpointing throttling, pause for %s", sleep))
|
2017-08-06 10:58:46 +00:00
|
|
|
time.Sleep(sleep)
|
|
|
|
|
case "InvalidStateException":
|
2017-08-20 23:42:27 +00:00
|
|
|
kclp.ioHandler.writeError("MultiLangDaemon invalid state while checkpointing")
|
2017-08-06 10:58:46 +00:00
|
|
|
default:
|
2017-08-20 23:42:27 +00:00
|
|
|
kclp.ioHandler.writeError(fmt.Sprintf("Encountered an error while checkpointing: %s", msg))
|
2017-08-06 10:58:46 +00:00
|
|
|
}
|
2017-08-03 21:22:52 +00:00
|
|
|
|
2017-08-06 10:58:46 +00:00
|
|
|
seq := action.SequenceNumber
|
|
|
|
|
subSeq := action.SubSequenceNumber
|
2017-08-03 21:22:52 +00:00
|
|
|
|
2017-08-06 10:58:46 +00:00
|
|
|
kclp.ckpmux.Lock()
|
2017-08-10 20:16:41 +00:00
|
|
|
if !kclp.nextCheckpointPair.IsNil() {
|
2017-08-06 10:58:46 +00:00
|
|
|
tmp := kclp.nextCheckpointPair.Sequence.String()
|
|
|
|
|
seq = &tmp
|
|
|
|
|
subSeq = &kclp.nextCheckpointPair.SubSequence
|
|
|
|
|
}
|
|
|
|
|
kclp.ckpmux.Unlock()
|
2017-08-03 21:22:52 +00:00
|
|
|
|
2017-08-06 10:58:46 +00:00
|
|
|
if seq != nil && subSeq != nil {
|
|
|
|
|
return kclp.sendCheckpoint(seq, subSeq)
|
2017-08-03 21:22:52 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
|
|
2017-08-06 10:58:46 +00:00
|
|
|
func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error {
|
|
|
|
|
return kclp.ioHandler.writeAction(ActionCheckpoint{
|
2017-08-03 21:22:52 +00:00
|
|
|
Action: "checkpoint",
|
|
|
|
|
SequenceNumber: seq,
|
|
|
|
|
SubSequenceNumber: subSeq,
|
|
|
|
|
})
|
2017-08-06 10:58:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) reportDone(responseFor string) error {
|
|
|
|
|
return kclp.ioHandler.writeAction(struct {
|
|
|
|
|
Action string `json:"action"`
|
|
|
|
|
ResponseFor string `json:"responseFor"`
|
|
|
|
|
}{
|
|
|
|
|
Action: "status",
|
|
|
|
|
ResponseFor: responseFor,
|
|
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (kclp *KCLProcess) handleLine(line string) error {
|
|
|
|
|
action, err := kclp.ioHandler.loadAction(line)
|
2017-08-03 21:22:52 +00:00
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
2017-08-06 10:58:46 +00:00
|
|
|
|
|
|
|
|
switch action := action.(type) {
|
|
|
|
|
case ActionCheckpoint:
|
|
|
|
|
err = kclp.handleCheckpointAction(action)
|
2017-08-08 19:09:31 +00:00
|
|
|
case ActionShutdown:
|
|
|
|
|
kclp.ioHandler.writeError("Received shutdown action...")
|
|
|
|
|
|
2017-08-10 19:15:53 +00:00
|
|
|
// Shutdown should block until it's safe to shutdown the process
|
2017-08-08 19:09:31 +00:00
|
|
|
err = kclp.recordProcessor.Shutdown(action.Reason)
|
|
|
|
|
if err != nil { // Log error and continue shutting down
|
|
|
|
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown: %+#v", err))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
kclp.ioHandler.writeError("Reporting shutdown done")
|
|
|
|
|
return kclp.reportDone("shutdown")
|
2017-08-06 10:58:46 +00:00
|
|
|
case ActionInitialize:
|
|
|
|
|
err = kclp.recordProcessor.Initialize(action.ShardID, kclp)
|
|
|
|
|
if err == nil {
|
|
|
|
|
err = kclp.reportDone(action.Action)
|
|
|
|
|
}
|
|
|
|
|
case ActionProcessRecords:
|
|
|
|
|
err = kclp.recordProcessor.ProcessRecords(action.Records)
|
|
|
|
|
if err == nil {
|
|
|
|
|
err = kclp.reportDone(action.Action)
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
err = fmt.Errorf("unknown action to dispatch: %+#v", action)
|
2017-08-03 21:22:52 +00:00
|
|
|
}
|
2017-08-06 10:58:46 +00:00
|
|
|
|
|
|
|
|
return err
|
2017-08-03 21:22:52 +00:00
|
|
|
}
|
|
|
|
|
|
2017-02-08 20:23:00 +00:00
|
|
|
func (kclp *KCLProcess) Run() {
|
|
|
|
|
for {
|
2017-08-06 02:59:28 +00:00
|
|
|
line, err := kclp.ioHandler.readLine()
|
2017-08-03 21:22:52 +00:00
|
|
|
if err == io.EOF {
|
|
|
|
|
kclp.ioHandler.writeError("IO stream closed")
|
2017-02-08 20:23:00 +00:00
|
|
|
return
|
2017-08-03 21:22:52 +00:00
|
|
|
} else if err != nil {
|
2017-08-06 02:59:28 +00:00
|
|
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR Read line: %+#v", err))
|
|
|
|
|
return
|
|
|
|
|
} else if line == "" {
|
|
|
|
|
kclp.ioHandler.writeError("Empty read line recieved")
|
2017-08-06 10:58:46 +00:00
|
|
|
continue
|
2017-08-06 02:59:28 +00:00
|
|
|
}
|
|
|
|
|
|
2017-08-06 10:58:46 +00:00
|
|
|
err = kclp.handleLine(line)
|
2017-08-06 02:59:28 +00:00
|
|
|
if err != nil {
|
2017-08-03 21:22:52 +00:00
|
|
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
2017-08-06 05:02:55 +00:00
|
|
|
|
2017-08-06 10:58:46 +00:00
|
|
|
kclp.ckpmux.Lock()
|
2017-08-10 20:16:41 +00:00
|
|
|
if !kclp.nextCheckpointPair.IsNil() {
|
2017-08-06 10:58:46 +00:00
|
|
|
seq := kclp.nextCheckpointPair.Sequence.String()
|
|
|
|
|
subSeq := kclp.nextCheckpointPair.SubSequence
|
|
|
|
|
|
|
|
|
|
err := kclp.sendCheckpoint(&seq, &subSeq)
|
2017-08-06 05:02:55 +00:00
|
|
|
if err != nil {
|
|
|
|
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR checkpoint: %+#v", err))
|
2017-08-06 10:58:46 +00:00
|
|
|
} else {
|
|
|
|
|
kclp.nextCheckpointPair = SequencePair{}
|
2017-08-06 05:02:55 +00:00
|
|
|
}
|
2017-08-18 01:42:36 +00:00
|
|
|
} else if kclp.wasAskedToShutdown {
|
|
|
|
|
err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown
|
|
|
|
|
if err != nil {
|
|
|
|
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err))
|
|
|
|
|
}
|
2017-08-06 05:02:55 +00:00
|
|
|
}
|
2017-08-06 10:58:46 +00:00
|
|
|
kclp.ckpmux.Unlock()
|
2017-02-08 20:23:00 +00:00
|
|
|
}
|
|
|
|
|
}
|