diff --git a/consumer.go b/consumer.go index 53b9cdf..d33a4ec 100644 --- a/consumer.go +++ b/consumer.go @@ -47,6 +47,13 @@ func WithClient(client kinesisiface.KinesisAPI) Option { } } +// ShardIteratorType overrides the starting point for the consumer +func WithShardIteratorType(t string) Option { + return func(c *Consumer) { + c.initialShardIteratorType = t + } +} + // ScanStatus signals the consumer if we should continue scanning for next record // and whether to checkpoint. type ScanStatus struct { @@ -64,9 +71,10 @@ func New(streamName string, opts ...Option) (*Consumer, error) { // new consumer with no-op checkpoint, counter, and logger c := &Consumer{ - streamName: streamName, - checkpoint: &noopCheckpoint{}, - counter: &noopCounter{}, + streamName: streamName, + initialShardIteratorType: "TRIM_HORIZON", + checkpoint: &noopCheckpoint{}, + counter: &noopCounter{}, logger: &noopLogger{ logger: log.New(ioutil.Discard, "", log.LstdFlags), }, @@ -91,11 +99,12 @@ func New(streamName string, opts ...Option) (*Consumer, error) { // Consumer wraps the interaction with the Kinesis stream type Consumer struct { - streamName string - client kinesisiface.KinesisAPI - logger Logger - checkpoint Checkpoint - counter Counter + streamName string + initialShardIteratorType string + client kinesisiface.KinesisAPI + logger Logger + checkpoint Checkpoint + counter Counter } // Scan scans each of the shards of the stream, calls the callback @@ -258,7 +267,7 @@ func (c *Consumer) getShardIterator(streamName, shardID, lastSeqNum string) (*st params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") params.StartingSequenceNumber = aws.String(lastSeqNum) } else { - params.ShardIteratorType = aws.String("TRIM_HORIZON") + params.ShardIteratorType = aws.String(c.initialShardIteratorType) } resp, err := c.client.GetShardIterator(params)