diff --git a/consumer.go b/consumer.go index 507076b..05c59cc 100644 --- a/consumer.go +++ b/consumer.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "log" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -61,6 +62,7 @@ func New(streamName string, opts ...Option) (*Consumer, error) { type Consumer struct { streamName string initialShardIteratorType string + initialTimestamp *time.Time client kinesisiface.KinesisAPI counter Counter group Group @@ -214,6 +216,9 @@ func (c *Consumer) getShardIterator(streamName, shardID, seqNum string) (*string if seqNum != "" { params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAfterSequenceNumber) params.StartingSequenceNumber = aws.String(seqNum) + } else if c.initialTimestamp != nil { + params.ShardIteratorType = aws.String(kinesis.ShardIteratorTypeAtTimestamp) + params.Timestamp = c.initialTimestamp } else { params.ShardIteratorType = aws.String(c.initialShardIteratorType) } diff --git a/options.go b/options.go index dd77da0..be306da 100644 --- a/options.go +++ b/options.go @@ -1,6 +1,10 @@ package consumer -import "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" +import ( + "time" + + "github.com/aws/aws-sdk-go/service/kinesis/kinesisiface" +) // Option is used to override defaults when creating a new Consumer type Option func(*Consumer) @@ -46,3 +50,10 @@ func WithShardIteratorType(t string) Option { c.initialShardIteratorType = t } } + +// Timestamp overrides the starting point for the consumer +func WithTimestamp(t time.Time) Option { + return func(c *Consumer) { + c.initialTimestamp = &t + } +}