diff --git a/client.go b/client.go index f3eab7a..40e840e 100644 --- a/client.go +++ b/client.go @@ -20,6 +20,15 @@ func WithKinesis(svc kinesisiface.KinesisAPI) ClientOption { } } +// WithStartFrmLatest will make sure the client start consuming +// events starting from the most recent event in kinesis. This +// option discards the checkpoints. +func WithStartFromLatest() ClientOption { + return func(kc *KinesisClient) { + kc.fromLatest = true + } +} + // NewKinesisClient returns client to interface with Kinesis stream func NewKinesisClient(opts ...ClientOption) *KinesisClient { kc := &KinesisClient{} @@ -37,7 +46,8 @@ func NewKinesisClient(opts ...ClientOption) *KinesisClient { // KinesisClient acts as wrapper around Kinesis client type KinesisClient struct { - svc kinesisiface.KinesisAPI + svc kinesisiface.KinesisAPI + fromLatest bool } // GetShardIDs returns shard ids in a given stream @@ -127,7 +137,9 @@ func (c *KinesisClient) getShardIterator(streamName, shardID, lastSeqNum string) StreamName: aws.String(streamName), } - if lastSeqNum != "" { + if c.fromLatest { + params.ShardIteratorType = aws.String("LATEST") + } else if lastSeqNum != "" { params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") params.StartingSequenceNumber = aws.String(lastSeqNum) } else {