checkpoint after filtered messages in the pipeline

checkpoint after filtered messages in the pipeline

checkpoint after filtered messages in the pipeline
This commit is contained in:
dan 2015-04-07 11:19:10 -07:00
parent 0dce2a6045
commit 2ff748a2d4
2 changed files with 16 additions and 9 deletions

View file

@ -22,6 +22,7 @@ type Pipeline struct {
Filter Filter
StreamName string
Transformer Transformer
CheckpointFilteredRecords bool
}
var pipelineRecoverableErrorCodes = map[string]bool{
@ -110,6 +111,8 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
if p.Filter.KeepRecord(r) {
p.Buffer.ProcessRecord(r, v.SequenceNumber)
} else if p.CheckpointFilteredRecords {
p.Buffer.ProcessRecord(nil, v.SequenceNumber)
}
}
} else if recordSet.NextShardIterator == "" || shardIterator == recordSet.NextShardIterator || err != nil {
@ -120,7 +123,9 @@ func (p Pipeline) ProcessShard(ksis *kinesis.Kinesis, shardID string) {
}
if p.Buffer.ShouldFlush() {
if p.Buffer.NumRecordsInBuffer() > 0 {
p.Emitter.Emit(p.Buffer, p.Transformer)
}
p.Checkpoint.SetCheckpoint(shardID, p.Buffer.LastSequenceNumber())
p.Buffer.Flush()
}

View file

@ -20,7 +20,9 @@ func (b *RecordBuffer) ProcessRecord(record interface{}, sequenceNumber string)
b.lastSequenceNumber = sequenceNumber
if !b.sequenceExists(sequenceNumber) {
if record != nil {
b.recordsInBuffer = append(b.recordsInBuffer, record)
}
b.sequencesInBuffer = append(b.sequencesInBuffer, sequenceNumber)
}
}
@ -32,7 +34,7 @@ func (b *RecordBuffer) Records() []interface{} {
// NumRecordsInBuffer returns the number of messages in the buffer.
func (b RecordBuffer) NumRecordsInBuffer() int {
return len(b.sequencesInBuffer)
return len(b.recordsInBuffer)
}
// Flush empties the buffer and resets the sequence counter.