Allow cancelling of request
This commit is contained in:
parent
cdaa98adfd
commit
89b383161a
2 changed files with 9 additions and 4 deletions
|
|
@ -137,7 +137,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
}
|
}
|
||||||
|
|
||||||
// get shard iterator
|
// get shard iterator
|
||||||
shardIterator, err := c.getShardIterator(c.streamName, shardID, lastSeqNum)
|
shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get shard iterator error: %v", err)
|
return fmt.Errorf("get shard iterator error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -166,7 +166,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIterator, err = c.getShardIterator(c.streamName, shardID, lastSeqNum)
|
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get shard iterator error: %v", err)
|
return fmt.Errorf("get shard iterator error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
@ -215,7 +215,7 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
|
||||||
return nextShardIterator == nil || currentShardIterator == nextShardIterator
|
return nextShardIterator == nil || currentShardIterator == nextShardIterator
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Consumer) getShardIterator(streamName, shardID, seqNum string) (*string, error) {
|
func (c *Consumer) getShardIterator(ctx context.Context, streamName, shardID, seqNum string) (*string, error) {
|
||||||
params := &kinesis.GetShardIteratorInput{
|
params := &kinesis.GetShardIteratorInput{
|
||||||
ShardId: aws.String(shardID),
|
ShardId: aws.String(shardID),
|
||||||
StreamName: aws.String(streamName),
|
StreamName: aws.String(streamName),
|
||||||
|
|
@ -231,6 +231,6 @@ func (c *Consumer) getShardIterator(streamName, shardID, seqNum string) (*string
|
||||||
params.ShardIteratorType = aws.String(c.initialShardIteratorType)
|
params.ShardIteratorType = aws.String(c.initialShardIteratorType)
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := c.client.GetShardIterator(params)
|
res, err := c.client.GetShardIteratorWithContext(aws.Context(ctx), params)
|
||||||
return res.ShardIterator, err
|
return res.ShardIterator, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,6 +8,7 @@ import (
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/request"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis"
|
"github.com/aws/aws-sdk-go/service/kinesis"
|
||||||
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
|
||||||
)
|
)
|
||||||
|
|
@ -330,6 +331,10 @@ func (c *kinesisClientMock) GetShardIterator(in *kinesis.GetShardIteratorInput)
|
||||||
return c.getShardIteratorMock(in)
|
return c.getShardIteratorMock(in)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *kinesisClientMock) GetShardIteratorWithContext(ctx aws.Context, in *kinesis.GetShardIteratorInput, options ...request.Option) (*kinesis.GetShardIteratorOutput, error) {
|
||||||
|
return c.getShardIteratorMock(in)
|
||||||
|
}
|
||||||
|
|
||||||
// implementation of checkpoint
|
// implementation of checkpoint
|
||||||
type fakeCheckpoint struct {
|
type fakeCheckpoint struct {
|
||||||
cache map[string]string
|
cache map[string]string
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue