Checkpoint after filtered messages in the pipeline
When records are filtered we still want to count them in the checkpoint. This will allow the checkpoint to pick up at the appropriate spot if any messages are filtered out.
This commit is contained in:
parent
b38102eec7
commit
e52fcb4f8c
2 changed files with 16 additions and 9 deletions
|
|
@ -18,6 +18,7 @@ type Pipeline struct {
|
||||||
Filter Filter
|
Filter Filter
|
||||||
StreamName string
|
StreamName string
|
||||||
Transformer Transformer
|
Transformer Transformer
|
||||||
|
CheckpointFilteredRecords bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessShard kicks off the process of a Kinesis Shard.
|
// ProcessShard kicks off the process of a Kinesis Shard.
|
||||||
|
|
@ -78,6 +79,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
|
|
||||||
if p.Filter.KeepRecord(r) {
|
if p.Filter.KeepRecord(r) {
|
||||||
p.Buffer.ProcessRecord(r, v.SequenceNumber)
|
p.Buffer.ProcessRecord(r, v.SequenceNumber)
|
||||||
|
} else if p.CheckpointFilteredRecords {
|
||||||
|
p.Buffer.ProcessRecord(nil, v.SequenceNumber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
|
||||||
|
|
@ -88,7 +91,9 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.Buffer.ShouldFlush() {
|
if p.Buffer.ShouldFlush() {
|
||||||
|
if p.Buffer.NumRecordsInBuffer() > 0 {
|
||||||
p.Emitter.Emit(p.Buffer, p.Transformer)
|
p.Emitter.Emit(p.Buffer, p.Transformer)
|
||||||
|
}
|
||||||
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
|
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
|
||||||
p.Buffer.Flush()
|
p.Buffer.Flush()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,7 +20,9 @@ func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string)
|
||||||
b.lastSequenceNumber = sequenceNumber
|
b.lastSequenceNumber = sequenceNumber
|
||||||
|
|
||||||
if !b.sequenceExists(sequenceNumber) {
|
if !b.sequenceExists(sequenceNumber) {
|
||||||
|
if record != nil {
|
||||||
b.recordsInBuffer = append(b.recordsInBuffer, record)
|
b.recordsInBuffer = append(b.recordsInBuffer, record)
|
||||||
|
}
|
||||||
b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber)
|
b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -32,7 +34,7 @@ func (b *RecordBuffer) Records() []interface{} {
|
||||||
|
|
||||||
// NumRecordsInBuffer returns the number of messages in the buffer.
|
// NumRecordsInBuffer returns the number of messages in the buffer.
|
||||||
func (b RecordBuffer) NumRecordsInBuffer() int {
|
func (b RecordBuffer) NumRecordsInBuffer() int {
|
||||||
return len(b.sequencesInBuffer)
|
return len(b.recordsInBuffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Flush empties the buffer and resets the sequence counter.
|
// Flush empties the buffer and resets the sequence counter.
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue