Add an option to start consuming from latest (#59)
* Add an option to client to start consuming from latest
This commit is contained in:
parent
b7be26418a
commit
9ccee87b62
1 changed files with 14 additions and 2 deletions
16
client.go
16
client.go
|
|
@ -20,6 +20,15 @@ func WithKinesis(svc kinesisiface.KinesisAPI) ClientOption {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithStartFrmLatest will make sure the client start consuming
|
||||||
|
// events starting from the most recent event in kinesis. This
|
||||||
|
// option discards the checkpoints.
|
||||||
|
func WithStartFromLatest() ClientOption {
|
||||||
|
return func(kc *KinesisClient) {
|
||||||
|
kc.fromLatest = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// NewKinesisClient returns client to interface with Kinesis stream
|
// NewKinesisClient returns client to interface with Kinesis stream
|
||||||
func NewKinesisClient(opts ...ClientOption) *KinesisClient {
|
func NewKinesisClient(opts ...ClientOption) *KinesisClient {
|
||||||
kc := &KinesisClient{}
|
kc := &KinesisClient{}
|
||||||
|
|
@ -37,7 +46,8 @@ func NewKinesisClient(opts ...ClientOption) *KinesisClient {
|
||||||
|
|
||||||
// KinesisClient acts as wrapper around Kinesis client
|
// KinesisClient acts as wrapper around Kinesis client
|
||||||
type KinesisClient struct {
|
type KinesisClient struct {
|
||||||
svc kinesisiface.KinesisAPI
|
svc kinesisiface.KinesisAPI
|
||||||
|
fromLatest bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetShardIDs returns shard ids in a given stream
|
// GetShardIDs returns shard ids in a given stream
|
||||||
|
|
@ -127,7 +137,9 @@ func (c *KinesisClient) getShardIterator(streamName, shardID, lastSeqNum string)
|
||||||
StreamName: aws.String(streamName),
|
StreamName: aws.String(streamName),
|
||||||
}
|
}
|
||||||
|
|
||||||
if lastSeqNum != "" {
|
if c.fromLatest {
|
||||||
|
params.ShardIteratorType = aws.String("LATEST")
|
||||||
|
} else if lastSeqNum != "" {
|
||||||
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
|
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
|
||||||
params.StartingSequenceNumber = aws.String(lastSeqNum)
|
params.StartingSequenceNumber = aws.String(lastSeqNum)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue