Scanerror signals the consumer if we should continue scanning for next record & whether to checkpoint. (#54)

* remove ValidateCheckpoint

* update for checkpoint can not customize retryer

* implement the scan error as in PR 44

* at least log if record processor has error

* mistakenly removed this line

* propage error up. ignore invalid state
This commit is contained in:
Prometheus 2018-06-08 08:40:42 -07:00 committed by Harlow Ward
parent b05f5b3ac6
commit e6a489c76b
4 changed files with 49 additions and 17 deletions

View file

@ -38,9 +38,13 @@ func main() {
} }
// start // start
err = c.Scan(context.TODO(), func(r *consumer.Record) bool { err = c.Scan(context.TODO(), func(r *consumer.Record) consumer.ScanError {
fmt.Println(string(r.Data)) fmt.Println(string(r.Data))
return true // continue scanning // continue scanning
return consumer.ScanError{
StopScan: false, // true to stop scan
SkipCheckpoint: false, // true to skip checkpoint
}
}) })
if err != nil { if err != nil {
log.Fatalf("scan error: %v", err) log.Fatalf("scan error: %v", err)
@ -53,7 +57,7 @@ func main() {
## Checkpoint ## Checkpoint
To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. To record the progress of the consumer in the stream we use a checkpoint to store the last sequence number the consumer has read from a particular shard. The boolean value SkipCheckpoint of consumer.ScanError determines if checkpoint will be activated. ScanError is returned by the record processing callback.
This will allow consumers to re-launch and pick up at the position in the stream where they left off. This will allow consumers to re-launch and pick up at the position in the stream where they left off.

View file

@ -10,6 +10,14 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis" "github.com/aws/aws-sdk-go/service/kinesis"
) )
// ScanError signals the consumer if we should continue scanning for next record
// and whether to checkpoint.
type ScanError struct {
Error error
StopScan bool
SkipCheckpoint bool
}
// Record is an alias of record returned from kinesis library // Record is an alias of record returned from kinesis library
type Record = kinesis.Record type Record = kinesis.Record
@ -111,7 +119,7 @@ type Consumer struct {
// Scan scans each of the shards of the stream, calls the callback // Scan scans each of the shards of the stream, calls the callback
// func with each of the kinesis records. // func with each of the kinesis records.
func (c *Consumer) Scan(ctx context.Context, fn func(*Record) bool) error { func (c *Consumer) Scan(ctx context.Context, fn func(*Record) ScanError) error {
shardIDs, err := c.client.GetShardIDs(c.streamName) shardIDs, err := c.client.GetShardIDs(c.streamName)
if err != nil { if err != nil {
return fmt.Errorf("get shards error: %v", err) return fmt.Errorf("get shards error: %v", err)
@ -156,33 +164,41 @@ func (c *Consumer) Scan(ctx context.Context, fn func(*Record) bool) error {
// ScanShard loops over records on a specific shard, calls the callback func // ScanShard loops over records on a specific shard, calls the callback func
// for each record and checkpoints the progress of scan. // for each record and checkpoints the progress of scan.
// Note: Returning `false` from the callback func will end the scan. // Note: Returning `false` from the callback func will end the scan.
func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*Record) bool) (err error) { func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn func(*Record) ScanError) (err error) {
lastSeqNum, err := c.checkpoint.Get(c.streamName, shardID) lastSeqNum, err := c.checkpoint.Get(c.streamName, shardID)
if err != nil { if err != nil {
return fmt.Errorf("get checkpoint error: %v", err) return fmt.Errorf("get checkpoint error: %v", err)
} }
c.logger.Println("scanning", shardID, lastSeqNum) c.logger.Println("scanning", shardID, lastSeqNum)
// get records // get records
recc, errc, err := c.client.GetRecords(ctx, c.streamName, shardID, lastSeqNum) recc, errc, err := c.client.GetRecords(ctx, c.streamName, shardID, lastSeqNum)
if err != nil { if err != nil {
return fmt.Errorf("get records error: %v", err) return fmt.Errorf("get records error: %v", err)
} }
// loop records // loop records
for r := range recc { for r := range recc {
if ok := fn(r); !ok { scanError := fn(r)
// It will be nicer if this can be reported with checkpoint error
err = scanError.Error
// Skip invalid state
if scanError.StopScan && scanError.SkipCheckpoint {
continue
}
if scanError.StopScan {
break break
} }
if !scanError.SkipCheckpoint {
c.counter.Add("records", 1) c.counter.Add("records", 1)
err := c.checkpoint.Set(c.streamName, shardID, *r.SequenceNumber) err := c.checkpoint.Set(c.streamName, shardID, *r.SequenceNumber)
if err != nil { if err != nil {
return fmt.Errorf("set checkpoint error: %v", err) return fmt.Errorf("set checkpoint error: %v", err)
} }
} }
}
c.logger.Println("exiting", shardID) c.logger.Println("exiting", shardID)
return <-errc return <-errc

View file

@ -45,9 +45,14 @@ func TestScanShard(t *testing.T) {
// callback fn simply appends the record data to result string // callback fn simply appends the record data to result string
var ( var (
resultData string resultData string
fn = func(r *Record) bool { fn = func(r *Record) ScanError {
resultData += string(r.Data) resultData += string(r.Data)
return true err := errors.New("some error happened")
return ScanError{
Error: err,
StopScan: false,
SkipCheckpoint: false,
}
} }
) )

View file

@ -2,6 +2,7 @@ package main
import ( import (
"context" "context"
"errors"
"expvar" "expvar"
"flag" "flag"
"fmt" "fmt"
@ -89,9 +90,15 @@ func main() {
}() }()
// scan stream // scan stream
err = c.Scan(ctx, func(r *consumer.Record) bool { err = c.Scan(ctx, func(r *consumer.Record) consumer.ScanError {
fmt.Println(string(r.Data)) fmt.Println(string(r.Data))
return true // continue scanning err := errors.New("some error happened")
// continue scanning
return consumer.ScanError{
Error: err,
StopScan: true,
SkipCheckpoint: false,
}
}) })
if err != nil { if err != nil {
log.Fatalf("scan error: %v", err) log.Fatalf("scan error: %v", err)