From e52fcb4f8c3fae7b287a9290aa42a33fab9c4685 Mon Sep 17 00:00:00 2001 From: dan Date: Tue, 7 Apr 2015 11:19:10 -0700 Subject: [PATCH] Checkpoint after filtered messages in the pipeline When records are filtered we still want to count them in the checkpoint. This will allow the checkpoint to pick up at the appropriate spot if any messages are filtered out. --- pipeline.go | 19 ++++++++++++------- record_buffer.go | 6 ++++-- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/pipeline.go b/pipeline.go index 0cd9476..dbaede0 100644 --- a/pipeline.go +++ b/pipeline.go @@ -12,12 +12,13 @@ import ( // interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model. // Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter. type Pipeline struct { - Buffer Buffer - Checkpoint Checkpoint - Emitter Emitter - Filter Filter - StreamName string - Transformer Transformer + Buffer Buffer + Checkpoint Checkpoint + Emitter Emitter + Filter Filter + StreamName string + Transformer Transformer + CheckpointFilteredRecords bool } // ProcessShard kicks off the process of a Kinesis Shard. @@ -78,6 +79,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if p.Filter.KeepRecord(r) { p.Buffer.ProcessRecord(r, v.SequenceNumber) + } else if p.CheckpointFilteredRecords { + p.Buffer.ProcessRecord(nil, v.SequenceNumber) } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { @@ -88,7 +91,9 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } if p.Buffer.ShouldFlush() { - p.Emitter.Emit(p.Buffer, p.Transformer) + if p.Buffer.NumRecordsInBuffer() > 0 { + p.Emitter.Emit(p.Buffer, p.Transformer) + } p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) p.Buffer.Flush() } diff --git a/record_buffer.go b/record_buffer.go index a2a7dfd..8f01f58 100644 --- a/record_buffer.go +++ b/record_buffer.go @@ -20,7 +20,9 @@ func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) b.lastSequenceNumber = sequenceNumber if !b.sequenceExists(sequenceNumber) { - b.recordsInBuffer = append(b.recordsInBuffer, record) + if record != nil { + b.recordsInBuffer = append(b.recordsInBuffer, record) + } b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber) } } @@ -32,7 +34,7 @@ func (b *RecordBuffer) Records() []interface{} { // NumRecordsInBuffer returns the number of messages in the buffer. func (b RecordBuffer) NumRecordsInBuffer() int { - return len(b.sequencesInBuffer) + return len(b.recordsInBuffer) } // Flush empties the buffer and resets the sequence counter.