Cleaned up shutdown code

This commit is contained in:
Xavi Ramirez 2017-09-20 21:28:43 +00:00
parent 7f2afe9d8f
commit b07ad4c1c2

View file

@ -141,7 +141,7 @@ func New(
recordProcessor: recordProcessor, recordProcessor: recordProcessor,
nextCheckpointPair: SequencePair{}, nextCheckpointPair: SequencePair{},
wasAskedToShutdown: false, doGracefulShutdown: false,
} }
} }
@ -152,7 +152,7 @@ type KCLProcess struct {
recordProcessor RecordProcessor recordProcessor RecordProcessor
nextCheckpointPair SequencePair nextCheckpointPair SequencePair
wasAskedToShutdown bool doGracefulShutdown bool
} }
func (kclp *KCLProcess) Checkpoint(pair SequencePair) { func (kclp *KCLProcess) Checkpoint(pair SequencePair) {
@ -166,7 +166,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair) {
func (kclp *KCLProcess) Shutdown() { func (kclp *KCLProcess) Shutdown() {
kclp.ioHandler.writeError("Checkpoint shutdown") kclp.ioHandler.writeError("Checkpoint shutdown")
kclp.wasAskedToShutdown = true kclp.doGracefulShutdown = true
} }
func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error { func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
@ -224,15 +224,15 @@ func (kclp *KCLProcess) reportDone(responseFor string) error {
}) })
} }
func (kclp *KCLProcess) handleLine(line string) error { func (kclp *KCLProcess) handleLine(line string) (string, error) {
action, err := kclp.ioHandler.loadAction(line) action, err := kclp.ioHandler.loadAction(line)
if err != nil { if err != nil {
return err return "", err
} }
switch action := action.(type) { switch action := action.(type) {
case ActionCheckpoint: case ActionCheckpoint:
err = kclp.handleCheckpointAction(action) return "checkpoint", kclp.handleCheckpointAction(action)
case ActionShutdown: case ActionShutdown:
kclp.ioHandler.writeError("Received shutdown action...") kclp.ioHandler.writeError("Received shutdown action...")
@ -244,27 +244,31 @@ func (kclp *KCLProcess) handleLine(line string) error {
kclp.ioHandler.writeError("Reporting shutdown done") kclp.ioHandler.writeError("Reporting shutdown done")
err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown if kclp.doGracefulShutdown {
if err != nil { err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown
kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err)) if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err))
}
} }
return kclp.reportDone("shutdown") return "shutdown", kclp.reportDone("shutdown")
case ActionInitialize: case ActionInitialize:
err = kclp.recordProcessor.Initialize(action.ShardID, kclp) err := kclp.recordProcessor.Initialize(action.ShardID, kclp)
if err == nil { if err != nil {
err = kclp.reportDone(action.Action) return "", err
} }
case ActionProcessRecords:
err = kclp.recordProcessor.ProcessRecords(action.Records)
if err == nil {
err = kclp.reportDone(action.Action)
}
default:
err = fmt.Errorf("unknown action to dispatch: %+#v", action)
}
return err return "initialize", kclp.reportDone(action.Action)
case ActionProcessRecords:
err := kclp.recordProcessor.ProcessRecords(action.Records)
if err != nil {
return "", err
}
return "process-record", kclp.reportDone(action.Action)
default:
return "", fmt.Errorf("unknown action to dispatch: %+#v", action)
}
} }
func (kclp *KCLProcess) Run() { func (kclp *KCLProcess) Run() {
@ -281,9 +285,11 @@ func (kclp *KCLProcess) Run() {
continue continue
} }
err = kclp.handleLine(line) action, err := kclp.handleLine(line)
if err != nil { if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err)) kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
} else if action == "shutdown" {
return
} }
kclp.ckpmux.Lock() kclp.ckpmux.Lock()
@ -297,7 +303,7 @@ func (kclp *KCLProcess) Run() {
} else { } else {
kclp.nextCheckpointPair = SequencePair{} kclp.nextCheckpointPair = SequencePair{}
} }
} else if kclp.wasAskedToShutdown { } else if kclp.doGracefulShutdown {
err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown
if err != nil { if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err)) kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err))