fix: wrap kinesis get records error in ScanShard instead of printing error message
* wrap get records error using %w fmt code * add unit test for kinesis get records using errors.Is
This commit is contained in:
parent
baf8258298
commit
e062e02f98
2 changed files with 7 additions and 3 deletions
|
|
@ -192,7 +192,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
c.logger.Log("[CONSUMER] get records error:", err.Error())
|
c.logger.Log("[CONSUMER] get records error:", err.Error())
|
||||||
|
|
||||||
if !isRetriableError(err) {
|
if !isRetriableError(err) {
|
||||||
return fmt.Errorf("get records error: %v", err.Error())
|
return fmt.Errorf("get records error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
||||||
|
|
|
||||||
|
|
@ -396,6 +396,7 @@ func TestScanShard_ShardIsClosed_WithShardClosedHandler(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestScanShard_GetRecordsError(t *testing.T) {
|
func TestScanShard_GetRecordsError(t *testing.T) {
|
||||||
|
getRecordsError := &types.InvalidArgumentException{Message: aws.String("aws error message")}
|
||||||
var client = &kinesisClientMock{
|
var client = &kinesisClientMock{
|
||||||
getShardIteratorMock: func(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) {
|
getShardIteratorMock: func(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) {
|
||||||
return &kinesis.GetShardIteratorOutput{
|
return &kinesis.GetShardIteratorOutput{
|
||||||
|
|
@ -406,8 +407,7 @@ func TestScanShard_GetRecordsError(t *testing.T) {
|
||||||
return &kinesis.GetRecordsOutput{
|
return &kinesis.GetRecordsOutput{
|
||||||
NextShardIterator: nil,
|
NextShardIterator: nil,
|
||||||
Records: nil,
|
Records: nil,
|
||||||
},
|
}, getRecordsError
|
||||||
&types.InvalidArgumentException{Message: aws.String("aws error message")}
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -424,6 +424,10 @@ func TestScanShard_GetRecordsError(t *testing.T) {
|
||||||
if err.Error() != "get records error: InvalidArgumentException: aws error message" {
|
if err.Error() != "get records error: InvalidArgumentException: aws error message" {
|
||||||
t.Fatalf("unexpected error: %v", err)
|
t.Fatalf("unexpected error: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !errors.Is(err, getRecordsError) {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type kinesisClientMock struct {
|
type kinesisClientMock struct {
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue