Add shard iterator type as config option

Fixes: https://github.com/harlow/kinesis-consumer/issues/74
This commit is contained in:
Harlow Ward 2018-12-28 19:39:47 -08:00
parent 5688ff2820
commit 3527b603d3

View file

@ -47,6 +47,13 @@ func WithClient(client kinesisiface.KinesisAPI) Option {
} }
} }
// ShardIteratorType overrides the starting point for the consumer
func WithShardIteratorType(t string) Option {
return func(c *Consumer) {
c.initialShardIteratorType = t
}
}
// ScanStatus signals the consumer if we should continue scanning for next record // ScanStatus signals the consumer if we should continue scanning for next record
// and whether to checkpoint. // and whether to checkpoint.
type ScanStatus struct { type ScanStatus struct {
@ -64,9 +71,10 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
// new consumer with no-op checkpoint, counter, and logger // new consumer with no-op checkpoint, counter, and logger
c := &Consumer{ c := &Consumer{
streamName: streamName, streamName: streamName,
checkpoint: &noopCheckpoint{}, initialShardIteratorType: "TRIM_HORIZON",
counter: &noopCounter{}, checkpoint: &noopCheckpoint{},
counter: &noopCounter{},
logger: &noopLogger{ logger: &noopLogger{
logger: log.New(ioutil.Discard, "", log.LstdFlags), logger: log.New(ioutil.Discard, "", log.LstdFlags),
}, },
@ -91,11 +99,12 @@ func New(streamName string, opts ...Option) (*Consumer, error) {
// Consumer wraps the interaction with the Kinesis stream // Consumer wraps the interaction with the Kinesis stream
type Consumer struct { type Consumer struct {
streamName string streamName string
client kinesisiface.KinesisAPI initialShardIteratorType string
logger Logger client kinesisiface.KinesisAPI
checkpoint Checkpoint logger Logger
counter Counter checkpoint Checkpoint
counter Counter
} }
// Scan scans each of the shards of the stream, calls the callback // Scan scans each of the shards of the stream, calls the callback
@ -258,7 +267,7 @@ func (c *Consumer) getShardIterator(streamName, shardID, lastSeqNum string) (*st
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
params.StartingSequenceNumber = aws.String(lastSeqNum) params.StartingSequenceNumber = aws.String(lastSeqNum)
} else { } else {
params.ShardIteratorType = aws.String("TRIM_HORIZON") params.ShardIteratorType = aws.String(c.initialShardIteratorType)
} }
resp, err := c.client.GetShardIterator(params) resp, err := c.client.GetShardIterator(params)