Wrap underlying errors via %w verb (#130)
As introduced in Go 1.13. This enables user of this library to check for an underlying wrapped error type via errors.Is and errors.As functions.
This commit is contained in:
parent
799ccf2d40
commit
27055f2ace
2 changed files with 6 additions and 6 deletions
10
consumer.go
10
consumer.go
|
|
@ -29,7 +29,7 @@ type Record struct {
|
||||||
// any of the optional attributes.
|
// any of the optional attributes.
|
||||||
func New(streamName string, opts ...Option) (*Consumer, error) {
|
func New(streamName string, opts ...Option) (*Consumer, error) {
|
||||||
if streamName == "" {
|
if streamName == "" {
|
||||||
return nil, fmt.Errorf("must provide stream name")
|
return nil, errors.New("must provide stream name")
|
||||||
}
|
}
|
||||||
|
|
||||||
// new consumer with noop storage, counter, and logger
|
// new consumer with noop storage, counter, and logger
|
||||||
|
|
@ -120,7 +120,7 @@ func (c *Consumer) Scan(ctx context.Context, fn ScanFunc) error {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
if err := c.ScanShard(ctx, shardID, fn); err != nil {
|
if err := c.ScanShard(ctx, shardID, fn); err != nil {
|
||||||
select {
|
select {
|
||||||
case errc <- fmt.Errorf("shard %s error: %v", shardID, err):
|
case errc <- fmt.Errorf("shard %s error: %w", shardID, err):
|
||||||
// first error to occur
|
// first error to occur
|
||||||
cancel()
|
cancel()
|
||||||
default:
|
default:
|
||||||
|
|
@ -144,13 +144,13 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
// get last seq number from checkpoint
|
// get last seq number from checkpoint
|
||||||
lastSeqNum, err := c.group.GetCheckpoint(c.streamName, shardID)
|
lastSeqNum, err := c.group.GetCheckpoint(c.streamName, shardID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get checkpoint error: %v", err)
|
return fmt.Errorf("get checkpoint error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// get shard iterator
|
// get shard iterator
|
||||||
shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
shardIterator, err := c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get shard iterator error: %v", err)
|
return fmt.Errorf("get shard iterator error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
c.logger.Log("[CONSUMER] start scan:", shardID, lastSeqNum)
|
c.logger.Log("[CONSUMER] start scan:", shardID, lastSeqNum)
|
||||||
|
|
@ -178,7 +178,7 @@ func (c *Consumer) ScanShard(ctx context.Context, shardID string, fn ScanFunc) e
|
||||||
|
|
||||||
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
shardIterator, err = c.getShardIterator(ctx, c.streamName, shardID, lastSeqNum)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get shard iterator error: %v", err)
|
return fmt.Errorf("get shard iterator error: %w", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// loop over records, call callback func
|
// loop over records, call callback func
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ func listShards(ksis kinesisiface.KinesisAPI, streamName string) ([]*kinesis.Sha
|
||||||
for {
|
for {
|
||||||
resp, err := ksis.ListShards(listShardsInput)
|
resp, err := ksis.ListShards(listShardsInput)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("ListShards error: %v", err)
|
return nil, fmt.Errorf("ListShards error: %w", err)
|
||||||
}
|
}
|
||||||
ss = append(ss, resp.Shards...)
|
ss = append(ss, resp.Shards...)
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue