diff --git a/batchconsumer/consumer.go b/batchconsumer/consumer.go index 9b16e0b..c51ddd1 100644 --- a/batchconsumer/consumer.go +++ b/batchconsumer/consumer.go @@ -13,9 +13,6 @@ import ( // Config used for BatchConsumer constructor. Any empty fields are populated with defaults. type Config struct { - // DeployEnv the name of the deployment environment - DeployEnv string - // LogFile where consumer errors and failed log lines are saved LogFile string @@ -56,10 +53,6 @@ func withDefaults(config Config) Config { config.BatchSize = 4 * 1024 * 1024 } - if config.DeployEnv == "" { - config.DeployEnv = "unknown-env" - } - // Not totally clear we need this rate limit. The KCL may do rate limiting for us. if config.ReadRateLimit == 0 { config.ReadRateLimit = 1000 diff --git a/batchconsumer/writer.go b/batchconsumer/writer.go index af102d1..4b68c74 100644 --- a/batchconsumer/writer.go +++ b/batchconsumer/writer.go @@ -65,7 +65,7 @@ func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error) } // Process a batch of messages from a CWLogs Subscription - return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production") + return splitter.GetMessagesFromGzippedInput(record) } func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { @@ -81,7 +81,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error { return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber) } - pair = kcl.SequencePair{seq, record.SubSequenceNumber} + pair = kcl.SequencePair{Sequence: seq, SubSequence: record.SubSequenceNumber} if prevPair.IsNil() { // Handles on-start edge case where b.lastProcessSeq is empty prevPair = pair } diff --git a/cmd/batchconsumer/main.go b/cmd/batchconsumer/main.go index f97e574..f12d3c1 100644 --- a/cmd/batchconsumer/main.go +++ b/cmd/batchconsumer/main.go @@ -29,7 +29,6 @@ func main() { BatchCount: 500, BatchSize: 4 * 1024 * 1024, // 4Mb LogFile: "/tmp/example-kcl-consumer", - DeployEnv: "test-env", } output, file := createDummyOutput() diff --git a/cmd/consumer/main.go b/cmd/consumer/main.go index 6f2f51a..647b472 100644 --- a/cmd/consumer/main.go +++ b/cmd/consumer/main.go @@ -39,7 +39,7 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error { fmt.Fprintf(os.Stderr, "could not parse sequence number '%s'\n", record.SequenceNumber) continue } - pair := kcl.SequencePair{seqNumber, record.SubSequenceNumber} + pair := kcl.SequencePair{Sequence: seqNumber, SubSequence: record.SubSequenceNumber} if srp.shouldUpdateSequence(pair) { srp.largestPair = pair } diff --git a/splitter/splitter.go b/splitter/splitter.go index d43bd93..9b811a4 100644 --- a/splitter/splitter.go +++ b/splitter/splitter.go @@ -34,12 +34,12 @@ func IsGzipped(b []byte) bool { // GetMessagesFromGzippedInput takes a gzipped record from a CWLogs Subscription and splits it into // a slice of messages. -func GetMessagesFromGzippedInput(input []byte, prodEnv bool) ([][]byte, error) { +func GetMessagesFromGzippedInput(input []byte) ([][]byte, error) { unpacked, err := unpack(input) if err != nil { return [][]byte{}, err } - return Split(unpacked, prodEnv), nil + return Split(unpacked), nil } // Unpack expects a gzipped + json-stringified LogEventBatch @@ -74,7 +74,7 @@ var taskRegex = regexp.MustCompile(taskMeta) // Split takes a LogEventBatch and separates into a slice of enriched log lines // Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by // the subsequent decoding logic. -func Split(b LogEventBatch, prodEnv bool) [][]byte { +func Split(b LogEventBatch) [][]byte { env := "unknown" app := "unknown" task := "00001111-2222-3333-4444-555566667777" @@ -85,12 +85,6 @@ func Split(b LogEventBatch, prodEnv bool) [][]byte { task = matches[0][3] } - if (env == "production") != prodEnv { - // if there's a mis-match between the consumer's environment and the log's environment, - // throw away the log. (this is a workaround for coarse grained subscription filters.) - return [][]byte{} - } - rsyslogPrefix := `%s %s %s[%d]: %s` // programName is a mocked ARN in the format expected by our log decoders programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task diff --git a/splitter/splitter_test.go b/splitter/splitter_test.go index d6c5cda..2a90c03 100644 --- a/splitter/splitter_test.go +++ b/splitter/splitter_test.go @@ -118,65 +118,10 @@ func TestSplit(t *testing.T) { }, }, } - prodEnv := false - lines := Split(input, prodEnv) + lines := Split(input) expected := [][]byte{ []byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line"), []byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line"), } assert.Equal(t, expected, lines) } - -func TestSplitFiltersByEnv(t *testing.T) { - t.Log("If Split is run with prodEnv == true, it should omit logs with env != production") - input := LogEventBatch{ - MessageType: "DATA_MESSAGE", - Owner: "123456789012", - LogGroup: "/aws/batch/job", - LogStream: "env--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee", - // LogStream: "environment--app", - SubscriptionFilters: []string{"MySubscriptionFilter"}, - LogEvents: []LogEvent{ - { - ID: "99999992379011144044923130086453437181614530551221780480", - Timestamp: 1498519943285, - Message: "some log line", - }, - { - ID: "99999992387663833181953011865369295871402094815542181889", - Timestamp: 1498519943285, - Message: "another log line", - }, - }, - } - prodEnv := true - lines := Split(input, prodEnv) - expected := [][]byte{} - assert.Equal(t, expected, lines) - - t.Log("If Split is run with prodEnv == false, it should omit logs with env == production") - input = LogEventBatch{ - MessageType: "DATA_MESSAGE", - Owner: "123456789012", - LogGroup: "/aws/batch/job", - LogStream: "production--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee", - // LogStream: "environment--app", - SubscriptionFilters: []string{"MySubscriptionFilter"}, - LogEvents: []LogEvent{ - { - ID: "99999992379011144044923130086453437181614530551221780480", - Timestamp: 1498519943285, - Message: "some log line", - }, - { - ID: "99999992387663833181953011865369295871402094815542181889", - Timestamp: 1498519943285, - Message: "another log line", - }, - }, - } - prodEnv = false - lines = Split(input, prodEnv) - expected = [][]byte{} - assert.Equal(t, expected, lines) -}