Merge pull request #7 from Clever/splitter-dont-filter-by-env
Splitter -- dont filter by env
This commit is contained in:
commit
375e4d1147
6 changed files with 7 additions and 76 deletions
|
|
@ -13,9 +13,6 @@ import (
|
||||||
|
|
||||||
// Config used for BatchConsumer constructor. Any empty fields are populated with defaults.
|
// Config used for BatchConsumer constructor. Any empty fields are populated with defaults.
|
||||||
type Config struct {
|
type Config struct {
|
||||||
// DeployEnv the name of the deployment environment
|
|
||||||
DeployEnv string
|
|
||||||
|
|
||||||
// LogFile where consumer errors and failed log lines are saved
|
// LogFile where consumer errors and failed log lines are saved
|
||||||
LogFile string
|
LogFile string
|
||||||
|
|
||||||
|
|
@ -56,10 +53,6 @@ func withDefaults(config Config) Config {
|
||||||
config.BatchSize = 4 * 1024 * 1024
|
config.BatchSize = 4 * 1024 * 1024
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.DeployEnv == "" {
|
|
||||||
config.DeployEnv = "unknown-env"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Not totally clear we need this rate limit. The KCL may do rate limiting for us.
|
// Not totally clear we need this rate limit. The KCL may do rate limiting for us.
|
||||||
if config.ReadRateLimit == 0 {
|
if config.ReadRateLimit == 0 {
|
||||||
config.ReadRateLimit = 1000
|
config.ReadRateLimit = 1000
|
||||||
|
|
|
||||||
|
|
@ -65,7 +65,7 @@ func (b *batchedWriter) splitMessageIfNecessary(record []byte) ([][]byte, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process a batch of messages from a CWLogs Subscription
|
// Process a batch of messages from a CWLogs Subscription
|
||||||
return splitter.GetMessagesFromGzippedInput(record, b.config.DeployEnv == "production")
|
return splitter.GetMessagesFromGzippedInput(record)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
|
|
@ -81,7 +81,7 @@ func (b *batchedWriter) ProcessRecords(records []kcl.Record) error {
|
||||||
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
|
return fmt.Errorf("could not parse sequence number '%s'", record.SequenceNumber)
|
||||||
}
|
}
|
||||||
|
|
||||||
pair = kcl.SequencePair{seq, record.SubSequenceNumber}
|
pair = kcl.SequencePair{Sequence: seq, SubSequence: record.SubSequenceNumber}
|
||||||
if prevPair.IsNil() { // Handles on-start edge case where b.lastProcessSeq is empty
|
if prevPair.IsNil() { // Handles on-start edge case where b.lastProcessSeq is empty
|
||||||
prevPair = pair
|
prevPair = pair
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,7 +29,6 @@ func main() {
|
||||||
BatchCount: 500,
|
BatchCount: 500,
|
||||||
BatchSize: 4 * 1024 * 1024, // 4Mb
|
BatchSize: 4 * 1024 * 1024, // 4Mb
|
||||||
LogFile: "/tmp/example-kcl-consumer",
|
LogFile: "/tmp/example-kcl-consumer",
|
||||||
DeployEnv: "test-env",
|
|
||||||
}
|
}
|
||||||
|
|
||||||
output, file := createDummyOutput()
|
output, file := createDummyOutput()
|
||||||
|
|
|
||||||
|
|
@ -39,7 +39,7 @@ func (srp *sampleRecordProcessor) ProcessRecords(records []kcl.Record) error {
|
||||||
fmt.Fprintf(os.Stderr, "could not parse sequence number '%s'\n", record.SequenceNumber)
|
fmt.Fprintf(os.Stderr, "could not parse sequence number '%s'\n", record.SequenceNumber)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
pair := kcl.SequencePair{seqNumber, record.SubSequenceNumber}
|
pair := kcl.SequencePair{Sequence: seqNumber, SubSequence: record.SubSequenceNumber}
|
||||||
if srp.shouldUpdateSequence(pair) {
|
if srp.shouldUpdateSequence(pair) {
|
||||||
srp.largestPair = pair
|
srp.largestPair = pair
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -34,12 +34,12 @@ func IsGzipped(b []byte) bool {
|
||||||
|
|
||||||
// GetMessagesFromGzippedInput takes a gzipped record from a CWLogs Subscription and splits it into
|
// GetMessagesFromGzippedInput takes a gzipped record from a CWLogs Subscription and splits it into
|
||||||
// a slice of messages.
|
// a slice of messages.
|
||||||
func GetMessagesFromGzippedInput(input []byte, prodEnv bool) ([][]byte, error) {
|
func GetMessagesFromGzippedInput(input []byte) ([][]byte, error) {
|
||||||
unpacked, err := unpack(input)
|
unpacked, err := unpack(input)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return [][]byte{}, err
|
return [][]byte{}, err
|
||||||
}
|
}
|
||||||
return Split(unpacked, prodEnv), nil
|
return Split(unpacked), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unpack expects a gzipped + json-stringified LogEventBatch
|
// Unpack expects a gzipped + json-stringified LogEventBatch
|
||||||
|
|
@ -74,7 +74,7 @@ var taskRegex = regexp.MustCompile(taskMeta)
|
||||||
// Split takes a LogEventBatch and separates into a slice of enriched log lines
|
// Split takes a LogEventBatch and separates into a slice of enriched log lines
|
||||||
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
|
// Lines are enhanced by adding an Rsyslog prefix, which should be handled correctly by
|
||||||
// the subsequent decoding logic.
|
// the subsequent decoding logic.
|
||||||
func Split(b LogEventBatch, prodEnv bool) [][]byte {
|
func Split(b LogEventBatch) [][]byte {
|
||||||
env := "unknown"
|
env := "unknown"
|
||||||
app := "unknown"
|
app := "unknown"
|
||||||
task := "00001111-2222-3333-4444-555566667777"
|
task := "00001111-2222-3333-4444-555566667777"
|
||||||
|
|
@ -85,12 +85,6 @@ func Split(b LogEventBatch, prodEnv bool) [][]byte {
|
||||||
task = matches[0][3]
|
task = matches[0][3]
|
||||||
}
|
}
|
||||||
|
|
||||||
if (env == "production") != prodEnv {
|
|
||||||
// if there's a mis-match between the consumer's environment and the log's environment,
|
|
||||||
// throw away the log. (this is a workaround for coarse grained subscription filters.)
|
|
||||||
return [][]byte{}
|
|
||||||
}
|
|
||||||
|
|
||||||
rsyslogPrefix := `%s %s %s[%d]: %s`
|
rsyslogPrefix := `%s %s %s[%d]: %s`
|
||||||
// programName is a mocked ARN in the format expected by our log decoders
|
// programName is a mocked ARN in the format expected by our log decoders
|
||||||
programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task
|
programName := env + "--" + app + `/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F` + task
|
||||||
|
|
|
||||||
|
|
@ -118,65 +118,10 @@ func TestSplit(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
prodEnv := false
|
lines := Split(input)
|
||||||
lines := Split(input, prodEnv)
|
|
||||||
expected := [][]byte{
|
expected := [][]byte{
|
||||||
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line"),
|
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: some log line"),
|
||||||
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line"),
|
[]byte("2017-06-26T23:32:23.285001+00:00 aws-batch env--app/arn%3Aaws%3Aecs%3Aus-east-1%3A999988887777%3Atask%2F12345678-1234-1234-1234-555566667777[1]: another log line"),
|
||||||
}
|
}
|
||||||
assert.Equal(t, expected, lines)
|
assert.Equal(t, expected, lines)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestSplitFiltersByEnv(t *testing.T) {
|
|
||||||
t.Log("If Split is run with prodEnv == true, it should omit logs with env != production")
|
|
||||||
input := LogEventBatch{
|
|
||||||
MessageType: "DATA_MESSAGE",
|
|
||||||
Owner: "123456789012",
|
|
||||||
LogGroup: "/aws/batch/job",
|
|
||||||
LogStream: "env--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee",
|
|
||||||
// LogStream: "environment--app",
|
|
||||||
SubscriptionFilters: []string{"MySubscriptionFilter"},
|
|
||||||
LogEvents: []LogEvent{
|
|
||||||
{
|
|
||||||
ID: "99999992379011144044923130086453437181614530551221780480",
|
|
||||||
Timestamp: 1498519943285,
|
|
||||||
Message: "some log line",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ID: "99999992387663833181953011865369295871402094815542181889",
|
|
||||||
Timestamp: 1498519943285,
|
|
||||||
Message: "another log line",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
prodEnv := true
|
|
||||||
lines := Split(input, prodEnv)
|
|
||||||
expected := [][]byte{}
|
|
||||||
assert.Equal(t, expected, lines)
|
|
||||||
|
|
||||||
t.Log("If Split is run with prodEnv == false, it should omit logs with env == production")
|
|
||||||
input = LogEventBatch{
|
|
||||||
MessageType: "DATA_MESSAGE",
|
|
||||||
Owner: "123456789012",
|
|
||||||
LogGroup: "/aws/batch/job",
|
|
||||||
LogStream: "production--app/12345678-1234-1234-1234-555566667777/88889999-0000-aaaa-bbbb-ccccddddeeee",
|
|
||||||
// LogStream: "environment--app",
|
|
||||||
SubscriptionFilters: []string{"MySubscriptionFilter"},
|
|
||||||
LogEvents: []LogEvent{
|
|
||||||
{
|
|
||||||
ID: "99999992379011144044923130086453437181614530551221780480",
|
|
||||||
Timestamp: 1498519943285,
|
|
||||||
Message: "some log line",
|
|
||||||
},
|
|
||||||
{
|
|
||||||
ID: "99999992387663833181953011865369295871402094815542181889",
|
|
||||||
Timestamp: 1498519943285,
|
|
||||||
Message: "another log line",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
prodEnv = false
|
|
||||||
lines = Split(input, prodEnv)
|
|
||||||
expected = [][]byte{}
|
|
||||||
assert.Equal(t, expected, lines)
|
|
||||||
}
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue