Compare commits
1 commit
master
...
remove-str
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2b763d7b15 |
1 changed files with 56 additions and 27 deletions
83
kcl/kcl.go
83
kcl/kcl.go
|
|
@ -51,13 +51,27 @@ func (i ioHandler) readLine() (string, error) {
|
||||||
return line, nil
|
return line, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ActionName is the "action" string contained in the JSON sent from the KCL.
|
||||||
|
type ActionName string
|
||||||
|
|
||||||
|
const (
|
||||||
|
ActionNameInitialize ActionName = "initialize"
|
||||||
|
ActionNameProcessRecords = "processRecords"
|
||||||
|
ActionNameShutdownRequested = "shutdownRequested"
|
||||||
|
ActionNameShutdown = "shutdown"
|
||||||
|
ActionNameCheckpoint = "checkpoint"
|
||||||
|
)
|
||||||
|
|
||||||
type ActionInitialize struct {
|
type ActionInitialize struct {
|
||||||
Action string `json:"action"`
|
|
||||||
ShardID string `json:"shardId"`
|
ShardID string `json:"shardId"`
|
||||||
SequenceNumber string `json:"sequenceNumber"`
|
SequenceNumber string `json:"sequenceNumber"`
|
||||||
SubSequenceNumber int `json:"subSequenceNumber"`
|
SubSequenceNumber int `json:"subSequenceNumber"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a ActionInitialize) Name() ActionName {
|
||||||
|
return ActionNameInitialize
|
||||||
|
}
|
||||||
|
|
||||||
type Record struct {
|
type Record struct {
|
||||||
SequenceNumber string `json:"sequenceNumber"`
|
SequenceNumber string `json:"sequenceNumber"`
|
||||||
SubSequenceNumber int `json:"subSequenceNumber"`
|
SubSequenceNumber int `json:"subSequenceNumber"`
|
||||||
|
|
@ -67,53 +81,63 @@ type Record struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
type ActionProcessRecords struct {
|
type ActionProcessRecords struct {
|
||||||
Action string `json:"action"`
|
|
||||||
Records []Record `json:"records"`
|
Records []Record `json:"records"`
|
||||||
MillisBehindLatest int `json:"millisBehindLatest"`
|
MillisBehindLatest int `json:"millisBehindLatest"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a ActionProcessRecords) Name() ActionName {
|
||||||
|
return ActionNameProcessRecords
|
||||||
|
}
|
||||||
|
|
||||||
type ActionShutdown struct {
|
type ActionShutdown struct {
|
||||||
Action string `json:"action"`
|
|
||||||
Reason string `json:"reason"`
|
Reason string `json:"reason"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a ActionShutdown) Name() ActionName {
|
||||||
|
return ActionNameShutdown
|
||||||
|
}
|
||||||
|
|
||||||
type ActionCheckpoint struct {
|
type ActionCheckpoint struct {
|
||||||
Action string `json:"action"`
|
Action ActionName `json:"action"`
|
||||||
SequenceNumber *string `json:"sequenceNumber,omitempty"`
|
SequenceNumber *string `json:"sequenceNumber,omitempty"`
|
||||||
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
|
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
|
||||||
Error *string `json:"error,omitempty"`
|
Error *string `json:"error,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a ActionCheckpoint) Name() ActionName {
|
||||||
|
return ActionNameCheckpoint
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i ioHandler) loadAction(line string) (interface{}, error) {
|
func (i ioHandler) loadAction(line string) (interface{}, error) {
|
||||||
lineBytes := []byte(line)
|
lineBytes := []byte(line)
|
||||||
var message struct {
|
var message struct {
|
||||||
Action string `json:"action"`
|
Action ActionName `json:"action"`
|
||||||
}
|
}
|
||||||
if err := json.Unmarshal(lineBytes, &message); err != nil {
|
if err := json.Unmarshal(lineBytes, &message); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
switch message.Action {
|
switch message.Action {
|
||||||
case "initialize":
|
case ActionNameInitialize:
|
||||||
var actionInitialize ActionInitialize
|
var actionInitialize ActionInitialize
|
||||||
if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil {
|
if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return actionInitialize, nil
|
return actionInitialize, nil
|
||||||
case "processRecords":
|
case ActionNameProcessRecords:
|
||||||
var actionProcessRecords ActionProcessRecords
|
var actionProcessRecords ActionProcessRecords
|
||||||
if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil {
|
if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return actionProcessRecords, nil
|
return actionProcessRecords, nil
|
||||||
case "shutdownRequested":
|
case ActionNameShutdownRequested:
|
||||||
fallthrough
|
fallthrough
|
||||||
case "shutdown":
|
case ActionNameShutdown:
|
||||||
var actionShutdown ActionShutdown
|
var actionShutdown ActionShutdown
|
||||||
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
|
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return actionShutdown, nil
|
return actionShutdown, nil
|
||||||
case "checkpoint":
|
case ActionNameCheckpoint:
|
||||||
var actionCheckpoint ActionCheckpoint
|
var actionCheckpoint ActionCheckpoint
|
||||||
if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil {
|
if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -208,31 +232,36 @@ func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
|
||||||
|
|
||||||
func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error {
|
func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error {
|
||||||
return kclp.ioHandler.writeAction(ActionCheckpoint{
|
return kclp.ioHandler.writeAction(ActionCheckpoint{
|
||||||
Action: "checkpoint",
|
Action: ActionNameCheckpoint,
|
||||||
SequenceNumber: seq,
|
SequenceNumber: seq,
|
||||||
SubSequenceNumber: subSeq,
|
SubSequenceNumber: subSeq,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kclp *KCLProcess) reportDone(responseFor string) error {
|
func (kclp *KCLProcess) reportDone(responseFor ActionName) error {
|
||||||
return kclp.ioHandler.writeAction(struct {
|
return kclp.ioHandler.writeAction(struct {
|
||||||
Action string `json:"action"`
|
Action string `json:"action"`
|
||||||
ResponseFor string `json:"responseFor"`
|
ResponseFor ActionName `json:"responseFor"`
|
||||||
}{
|
}{
|
||||||
Action: "status",
|
Action: "status",
|
||||||
ResponseFor: responseFor,
|
ResponseFor: responseFor,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kclp *KCLProcess) handleLine(line string) (string, error) {
|
type Action interface {
|
||||||
|
Name() ActionName
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleLine processes a line of text sent to the process by the KCL. It returns the action handled, if any.
|
||||||
|
func (kclp *KCLProcess) handleLine(line string) (Action, error) {
|
||||||
action, err := kclp.ioHandler.loadAction(line)
|
action, err := kclp.ioHandler.loadAction(line)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
switch action := action.(type) {
|
switch action := action.(type) {
|
||||||
case ActionCheckpoint:
|
case ActionCheckpoint:
|
||||||
return "checkpoint", kclp.handleCheckpointAction(action)
|
return action, kclp.handleCheckpointAction(action)
|
||||||
case ActionShutdown:
|
case ActionShutdown:
|
||||||
kclp.ioHandler.writeError("Received shutdown action...")
|
kclp.ioHandler.writeError("Received shutdown action...")
|
||||||
|
|
||||||
|
|
@ -251,23 +280,23 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return "shutdown", kclp.reportDone("shutdown")
|
return action, kclp.reportDone(action.Name())
|
||||||
case ActionInitialize:
|
case ActionInitialize:
|
||||||
err := kclp.recordProcessor.Initialize(action.ShardID, kclp)
|
err := kclp.recordProcessor.Initialize(action.ShardID, kclp)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return "initialize", kclp.reportDone(action.Action)
|
return action, kclp.reportDone(action.Name())
|
||||||
case ActionProcessRecords:
|
case ActionProcessRecords:
|
||||||
err := kclp.recordProcessor.ProcessRecords(action.Records)
|
err := kclp.recordProcessor.ProcessRecords(action.Records)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return "process-record", kclp.reportDone(action.Action)
|
return action, kclp.reportDone(action.Name())
|
||||||
default:
|
default:
|
||||||
return "", fmt.Errorf("unknown action to dispatch: %+#v", action)
|
return nil, fmt.Errorf("unknown action to dispatch: %+#v", action)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -288,7 +317,7 @@ func (kclp *KCLProcess) Run() {
|
||||||
action, err := kclp.handleLine(line)
|
action, err := kclp.handleLine(line)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
|
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
|
||||||
} else if action == "shutdown" {
|
} else if action != nil && action.Name() == ActionNameShutdown {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue