amazon-kinesis-client-go/splitter/splitter.go
Nathan Leiby efa90d4481 splitter: update log stream parsing
AWS changed the format of the log stream for AWS Batch logs. This
reflects that change.

---

http://docs.aws.amazon.com/batch/latest/userguide/job_states.html

Logs for RUNNING jobs are available in CloudWatch Logs; the log group is /aws/batch/job, and the log stream name format is jobDefinitionName/default/ecs_task_id (this format may change in the future).
After a job reaches the RUNNING status, you can programmatically retrieve its log stream name with the DescribeJobs API operation. For more information, see View Log Data Sent to CloudWatch Logs in the Amazon CloudWatch Logs User Guide. By default, these logs are set to never expire, but you can modify the retention period. For more information, see Change Log Data Retention in CloudWatch Logs in the Amazon CloudWatch Logs User Guide.
2017-09-20 14:18:35 -07:00

109 lines
3.4 KiB
Go

package splitter
import (
"bytes"
"compress/gzip"
"encoding/json"
"fmt"
"io/ioutil"
"regexp"
"time"
)
// LogEvent is a single log line within a LogEventBatch
type LogEvent struct {
ID string `json:"id"`
Timestamp int64 `json:"timestamp"`
Message string `json:"message"`
}
// LogEventBatch is a batch of multiple log lines, read from a KinesisStream with a CWLogs subscription
type LogEventBatch struct {
MessageType string `json:"messageType"`
Owner string `json:"owner"`
LogGroup string `json:"logGroup"`
LogStream string `json:"logStream"`
SubscriptionFilters []string `json:"subscriptionFilters"`
LogEvents []LogEvent `json:"logEvents"`
}
// IsGzipped returns whether or not a string is Gzipped (determined by looking for a Gzip byte prefix)
func IsGzipped(b []byte) bool {
return b[0] == 0x1f && b[1] == 0x8b
}
// GetMessagesFromGzippedInput takes a gzipped record from a CWLogs Subscription and splits it into
// a slice of messages.
func GetMessagesFromGzippedInput(input []byte) ([][]byte, error) {
unpacked, err := unpack(input)
if err != nil {
return [][]byte{}, err
}
return Split(unpacked), nil
}
// Unpack expects a gzipped + json-stringified LogEventBatch
func unpack(input []byte) (LogEventBatch, error) {
gzipReader, err := gzip.NewReader(bytes.NewReader(input))
if err != nil {
return LogEventBatch{}, err
}
byt, err := ioutil.ReadAll(gzipReader)
if err != nil {
return LogEventBatch{}, err
}
var dat LogEventBatch
if err := json.Unmarshal(byt, &dat); err != nil {
return LogEventBatch{}, err
}
return dat, nil
}
// RFC3339Micro is the RFC3339 format in microseconds
const RFC3339Micro = "2006-01-02T15:04:05.999999-07:00"
// http://docs.aws.amazon.com/batch/latest/userguide/job_states.html
// "log stream name format is jobDefinitionName/default/ecs_task_id (this format may change in the future)."
const taskMeta = `([a-z0-9-]+)--([a-z0-9-]+)\/` + // env--app
`default\/` +
`([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})` // task-id
var taskRegex = regexp.MustCompile(taskMeta)
// Split takes a LogEventBatch and separates into a slice of enriched log lines
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
// the subsequent decoding logic.
func Split(b LogEventBatch) [][]byte {
env := "unknown"
app := "unknown"
task := "00001111-2222-3333-4444-555566667777"
matches := taskRegex.FindAllStringSubmatch(b.LogStream, 1)
if len(matches) == 1 {
env = matches[0][1]
app = matches[0][2]
task = matches[0][3]
}
rsyslogPrefix := `%s %s %s[%d]: %s`
// programName is a mocked ARN in the format expected by our log decoders
programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task
mockPid := 1
hostname := "aws-batch"
out := [][]byte{}
for _, event := range b.LogEvents {
// Adding an extra Microsecond forces `Format` to include all 6 digits within the micorsecond format.
// Otherwise, time.Format omits trailing zeroes. (https://github.com/golang/go/issues/12472)
nsecs := event.Timestamp*int64(time.Millisecond) + int64(time.Microsecond)
logTime := time.Unix(0, nsecs).UTC().Format(RFC3339Micro)
// Fake an RSyslog prefix, expected by consumers
formatted := fmt.Sprintf(rsyslogPrefix, logTime, hostname, programName, mockPid, event.Message)
out = append(out, []byte(formatted))
}
return out
}