diff --git a/pipeline.go b/pipeline.go index 0cd9476..dbaede0 100644 --- a/pipeline.go +++ b/pipeline.go @@ -12,12 +12,13 @@ import ( // interface. It has a data type (Model) as Records come in as a byte[] and are transformed to a Model. // Then they are buffered in Model form and when the buffer is full, Models's are passed to the emitter. type Pipeline struct { - Buffer Buffer - Checkpoint Checkpoint - Emitter Emitter - Filter Filter - StreamName string - Transformer Transformer + Buffer Buffer + Checkpoint Checkpoint + Emitter Emitter + Filter Filter + StreamName string + Transformer Transformer + CheckpointFilteredRecords bool } // ProcessShard kicks off the process of a Kinesis Shard. @@ -78,6 +79,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { if p.Filter.KeepRecord(r) { p.Buffer.ProcessRecord(r, v.SequenceNumber) + } else if p.CheckpointFilteredRecords { + p.Buffer.ProcessRecord(nil, v.SequenceNumber) } } } else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil { @@ -88,7 +91,9 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) { } if p.Buffer.ShouldFlush() { - p.Emitter.Emit(p.Buffer, p.Transformer) + if p.Buffer.NumRecordsInBuffer() > 0 { + p.Emitter.Emit(p.Buffer, p.Transformer) + } p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber()) p.Buffer.Flush() } diff --git a/record_buffer.go b/record_buffer.go index a2a7dfd..8f01f58 100644 --- a/record_buffer.go +++ b/record_buffer.go @@ -20,7 +20,9 @@ func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string) b.lastSequenceNumber = sequenceNumber if !b.sequenceExists(sequenceNumber) { - b.recordsInBuffer = append(b.recordsInBuffer, record) + if record != nil { + b.recordsInBuffer = append(b.recordsInBuffer, record) + } b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber) } } @@ -32,7 +34,7 @@ func (b *RecordBuffer) Records() []interface{} { // NumRecordsInBuffer returns the number of messages in the buffer. func (b RecordBuffer) NumRecordsInBuffer() int { - return len(b.sequencesInBuffer) + return len(b.recordsInBuffer) } // Flush empties the buffer and resets the sequence counter.