diff --git a/consumer.go b/consumer.go index 6e584b6..854b96f 100644 --- a/consumer.go +++ b/consumer.go @@ -25,6 +25,7 @@ func NewConsumer(config Config) *Consumer { } } +// Consumer wraps the interaction with the Kinesis stream type Consumer struct { svc *kinesis.Kinesis Config @@ -102,33 +103,10 @@ func (c *Consumer) getShardIterator(shardID string) *string { } if c.Checkpoint.CheckpointExists(shardID) { - params.ShardIteratorType = aws.String(string(ShardIteratorAfterSequenceNumber)) + params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER") params.StartingSequenceNumber = aws.String(c.Checkpoint.SequenceNumber()) } else { - params.ShardIteratorType = aws.String(string(c.ShardIteratorType)) - } - - resp, err := c.svc.GetShardIterator(params) - - if err != nil { - c.Logger.WithError(err).Error("GetShardIterator") - os.Exit(1) - } - - return resp.ShardIterator -} - -func (c *Consumer) getShardIterator(shardID string) *string { - params := &kinesis.GetShardIteratorInput{ - ShardId: aws.String(shardID), - StreamName: aws.String(c.StreamName), - } - - if c.Checkpoint.CheckpointExists(shardID) { - params.ShardIteratorType = aws.String(string(ShardIteratorAfterSequenceNumber)) - params.StartingSequenceNumber = aws.String(c.Checkpoint.SequenceNumber()) - } else { - params.ShardIteratorType = aws.String(string(c.ShardIteratorType)) + params.ShardIteratorType = aws.String("TRIM_HORIZON") } resp, err := c.svc.GetShardIterator(params)