diff --git a/allgroup.go b/allgroup.go index a8330e9..f046f1d 100644 --- a/allgroup.go +++ b/allgroup.go @@ -9,6 +9,8 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" ) +// NewAllGroup returns an intitialized AllGroup for consuming +// all shards on a stream func NewAllGroup(ksis kinesisiface.KinesisAPI, ck Checkpoint, streamName string, logger Logger) *AllGroup { return &AllGroup{ ksis: ksis, @@ -31,7 +33,7 @@ type AllGroup struct { shards map[string]*kinesis.Shard } -// start is a blocking operation which will loop and attempt to find new +// Start is a blocking operation which will loop and attempt to find new // shards on a regular cadence. func (g *AllGroup) Start(ctx context.Context, shardc chan *kinesis.Shard) { var ticker = time.NewTicker(30 * time.Second) @@ -57,10 +59,12 @@ func (g *AllGroup) Start(ctx context.Context, shardc chan *kinesis.Shard) { } } +// GetCheckpoint returns the current checkpoint for provided stream func (g *AllGroup) GetCheckpoint(streamName, shardID string) (string, error) { return g.checkpoint.Get(streamName, shardID) } +// SetCheckpoint sets the current checkpoint for provided stream func (g *AllGroup) SetCheckpoint(streamName, shardID, sequenceNumber string) error { return g.checkpoint.Set(streamName, shardID, sequenceNumber) } diff --git a/consumer.go b/consumer.go index 1ea5c5c..d651020 100644 --- a/consumer.go +++ b/consumer.go @@ -71,13 +71,13 @@ type Consumer struct { // from the stream. The record argument contains the original record // returned from the AWS Kinesis library. // If an error is returned, scanning stops. The sole exception is when the -// function returns the special value SkipCheckpoint. +// function returns the special value ErrSkipCheckpoint. type ScanFunc func(*Record) error -// SkipCheckpoint is used as a return value from ScanFunc to indicate that +// ErrSkipCheckpoint is used as a return value from ScanFunc to indicate that // the current checkpoint should be skipped skipped. It is not returned // as an error by any function. -var SkipCheckpoint = errors.New("skip checkpoint") +var ErrSkipCheckpoint = errors.New("skip checkpoint") // Scan launches a goroutine to process each of the shards in the stream. The ScanFunc // is passed through to each of the goroutines and called with each message pulled from @@ -161,11 +161,11 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e return nil default: err := fn(r) - if err != nil && err != SkipCheckpoint { + if err != nil && err != ErrSkipCheckpoint { return err } - if err != SkipCheckpoint { + if err != ErrSkipCheckpoint { if err := c.group.SetCheckpoint(c.streamName, shardID, *r.SequenceNumber); err != nil { return err } diff --git a/go.mod b/go.mod index 7ed5fdc..e97e80f 100644 --- a/go.mod +++ b/go.mod @@ -3,11 +3,11 @@ module github.com/harlow/kinesis-consumer require ( github.com/apex/log v1.0.0 github.com/aws/aws-sdk-go v1.15.0 - github.com/go-ini/ini v1.38.1 + github.com/go-ini/ini v1.38.1 // indirect github.com/go-sql-driver/mysql v1.4.1 - github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 github.com/pkg/errors v0.8.0 + google.golang.org/appengine v1.6.1 // indirect gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 gopkg.in/redis.v5 v5.2.9 ) diff --git a/go.sum b/go.sum index e9300ce..7a1f283 100644 --- a/go.sum +++ b/go.sum @@ -7,12 +7,28 @@ github.com/go-ini/ini v1.38.1 h1:hbtfM8emWUVo9GnXSloXYyFbXxZ+tG6sbepSStoe1FY= github.com/go-ini/ini v1.38.1/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8 h1:12VvqtR6Aowv3l/EQUlocDHW2Cp4G9WJVH7uyH8QFJE= github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/lib/pq v0.0.0-20180523175426-90697d60dd84 h1:it29sI2IM490luSc3RAhp5WuCYnc6RtbfLVAB7nmC5M= github.com/lib/pq v0.0.0-20180523175426-90697d60dd84/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +google.golang.org/appengine v1.6.1 h1:QzqyMA1tlu6CgqCDUtU9V+ZKhLFT2dkJuANu5QaxI3I= +google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0 h1:FVCohIoYO7IJoDDVpV2pdq7SgrMH6wHnuTyrdrxJNoY= gopkg.in/DATA-DOG/go-sqlmock.v1 v1.3.0/go.mod h1:OdE7CF6DbADk7lN8LIKRzRJTTZXIjtWgA5THM5lhBAw= gopkg.in/redis.v5 v5.2.9 h1:MNZYOLPomQzZMfpN3ZtD1uyJ2IDonTTlxYiV/pEApiw= diff --git a/options.go b/options.go index 0876931..c17ba0b 100644 --- a/options.go +++ b/options.go @@ -33,7 +33,7 @@ func WithClient(client kinesisiface.KinesisAPI) Option { } } -// ShardIteratorType overrides the starting point for the consumer +// WithShardIteratorType overrides the starting point for the consumer func WithShardIteratorType(t string) Option { return func(c *Consumer) { c.initialShardIteratorType = t