Compare commits
1 commit
master
...
remove-str
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b763d7b15 |
1 changed files with 56 additions and 27 deletions
83
kcl/kcl.go
83
kcl/kcl.go
|
|
@ -51,13 +51,27 @@ func (i ioHandler) readLine() (string, error) {
|
|||
return line, nil
|
||||
}
|
||||
|
||||
// ActionName is the "action" string contained in the JSON sent from the KCL.
|
||||
type ActionName string
|
||||
|
||||
const (
|
||||
ActionNameInitialize ActionName = "initialize"
|
||||
ActionNameProcessRecords = "processRecords"
|
||||
ActionNameShutdownRequested = "shutdownRequested"
|
||||
ActionNameShutdown = "shutdown"
|
||||
ActionNameCheckpoint = "checkpoint"
|
||||
)
|
||||
|
||||
type ActionInitialize struct {
|
||||
Action string `json:"action"`
|
||||
ShardID string `json:"shardId"`
|
||||
SequenceNumber string `json:"sequenceNumber"`
|
||||
SubSequenceNumber int `json:"subSequenceNumber"`
|
||||
}
|
||||
|
||||
func (a ActionInitialize) Name() ActionName {
|
||||
return ActionNameInitialize
|
||||
}
|
||||
|
||||
type Record struct {
|
||||
SequenceNumber string `json:"sequenceNumber"`
|
||||
SubSequenceNumber int `json:"subSequenceNumber"`
|
||||
|
|
@ -67,53 +81,63 @@ type Record struct {
|
|||
}
|
||||
|
||||
type ActionProcessRecords struct {
|
||||
Action string `json:"action"`
|
||||
Records []Record `json:"records"`
|
||||
MillisBehindLatest int `json:"millisBehindLatest"`
|
||||
}
|
||||
|
||||
func (a ActionProcessRecords) Name() ActionName {
|
||||
return ActionNameProcessRecords
|
||||
}
|
||||
|
||||
type ActionShutdown struct {
|
||||
Action string `json:"action"`
|
||||
Reason string `json:"reason"`
|
||||
}
|
||||
|
||||
func (a ActionShutdown) Name() ActionName {
|
||||
return ActionNameShutdown
|
||||
}
|
||||
|
||||
type ActionCheckpoint struct {
|
||||
Action string `json:"action"`
|
||||
SequenceNumber *string `json:"sequenceNumber,omitempty"`
|
||||
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
|
||||
Error *string `json:"error,omitempty"`
|
||||
Action ActionName `json:"action"`
|
||||
SequenceNumber *string `json:"sequenceNumber,omitempty"`
|
||||
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
|
||||
Error *string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
func (a ActionCheckpoint) Name() ActionName {
|
||||
return ActionNameCheckpoint
|
||||
}
|
||||
|
||||
func (i ioHandler) loadAction(line string) (interface{}, error) {
|
||||
lineBytes := []byte(line)
|
||||
var message struct {
|
||||
Action string `json:"action"`
|
||||
Action ActionName `json:"action"`
|
||||
}
|
||||
if err := json.Unmarshal(lineBytes, &message); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
switch message.Action {
|
||||
case "initialize":
|
||||
case ActionNameInitialize:
|
||||
var actionInitialize ActionInitialize
|
||||
if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return actionInitialize, nil
|
||||
case "processRecords":
|
||||
case ActionNameProcessRecords:
|
||||
var actionProcessRecords ActionProcessRecords
|
||||
if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return actionProcessRecords, nil
|
||||
case "shutdownRequested":
|
||||
case ActionNameShutdownRequested:
|
||||
fallthrough
|
||||
case "shutdown":
|
||||
case ActionNameShutdown:
|
||||
var actionShutdown ActionShutdown
|
||||
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return actionShutdown, nil
|
||||
case "checkpoint":
|
||||
case ActionNameCheckpoint:
|
||||
var actionCheckpoint ActionCheckpoint
|
||||
if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil {
|
||||
return nil, err
|
||||
|
|
@ -208,31 +232,36 @@ func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
|
|||
|
||||
func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error {
|
||||
return kclp.ioHandler.writeAction(ActionCheckpoint{
|
||||
Action: "checkpoint",
|
||||
Action: ActionNameCheckpoint,
|
||||
SequenceNumber: seq,
|
||||
SubSequenceNumber: subSeq,
|
||||
})
|
||||
}
|
||||
|
||||
func (kclp *KCLProcess) reportDone(responseFor string) error {
|
||||
func (kclp *KCLProcess) reportDone(responseFor ActionName) error {
|
||||
return kclp.ioHandler.writeAction(struct {
|
||||
Action string `json:"action"`
|
||||
ResponseFor string `json:"responseFor"`
|
||||
Action string `json:"action"`
|
||||
ResponseFor ActionName `json:"responseFor"`
|
||||
}{
|
||||
Action: "status",
|
||||
ResponseFor: responseFor,
|
||||
})
|
||||
}
|
||||
|
||||
func (kclp *KCLProcess) handleLine(line string) (string, error) {
|
||||
type Action interface {
|
||||
Name() ActionName
|
||||
}
|
||||
|
||||
// handleLine processes a line of text sent to the process by the KCL. It returns the action handled, if any.
|
||||
func (kclp *KCLProcess) handleLine(line string) (Action, error) {
|
||||
action, err := kclp.ioHandler.loadAction(line)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch action := action.(type) {
|
||||
case ActionCheckpoint:
|
||||
return "checkpoint", kclp.handleCheckpointAction(action)
|
||||
return action, kclp.handleCheckpointAction(action)
|
||||
case ActionShutdown:
|
||||
kclp.ioHandler.writeError("Received shutdown action...")
|
||||
|
||||
|
|
@ -251,23 +280,23 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) {
|
|||
}
|
||||
}
|
||||
|
||||
return "shutdown", kclp.reportDone("shutdown")
|
||||
return action, kclp.reportDone(action.Name())
|
||||
case ActionInitialize:
|
||||
err := kclp.recordProcessor.Initialize(action.ShardID, kclp)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return "initialize", kclp.reportDone(action.Action)
|
||||
return action, kclp.reportDone(action.Name())
|
||||
case ActionProcessRecords:
|
||||
err := kclp.recordProcessor.ProcessRecords(action.Records)
|
||||
if err != nil {
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return "process-record", kclp.reportDone(action.Action)
|
||||
return action, kclp.reportDone(action.Name())
|
||||
default:
|
||||
return "", fmt.Errorf("unknown action to dispatch: %+#v", action)
|
||||
return nil, fmt.Errorf("unknown action to dispatch: %+#v", action)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -288,7 +317,7 @@ func (kclp *KCLProcess) Run() {
|
|||
action, err := kclp.handleLine(line)
|
||||
if err != nil {
|
||||
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
|
||||
} else if action == "shutdown" {
|
||||
} else if action != nil && action.Name() == ActionNameShutdown {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue