amazon-kinesis-client-go/splitter/splitter_test.go
Nathan Leiby efa90d4481 splitter: update log stream parsing
AWS changed the format of the log stream for AWS Batch logs. This
reflects that change.

---

http://docs.aws.amazon.com/batch/latest/userguide/job_states.html

Logs for RUNNING jobs are available in CloudWatch Logs; the log group is /aws/batch/job, and the log stream name format is jobDefinitionName/default/ecs_task_id (this format may change in the future).
After a job reaches the RUNNING status, you can programmatically retrieve its log stream name with the DescribeJobs API operation. For more information, see View Log Data Sent to CloudWatch Logs in the Amazon CloudWatch Logs User Guide. By default, these logs are set to never expire, but you can modify the retention period. For more information, see Change Log Data Retention in CloudWatch Logs in the Amazon CloudWatch Logs User Guide.
2017-09-20 14:18:35 -07:00

127 lines
3.4 KiB
Go

package splitter
import (
"bytes"
"compress/gzip"
b64 "encoding/base64"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
)
func TestUnpacking(t *testing.T) {
input := "H4sIAAAAAAAAADWOTQuCQBRF/8ow6wj6ENRdhLXIClJoERKTvsZHOiPzxiLE/96YtTzcy72n4zUQCQnpuwEe8vXxkJ6O8XUfJclqG/EJ1y8FZkgq3RYvYfMy1pJcUGm5NbptXDZSYg2IekRqb5QbbCxqtcHKgiEeXrJvL3qCsgN2HIuxbtFpWFG7sdky8L1ZECwXc9+b/PUGgXPMfnrspxeydQn5A5VkJYjKlkzfWeGWUInhme1QASEx+qpNeZ/1H1PFPn3yAAAA"
decoded, err := b64.StdEncoding.DecodeString(input)
assert.NoError(t, err)
output, err := unpack(decoded)
assert.NoError(t, err)
expectedOutput := LogEventBatch{
MessageType: "CONTROL_MESSAGE",
Owner: "CloudwatchLogs",
LogGroup: "",
LogStream: "",
SubscriptionFilters: []string{},
LogEvents: []LogEvent{
{
ID: "",
Timestamp: 1498519943285,
Message: "CWL CONTROL MESSAGE: Checking health of destination Kinesis stream.",
},
},
}
assert.Equal(t, expectedOutput, output)
}
func pack(input LogEventBatch) (string, error) {
src, err := json.Marshal(input)
if err != nil {
return "", err
}
// Gzip
var b bytes.Buffer
gz := gzip.NewWriter(&b)
if _, err := gz.Write(src); err != nil {
return "", err
}
if err := gz.Flush(); err != nil {
panic(err)
}
if err := gz.Close(); err != nil {
return "", err
}
// Base64 Encode
return b64.StdEncoding.EncodeToString([]byte(b.String())), nil
}
func TestFullLoop(t *testing.T) {
input := `{
"messageType": "DATA_MESSAGE",
"owner": "123456789012",
"logGroup": "/aws/batch/job",
"logStream": "environment--app/default/11111111-2222-3333-4444-555566667777",
"subscriptionFilters": [
"MySubscriptionFilter"
],
"logEvents": [
{
"id": "33418742379011144044923130086453437181614530551221780480",
"timestamp": 1498548236012,
"message": "some log line"
},
{
"id": "33418742387663833181953011865369295871402094815542181889",
"timestamp": 1498548236400,
"message": "2017/06/27 07:23:56 Another log line"
}
]
}`
var leb LogEventBatch
err := json.Unmarshal([]byte(input), &leb)
assert.NoError(t, err)
packed, err := pack(leb)
assert.NoError(t, err)
decoded, err := b64.StdEncoding.DecodeString(packed)
assert.NoError(t, err)
output, err := unpack(decoded)
assert.NoError(t, err)
assert.Equal(t, leb, output)
}
func TestSplit(t *testing.T) {
input := LogEventBatch{
MessageType: "DATA_MESSAGE",
Owner: "123456789012",
LogGroup: "/aws/batch/job",
LogStream: "env--app/default/12345678-1234-1234-1234-555566667777",
SubscriptionFilters: []string{"MySubscriptionFilter"},
LogEvents: []LogEvent{
{
ID: "99999992379011144044923130086453437181614530551221780480",
Timestamp: 1498519943285,
Message: "some log line",
},
{
ID: "99999992387663833181953011865369295871402094815542181889",
Timestamp: 1498519943285,
Message: "another log line",
},
},
}
lines := Split(input)
expected := [][]byte{
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line"),
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line"),
}
assert.Equal(t, expected, lines)
}