Remove duplicated function for getShardIterator
This commit is contained in:
parent
c04f3d8a94
commit
e4efcdb56e
1 changed files with 3 additions and 25 deletions
28
consumer.go
28
consumer.go
|
|
@ -25,6 +25,7 @@ func NewConsumer(config Config) *Consumer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Consumer wraps the interaction with the Kinesis stream
|
||||||
type Consumer struct {
|
type Consumer struct {
|
||||||
svc *kinesis.Kinesis
|
svc *kinesis.Kinesis
|
||||||
Config
|
Config
|
||||||
|
|
@ -102,33 +103,10 @@ func (c *Consumer) getShardIterator(shardID string) *string {
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.Checkpoint.CheckpointExists(shardID) {
|
if c.Checkpoint.CheckpointExists(shardID) {
|
||||||
params.ShardIteratorType = aws.String(string(ShardIteratorAfterSequenceNumber))
|
params.ShardIteratorType = aws.String("AFTER_SEQUENCE_NUMBER")
|
||||||
params.StartingSequenceNumber = aws.String(c.Checkpoint.SequenceNumber())
|
params.StartingSequenceNumber = aws.String(c.Checkpoint.SequenceNumber())
|
||||||
} else {
|
} else {
|
||||||
params.ShardIteratorType = aws.String(string(c.ShardIteratorType))
|
params.ShardIteratorType = aws.String("TRIM_HORIZON")
|
||||||
}
|
|
||||||
|
|
||||||
resp, err := c.svc.GetShardIterator(params)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
c.Logger.WithError(err).Error("GetShardIterator")
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
return resp.ShardIterator
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Consumer) getShardIterator(shardID string) *string {
|
|
||||||
params := &kinesis.GetShardIteratorInput{
|
|
||||||
ShardId: aws.String(shardID),
|
|
||||||
StreamName: aws.String(c.StreamName),
|
|
||||||
}
|
|
||||||
|
|
||||||
if c.Checkpoint.CheckpointExists(shardID) {
|
|
||||||
params.ShardIteratorType = aws.String(string(ShardIteratorAfterSequenceNumber))
|
|
||||||
params.StartingSequenceNumber = aws.String(c.Checkpoint.SequenceNumber())
|
|
||||||
} else {
|
|
||||||
params.ShardIteratorType = aws.String(string(c.ShardIteratorType))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := c.svc.GetShardIterator(params)
|
resp, err := c.svc.GetShardIterator(params)
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue