Compare commits

...

1 commit

Author SHA1 Message Date
Rafael Garcia
2b763d7b15 remove some strings 2017-09-25 16:26:32 +00:00

View file

@ -51,13 +51,27 @@ func (i ioHandler) readLine() (string, error) {
return line, nil
}
// ActionName is the "action" string contained in the JSON sent from the KCL.
type ActionName string
const (
ActionNameInitialize ActionName = "initialize"
ActionNameProcessRecords = "processRecords"
ActionNameShutdownRequested = "shutdownRequested"
ActionNameShutdown = "shutdown"
ActionNameCheckpoint = "checkpoint"
)
type ActionInitialize struct {
Action string `json:"action"`
ShardID string `json:"shardId"`
SequenceNumber string `json:"sequenceNumber"`
SubSequenceNumber int `json:"subSequenceNumber"`
}
func (a ActionInitialize) Name() ActionName {
return ActionNameInitialize
}
type Record struct {
SequenceNumber string `json:"sequenceNumber"`
SubSequenceNumber int `json:"subSequenceNumber"`
@ -67,53 +81,63 @@ type Record struct {
}
type ActionProcessRecords struct {
Action string `json:"action"`
Records []Record `json:"records"`
MillisBehindLatest int `json:"millisBehindLatest"`
}
func (a ActionProcessRecords) Name() ActionName {
return ActionNameProcessRecords
}
type ActionShutdown struct {
Action string `json:"action"`
Reason string `json:"reason"`
}
func (a ActionShutdown) Name() ActionName {
return ActionNameShutdown
}
type ActionCheckpoint struct {
Action string `json:"action"`
SequenceNumber *string `json:"sequenceNumber,omitempty"`
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
Error *string `json:"error,omitempty"`
Action ActionName `json:"action"`
SequenceNumber *string `json:"sequenceNumber,omitempty"`
SubSequenceNumber *int `json:"subSequenceNumber,omitempty"`
Error *string `json:"error,omitempty"`
}
func (a ActionCheckpoint) Name() ActionName {
return ActionNameCheckpoint
}
func (i ioHandler) loadAction(line string) (interface{}, error) {
lineBytes := []byte(line)
var message struct {
Action string `json:"action"`
Action ActionName `json:"action"`
}
if err := json.Unmarshal(lineBytes, &message); err != nil {
return nil, err
}
switch message.Action {
case "initialize":
case ActionNameInitialize:
var actionInitialize ActionInitialize
if err := json.Unmarshal(lineBytes, &actionInitialize); err != nil {
return nil, err
}
return actionInitialize, nil
case "processRecords":
case ActionNameProcessRecords:
var actionProcessRecords ActionProcessRecords
if err := json.Unmarshal(lineBytes, &actionProcessRecords); err != nil {
return nil, err
}
return actionProcessRecords, nil
case "shutdownRequested":
case ActionNameShutdownRequested:
fallthrough
case "shutdown":
case ActionNameShutdown:
var actionShutdown ActionShutdown
if err := json.Unmarshal(lineBytes, &actionShutdown); err != nil {
return nil, err
}
return actionShutdown, nil
case "checkpoint":
case ActionNameCheckpoint:
var actionCheckpoint ActionCheckpoint
if err := json.Unmarshal(lineBytes, &actionCheckpoint); err != nil {
return nil, err
@ -208,31 +232,36 @@ func (kclp *KCLProcess) handleCheckpointAction(action ActionCheckpoint) error {
func (kclp *KCLProcess) sendCheckpoint(seq *string, subSeq *int) error {
return kclp.ioHandler.writeAction(ActionCheckpoint{
Action: "checkpoint",
Action: ActionNameCheckpoint,
SequenceNumber: seq,
SubSequenceNumber: subSeq,
})
}
func (kclp *KCLProcess) reportDone(responseFor string) error {
func (kclp *KCLProcess) reportDone(responseFor ActionName) error {
return kclp.ioHandler.writeAction(struct {
Action string `json:"action"`
ResponseFor string `json:"responseFor"`
Action string `json:"action"`
ResponseFor ActionName `json:"responseFor"`
}{
Action: "status",
ResponseFor: responseFor,
})
}
func (kclp *KCLProcess) handleLine(line string) (string, error) {
type Action interface {
Name() ActionName
}
// handleLine processes a line of text sent to the process by the KCL. It returns the action handled, if any.
func (kclp *KCLProcess) handleLine(line string) (Action, error) {
action, err := kclp.ioHandler.loadAction(line)
if err != nil {
return "", err
return nil, err
}
switch action := action.(type) {
case ActionCheckpoint:
return "checkpoint", kclp.handleCheckpointAction(action)
return action, kclp.handleCheckpointAction(action)
case ActionShutdown:
kclp.ioHandler.writeError("Received shutdown action...")
@ -251,23 +280,23 @@ func (kclp *KCLProcess) handleLine(line string) (string, error) {
}
}
return "shutdown", kclp.reportDone("shutdown")
return action, kclp.reportDone(action.Name())
case ActionInitialize:
err := kclp.recordProcessor.Initialize(action.ShardID, kclp)
if err != nil {
return "", err
return nil, err
}
return "initialize", kclp.reportDone(action.Action)
return action, kclp.reportDone(action.Name())
case ActionProcessRecords:
err := kclp.recordProcessor.ProcessRecords(action.Records)
if err != nil {
return "", err
return nil, err
}
return "process-record", kclp.reportDone(action.Action)
return action, kclp.reportDone(action.Name())
default:
return "", fmt.Errorf("unknown action to dispatch: %+#v", action)
return nil, fmt.Errorf("unknown action to dispatch: %+#v", action)
}
}
@ -288,7 +317,7 @@ func (kclp *KCLProcess) Run() {
action, err := kclp.handleLine(line)
if err != nil {
kclp.ioHandler.writeError(fmt.Sprintf("ERR Handle line: %+#v", err))
} else if action == "shutdown" {
} else if action != nil && action.Name() == ActionNameShutdown {
return
}