From 9ccee87b62db3846a1fcc9b90149b11ce0dc23e4 Mon Sep 17 00:00:00 2001 From: Umur Gedik Date: Thu, 28 Jun 2018 12:05:09 +0900 Subject: [PATCH] Add an option to start consuming from latest (#59) * Add an option to client to start consuming from latest --- client.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/client.go b/client.go index f3eab7a..40e840e 100644 --- a/client.go +++ b/client.go @@ -20,6 +20,15 @@ func WithKinesis(svc kinesisiface.KinesisAPI) ClientOption { } } +// WithStartFrmLatest will make sure the client start consuming +// events starting from the most recent event in kinesis. This +// option discards the checkpoints. +func WithStartFromLatest() ClientOption { + return func(kc *KinesisClient) { + kc.fromLatest = true + } +} + // NewKinesisClient returns client to interface with Kinesis stream func NewKinesisClient(opts ...ClientOption) *KinesisClient { kc := &KinesisClient{} @@ -37,7 +46,8 @@ func NewKinesisClient(opts ...ClientOption) *KinesisClient { // KinesisClient acts as wrapper around Kinesis client type KinesisClient struct { - svc kinesisiface.KinesisAPI + svc kinesisiface.KinesisAPI + fromLatest bool } // GetShardIDs returns shard ids in a given stream @@ -127,7 +137,9 @@ func (c *KinesisClient) getShardIterator(streamName, shardID, lastSeqNum string) StreamName: aws.String(streamName), } - if lastSeqNum != "" { + if c.fromLatest { + params.ShardIteratorType = aws.String("LATEST") + } else if lastSeqNum != "" { params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") params.StartingSequenceNumber = aws.String(lastSeqNum) } else {