diff --git a/clientlibrary/streams/client.go b/clientlibrary/streams/client.go new file mode 100644 index 0000000..f72e3d4 --- /dev/null +++ b/clientlibrary/streams/client.go @@ -0,0 +1,38 @@ +package streams + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/service/kinesis" +) + +type Client interface { + RegisterStreamConsumer(ctx context.Context, params *kinesis.RegisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.RegisterStreamConsumerOutput, error) + PutRecords(ctx context.Context, params *kinesis.PutRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error) + DisableEnhancedMonitoring(ctx context.Context, params *kinesis.DisableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.DisableEnhancedMonitoringOutput, error) + DescribeLimits(ctx context.Context, params *kinesis.DescribeLimitsInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeLimitsOutput, error) + ListStreams(ctx context.Context, params *kinesis.ListStreamsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamsOutput, error) + GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) + AddTagsToStream(ctx context.Context, params *kinesis.AddTagsToStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.AddTagsToStreamOutput, error) + DescribeStreamConsumer(ctx context.Context, params *kinesis.DescribeStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamConsumerOutput, error) + UpdateStreamMode(ctx context.Context, params *kinesis.UpdateStreamModeInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateStreamModeOutput, error) + StopStreamEncryption(ctx context.Context, params *kinesis.StopStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StopStreamEncryptionOutput, error) + StartStreamEncryption(ctx context.Context, params *kinesis.StartStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StartStreamEncryptionOutput, error) + SplitShard(ctx context.Context, params *kinesis.SplitShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SplitShardOutput, error) + RemoveTagsFromStream(ctx context.Context, params *kinesis.RemoveTagsFromStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.RemoveTagsFromStreamOutput, error) + ListStreamConsumers(ctx context.Context, params *kinesis.ListStreamConsumersInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamConsumersOutput, error) + DeleteStream(ctx context.Context, params *kinesis.DeleteStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DeleteStreamOutput, error) + EnableEnhancedMonitoring(ctx context.Context, params *kinesis.EnableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.EnableEnhancedMonitoringOutput, error) + IncreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.IncreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.IncreaseStreamRetentionPeriodOutput, error) + DescribeStreamSummary(ctx context.Context, params *kinesis.DescribeStreamSummaryInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamSummaryOutput, error) + SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) + PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error) + CreateStream(ctx context.Context, params *kinesis.CreateStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.CreateStreamOutput, error) + DeregisterStreamConsumer(ctx context.Context, params *kinesis.DeregisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DeregisterStreamConsumerOutput, error) + GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) + MergeShards(ctx context.Context, params *kinesis.MergeShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.MergeShardsOutput, error) + DecreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.DecreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.DecreaseStreamRetentionPeriodOutput, error) + ListTagsForStream(ctx context.Context, params *kinesis.ListTagsForStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.ListTagsForStreamOutput, error) + ListShards(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) + UpdateShardCount(ctx context.Context, params *kinesis.UpdateShardCountInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateShardCountOutput, error) + DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) +} diff --git a/clientlibrary/streams/dynamodb_adapter_client.go b/clientlibrary/streams/dynamodb_adapter_client.go new file mode 100644 index 0000000..5b58b2a --- /dev/null +++ b/clientlibrary/streams/dynamodb_adapter_client.go @@ -0,0 +1,356 @@ +package streams + +import ( + "bytes" + "context" + "encoding/json" + "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams" + "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams/types" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + ktypes "github.com/aws/aws-sdk-go-v2/service/kinesis/types" + log "github.com/sirupsen/logrus" +) + +type DynamodbStreamAdapterClient struct { + internalClient *dynamodbstreams.Client + tableName *string +} + +func (d DynamodbStreamAdapterClient) ListStreams(ctx context.Context, params *kinesis.ListStreamsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamsOutput, error) { + log.Info("ListStreams >>> request ", params) + listStreamsOutput, err := d.internalClient.ListStreams(ctx, d.convertListStreamsInput(params)) + log.Info("ListStreams >>> response ", params) + if err != nil { + return nil, err + } + return d.convertListStreamsOutput(listStreamsOutput), nil +} + +func (d DynamodbStreamAdapterClient) GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) { + log.Info("GetShardIterator >>> request ", params) + shardIteratorOutput, err := d.internalClient.GetShardIterator(ctx, d.convertShardIteratorInput(params)) + log.Info("GetShardIterator >>> response ", params) + if err != nil { + return nil, err + } + return d.convertShardIteratorOutput(shardIteratorOutput), nil +} + +func (d DynamodbStreamAdapterClient) GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) { + log.Info("GetRecords >>> request ", params) + getRecordsInput, err := d.internalClient.GetRecords(ctx, d.convertGetRecordsInput(params)) + log.Info("GetRecords >>> output ", params) + if err != nil { + return nil, err + } + return d.convertGetRecordsOutput(getRecordsInput), nil +} + +func (d DynamodbStreamAdapterClient) ListShards(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) { + + log.Info("ListShards >>> request ", params) + var maxResults int32 = 100 + if *params.MaxResults >= 100 { + params.MaxResults = &maxResults + } + dynamoOutput, err := d.internalClient.DescribeStream(ctx, &dynamodbstreams.DescribeStreamInput{ + ExclusiveStartShardId: params.ExclusiveStartShardId, + Limit: params.MaxResults, + StreamArn: params.StreamName, + }) + log.Info("ListShards >>> output ", dynamoOutput) + if err != nil { + return nil, err + } + return d.convertListShardsOutput(dynamoOutput), nil +} + +func (d DynamodbStreamAdapterClient) DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { + dynamoOutput, err := d.internalClient.DescribeStream(ctx, d.convertDescribeStreamInput(params)) + if err != nil { + return nil, err + } + return d.convertDescribeStreamOutput(dynamoOutput), nil +} + +func (d DynamodbStreamAdapterClient) RegisterStreamConsumer(ctx context.Context, params *kinesis.RegisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.RegisterStreamConsumerOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) PutRecords(ctx context.Context, params *kinesis.PutRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) DisableEnhancedMonitoring(ctx context.Context, params *kinesis.DisableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.DisableEnhancedMonitoringOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) DescribeLimits(ctx context.Context, params *kinesis.DescribeLimitsInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeLimitsOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) AddTagsToStream(ctx context.Context, params *kinesis.AddTagsToStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.AddTagsToStreamOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) DescribeStreamConsumer(ctx context.Context, params *kinesis.DescribeStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamConsumerOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) UpdateStreamMode(ctx context.Context, params *kinesis.UpdateStreamModeInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateStreamModeOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) StopStreamEncryption(ctx context.Context, params *kinesis.StopStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StopStreamEncryptionOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) StartStreamEncryption(ctx context.Context, params *kinesis.StartStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StartStreamEncryptionOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) SplitShard(ctx context.Context, params *kinesis.SplitShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SplitShardOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) RemoveTagsFromStream(ctx context.Context, params *kinesis.RemoveTagsFromStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.RemoveTagsFromStreamOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) ListStreamConsumers(ctx context.Context, params *kinesis.ListStreamConsumersInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamConsumersOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) DeleteStream(ctx context.Context, params *kinesis.DeleteStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DeleteStreamOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) EnableEnhancedMonitoring(ctx context.Context, params *kinesis.EnableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.EnableEnhancedMonitoringOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) IncreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.IncreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.IncreaseStreamRetentionPeriodOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) DescribeStreamSummary(ctx context.Context, params *kinesis.DescribeStreamSummaryInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamSummaryOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) CreateStream(ctx context.Context, params *kinesis.CreateStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.CreateStreamOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) DeregisterStreamConsumer(ctx context.Context, params *kinesis.DeregisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DeregisterStreamConsumerOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) MergeShards(ctx context.Context, params *kinesis.MergeShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.MergeShardsOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) DecreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.DecreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.DecreaseStreamRetentionPeriodOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) ListTagsForStream(ctx context.Context, params *kinesis.ListTagsForStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.ListTagsForStreamOutput, error) { + //TODO implement me + panic("implement me") +} + +func (d DynamodbStreamAdapterClient) UpdateShardCount(ctx context.Context, params *kinesis.UpdateShardCountInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateShardCountOutput, error) { + //TODO implement me + panic("implement me") +} + +func NewDynamodbStreamClient(client *dynamodbstreams.Client) Client { + return &DynamodbStreamAdapterClient{ + internalClient: client, + } +} + +func (d DynamodbStreamAdapterClient) convertListStreamsInput(kinesisInput *kinesis.ListStreamsInput) *dynamodbstreams.ListStreamsInput { + dynamoInput := dynamodbstreams.ListStreamsInput{} + + // Set ExclusiveStartStreamArn field using ExclusiveStartStreamName field from kinesisInput + dynamoInput.ExclusiveStartStreamArn = nil + if kinesisInput.ExclusiveStartStreamName != nil { + dynamoInput.ExclusiveStartStreamArn = kinesisInput.ExclusiveStartStreamName + } + + // Set Limit field + if kinesisInput.Limit != nil { + dynamoInput.Limit = kinesisInput.Limit + } + + // Set TableName field to nil since it does not exist in kinesis.ListStreamsInput + dynamoInput.TableName = d.tableName + + return &dynamoInput +} + +func (d DynamodbStreamAdapterClient) convertListStreamsOutput(dOutput *dynamodbstreams.ListStreamsOutput) *kinesis.ListStreamsOutput { + kinesisOutput := kinesis.ListStreamsOutput{} + hasMoreStreams := dOutput.LastEvaluatedStreamArn != nil + // Set HasMoreStreams field + kinesisOutput.HasMoreStreams = &hasMoreStreams + + // Set StreamNames field using StreamDescriptionList field from output + kinesisOutput.StreamNames = nil + if dOutput.Streams != nil { + kinesisOutput.StreamNames = make([]string, len(dOutput.Streams)) + for i, stream := range dOutput.Streams { + kinesisOutput.StreamNames[i] = *stream.StreamArn + } + } + kinesisOutput.ResultMetadata = dOutput.ResultMetadata + return &kinesisOutput +} + +func (d DynamodbStreamAdapterClient) convertDescribeStreamInput(params *kinesis.DescribeStreamInput) *dynamodbstreams.DescribeStreamInput { + dynamoStreamInput := dynamodbstreams.DescribeStreamInput{} + if params != nil { + dynamoStreamInput.StreamArn = params.StreamName + dynamoStreamInput.Limit = params.Limit + dynamoStreamInput.ExclusiveStartShardId = params.ExclusiveStartShardId + } + return &dynamoStreamInput +} + +func (d DynamodbStreamAdapterClient) convertShardIteratorOutput(output *dynamodbstreams.GetShardIteratorOutput) *kinesis.GetShardIteratorOutput { + kinesisOutput := kinesis.GetShardIteratorOutput{} + if output != nil { + kinesisOutput.ShardIterator = output.ShardIterator + kinesisOutput.ResultMetadata = output.ResultMetadata + } + return &kinesisOutput +} + +func (d DynamodbStreamAdapterClient) convertShardIteratorInput(kinesisInput *kinesis.GetShardIteratorInput) *dynamodbstreams.GetShardIteratorInput { + if kinesisInput.ShardIteratorType == ktypes.ShardIteratorTypeAtTimestamp { + kinesisInput.ShardIteratorType = ktypes.ShardIteratorTypeLatest + } + dynamodbInput := &dynamodbstreams.GetShardIteratorInput{ + ShardId: kinesisInput.ShardId, + ShardIteratorType: types.ShardIteratorType(kinesisInput.ShardIteratorType), + StreamArn: kinesisInput.StreamName, + SequenceNumber: kinesisInput.StartingSequenceNumber, + } + return dynamodbInput +} + +func (d DynamodbStreamAdapterClient) convertGetRecordsInput(params *kinesis.GetRecordsInput) *dynamodbstreams.GetRecordsInput { + var dynamoMaxLimit int32 = 1000 + if *params.Limit >= 10000 { + params.Limit = &dynamoMaxLimit + } + dynamoInput := &dynamodbstreams.GetRecordsInput{ + Limit: params.Limit, + ShardIterator: params.ShardIterator, + } + return dynamoInput +} + +func (d DynamodbStreamAdapterClient) convertGetRecordsOutput(dynamoOutput *dynamodbstreams.GetRecordsOutput) *kinesis.GetRecordsOutput { + kinesisOutput := kinesis.GetRecordsOutput{} + if dynamoOutput != nil && len(dynamoOutput.Records) != 0 { + kinesisOutput.Records = make([]ktypes.Record, len(dynamoOutput.Records)) + kinesisOutput.ResultMetadata = dynamoOutput.ResultMetadata + kinesisOutput.NextShardIterator = dynamoOutput.NextShardIterator + } + for _, record := range dynamoOutput.Records { + reqBodyBytes := new(bytes.Buffer) + json.NewEncoder(reqBodyBytes).Encode(record) + kinesisOutput.Records = append(kinesisOutput.Records, ktypes.Record{ + Data: reqBodyBytes.Bytes(), + SequenceNumber: record.Dynamodb.SequenceNumber, + ApproximateArrivalTimestamp: record.Dynamodb.ApproximateCreationDateTime, + EncryptionType: ktypes.EncryptionTypeNone, + }) + } + return &kinesisOutput +} + +func (d DynamodbStreamAdapterClient) convertListShardsOutput(output *dynamodbstreams.DescribeStreamOutput) *kinesis.ListShardsOutput { + kinesisOutput := kinesis.ListShardsOutput{} + if output != nil && output.StreamDescription.StreamStatus == types.StreamStatusDisabled { + return &kinesisOutput + } + if output != nil { + if output.StreamDescription != nil && len(output.StreamDescription.Shards) != 0 { + kinesisOutput.Shards = make([]ktypes.Shard, len(output.StreamDescription.Shards)) + } + for _, shard := range output.StreamDescription.Shards { + kinesisOutput.Shards = append(kinesisOutput.Shards, ktypes.Shard{ + SequenceNumberRange: &ktypes.SequenceNumberRange{ + StartingSequenceNumber: shard.SequenceNumberRange.StartingSequenceNumber, + EndingSequenceNumber: shard.SequenceNumberRange.EndingSequenceNumber, + }, + ShardId: shard.ShardId, + ParentShardId: shard.ParentShardId, + }) + } + kinesisOutput.ResultMetadata = output.ResultMetadata + } + return &kinesisOutput +} + +func (d DynamodbStreamAdapterClient) convertDescribeStreamOutput(output *dynamodbstreams.DescribeStreamOutput) *kinesis.DescribeStreamOutput { + kinesisOutput := kinesis.DescribeStreamOutput{} + if output != nil && output.StreamDescription.StreamStatus == types.StreamStatusDisabled { + return &kinesisOutput + } + if output != nil { + if output.StreamDescription != nil && len(output.StreamDescription.Shards) != 0 { + kinesisOutput.StreamDescription = &ktypes.StreamDescription{ + StreamARN: output.StreamDescription.StreamArn, + StreamName: output.StreamDescription.StreamArn, + StreamStatus: ktypes.StreamStatus(output.StreamDescription.StreamStatus), + StreamCreationTimestamp: output.StreamDescription.CreationRequestDateTime, + Shards: make([]ktypes.Shard, len(output.StreamDescription.Shards)), + } + } + for _, shard := range output.StreamDescription.Shards { + kinesisOutput.StreamDescription.Shards = append(kinesisOutput.StreamDescription.Shards, ktypes.Shard{ + SequenceNumberRange: &ktypes.SequenceNumberRange{ + StartingSequenceNumber: shard.SequenceNumberRange.StartingSequenceNumber, + EndingSequenceNumber: shard.SequenceNumberRange.EndingSequenceNumber, + }, + ShardId: shard.ShardId, + ParentShardId: shard.ParentShardId, + }) + } + kinesisOutput.ResultMetadata = output.ResultMetadata + } + return &kinesisOutput +} diff --git a/clientlibrary/streams/kinesis.go b/clientlibrary/streams/kinesis.go new file mode 100644 index 0000000..cd2c891 --- /dev/null +++ b/clientlibrary/streams/kinesis.go @@ -0,0 +1,130 @@ +package streams + +import ( + "context" + "github.com/aws/aws-sdk-go-v2/service/kinesis" +) + +type KinesisClient struct { + internalClient *kinesis.Client +} + +func (k KinesisClient) RegisterStreamConsumer(ctx context.Context, params *kinesis.RegisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.RegisterStreamConsumerOutput, error) { + return k.internalClient.RegisterStreamConsumer(ctx, params, optFns...) +} + +func (k KinesisClient) PutRecords(ctx context.Context, params *kinesis.PutRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordsOutput, error) { + return k.internalClient.PutRecords(ctx, params, optFns...) +} + +func (k KinesisClient) DisableEnhancedMonitoring(ctx context.Context, params *kinesis.DisableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.DisableEnhancedMonitoringOutput, error) { + return k.internalClient.DisableEnhancedMonitoring(ctx, params, optFns...) +} + +func (k KinesisClient) DescribeLimits(ctx context.Context, params *kinesis.DescribeLimitsInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeLimitsOutput, error) { + return k.internalClient.DescribeLimits(ctx, params, optFns...) +} + +func (k KinesisClient) ListStreams(ctx context.Context, params *kinesis.ListStreamsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamsOutput, error) { + return k.internalClient.ListStreams(ctx, params, optFns...) +} + +func (k KinesisClient) GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) { + return k.internalClient.GetShardIterator(ctx, params, optFns...) +} + +func (k KinesisClient) AddTagsToStream(ctx context.Context, params *kinesis.AddTagsToStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.AddTagsToStreamOutput, error) { + return k.AddTagsToStream(ctx, params, optFns...) +} + +func (k KinesisClient) DescribeStreamConsumer(ctx context.Context, params *kinesis.DescribeStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamConsumerOutput, error) { + return k.DescribeStreamConsumer(ctx, params, optFns...) +} + +func (k KinesisClient) UpdateStreamMode(ctx context.Context, params *kinesis.UpdateStreamModeInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateStreamModeOutput, error) { + return k.internalClient.UpdateStreamMode(ctx, params, optFns...) +} + +func (k KinesisClient) StopStreamEncryption(ctx context.Context, params *kinesis.StopStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StopStreamEncryptionOutput, error) { + return k.internalClient.StopStreamEncryption(ctx, params, optFns...) +} + +func (k KinesisClient) StartStreamEncryption(ctx context.Context, params *kinesis.StartStreamEncryptionInput, optFns ...func(*kinesis.Options)) (*kinesis.StartStreamEncryptionOutput, error) { + return k.internalClient.StartStreamEncryption(ctx, params, optFns...) +} + +func (k KinesisClient) SplitShard(ctx context.Context, params *kinesis.SplitShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SplitShardOutput, error) { + return k.internalClient.SplitShard(ctx, params, optFns...) +} + +func (k KinesisClient) RemoveTagsFromStream(ctx context.Context, params *kinesis.RemoveTagsFromStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.RemoveTagsFromStreamOutput, error) { + return k.RemoveTagsFromStream(ctx, params, optFns...) +} + +func (k KinesisClient) ListStreamConsumers(ctx context.Context, params *kinesis.ListStreamConsumersInput, optFns ...func(*kinesis.Options)) (*kinesis.ListStreamConsumersOutput, error) { + return k.internalClient.ListStreamConsumers(ctx, params, optFns...) +} + +func (k KinesisClient) DeleteStream(ctx context.Context, params *kinesis.DeleteStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DeleteStreamOutput, error) { + return k.internalClient.DeleteStream(ctx, params, optFns...) +} + +func (k KinesisClient) EnableEnhancedMonitoring(ctx context.Context, params *kinesis.EnableEnhancedMonitoringInput, optFns ...func(*kinesis.Options)) (*kinesis.EnableEnhancedMonitoringOutput, error) { + return k.internalClient.EnableEnhancedMonitoring(ctx, params, optFns...) +} + +func (k KinesisClient) IncreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.IncreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.IncreaseStreamRetentionPeriodOutput, error) { + return k.internalClient.IncreaseStreamRetentionPeriod(ctx, params, optFns...) +} + +func (k KinesisClient) DescribeStreamSummary(ctx context.Context, params *kinesis.DescribeStreamSummaryInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamSummaryOutput, error) { + return k.internalClient.DescribeStreamSummary(ctx, params, optFns...) +} + +func (k KinesisClient) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) { + return k.internalClient.SubscribeToShard(ctx, params, optFns...) +} + +func (k KinesisClient) PutRecord(ctx context.Context, params *kinesis.PutRecordInput, optFns ...func(*kinesis.Options)) (*kinesis.PutRecordOutput, error) { + return k.internalClient.PutRecord(ctx, params, optFns...) +} + +func (k KinesisClient) CreateStream(ctx context.Context, params *kinesis.CreateStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.CreateStreamOutput, error) { + return k.internalClient.CreateStream(ctx, params, optFns...) +} + +func (k KinesisClient) DeregisterStreamConsumer(ctx context.Context, params *kinesis.DeregisterStreamConsumerInput, optFns ...func(*kinesis.Options)) (*kinesis.DeregisterStreamConsumerOutput, error) { + return k.internalClient.DeregisterStreamConsumer(ctx, params, optFns...) +} + +func (k KinesisClient) GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) { + return k.internalClient.GetRecords(ctx, params, optFns...) +} + +func (k KinesisClient) MergeShards(ctx context.Context, params *kinesis.MergeShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.MergeShardsOutput, error) { + return k.internalClient.MergeShards(ctx, params, optFns...) +} + +func (k KinesisClient) DecreaseStreamRetentionPeriod(ctx context.Context, params *kinesis.DecreaseStreamRetentionPeriodInput, optFns ...func(*kinesis.Options)) (*kinesis.DecreaseStreamRetentionPeriodOutput, error) { + return k.internalClient.DecreaseStreamRetentionPeriod(ctx, params, optFns...) +} + +func (k KinesisClient) ListTagsForStream(ctx context.Context, params *kinesis.ListTagsForStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.ListTagsForStreamOutput, error) { + return k.internalClient.ListTagsForStream(ctx, params, optFns...) +} + +func (k KinesisClient) ListShards(ctx context.Context, params *kinesis.ListShardsInput, optFns ...func(*kinesis.Options)) (*kinesis.ListShardsOutput, error) { + return k.internalClient.ListShards(ctx, params, optFns...) +} + +func (k KinesisClient) UpdateShardCount(ctx context.Context, params *kinesis.UpdateShardCountInput, optFns ...func(*kinesis.Options)) (*kinesis.UpdateShardCountOutput, error) { + return k.internalClient.UpdateShardCount(ctx, params, optFns...) +} + +func (k KinesisClient) DescribeStream(ctx context.Context, params *kinesis.DescribeStreamInput, optFns ...func(*kinesis.Options)) (*kinesis.DescribeStreamOutput, error) { + return k.internalClient.DescribeStream(ctx, params, optFns...) +} + +func NewKinesisClient() Client { + return &KinesisClient{} +} diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 6843f01..4ece066 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -33,6 +33,8 @@ import ( "context" "crypto/rand" "errors" + "github.com/aws/aws-sdk-go-v2/service/dynamodbstreams" + "github.com/vmware/vmware-go-kcl-v2/clientlibrary/streams" "math/big" "sync" "time" @@ -60,7 +62,7 @@ type Worker struct { processorFactory kcl.IRecordProcessorFactory kclConfig *config.KinesisClientLibConfiguration - kc *kinesis.Client + kc streams.Client checkpointer chk.Checkpointer mService metrics.MonitoringService @@ -185,7 +187,7 @@ func (w *Worker) initialize() error { // no need to move forward log.Fatalf("Failed in loading Kinesis default config for creating Worker: %+v", err) } - w.kc = kinesis.NewFromConfig(cfg) + w.kc = streams.NewDynamodbStreamClient(dynamodbstreams.NewFromConfig(cfg)) } else { log.Infof("Use custom Kinesis service.") } diff --git a/go.mod b/go.mod index 271895b..516863b 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,16 @@ -module github.com/vmware/vmware-go-kcl-v2 +module github.com/ravikiran-zoman/vmware-go-kcl-v2 + +replace github.com/vmware/vmware-go-kcl-v2 v0.0.0-20230407010916-b12921da2398 => github.com/ravikiran-zoman/vmware-go-kcl-v2 v0.0.1 go 1.17 require ( - github.com/aws/aws-sdk-go-v2 v1.11.2 + github.com/aws/aws-sdk-go-v2 v1.18.0 github.com/aws/aws-sdk-go-v2/config v1.11.1 github.com/aws/aws-sdk-go-v2/credentials v1.6.5 github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 + github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.14.10 github.com/aws/aws-sdk-go-v2/service/kinesis v1.11.0 github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 github.com/golang/protobuf v1.5.2 @@ -17,23 +20,23 @@ require ( github.com/rs/zerolog v1.26.1 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.8.1 + github.com/vmware/vmware-go-kcl-v2 v0.0.0-20230407010916-b12921da2398 go.uber.org/zap v1.20.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) require ( - github.com/BurntSushi/toml v0.4.1 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.7.0 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 // indirect - github.com/aws/smithy-go v1.9.0 // indirect + github.com/aws/smithy-go v1.13.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect diff --git a/go.sum b/go.sum index 7ac1638..6137d73 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,9 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= -github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs= github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= +github.com/aws/aws-sdk-go-v2 v1.18.0 h1:882kkTpSFhdgYRKVZ/VCgf7sd0ru57p2JCxz4/oN5RY= +github.com/aws/aws-sdk-go-v2 v1.18.0/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM= github.com/aws/aws-sdk-go-v2/config v1.11.1 h1:KXSjb7ZMLRtjxClFptukTYibiOqJS9NwBO+9WD3UMto= @@ -51,16 +52,20 @@ github.com/aws/aws-sdk-go-v2/credentials v1.6.5 h1:ZrsO2js2v4T95rsCIWoAb/ck5+U1k github.com/aws/aws-sdk-go-v2/credentials v1.6.5/go.mod h1:HWSOnsnqVMbLcWUmom6AN1cqhcLzLJ62AObW28CbYbU= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2 h1:KiN5TPOLrEjbGCvdTQR4t0U4T87vVwALZ5Bg3jpMqPY= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.8.2/go.mod h1:dF2F6tXEOgmW5X1ZFO/EPtWrcm7XkW07KNcJUGNtt4s= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4= github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33 h1:kG5eQilShqmJbv11XL1VpyDbaEJzWxd4zRiCG30GSn4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.33/go.mod h1:7i0PF1ME/2eUPFcjkVIwq+DOygHEoK92t5cDqNgYbIw= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27 h1:vFQlirhuM8lLlpI7imKOMsjdQLuN9CPi+k44F/OFVsk= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.27/go.mod h1:UrHnn3QV/d0pBZ6QBAEQcqFLf8FAzLmoUfPVIueOvoM= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 h1:IQup8Q6lorXeiA/rK72PeToWoWK8h7VAPgHNWdSrtgE= github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2/go.mod h1:VITe/MdW6EMXPb0o0txu/fsonXbMHUU2OC2Qp7ivU4o= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0 h1:BcSBoss+CeyRS4TgZKAcR6kcZ0Sb2P+DHs8r8aMlTpQ= github.com/aws/aws-sdk-go-v2/service/cloudwatch v1.13.0/go.mod h1:eAgmZ4hIzTsTOlAA7yvGJz+RywxZo3KWtGt7J+jAUxU= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0 h1:te+nIFwPf5Bi/cZvd9g/+EF0gkJT3c0J/5+NMx0NBZg= github.com/aws/aws-sdk-go-v2/service/dynamodb v1.11.0/go.mod h1:ELltfl9ri0n4sZ/VjPZBgemNMd9mYIpCAuZhc7NP7l4= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.14.10 h1:x6VwmKSbPqTYt5eZieio3/nNmgb6CiFo5DLV8nJ6QfI= +github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.14.10/go.mod h1:pP+91QTpJMvcFTqGky6puHrkBs8oqoB3XOCiGRDaXwI= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY= github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.3.3 h1:ru9+IpkVIuDvIkm9Q0DEjtWHnh6ITDoZo8fH2dIjlqQ= @@ -75,8 +80,9 @@ github.com/aws/aws-sdk-go-v2/service/sso v1.7.0/go.mod h1:KnIpszaIdwI33tmc/W/GGX github.com/aws/aws-sdk-go-v2/service/sts v1.12.0 h1:7g0252k2TF3eA1DtfkTQB/tqI41YvbUPaolwTR0/ITc= github.com/aws/aws-sdk-go-v2/service/sts v1.12.0/go.mod h1:UV2N5HaPfdbDpkgkz4sRzWCvQswZjdO1FfqCWl0t7RA= github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= -github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58= github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= +github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 h1:p8Ubi4GEgfRc1xFn/WtGNkVG8RXxGHOsKiwGptufIo8= github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407/go.mod h1:0Qr1uMHFmHsIYMcG4T7BJ9yrJtWadhOmpABCX69dwuc= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= @@ -153,8 +159,9 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -230,6 +237,8 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= +github.com/ravikiran-zoman/vmware-go-kcl-v2 v0.0.1 h1:3W8KdrQTtQU3O5+5B+4RFSFPcDe93bkNkzJOkMD44tA= +github.com/ravikiran-zoman/vmware-go-kcl-v2 v0.0.1/go.mod h1:DmEtPEfdoN1qnYkPI4B3tx/LFH3v6AFHZS9bFte8z80= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/xid v1.3.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= github.com/rs/zerolog v1.26.1 h1:/ihwxqH+4z8UxyI70wM1z9yCvkWcfz/a3mj48k/Zngc= @@ -454,7 +463,6 @@ golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/api v0.4.0/go.mod h1:8k5glujaEP+g9n7WNsDg8QP6cUVNI86fCNMcbazEtwE= google.golang.org/api v0.7.0/go.mod h1:WtwebWUNSVBH/HAw79HIFXZNqEvBhG+Ra+ax0hx3E3M=