Merge pull request #18 from Clever/fix-shutdown

Fix shutdown
This commit is contained in:
Xavi 2017-09-21 10:58:25 -07:00 committed by GitHub
commit 8d871591de

View file

@ -141,7 +141,7 @@ func New(
recordProcessor: recordProcessor,
nextCheckpointPair: SequencePair{},
wasAskedToShutdown: false,
doGracefulShutdown: false,
}
}
@ -152,7 +152,7 @@ type KCLProcess struct {
recordProcessor RecordProcessor
nextCheckpointPair SequencePair
wasAskedToShutdown bool
doGracefulShutdown bool
}
func (kclp *KCLProcess) Checkpoint(pair SequencePair) {
@ -166,7 +166,7 @@ func (kclp *KCLProcess) Checkpoint(pair SequencePair) {
func (kclp *KCLProcess) Shutdown() {
kclp.ioHandler.writeError("Checkpoint shutdown")
kclp.wasAskedToShutdown = true
kclp.doGracefulShutdown = true
}
func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
@ -224,15 +224,15 @@ func (kclp *KCLProcess) reportDone(responseFor string) error {
})
}
func (kclp *KCLProcess) handleLine(line string) error {
func (kclp *KCLProcess) handleLine(line string) (string, error) {
action, err := kclp.ioHandler.loadAction(line)
if err != nil {
return err
return "", err
}
switch action := action.(type) {
case ActionCheckpoint:
err = kclp.handleCheckpointAction(action)
return "checkpoint", kclp.handleCheckpointAction(action)
case ActionShutdown:
kclp.ioHandler.writeError("Received shutdown action...")
@ -243,22 +243,32 @@ func (kclp *KCLProcess) handleLine(line string) error {
}
kclp.ioHandler.writeError("Reporting shutdown done")
return kclp.reportDone("shutdown")
case ActionInitialize:
err = kclp.recordProcessor.Initialize(action.ShardID, kclp)
if err == nil {
err = kclp.reportDone(action.Action)
if kclp.doGracefulShutdown {
err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown
if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err))
}
case ActionProcessRecords:
err = kclp.recordProcessor.ProcessRecords(action.Records)
if err == nil {
err = kclp.reportDone(action.Action)
}
default:
err = fmt.Errorf("unknown action to dispatch: %+#v", action)
}
return err
return "shutdown", kclp.reportDone("shutdown")
case ActionInitialize:
err := kclp.recordProcessor.Initialize(action.ShardID, kclp)
if err != nil {
return "", err
}
return "initialize", kclp.reportDone(action.Action)
case ActionProcessRecords:
err := kclp.recordProcessor.ProcessRecords(action.Records)
if err != nil {
return "", err
}
return "process-record", kclp.reportDone(action.Action)
default:
return "", fmt.Errorf("unknown action to dispatch: %+#v", action)
}
}
func (kclp *KCLProcess) Run() {
@ -275,9 +285,11 @@ func (kclp *KCLProcess) Run() {
continue
}
err = kclp.handleLine(line)
action, err := kclp.handleLine(line)
if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
} else if action == "shutdown" {
return
}
kclp.ckpmux.Lock()
@ -291,7 +303,7 @@ func (kclp *KCLProcess) Run() {
} else {
kclp.nextCheckpointPair = SequencePair{}
}
} else if kclp.wasAskedToShutdown {
} else if kclp.doGracefulShutdown {
err := kclp.sendCheckpoint(nil, nil) // nil sequence num is signal to shutdown
if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR shutdown ask: %+#v", err))