From aab08b90501fefef0acac160ede0ed25f94d3daa Mon Sep 17 00:00:00 2001 From: Mike Monaghan Date: Tue, 13 Sep 2022 15:31:16 -0600 Subject: [PATCH 01/22] fixing infinite worker loop Signed-off-by: Mike Monaghan --- clientlibrary/worker/worker.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 7807edd..f68e949 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -272,6 +272,14 @@ func (w *Worker) eventLoop() { rnd, _ := rand.Int(rand.Reader, big.NewInt(int64(w.kclConfig.ShardSyncIntervalMillis))) shardSyncSleep := w.kclConfig.ShardSyncIntervalMillis/2 + int(rnd.Int64()) + select { + case <-*w.stop: + log.Infof("Shutting down...") + return + case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond): + log.Debugf("Waited %d ms to sync shards...", shardSyncSleep) + } + err := w.syncShard() if err != nil { log.Errorf("Error syncing shards: %+v, Retrying in %d ms...", err, shardSyncSleep) @@ -363,14 +371,6 @@ func (w *Worker) eventLoop() { log.Warnf("Error in rebalance: %+v", err) } } - - select { - case <-*w.stop: - log.Infof("Shutting down...") - return - case <-time.After(time.Duration(shardSyncSleep) * time.Millisecond): - log.Debugf("Waited %d ms to sync shards...", shardSyncSleep) - } } } From e2a45c53c3c6312fee0bc76259a929ac5d9ac578 Mon Sep 17 00:00:00 2001 From: Caleb Stewart Date: Thu, 13 Oct 2022 13:37:51 -0400 Subject: [PATCH 02/22] Automatically resolve default KinesisEndpoint This commit fixes #5 by returning `aws.EndpointNotFoundError` from the endpoint resolver when no `KinesisEndpoint` is defined, which will resolve the default AWS endpoint. This is the same process used by the DynamoDB checkpointer to resolve the default endpoint. Signed-off-by: Caleb Stewart --- clientlibrary/worker/worker.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 7807edd..624598b 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -160,11 +160,15 @@ func (w *Worker) initialize() error { log.Infof("Creating Kinesis client") resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) { - return aws.Endpoint{ - PartitionID: "aws", - URL: w.kclConfig.KinesisEndpoint, - SigningRegion: w.regionName, - }, nil + if service == kinesis.ServiceID && len(w.kclConfig.KinesisEndpoint) > 0 { + return aws.Endpoint{ + PartitionID: "aws", + URL: w.kclConfig.KinesisEndpoint, + SigningRegion: w.regionName, + }, nil + } + // returning EndpointNotFoundError will allow the service to fallback to it's default resolution + return aws.Endpoint{}, &aws.EndpointNotFoundError{} }) cfg, err := awsConfig.LoadDefaultConfig( From 08b7fd9447547da2f8a7708c172637b023d02dc6 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Fri, 16 Dec 2022 18:07:22 -0500 Subject: [PATCH 03/22] fix: catch DynamoDB Scan error when trying to scan nonexistent table/index in syncLeases() Signed-off-by: Shiva Pentakota --- clientlibrary/checkpoint/checkpointer.go | 2 +- clientlibrary/checkpoint/dynamodb-checkpointer.go | 8 +++++++- clientlibrary/checkpoint/mock-dynamodb_test.go | 2 +- clientlibrary/metrics/cloudwatch/cloudwatch.go | 2 +- clientlibrary/metrics/interfaces.go | 2 +- clientlibrary/metrics/prometheus/prometheus.go | 2 +- clientlibrary/partition/partition.go | 2 +- clientlibrary/worker/polling-shard-consumer.go | 2 +- clientlibrary/worker/worker.go | 8 ++++---- logger/zap/zap.go | 1 - 10 files changed, 18 insertions(+), 13 deletions(-) diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index c24f469..cfc6adf 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -20,7 +20,7 @@ // Package checkpoint // The implementation is derived from https://github.com/patrobinson/gokini // -// Copyright 2018 Patrick robinson +// # Copyright 2018 Patrick robinson // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index f64e630..17c5cd7 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -20,7 +20,7 @@ // Package checkpoint // The implementation is derived from https://github.com/patrobinson/gokini // -// Copyright 2018 Patrick robinson +// # Copyright 2018 Patrick robinson // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // @@ -441,6 +441,12 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha } scanOutput, err := checkpointer.svc.Scan(context.TODO(), input) + + if err != nil { + log.Debugf("Error performing DynamoDB Scan. Error: %+v ", err) + return err + } + results := scanOutput.Items for _, result := range results { shardId, foundShardId := result[LeaseKeyKey] diff --git a/clientlibrary/checkpoint/mock-dynamodb_test.go b/clientlibrary/checkpoint/mock-dynamodb_test.go index eb9e17c..d7d63bf 100644 --- a/clientlibrary/checkpoint/mock-dynamodb_test.go +++ b/clientlibrary/checkpoint/mock-dynamodb_test.go @@ -19,7 +19,7 @@ // The implementation is derived from https://github.com/patrobinson/gokini // -// Copyright 2018 Patrick robinson +// # Copyright 2018 Patrick robinson // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/metrics/cloudwatch/cloudwatch.go b/clientlibrary/metrics/cloudwatch/cloudwatch.go index 1383bbb..b3ef20f 100644 --- a/clientlibrary/metrics/cloudwatch/cloudwatch.go +++ b/clientlibrary/metrics/cloudwatch/cloudwatch.go @@ -20,7 +20,7 @@ // Package cloudwatch // The implementation is derived from https://github.com/patrobinson/gokini // -// Copyright 2018 Patrick robinson +// # Copyright 2018 Patrick robinson // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/metrics/interfaces.go b/clientlibrary/metrics/interfaces.go index 8762a49..b416a88 100644 --- a/clientlibrary/metrics/interfaces.go +++ b/clientlibrary/metrics/interfaces.go @@ -20,7 +20,7 @@ // Package metrics // The implementation is derived from https://github.com/patrobinson/gokini // -// Copyright 2018 Patrick robinson +// # Copyright 2018 Patrick robinson // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/metrics/prometheus/prometheus.go b/clientlibrary/metrics/prometheus/prometheus.go index 07a6e7e..69e311e 100644 --- a/clientlibrary/metrics/prometheus/prometheus.go +++ b/clientlibrary/metrics/prometheus/prometheus.go @@ -20,7 +20,7 @@ // Package prometheus // The implementation is derived from https://github.com/patrobinson/gokini // -// Copyright 2018 Patrick robinson +// # Copyright 2018 Patrick robinson // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/partition/partition.go b/clientlibrary/partition/partition.go index 6f75290..8eb31dc 100644 --- a/clientlibrary/partition/partition.go +++ b/clientlibrary/partition/partition.go @@ -20,7 +20,7 @@ // Package partition // The implementation is derived from https://github.com/patrobinson/gokini // -// Copyright 2018 Patrick robinson +// # Copyright 2018 Patrick robinson // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 0589a41..8ee8091 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -20,7 +20,7 @@ // Package worker // The implementation is derived from https://github.com/patrobinson/gokini // -// Copyright 2018 Patrick robinson +// # Copyright 2018 Patrick robinson // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 7807edd..31bb61b 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -20,7 +20,7 @@ // Package worker // The implementation is derived from https://github.com/patrobinson/gokini // -// Copyright 2018 Patrick robinson +// # Copyright 2018 Patrick robinson // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // @@ -49,9 +49,9 @@ import ( par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition" ) -//Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees -//different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from -//the shards). +// Worker is the high level class that Kinesis applications use to start processing data. It initializes and oversees +// different components (e.g. syncing shard and lease information, tracking shard assignments, and processing data from +// the shards). type Worker struct { streamName string regionName string diff --git a/logger/zap/zap.go b/logger/zap/zap.go index cebb5b6..8e75364 100644 --- a/logger/zap/zap.go +++ b/logger/zap/zap.go @@ -44,7 +44,6 @@ type ZapLogger struct { // // Base zap logger can be convert to SugaredLogger by calling to add a wrapper: // sugaredLogger := log.Sugar() -// func NewZapLogger(logger *uzap.SugaredLogger) logger.Logger { return &ZapLogger{ sugaredLogger: logger, From 09cc5896e95e5438e58381826275c7d861a3f736 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Fri, 16 Dec 2022 19:03:29 -0500 Subject: [PATCH 04/22] chore: Adding periods to copyright comment to satisfy gofmt Signed-off-by: Shiva Pentakota --- clientlibrary/checkpoint/checkpointer.go | 2 +- clientlibrary/checkpoint/dynamodb-checkpointer.go | 2 +- clientlibrary/checkpoint/mock-dynamodb_test.go | 2 +- clientlibrary/metrics/cloudwatch/cloudwatch.go | 2 +- clientlibrary/metrics/interfaces.go | 2 +- clientlibrary/metrics/prometheus/prometheus.go | 2 +- clientlibrary/partition/partition.go | 2 +- clientlibrary/worker/polling-shard-consumer.go | 2 +- clientlibrary/worker/worker.go | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index cfc6adf..1af66ba 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -20,7 +20,7 @@ // Package checkpoint // The implementation is derived from https://github.com/patrobinson/gokini // -// # Copyright 2018 Patrick robinson +// Copyright 2018 Patrick robinson. // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 17c5cd7..3a7e22e 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -20,7 +20,7 @@ // Package checkpoint // The implementation is derived from https://github.com/patrobinson/gokini // -// # Copyright 2018 Patrick robinson +// Copyright 2018 Patrick robinson. // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/checkpoint/mock-dynamodb_test.go b/clientlibrary/checkpoint/mock-dynamodb_test.go index d7d63bf..76f1b13 100644 --- a/clientlibrary/checkpoint/mock-dynamodb_test.go +++ b/clientlibrary/checkpoint/mock-dynamodb_test.go @@ -19,7 +19,7 @@ // The implementation is derived from https://github.com/patrobinson/gokini // -// # Copyright 2018 Patrick robinson +// Copyright 2018 Patrick robinson. // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/metrics/cloudwatch/cloudwatch.go b/clientlibrary/metrics/cloudwatch/cloudwatch.go index b3ef20f..7fb66b3 100644 --- a/clientlibrary/metrics/cloudwatch/cloudwatch.go +++ b/clientlibrary/metrics/cloudwatch/cloudwatch.go @@ -20,7 +20,7 @@ // Package cloudwatch // The implementation is derived from https://github.com/patrobinson/gokini // -// # Copyright 2018 Patrick robinson +// Copyright 2018 Patrick robinson. // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/metrics/interfaces.go b/clientlibrary/metrics/interfaces.go index b416a88..e4b9f54 100644 --- a/clientlibrary/metrics/interfaces.go +++ b/clientlibrary/metrics/interfaces.go @@ -20,7 +20,7 @@ // Package metrics // The implementation is derived from https://github.com/patrobinson/gokini // -// # Copyright 2018 Patrick robinson +// Copyright 2018 Patrick robinson. // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/metrics/prometheus/prometheus.go b/clientlibrary/metrics/prometheus/prometheus.go index 69e311e..609e34f 100644 --- a/clientlibrary/metrics/prometheus/prometheus.go +++ b/clientlibrary/metrics/prometheus/prometheus.go @@ -20,7 +20,7 @@ // Package prometheus // The implementation is derived from https://github.com/patrobinson/gokini // -// # Copyright 2018 Patrick robinson +// Copyright 2018 Patrick robinson. // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/partition/partition.go b/clientlibrary/partition/partition.go index 8eb31dc..ffd3aff 100644 --- a/clientlibrary/partition/partition.go +++ b/clientlibrary/partition/partition.go @@ -20,7 +20,7 @@ // Package partition // The implementation is derived from https://github.com/patrobinson/gokini // -// # Copyright 2018 Patrick robinson +// Copyright 2018 Patrick robinson. // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 8ee8091..49d6a9a 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -20,7 +20,7 @@ // Package worker // The implementation is derived from https://github.com/patrobinson/gokini // -// # Copyright 2018 Patrick robinson +// Copyright 2018 Patrick robinson. // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // diff --git a/clientlibrary/worker/worker.go b/clientlibrary/worker/worker.go index 31bb61b..7001cd3 100644 --- a/clientlibrary/worker/worker.go +++ b/clientlibrary/worker/worker.go @@ -20,7 +20,7 @@ // Package worker // The implementation is derived from https://github.com/patrobinson/gokini // -// # Copyright 2018 Patrick robinson +// Copyright 2018 Patrick robinson. // // Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: // From e1425047a74793b3c242673fa0544f25df777355 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Thu, 12 Jan 2023 11:31:16 -0800 Subject: [PATCH 05/22] feat: Sending renewed lease metric Signed-off-by: Shiva Pentakota --- clientlibrary/worker/fan-out-shard-consumer.go | 2 ++ clientlibrary/worker/polling-shard-consumer.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/clientlibrary/worker/fan-out-shard-consumer.go b/clientlibrary/worker/fan-out-shard-consumer.go index ee0686f..1a6f13b 100644 --- a/clientlibrary/worker/fan-out-shard-consumer.go +++ b/clientlibrary/worker/fan-out-shard-consumer.go @@ -103,6 +103,8 @@ func (sc *FanOutShardConsumer) getRecords() error { return err } refreshLeaseTimer = time.After(time.Until(sc.shard.LeaseTimeout.Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond))) + // log metric for renewed lease for worker + sc.mService.LeaseRenewed(sc.shard.ID) case event, ok := <-shardSub.GetStream().Events(): if !ok { // need to resubscribe to shard diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 49d6a9a..252d39d 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -122,6 +122,8 @@ func (sc *PollingShardConsumer) getRecords() error { sc.shard.ID, sc.consumerID, err) return err } + // log metric for renewed lease for worker + sc.mService.LeaseRenewed(sc.shard.ID) } getRecordsStartTime := time.Now() From 599aa06ecd41585c3b0d704e05c90de716a0b1b4 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Fri, 20 Jan 2023 13:23:02 -0800 Subject: [PATCH 06/22] fix: add DeleteMetricMillisBehindLatest for error case Signed-off-by: Shiva Pentakota --- clientlibrary/metrics/interfaces.go | 2 ++ clientlibrary/metrics/prometheus/prometheus.go | 4 ++++ clientlibrary/worker/common-shard-consumer.go | 3 ++- clientlibrary/worker/fan-out-shard-consumer.go | 2 +- clientlibrary/worker/polling-shard-consumer.go | 2 +- 5 files changed, 10 insertions(+), 3 deletions(-) diff --git a/clientlibrary/metrics/interfaces.go b/clientlibrary/metrics/interfaces.go index e4b9f54..47ec490 100644 --- a/clientlibrary/metrics/interfaces.go +++ b/clientlibrary/metrics/interfaces.go @@ -35,6 +35,7 @@ type MonitoringService interface { IncrRecordsProcessed(shard string, count int) IncrBytesProcessed(shard string, count int64) MillisBehindLatest(shard string, milliSeconds float64) + DeleteMetricMillisBehindLatest(shard string) LeaseGained(shard string) LeaseLost(shard string) LeaseRenewed(shard string) @@ -53,6 +54,7 @@ func (NoopMonitoringService) Shutdown() {} func (NoopMonitoringService) IncrRecordsProcessed(_ string, _ int) {} func (NoopMonitoringService) IncrBytesProcessed(_ string, _ int64) {} func (NoopMonitoringService) MillisBehindLatest(_ string, _ float64) {} +func (NoopMonitoringService) DeleteMetricMillisBehindLatest(_ string) {} func (NoopMonitoringService) LeaseGained(_ string) {} func (NoopMonitoringService) LeaseLost(_ string) {} func (NoopMonitoringService) LeaseRenewed(_ string) {} diff --git a/clientlibrary/metrics/prometheus/prometheus.go b/clientlibrary/metrics/prometheus/prometheus.go index 609e34f..e489a78 100644 --- a/clientlibrary/metrics/prometheus/prometheus.go +++ b/clientlibrary/metrics/prometheus/prometheus.go @@ -147,6 +147,10 @@ func (p *MonitoringService) MillisBehindLatest(shard string, millSeconds float64 p.behindLatestMillis.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName}).Set(millSeconds) } +func (p *MonitoringService) DeleteMetricMillisBehindLatest(shard string) { + p.behindLatestMillis.Delete(prom.Labels{"shard": shard, "kinesisStream": p.streamName}) +} + func (p *MonitoringService) LeaseGained(shard string) { p.leasesHeld.With(prom.Labels{"shard": shard, "kinesisStream": p.streamName, "workerID": p.workerID}).Inc() } diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index 32b91b1..253cecb 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -51,7 +51,7 @@ type commonShardConsumer struct { } // Cleanup the internal lease cache -func (sc *commonShardConsumer) releaseLease() { +func (sc *commonShardConsumer) releaseLease(shard string) { log := sc.kclConfig.Logger log.Infof("Release lease for shard %s", sc.shard.ID) sc.shard.SetLeaseOwner("") @@ -63,6 +63,7 @@ func (sc *commonShardConsumer) releaseLease() { } // reporting lease lose metrics + sc.mService.DeleteMetricMillisBehindLatest(shard) sc.mService.LeaseLost(sc.shard.ID) } diff --git a/clientlibrary/worker/fan-out-shard-consumer.go b/clientlibrary/worker/fan-out-shard-consumer.go index 1a6f13b..a60258c 100644 --- a/clientlibrary/worker/fan-out-shard-consumer.go +++ b/clientlibrary/worker/fan-out-shard-consumer.go @@ -46,7 +46,7 @@ type FanOutShardConsumer struct { // getRecords subscribes to a shard and reads events from it. // Precondition: it currently has the lease on the shard. func (sc *FanOutShardConsumer) getRecords() error { - defer sc.releaseLease() + defer sc.releaseLease(sc.shard.ID) log := sc.kclConfig.Logger diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 252d39d..a20fde0 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -79,7 +79,7 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) { // getRecords continuously poll one shard for data record // Precondition: it currently has the lease on the shard. func (sc *PollingShardConsumer) getRecords() error { - defer sc.releaseLease() + defer sc.releaseLease(sc.shard.ID) log := sc.kclConfig.Logger From 3be57e8a745524d5a0ab463952980c69cd78b303 Mon Sep 17 00:00:00 2001 From: John Calixto Date: Mon, 23 Jan 2023 17:32:27 -0800 Subject: [PATCH 07/22] Refactor in prep for testing rate limiting improvements Signed-off-by: John Calixto --- clientlibrary/worker/common-shard-consumer.go | 9 ++- .../worker/polling-shard-consumer.go | 11 +++- .../worker/polling-shard-consumer_test.go | 65 +++++++++++++++++++ go.mod | 5 +- go.sum | 11 +++- 5 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 clientlibrary/worker/polling-shard-consumer_test.go diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index 253cecb..1b49f6a 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -21,6 +21,7 @@ package worker import ( + "context" "sync" "time" @@ -40,10 +41,16 @@ type shardConsumer interface { getRecords() error } +type KinesisSubscriberGetter interface { + SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) + GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) + GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) +} + // commonShardConsumer implements common functionality for regular and enhanced fan-out consumers type commonShardConsumer struct { shard *par.ShardStatus - kc *kinesis.Client + kc KinesisSubscriberGetter checkpointer chk.Checkpointer recordProcessor kcl.IRecordProcessor kclConfig *config.KinesisClientLibConfiguration diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index a20fde0..cd4565a 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -129,13 +129,13 @@ func (sc *PollingShardConsumer) getRecords() error { getRecordsStartTime := time.Now() log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) + + // Get records from stream and retry as needed getRecordsArgs := &kinesis.GetRecordsInput{ Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)), ShardIterator: shardIterator, } - - // Get records from stream and retry as needed - getResp, err := sc.kc.GetRecords(context.TODO(), getRecordsArgs) + getResp, err := sc.callGetRecordsAPI(getRecordsArgs) if err != nil { //aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling var throughputExceededErr *types.ProvisionedThroughputExceededException @@ -181,3 +181,8 @@ func (sc *PollingShardConsumer) getRecords() error { } } } + +func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) { + getResp, err := sc.kc.GetRecords(context.TODO(), gri) + return getResp, err +} diff --git a/clientlibrary/worker/polling-shard-consumer_test.go b/clientlibrary/worker/polling-shard-consumer_test.go new file mode 100644 index 0000000..b6a1fcf --- /dev/null +++ b/clientlibrary/worker/polling-shard-consumer_test.go @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2023 VMware, Inc. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy of this software and + * associated documentation files (the "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is furnished to do + * so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all copies or substantial + * portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT + * NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, + * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE + * SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + */ + +package worker + +import ( + "context" + "testing" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +func TestCallGetRecordsAPI(t *testing.T) { + // basic happy path + m1 := MockKinesisSubscriberGetter{} + ret := kinesis.GetRecordsOutput{} + m1.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret, nil) + psc := PollingShardConsumer{ + commonShardConsumer: commonShardConsumer{kc: &m1}, + } + gri := kinesis.GetRecordsInput{ + ShardIterator: aws.String("shard-iterator-01"), + } + out, err := psc.callGetRecordsAPI(&gri) + assert.Nil(t, err) + assert.Equal(t, &ret, out) + m1.AssertExpectations(t) +} + +type MockKinesisSubscriberGetter struct { + mock.Mock +} + +func (m *MockKinesisSubscriberGetter) GetRecords(ctx context.Context, params *kinesis.GetRecordsInput, optFns ...func(*kinesis.Options)) (*kinesis.GetRecordsOutput, error) { + ret := m.Called(ctx, params, optFns) + + return ret.Get(0).(*kinesis.GetRecordsOutput), ret.Error(1) +} + +func (m *MockKinesisSubscriberGetter) GetShardIterator(ctx context.Context, params *kinesis.GetShardIteratorInput, optFns ...func(*kinesis.Options)) (*kinesis.GetShardIteratorOutput, error) { + return nil, nil +} + +func (m *MockKinesisSubscriberGetter) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) { + return nil, nil +} diff --git a/go.mod b/go.mod index 9a56ba8..3d9554c 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/prometheus/common v0.32.1 github.com/rs/zerolog v1.26.1 github.com/sirupsen/logrus v1.8.1 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.8.1 go.uber.org/zap v1.20.0 gopkg.in/natefinch/lumberjack.v2 v2.0.0 ) @@ -42,9 +42,10 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/stretchr/objx v0.5.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect google.golang.org/protobuf v1.27.1 // indirect - gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index f03f679..159c486 100644 --- a/go.sum +++ b/go.sum @@ -240,11 +240,17 @@ github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -543,8 +549,9 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= From 66006caf8960e5a82a7f57a9a9c386f28df359c9 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Tue, 24 Jan 2023 11:56:29 -0800 Subject: [PATCH 08/22] fix: add getRecords TPS rate limiting Signed-off-by: Shiva Pentakota --- .../worker/polling-shard-consumer.go | 44 ++++++++++++++++++- .../worker/polling-shard-consumer_test.go | 15 +++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index cd4565a..ec973f5 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -44,6 +44,16 @@ import ( "github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics" ) +const ( + kinesisReadTPSLimit = 5 +) + +var ( + rateLimitTimeNow = time.Now + rateLimitTimeSince = time.Since + localTPSExceededError = errors.New("Error GetRecords TPS Exceeded") +) + // PollingShardConsumer is responsible for polling data records from a (specified) shard. // Note: PollingShardConsumer only deal with one shard. type PollingShardConsumer struct { @@ -52,6 +62,8 @@ type PollingShardConsumer struct { stop *chan struct{} consumerID string mService metrics.MonitoringService + currTime time.Time + callsLeft int } func (sc *PollingShardConsumer) getShardIterator() (*string, error) { @@ -108,6 +120,10 @@ func (sc *PollingShardConsumer) getRecords() error { recordCheckpointer := NewRecordProcessorCheckpoint(sc.shard, sc.checkpointer) retriedErrors := 0 + // define API call rate limit starting window + sc.currTime = rateLimitTimeNow() + sc.callsLeft = kinesisReadTPSLimit + for { if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) { log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) @@ -140,7 +156,14 @@ func (sc *PollingShardConsumer) getRecords() error { //aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling var throughputExceededErr *types.ProvisionedThroughputExceededException var kmsThrottlingErr *types.KMSThrottlingException - if errors.As(err, &throughputExceededErr) || errors.As(err, &kmsThrottlingErr) { + if errors.As(err, &throughputExceededErr) || err == localTPSExceededError { + // If there is insufficient provisioned throughput on the stream, + // subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException. + // ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + sc.waitASecond(sc.currTime) + continue + } + if errors.As(err, &kmsThrottlingErr) { log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err) retriedErrors++ // exponential backoff @@ -182,7 +205,26 @@ func (sc *PollingShardConsumer) getRecords() error { } } +func (sc *PollingShardConsumer) waitASecond(timePassed time.Time) { + waitTime := time.Since(timePassed) + if waitTime < time.Second { + time.Sleep(time.Second - waitTime) + } +} + func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) { + // every new second, we get a fresh set of calls + if rateLimitTimeSince(sc.currTime) > time.Second { + sc.callsLeft = kinesisReadTPSLimit + sc.currTime = rateLimitTimeNow() + } + + if sc.callsLeft < 1 { + return nil, localTPSExceededError + } + getResp, err := sc.kc.GetRecords(context.TODO(), gri) + sc.callsLeft-- + return getResp, err } diff --git a/clientlibrary/worker/polling-shard-consumer_test.go b/clientlibrary/worker/polling-shard-consumer_test.go index b6a1fcf..7819be7 100644 --- a/clientlibrary/worker/polling-shard-consumer_test.go +++ b/clientlibrary/worker/polling-shard-consumer_test.go @@ -22,6 +22,7 @@ package worker import ( "context" "testing" + "time" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/kinesis" @@ -44,6 +45,20 @@ func TestCallGetRecordsAPI(t *testing.T) { assert.Nil(t, err) assert.Equal(t, &ret, out) m1.AssertExpectations(t) + + // check that localTPSExceededError is thrown when trying more than 5 TPS + m2 := MockKinesisSubscriberGetter{} + psc2 := PollingShardConsumer{ + commonShardConsumer: commonShardConsumer{kc: &m2}, + callsLeft: 0, + } + rateLimitTimeSince = func(t time.Time) time.Duration { + return 500 * time.Millisecond + } + out2, err2 := psc2.callGetRecordsAPI(&gri) + assert.Nil(t, out2) + assert.ErrorIs(t, err2, localTPSExceededError) + m2.AssertExpectations(t) } type MockKinesisSubscriberGetter struct { From b5515931d102b49011ab29e805e6193eca4487eb Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Tue, 24 Jan 2023 11:59:32 -0800 Subject: [PATCH 09/22] fix: add hard cap maxRetries for getRecord errors Signed-off-by: Shiva Pentakota --- clientlibrary/config/config.go | 6 ++++++ clientlibrary/config/kcl-config.go | 8 ++++++++ clientlibrary/worker/polling-shard-consumer.go | 16 ++++++++++++++++ 3 files changed, 30 insertions(+) diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 5b45678..2d50ca8 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -136,6 +136,9 @@ const ( // DefaultLeaseSyncingIntervalMillis Number of milliseconds to wait before syncing with lease table (dynamodDB) DefaultLeaseSyncingIntervalMillis = 60000 + + // DefaultMaxRetryCount The default maximum number of retries in case of error + DefaultMaxRetryCount = 5 ) type ( @@ -283,6 +286,9 @@ type ( // LeaseSyncingTimeInterval The number of milliseconds to wait before syncing with lease table (dynamoDB) LeaseSyncingTimeIntervalMillis int + + // MaxRetryCount The maximum number of retries in case of error + MaxRetryCount int } ) diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 135f3fa..4d7181b 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -102,6 +102,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis, LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis, LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis, + MaxRetryCount: DefaultMaxRetryCount, Logger: logger.GetDefaultLogger(), } } @@ -211,6 +212,13 @@ func (c *KinesisClientLibConfiguration) WithLogger(logger logger.Logger) *Kinesi return c } +// WithMaxRetryCount sets the max retry count in case of error. +func (c *KinesisClientLibConfiguration) WithMaxRetryCount(maxRetryCount int) *KinesisClientLibConfiguration { + checkIsValuePositive("maxRetryCount", maxRetryCount) + c.MaxRetryCount = maxRetryCount + return c +} + // WithMonitoringService sets the monitoring service to use to publish metrics. func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.MonitoringService) *KinesisClientLibConfiguration { // Nil case is handled downward (at worker creation) so no need to do it here. diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index ec973f5..e207583 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -157,6 +157,14 @@ func (sc *PollingShardConsumer) getRecords() error { var throughputExceededErr *types.ProvisionedThroughputExceededException var kmsThrottlingErr *types.KMSThrottlingException if errors.As(err, &throughputExceededErr) || err == localTPSExceededError { + retriedErrors++ + if retriedErrors > sc.kclConfig.MaxRetryCount { + log.Errorf("message", "reached max retry count getting records from shard", + "shardId", sc.shard.ID, + "retryCount", retriedErrors, + "error", err) + return err + } // If there is insufficient provisioned throughput on the stream, // subsequent calls made within the next 1 second throw ProvisionedThroughputExceededException. // ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html @@ -166,6 +174,14 @@ func (sc *PollingShardConsumer) getRecords() error { if errors.As(err, &kmsThrottlingErr) { log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err) retriedErrors++ + // Greater than MaxRetryCount so we get the last retry + if retriedErrors > sc.kclConfig.MaxRetryCount { + log.Errorf("message", "reached max retry count getting records from shard", + "shardId", sc.shard.ID, + "retryCount", retriedErrors, + "error", err) + return err + } // exponential backoff // https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Programming.Errors.html#Programming.Errors.RetryAndBackoff time.Sleep(time.Duration(math.Exp2(float64(retriedErrors))*100) * time.Millisecond) From 7d6b1c33d06b8f4d35079072f1f17d337866ea2e Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Tue, 24 Jan 2023 16:28:22 -0800 Subject: [PATCH 10/22] fix: add maxBytes per second getRecord check Signed-off-by: Shiva Pentakota --- .../worker/polling-shard-consumer.go | 83 +++++++++++++--- .../worker/polling-shard-consumer_test.go | 99 ++++++++++++++++++- 2 files changed, 167 insertions(+), 15 deletions(-) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index e207583..b96b26d 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -46,24 +46,31 @@ import ( const ( kinesisReadTPSLimit = 5 + MaxBytes = 10000000 + MaxBytesPerSecond = 2000000 + BytesToMbConversion = 1000000 ) var ( rateLimitTimeNow = time.Now rateLimitTimeSince = time.Since localTPSExceededError = errors.New("Error GetRecords TPS Exceeded") + maxBytesExceededError = errors.New("Error GetRecords Max Bytes For Call Period Exceeded") ) // PollingShardConsumer is responsible for polling data records from a (specified) shard. // Note: PollingShardConsumer only deal with one shard. type PollingShardConsumer struct { commonShardConsumer - streamName string - stop *chan struct{} - consumerID string - mService metrics.MonitoringService - currTime time.Time - callsLeft int + streamName string + stop *chan struct{} + consumerID string + mService metrics.MonitoringService + currTime time.Time + callsLeft int + remBytes int + lastCheckTime time.Time + bytesRead int } func (sc *PollingShardConsumer) getShardIterator() (*string, error) { @@ -123,6 +130,8 @@ func (sc *PollingShardConsumer) getRecords() error { // define API call rate limit starting window sc.currTime = rateLimitTimeNow() sc.callsLeft = kinesisReadTPSLimit + sc.bytesRead = 0 + sc.remBytes = MaxBytes for { if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) { @@ -151,15 +160,16 @@ func (sc *PollingShardConsumer) getRecords() error { Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)), ShardIterator: shardIterator, } - getResp, err := sc.callGetRecordsAPI(getRecordsArgs) + getResp, coolDownPeriod, err := sc.callGetRecordsAPI(getRecordsArgs) if err != nil { //aws-sdk-go-v2 https://github.com/aws/aws-sdk-go-v2/blob/main/CHANGELOG.md#error-handling var throughputExceededErr *types.ProvisionedThroughputExceededException var kmsThrottlingErr *types.KMSThrottlingException - if errors.As(err, &throughputExceededErr) || err == localTPSExceededError { + if errors.As(err, &throughputExceededErr) { retriedErrors++ if retriedErrors > sc.kclConfig.MaxRetryCount { - log.Errorf("message", "reached max retry count getting records from shard", + log.Errorf("message", "Throughput Exceeded Error: "+ + "reached max retry count getting records from shard", "shardId", sc.shard.ID, "retryCount", retriedErrors, "error", err) @@ -171,12 +181,21 @@ func (sc *PollingShardConsumer) getRecords() error { sc.waitASecond(sc.currTime) continue } + if err == localTPSExceededError { + sc.waitASecond(sc.currTime) + continue + } + if err == maxBytesExceededError { + time.Sleep(time.Duration(coolDownPeriod) * time.Second) + continue + } if errors.As(err, &kmsThrottlingErr) { log.Errorf("Error getting records from shard %v: %+v", sc.shard.ID, err) retriedErrors++ // Greater than MaxRetryCount so we get the last retry if retriedErrors > sc.kclConfig.MaxRetryCount { - log.Errorf("message", "reached max retry count getting records from shard", + log.Errorf("message", "KMS Throttling Error: "+ + "reached max retry count getting records from shard", "shardId", sc.shard.ID, "retryCount", retriedErrors, "error", err) @@ -228,7 +247,37 @@ func (sc *PollingShardConsumer) waitASecond(timePassed time.Time) { } } -func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, error) { +func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) { + // Each shard can support up to a maximum total data read rate of 2 MB per second via GetRecords. + // If a call to GetRecords returns 10 MB, subsequent calls made within the next 5 seconds throw an exception. + // ref: https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html + // check for overspending of byte budget from getRecords call + currentTime := rateLimitTimeNow() + secondsPassed := currentTime.Sub(sc.lastCheckTime).Seconds() + sc.lastCheckTime = currentTime + sc.remBytes += int(secondsPassed * MaxBytesPerSecond) + transactionReadRate := float64(sc.bytesRead) / (secondsPassed * BytesToMbConversion) + + if sc.remBytes > MaxBytes { + sc.remBytes = MaxBytes + } + if sc.remBytes <= sc.bytesRead || transactionReadRate > 2 { + // Wait until cool down period has passed to prevent ProvisionedThroughputExceededException + coolDown := sc.bytesRead / MaxBytesPerSecond + return coolDown, maxBytesExceededError + } else { + sc.remBytes -= sc.bytesRead + } + return 0, nil +} + +func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) (*kinesis.GetRecordsOutput, int, error) { + if sc.bytesRead != 0 { + coolDownPeriod, err := sc.checkCoolOffPeriod() + if err != nil { + return nil, coolDownPeriod, err + } + } // every new second, we get a fresh set of calls if rateLimitTimeSince(sc.currTime) > time.Second { sc.callsLeft = kinesisReadTPSLimit @@ -236,11 +285,19 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) } if sc.callsLeft < 1 { - return nil, localTPSExceededError + return nil, 0, localTPSExceededError } getResp, err := sc.kc.GetRecords(context.TODO(), gri) sc.callsLeft-- + // Calculate size of records from read transaction + sc.bytesRead = 0 + for _, record := range getResp.Records { + sc.bytesRead += len(record.Data) + } + if sc.lastCheckTime.IsZero() { + sc.lastCheckTime = rateLimitTimeNow() + } - return getResp, err + return getResp, 0, err } diff --git a/clientlibrary/worker/polling-shard-consumer_test.go b/clientlibrary/worker/polling-shard-consumer_test.go index 7819be7..68dffd0 100644 --- a/clientlibrary/worker/polling-shard-consumer_test.go +++ b/clientlibrary/worker/polling-shard-consumer_test.go @@ -41,7 +41,7 @@ func TestCallGetRecordsAPI(t *testing.T) { gri := kinesis.GetRecordsInput{ ShardIterator: aws.String("shard-iterator-01"), } - out, err := psc.callGetRecordsAPI(&gri) + out, _, err := psc.callGetRecordsAPI(&gri) assert.Nil(t, err) assert.Equal(t, &ret, out) m1.AssertExpectations(t) @@ -55,10 +55,105 @@ func TestCallGetRecordsAPI(t *testing.T) { rateLimitTimeSince = func(t time.Time) time.Duration { return 500 * time.Millisecond } - out2, err2 := psc2.callGetRecordsAPI(&gri) + out2, _, err2 := psc2.callGetRecordsAPI(&gri) assert.Nil(t, out2) assert.ErrorIs(t, err2, localTPSExceededError) m2.AssertExpectations(t) + + // check that getRecords is called normally in bytesRead = 0 case + m3 := MockKinesisSubscriberGetter{} + ret3 := kinesis.GetRecordsOutput{} + m3.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret3, nil) + psc3 := PollingShardConsumer{ + commonShardConsumer: commonShardConsumer{kc: &m3}, + callsLeft: 2, + bytesRead: 0, + } + rateLimitTimeSince = func(t time.Time) time.Duration { + return 2 * time.Second + } + out3, checkSleepVal, err3 := psc3.callGetRecordsAPI(&gri) + assert.Nil(t, err3) + assert.Equal(t, checkSleepVal, 0) + assert.Equal(t, &ret3, out3) + m3.AssertExpectations(t) + + // check that correct cool off period is taken for 10mb in 1 second + testTime := time.Now() + m4 := MockKinesisSubscriberGetter{} + psc4 := PollingShardConsumer{ + commonShardConsumer: commonShardConsumer{kc: &m4}, + callsLeft: 2, + bytesRead: MaxBytes, + lastCheckTime: testTime, + remBytes: MaxBytes, + } + rateLimitTimeSince = func(t time.Time) time.Duration { + return 2 * time.Second + } + rateLimitTimeNow = func() time.Time { + return testTime.Add(time.Second) + } + out4, checkSleepVal2, err4 := psc4.callGetRecordsAPI(&gri) + assert.Nil(t, out4) + assert.Equal(t, maxBytesExceededError, err4) + m4.AssertExpectations(t) + if checkSleepVal2 != 5 { + t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal2) + } + + // check that no cool off period is taken for 6mb in 3 seconds + testTime2 := time.Now() + m5 := MockKinesisSubscriberGetter{} + ret5 := kinesis.GetRecordsOutput{} + m5.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret5, nil) + psc5 := PollingShardConsumer{ + commonShardConsumer: commonShardConsumer{kc: &m5}, + callsLeft: 2, + bytesRead: MaxBytesPerSecond * 3, + lastCheckTime: testTime2, + remBytes: MaxBytes, + } + rateLimitTimeSince = func(t time.Time) time.Duration { + return 3 * time.Second + } + rateLimitTimeNow = func() time.Time { + return testTime2.Add(time.Second * 3) + } + out5, checkSleepVal3, err5 := psc5.callGetRecordsAPI(&gri) + assert.Nil(t, err5) + assert.Equal(t, checkSleepVal3, 0) + assert.Equal(t, &ret5, out5) + m5.AssertExpectations(t) + + // check for correct cool off period with 8mb in .2 seconds with 6mb remaining + testTime3 := time.Now() + m6 := MockKinesisSubscriberGetter{} + psc6 := PollingShardConsumer{ + commonShardConsumer: commonShardConsumer{kc: &m6}, + callsLeft: 2, + bytesRead: MaxBytesPerSecond * 4, + lastCheckTime: testTime3, + remBytes: MaxBytesPerSecond * 3, + } + rateLimitTimeSince = func(t time.Time) time.Duration { + return 3 * time.Second + } + rateLimitTimeNow = func() time.Time { + return testTime3.Add(time.Second / 5) + } + out6, checkSleepVal4, err6 := psc6.callGetRecordsAPI(&gri) + assert.Nil(t, out6) + assert.Equal(t, err6, maxBytesExceededError) + m5.AssertExpectations(t) + if checkSleepVal4 != 4 { + t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal4) + } + + // restore original func + rateLimitTimeNow = time.Now + rateLimitTimeSince = time.Since + } type MockKinesisSubscriberGetter struct { From f879712f9d6de51188d5f42a0ae0395c5c981837 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Tue, 31 Jan 2023 10:18:15 -0800 Subject: [PATCH 11/22] chore: log RemoveLeaseOwner errors with debug instead of error Signed-off-by: Shiva Pentakota --- clientlibrary/worker/common-shard-consumer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index 1b49f6a..68ec1b3 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -66,7 +66,7 @@ func (sc *commonShardConsumer) releaseLease(shard string) { // Release the lease by wiping out the lease owner for the shard // Note: we don't need to do anything in case of error here and shard lease will eventually be expired. if err := sc.checkpointer.RemoveLeaseOwner(sc.shard.ID); err != nil { - log.Errorf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err) + log.Debugf("Failed to release shard lease or shard: %s Error: %+v", sc.shard.ID, err) } // reporting lease lose metrics From 04c5062acefe4b602d171b7125338e26ed1576ed Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Wed, 1 Feb 2023 08:00:49 -0800 Subject: [PATCH 12/22] fix: add check for GetRecords error within callGetRecordsAPI Signed-off-by: Shiva Pentakota --- .../worker/polling-shard-consumer.go | 6 ++++- .../worker/polling-shard-consumer_test.go | 23 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index b96b26d..28c1f6a 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -287,9 +287,13 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) if sc.callsLeft < 1 { return nil, 0, localTPSExceededError } - getResp, err := sc.kc.GetRecords(context.TODO(), gri) sc.callsLeft-- + + if err != nil { + return getResp, 0, err + } + // Calculate size of records from read transaction sc.bytesRead = 0 for _, record := range getResp.Records { diff --git a/clientlibrary/worker/polling-shard-consumer_test.go b/clientlibrary/worker/polling-shard-consumer_test.go index 68dffd0..c94859d 100644 --- a/clientlibrary/worker/polling-shard-consumer_test.go +++ b/clientlibrary/worker/polling-shard-consumer_test.go @@ -21,6 +21,7 @@ package worker import ( "context" + "errors" "testing" "time" @@ -30,6 +31,10 @@ import ( "github.com/stretchr/testify/mock" ) +var ( + testGetRecordsError = errors.New("GetRecords Error") +) + func TestCallGetRecordsAPI(t *testing.T) { // basic happy path m1 := MockKinesisSubscriberGetter{} @@ -150,6 +155,24 @@ func TestCallGetRecordsAPI(t *testing.T) { t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal4) } + // case where getRecords throws error + m7 := MockKinesisSubscriberGetter{} + ret7 := kinesis.GetRecordsOutput{Records: nil} + m7.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret7, testGetRecordsError) + psc7 := PollingShardConsumer{ + commonShardConsumer: commonShardConsumer{kc: &m7}, + callsLeft: 2, + bytesRead: 0, + } + rateLimitTimeSince = func(t time.Time) time.Duration { + return 2 * time.Second + } + out7, checkSleepVal7, err7 := psc7.callGetRecordsAPI(&gri) + assert.Equal(t, err7, testGetRecordsError) + assert.Equal(t, checkSleepVal7, 0) + assert.Equal(t, out7, &ret7) + m7.AssertExpectations(t) + // restore original func rateLimitTimeNow = time.Now rateLimitTimeSince = time.Since From df16ef451cf3c52752cf3ff8257c51644cdc4790 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Mon, 13 Feb 2023 17:56:11 -0800 Subject: [PATCH 13/22] fix: use nanosecond precision in lease comparisons Signed-off-by: Shiva Pentakota --- clientlibrary/checkpoint/dynamodb-checkpointer.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 3a7e22e..b8f12d8 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -129,7 +129,7 @@ func (checkpointer *DynamoCheckpoint) Init() error { // GetLease attempts to gain a lock on the given shard func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssignTo string) error { newLeaseTimeout := time.Now().Add(time.Duration(checkpointer.LeaseDuration) * time.Millisecond).UTC() - newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339) + newLeaseTimeoutString := newLeaseTimeout.Format(time.RFC3339Nano) currentCheckpoint, err := checkpointer.getItem(shard.ID) if err != nil { return err @@ -161,7 +161,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign assignedTo := assignedVar.(*types.AttributeValueMemberS).Value leaseTimeout := leaseVar.(*types.AttributeValueMemberS).Value - currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout) + currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout) if err != nil { return err } @@ -246,7 +246,7 @@ func (checkpointer *DynamoCheckpoint) GetLease(shard *par.ShardStatus, newAssign // CheckpointSequence writes a checkpoint at the designated sequence ID func (checkpointer *DynamoCheckpoint) CheckpointSequence(shard *par.ShardStatus) error { - leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339) + leaseTimeout := shard.GetLeaseTimeout().UTC().Format(time.RFC3339Nano) marshalledCheckpoint := map[string]types.AttributeValue{ LeaseKeyKey: &types.AttributeValueMemberS{ Value: shard.ID, @@ -290,7 +290,7 @@ func (checkpointer *DynamoCheckpoint) FetchCheckpoint(shard *par.ShardStatus) er // Use up-to-date leaseTimeout to avoid ConditionalCheckFailedException when claiming if leaseTimeout, ok := checkpoint[LeaseTimeoutKey]; ok && leaseTimeout.(*types.AttributeValueMemberS).Value != "" { - currentLeaseTimeout, err := time.Parse(time.RFC3339, leaseTimeout.(*types.AttributeValueMemberS).Value) + currentLeaseTimeout, err := time.Parse(time.RFC3339Nano, leaseTimeout.(*types.AttributeValueMemberS).Value) if err != nil { return err } @@ -370,7 +370,7 @@ func (checkpointer *DynamoCheckpoint) ClaimShard(shard *par.ShardStatus, claimID if err != nil && err != ErrSequenceIDNotFound { return err } - leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339) + leaseTimeoutString := shard.GetLeaseTimeout().Format(time.RFC3339Nano) conditionalExpression := `ShardID = :id AND LeaseTimeout = :lease_timeout AND attribute_not_exists(ClaimRequest)` expressionAttributeValues := map[string]types.AttributeValue{ From a7c063b99cdf91d659e30f6d1924137bd085f2f7 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Wed, 22 Mar 2023 12:22:26 -0700 Subject: [PATCH 14/22] chore: add info logs in sleep case for kinesis backoff errors Signed-off-by: Shiva Pentakota --- clientlibrary/worker/polling-shard-consumer.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 28c1f6a..6e19f5b 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -182,10 +182,12 @@ func (sc *PollingShardConsumer) getRecords() error { continue } if err == localTPSExceededError { + log.Infof("localTPSExceededError so sleep for a second") sc.waitASecond(sc.currTime) continue } if err == maxBytesExceededError { + log.Infof("maxBytesExceededError so sleep for %+v seconds", coolDownPeriod) time.Sleep(time.Duration(coolDownPeriod) * time.Second) continue } From 987fada9d3c28c05ac8e5cf34211665a678bcdf3 Mon Sep 17 00:00:00 2001 From: John Calixto Date: Thu, 23 Mar 2023 10:59:58 -0700 Subject: [PATCH 15/22] fix: Check token bucket corner cases correctly. Signed-off-by: John Calixto --- .../worker/polling-shard-consumer.go | 6 +- .../worker/polling-shard-consumer_test.go | 169 +++++++++++++++++- 2 files changed, 167 insertions(+), 8 deletions(-) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 6e19f5b..e0998ec 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -258,14 +258,16 @@ func (sc *PollingShardConsumer) checkCoolOffPeriod() (int, error) { secondsPassed := currentTime.Sub(sc.lastCheckTime).Seconds() sc.lastCheckTime = currentTime sc.remBytes += int(secondsPassed * MaxBytesPerSecond) - transactionReadRate := float64(sc.bytesRead) / (secondsPassed * BytesToMbConversion) if sc.remBytes > MaxBytes { sc.remBytes = MaxBytes } - if sc.remBytes <= sc.bytesRead || transactionReadRate > 2 { + if sc.remBytes < 1 { // Wait until cool down period has passed to prevent ProvisionedThroughputExceededException coolDown := sc.bytesRead / MaxBytesPerSecond + if sc.bytesRead%MaxBytesPerSecond > 0 { + coolDown++ + } return coolDown, maxBytesExceededError } else { sc.remBytes -= sc.bytesRead diff --git a/clientlibrary/worker/polling-shard-consumer_test.go b/clientlibrary/worker/polling-shard-consumer_test.go index c94859d..736b2bd 100644 --- a/clientlibrary/worker/polling-shard-consumer_test.go +++ b/clientlibrary/worker/polling-shard-consumer_test.go @@ -86,6 +86,8 @@ func TestCallGetRecordsAPI(t *testing.T) { // check that correct cool off period is taken for 10mb in 1 second testTime := time.Now() m4 := MockKinesisSubscriberGetter{} + ret4 := kinesis.GetRecordsOutput{Records: nil} + m4.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret4, nil) psc4 := PollingShardConsumer{ commonShardConsumer: commonShardConsumer{kc: &m4}, callsLeft: 2, @@ -100,10 +102,10 @@ func TestCallGetRecordsAPI(t *testing.T) { return testTime.Add(time.Second) } out4, checkSleepVal2, err4 := psc4.callGetRecordsAPI(&gri) - assert.Nil(t, out4) - assert.Equal(t, maxBytesExceededError, err4) + assert.Nil(t, err4) + assert.Equal(t, &ret4, out4) m4.AssertExpectations(t) - if checkSleepVal2 != 5 { + if checkSleepVal2 != 0 { t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal2) } @@ -134,6 +136,8 @@ func TestCallGetRecordsAPI(t *testing.T) { // check for correct cool off period with 8mb in .2 seconds with 6mb remaining testTime3 := time.Now() m6 := MockKinesisSubscriberGetter{} + ret6 := kinesis.GetRecordsOutput{Records: nil} + m6.On("GetRecords", mock.Anything, mock.Anything, mock.Anything).Return(&ret6, nil) psc6 := PollingShardConsumer{ commonShardConsumer: commonShardConsumer{kc: &m6}, callsLeft: 2, @@ -148,10 +152,10 @@ func TestCallGetRecordsAPI(t *testing.T) { return testTime3.Add(time.Second / 5) } out6, checkSleepVal4, err6 := psc6.callGetRecordsAPI(&gri) - assert.Nil(t, out6) - assert.Equal(t, err6, maxBytesExceededError) + assert.Nil(t, err6) + assert.Equal(t, &ret6, out6) m5.AssertExpectations(t) - if checkSleepVal4 != 4 { + if checkSleepVal4 != 0 { t.Errorf("Incorrect Cool Off Period: %v", checkSleepVal4) } @@ -196,3 +200,156 @@ func (m *MockKinesisSubscriberGetter) GetShardIterator(ctx context.Context, para func (m *MockKinesisSubscriberGetter) SubscribeToShard(ctx context.Context, params *kinesis.SubscribeToShardInput, optFns ...func(*kinesis.Options)) (*kinesis.SubscribeToShardOutput, error) { return nil, nil } + +func TestPollingShardConsumer_checkCoolOffPeriod(t *testing.T) { + refTime := time.Now() + type fields struct { + lastCheckTime time.Time + remBytes int + bytesRead int + } + tests := []struct { + name string + fields fields + timeNow time.Time + want int + wantErr bool + }{ + { + "zero time max bytes to spend", + fields{ + time.Time{}, + 0, + 0, + }, + refTime, + 0, + false, + }, + { + "same second, bytes still left to spend", + fields{ + refTime, + MaxBytesPerSecond, + MaxBytesPerSecond - 1, + }, + refTime, + 0, + false, + }, + { + "same second, not many but some bytes still left to spend", + fields{ + refTime, + 8, + MaxBytesPerSecond, + }, + refTime, + 0, + false, + }, + { + "same second, 1 byte still left to spend", + fields{ + refTime, + 1, + MaxBytesPerSecond, + }, + refTime, + 0, + false, + }, + { + "next second, bytes still left to spend", + fields{ + refTime, + 42, + 1024, + }, + refTime.Add(1 * time.Second), + 0, + false, + }, + { + "same second, max bytes per second already spent", + fields{ + refTime, + 0, + MaxBytesPerSecond, + }, + refTime, + 1, + true, + }, + { + "same second, more than max bytes per second already spent", + fields{ + refTime, + 0, + MaxBytesPerSecond + 1, + }, + refTime, + 2, + true, + }, + + // Kinesis prevents reading more than 10 MiB at once + { + "same second, 10 MiB read all at once", + fields{ + refTime, + 0, + 10 * 1024 * 1024, + }, + refTime, + 6, + true, + }, + + { + "same second, 10 MB read all at once", + fields{ + refTime, + 0, + 10 * 1000 * 1000, + }, + refTime, + 5, + true, + }, + { + "5 seconds ago, 10 MB read all at once", + fields{ + refTime, + 0, + 10 * 1000 * 1000, + }, + refTime.Add(5 * time.Second), + 0, + false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + sc := &PollingShardConsumer{ + lastCheckTime: tt.fields.lastCheckTime, + remBytes: tt.fields.remBytes, + bytesRead: tt.fields.bytesRead, + } + rateLimitTimeNow = func() time.Time { + return tt.timeNow + } + got, err := sc.checkCoolOffPeriod() + if (err != nil) != tt.wantErr { + t.Errorf("PollingShardConsumer.checkCoolOffPeriod() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("PollingShardConsumer.checkCoolOffPeriod() = %v, want %v", got, tt.want) + } + }) + } + + // restore original time.Now + rateLimitTimeNow = time.Now +} From 236412324f5c30c6daf2c5e6718cfeb8f93380c7 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:56:55 +0000 Subject: [PATCH 16/22] Bump github.com/prometheus/client_golang from 1.11.0 to 1.11.1 Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.11.0 to 1.11.1. - [Release notes](https://github.com/prometheus/client_golang/releases) - [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md) - [Commits](https://github.com/prometheus/client_golang/compare/v1.11.0...v1.11.1) --- updated-dependencies: - dependency-name: github.com/prometheus/client_golang dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index 3d9554c..94237c5 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20211222152315-953b66f67407 github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.3.0 - github.com/prometheus/client_golang v1.11.0 + github.com/prometheus/client_golang v1.11.1 github.com/prometheus/common v0.32.1 github.com/rs/zerolog v1.26.1 github.com/sirupsen/logrus v1.8.1 diff --git a/go.sum b/go.sum index 159c486..991b8a2 100644 --- a/go.sum +++ b/go.sum @@ -211,8 +211,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo= github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= -github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= +github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s= +github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= From 09f0889f28440feda7c3490991586a073c86431a Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 28 Mar 2023 16:57:32 +0000 Subject: [PATCH 17/22] Bump golang.org/x/sys from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0 Bumps [golang.org/x/sys](https://github.com/golang/sys) from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0. - [Release notes](https://github.com/golang/sys/releases) - [Commits](https://github.com/golang/sys/commits/v0.1.0) --- updated-dependencies: - dependency-name: golang.org/x/sys dependency-type: indirect ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 3d9554c..378f3a4 100644 --- a/go.mod +++ b/go.mod @@ -45,7 +45,7 @@ require ( github.com/stretchr/objx v0.5.0 // indirect go.uber.org/atomic v1.9.0 // indirect go.uber.org/multierr v1.7.0 // indirect - golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect + golang.org/x/sys v0.1.0 // indirect google.golang.org/protobuf v1.27.1 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 159c486..c0c3695 100644 --- a/go.sum +++ b/go.sum @@ -396,8 +396,8 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e h1:fLOSk5Q00efkSvAm+4xcoXD+RRmLmmulPn5I3Y9F2EM= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= From 02d4b44ff64b61ba35964ab3aede7ee358f406d9 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Tue, 28 Mar 2023 11:46:55 -0700 Subject: [PATCH 18/22] fix: add shutdown and leaseExpired error cases for checkpoint function Signed-off-by: Shiva Pentakota --- clientlibrary/checkpoint/checkpointer.go | 3 +++ .../checkpoint/dynamodb-checkpointer.go | 21 +++++++++++++++++++ clientlibrary/interfaces/record-processor.go | 2 +- clientlibrary/worker/common-shard-consumer.go | 8 +++++-- .../worker/polling-shard-consumer.go | 5 ++++- .../worker/record-processor-checkpointer.go | 19 ++++++++++++++++- 6 files changed, 53 insertions(+), 5 deletions(-) diff --git a/clientlibrary/checkpoint/checkpointer.go b/clientlibrary/checkpoint/checkpointer.go index 1af66ba..3f5d179 100644 --- a/clientlibrary/checkpoint/checkpointer.go +++ b/clientlibrary/checkpoint/checkpointer.go @@ -79,6 +79,9 @@ type Checkpointer interface { // RemoveLeaseOwner to remove lease owner for the shard entry to make the shard available for reassignment RemoveLeaseOwner(string) error + // GetLeaseOwner to get current owner of lease for shard + GetLeaseOwner(string) (string, error) + // ListActiveWorkers returns active workers and their shards (New Lease Stealing Methods) ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index b8f12d8..ee14fee 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -51,6 +51,10 @@ const ( NumMaxRetries = 10 ) +var ( + NoLeaseOwnerErr = errors.New("no LeaseOwner in checkpoints table") +) + // DynamoCheckpoint implements the Checkpoint interface using DynamoDB as a backend type DynamoCheckpoint struct { log logger.Logger @@ -336,6 +340,23 @@ func (checkpointer *DynamoCheckpoint) RemoveLeaseOwner(shardID string) error { return err } +// GetLeaseOwner returns current lease owner of given shard in checkpoints table +func (checkpointer *DynamoCheckpoint) GetLeaseOwner(shardID string) (string, error) { + currentCheckpoint, err := checkpointer.getItem(shardID) + if err != nil { + return "", err + } + + assignedVar, assignedToOk := currentCheckpoint[LeaseOwnerKey] + + if !assignedToOk { + return "", NoLeaseOwnerErr + } + + return assignedVar.(*types.AttributeValueMemberS).Value, nil + +} + // ListActiveWorkers returns a map of workers and their shards func (checkpointer *DynamoCheckpoint) ListActiveWorkers(shardStatus map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) { err := checkpointer.syncLeases(shardStatus) diff --git a/clientlibrary/interfaces/record-processor.go b/clientlibrary/interfaces/record-processor.go index 1c41d56..a4897d4 100644 --- a/clientlibrary/interfaces/record-processor.go +++ b/clientlibrary/interfaces/record-processor.go @@ -59,7 +59,7 @@ type ( * @param processRecordsInput Provides the records to be processed as well as information and capabilities related * to them (eg checkpointing). */ - ProcessRecords(processRecordsInput *ProcessRecordsInput) + ProcessRecords(processRecordsInput *ProcessRecordsInput) error // Shutdown /* diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index 68ec1b3..36ddb77 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -136,7 +136,7 @@ func (sc *commonShardConsumer) waitOnParentShard() error { } } -func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) { +func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) error { log := sc.kclConfig.Logger getRecordsTime := time.Since(getRecordsStartTime).Milliseconds() @@ -172,7 +172,10 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec // Delivery the events to the record processor input.CacheEntryTime = &getRecordsStartTime input.CacheExitTime = &processRecordsStartTime - sc.recordProcessor.ProcessRecords(input) + err := sc.recordProcessor.ProcessRecords(input) + if err != nil { + return err + } processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds() sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming)) @@ -181,4 +184,5 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec sc.mService.IncrRecordsProcessed(sc.shard.ID, recordLength) sc.mService.IncrBytesProcessed(sc.shard.ID, recordBytes) sc.mService.MillisBehindLatest(sc.shard.ID, float64(*millisBehindLatest)) + return nil } diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index e0998ec..7211842 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -214,7 +214,10 @@ func (sc *PollingShardConsumer) getRecords() error { // reset the retry count after success retriedErrors = 0 - sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) + err = sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) + if err != nil { + return err + } // The shard has been closed, so no new records can be read from it if getResp.NextShardIterator == nil { diff --git a/clientlibrary/worker/record-processor-checkpointer.go b/clientlibrary/worker/record-processor-checkpointer.go index 5544a86..101137f 100644 --- a/clientlibrary/worker/record-processor-checkpointer.go +++ b/clientlibrary/worker/record-processor-checkpointer.go @@ -21,11 +21,17 @@ package worker import ( + "errors" "github.com/aws/aws-sdk-go-v2/aws" - chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint" kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition" + "time" +) + +var ( + ShutdownError = errors.New("another instance may have started processing some of these records already") + LeaseExpiredError = errors.New("the lease has on the shard has expired") ) type ( @@ -69,6 +75,17 @@ func (pc *PreparedCheckpointer) Checkpoint() error { } func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error { + // return shutdown error if lease is expired or another worker has started processing records for this shard + currLeaseOwner, err := rc.checkpoint.GetLeaseOwner(rc.shard.ID) + if err != nil { + return err + } + if rc.shard.AssignedTo != currLeaseOwner { + return ShutdownError + } + if time.Now().After(rc.shard.LeaseTimeout) { + return LeaseExpiredError + } // checkpoint the last sequence of a closed shard if sequenceNumber == nil { rc.shard.SetCheckpoint(chk.ShardEnd) From 4aebaf1ae019fc0a5ac8cdeb9a2d9641130fba0d Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Mon, 3 Apr 2023 14:58:04 -0700 Subject: [PATCH 19/22] feat: make lease renewal async Signed-off-by: Shiva Pentakota --- clientlibrary/config/config.go | 6 +++ clientlibrary/config/kcl-config.go | 7 +++ clientlibrary/interfaces/record-processor.go | 2 +- clientlibrary/worker/common-shard-consumer.go | 9 +--- .../worker/polling-shard-consumer.go | 51 +++++++++++-------- .../worker/record-processor-checkpointer.go | 18 ------- 6 files changed, 45 insertions(+), 48 deletions(-) diff --git a/clientlibrary/config/config.go b/clientlibrary/config/config.go index 2d50ca8..57fd5a7 100644 --- a/clientlibrary/config/config.go +++ b/clientlibrary/config/config.go @@ -69,6 +69,9 @@ const ( // DefaultLeaseRefreshPeriodMillis Period before the end of lease during which a lease is refreshed by the owner. DefaultLeaseRefreshPeriodMillis = 5000 + // DefaultLeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt + DefaultLeaseRefreshWaitTime = 2500 + // DefaultMaxRecords Max records to fetch from Kinesis in a single GetRecords call. DefaultMaxRecords = 10000 @@ -216,6 +219,9 @@ type ( // LeaseRefreshPeriodMillis is the period before the end of lease during which a lease is refreshed by the owner. LeaseRefreshPeriodMillis int + // LeaseRefreshWaitTime is the period of time to wait before async lease renewal attempt + LeaseRefreshWaitTime int + // MaxRecords Max records to read per Kinesis getRecords() call MaxRecords int diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 4d7181b..2ac2fa5 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -102,6 +102,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio LeaseStealingIntervalMillis: DefaultLeaseStealingIntervalMillis, LeaseStealingClaimTimeoutMillis: DefaultLeaseStealingClaimTimeoutMillis, LeaseSyncingTimeIntervalMillis: DefaultLeaseSyncingIntervalMillis, + LeaseRefreshWaitTime: DefaultLeaseRefreshWaitTime, MaxRetryCount: DefaultMaxRetryCount, Logger: logger.GetDefaultLogger(), } @@ -149,6 +150,12 @@ func (c *KinesisClientLibConfiguration) WithLeaseRefreshPeriodMillis(leaseRefres return c } +func (c *KinesisClientLibConfiguration) WithLeaseRefreshWaitTime(leaseRefreshWaitTime int) *KinesisClientLibConfiguration { + checkIsValuePositive("LeaseRefreshWaitTime", leaseRefreshWaitTime) + c.LeaseRefreshWaitTime = leaseRefreshWaitTime + return c +} + func (c *KinesisClientLibConfiguration) WithShardSyncIntervalMillis(shardSyncIntervalMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis) c.ShardSyncIntervalMillis = shardSyncIntervalMillis diff --git a/clientlibrary/interfaces/record-processor.go b/clientlibrary/interfaces/record-processor.go index a4897d4..1c41d56 100644 --- a/clientlibrary/interfaces/record-processor.go +++ b/clientlibrary/interfaces/record-processor.go @@ -59,7 +59,7 @@ type ( * @param processRecordsInput Provides the records to be processed as well as information and capabilities related * to them (eg checkpointing). */ - ProcessRecords(processRecordsInput *ProcessRecordsInput) error + ProcessRecords(processRecordsInput *ProcessRecordsInput) // Shutdown /* diff --git a/clientlibrary/worker/common-shard-consumer.go b/clientlibrary/worker/common-shard-consumer.go index 36ddb77..e2b24f7 100644 --- a/clientlibrary/worker/common-shard-consumer.go +++ b/clientlibrary/worker/common-shard-consumer.go @@ -136,7 +136,7 @@ func (sc *commonShardConsumer) waitOnParentShard() error { } } -func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) error { +func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, records []types.Record, millisBehindLatest *int64, recordCheckpointer kcl.IRecordProcessorCheckpointer) { log := sc.kclConfig.Logger getRecordsTime := time.Since(getRecordsStartTime).Milliseconds() @@ -172,11 +172,7 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec // Delivery the events to the record processor input.CacheEntryTime = &getRecordsStartTime input.CacheExitTime = &processRecordsStartTime - err := sc.recordProcessor.ProcessRecords(input) - if err != nil { - return err - } - + sc.recordProcessor.ProcessRecords(input) processedRecordsTiming := time.Since(processRecordsStartTime).Milliseconds() sc.mService.RecordProcessRecordsTime(sc.shard.ID, float64(processedRecordsTiming)) } @@ -184,5 +180,4 @@ func (sc *commonShardConsumer) processRecords(getRecordsStartTime time.Time, rec sc.mService.IncrRecordsProcessed(sc.shard.ID, recordLength) sc.mService.IncrBytesProcessed(sc.shard.ID, recordBytes) sc.mService.MillisBehindLatest(sc.shard.ID, float64(*millisBehindLatest)) - return nil } diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 7211842..0abc5fd 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -32,6 +32,7 @@ package worker import ( "context" "errors" + log "github.com/sirupsen/logrus" "math" "time" @@ -132,25 +133,12 @@ func (sc *PollingShardConsumer) getRecords() error { sc.callsLeft = kinesisReadTPSLimit sc.bytesRead = 0 sc.remBytes = MaxBytes - + // starting async lease renewal thread + leaseRenewalErrChan := make(chan error, 1) + go func() { + leaseRenewalErrChan <- sc.renewLease() + }() for { - if time.Now().UTC().After(sc.shard.GetLeaseTimeout().Add(-time.Duration(sc.kclConfig.LeaseRefreshPeriodMillis) * time.Millisecond)) { - log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) - err = sc.checkpointer.GetLease(sc.shard, sc.consumerID) - if err != nil { - if errors.As(err, &chk.ErrLeaseNotAcquired{}) { - log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) - return nil - } - // log and return error - log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v", - sc.shard.ID, sc.consumerID, err) - return err - } - // log metric for renewed lease for worker - sc.mService.LeaseRenewed(sc.shard.ID) - } - getRecordsStartTime := time.Now() log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) @@ -214,10 +202,7 @@ func (sc *PollingShardConsumer) getRecords() error { // reset the retry count after success retriedErrors = 0 - err = sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) - if err != nil { - return err - } + sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) // The shard has been closed, so no new records can be read from it if getResp.NextShardIterator == nil { @@ -240,6 +225,8 @@ func (sc *PollingShardConsumer) getRecords() error { shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.REQUESTED, Checkpointer: recordCheckpointer} sc.recordProcessor.Shutdown(shutdownInput) return nil + case leaseRenewalErr := <-leaseRenewalErrChan: + return leaseRenewalErr default: } } @@ -312,3 +299,23 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) return getResp, 0, err } + +func (sc *PollingShardConsumer) renewLease() error { + for { + time.Sleep(time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond) + log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) + err := sc.checkpointer.GetLease(sc.shard, sc.consumerID) + if err != nil { + if errors.As(err, &chk.ErrLeaseNotAcquired{}) { + log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) + return nil + } + // log and return error + log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v", + sc.shard.ID, sc.consumerID, err) + return err + } + // log metric for renewed lease for worker + sc.mService.LeaseRenewed(sc.shard.ID) + } +} diff --git a/clientlibrary/worker/record-processor-checkpointer.go b/clientlibrary/worker/record-processor-checkpointer.go index 101137f..8f71f14 100644 --- a/clientlibrary/worker/record-processor-checkpointer.go +++ b/clientlibrary/worker/record-processor-checkpointer.go @@ -21,17 +21,10 @@ package worker import ( - "errors" "github.com/aws/aws-sdk-go-v2/aws" chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint" kcl "github.com/vmware/vmware-go-kcl-v2/clientlibrary/interfaces" par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition" - "time" -) - -var ( - ShutdownError = errors.New("another instance may have started processing some of these records already") - LeaseExpiredError = errors.New("the lease has on the shard has expired") ) type ( @@ -75,17 +68,6 @@ func (pc *PreparedCheckpointer) Checkpoint() error { } func (rc *RecordProcessorCheckpointer) Checkpoint(sequenceNumber *string) error { - // return shutdown error if lease is expired or another worker has started processing records for this shard - currLeaseOwner, err := rc.checkpoint.GetLeaseOwner(rc.shard.ID) - if err != nil { - return err - } - if rc.shard.AssignedTo != currLeaseOwner { - return ShutdownError - } - if time.Now().After(rc.shard.LeaseTimeout) { - return LeaseExpiredError - } // checkpoint the last sequence of a closed shard if sequenceNumber == nil { rc.shard.SetCheckpoint(chk.ShardEnd) From 86d70940e66bf04746dd875fe5288a78a5b8c399 Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Tue, 4 Apr 2023 11:17:22 -0700 Subject: [PATCH 20/22] fix: return err log in case of ErrLeaseNotAcquired Signed-off-by: Shiva Pentakota --- clientlibrary/worker/polling-shard-consumer.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 0abc5fd..d8c81c1 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -306,10 +306,6 @@ func (sc *PollingShardConsumer) renewLease() error { log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) err := sc.checkpointer.GetLease(sc.shard, sc.consumerID) if err != nil { - if errors.As(err, &chk.ErrLeaseNotAcquired{}) { - log.Warnf("Failed in acquiring lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) - return nil - } // log and return error log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v", sc.shard.ID, sc.consumerID, err) From c1f6b270ab88ef7c964aaf70191c0e6550aef77e Mon Sep 17 00:00:00 2001 From: John Calixto Date: Tue, 4 Apr 2023 14:13:54 -0700 Subject: [PATCH 21/22] chore: Remove extraneous err check After checking the scan result above this line, checking err here no longer has any effect. Signed-off-by: John Calixto --- clientlibrary/checkpoint/dynamodb-checkpointer.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index ee14fee..bd9be15 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -483,10 +483,6 @@ func (checkpointer *DynamoCheckpoint) syncLeases(shardStatus map[string]*par.Sha } } - if err != nil { - log.Debugf("Error performing SyncLeases. Error: %+v ", err) - return err - } log.Debugf("Lease sync completed. Next lease sync will occur in %s", time.Duration(checkpointer.kclConfig.LeaseSyncingTimeIntervalMillis)*time.Millisecond) return nil } From 4482696d955652a955957231d3cb42f162daeefa Mon Sep 17 00:00:00 2001 From: Shiva Pentakota Date: Thu, 6 Apr 2023 17:41:46 -0700 Subject: [PATCH 22/22] fix: pass in ctx with cancel for renewLease Signed-off-by: Shiva Pentakota --- .../worker/polling-shard-consumer.go | 43 +++++++++++++------ 1 file changed, 30 insertions(+), 13 deletions(-) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index d8c81c1..3829850 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -99,7 +99,12 @@ func (sc *PollingShardConsumer) getShardIterator() (*string, error) { // getRecords continuously poll one shard for data record // Precondition: it currently has the lease on the shard. func (sc *PollingShardConsumer) getRecords() error { - defer sc.releaseLease(sc.shard.ID) + ctx, cancelFunc := context.WithCancel(context.Background()) + defer func() { + // cancel renewLease() + cancelFunc() + sc.releaseLease(sc.shard.ID) + }() log := sc.kclConfig.Logger @@ -133,10 +138,11 @@ func (sc *PollingShardConsumer) getRecords() error { sc.callsLeft = kinesisReadTPSLimit sc.bytesRead = 0 sc.remBytes = MaxBytes + // starting async lease renewal thread leaseRenewalErrChan := make(chan error, 1) go func() { - leaseRenewalErrChan <- sc.renewLease() + leaseRenewalErrChan <- sc.renewLease(ctx) }() for { getRecordsStartTime := time.Now() @@ -300,18 +306,29 @@ func (sc *PollingShardConsumer) callGetRecordsAPI(gri *kinesis.GetRecordsInput) return getResp, 0, err } -func (sc *PollingShardConsumer) renewLease() error { +func (sc *PollingShardConsumer) renewLease(ctx context.Context) error { + renewDuration := time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond for { - time.Sleep(time.Duration(sc.kclConfig.LeaseRefreshWaitTime) * time.Millisecond) - log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) - err := sc.checkpointer.GetLease(sc.shard, sc.consumerID) - if err != nil { - // log and return error - log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v", - sc.shard.ID, sc.consumerID, err) - return err + timer := time.NewTimer(renewDuration) + select { + case <-timer.C: + log.Debugf("Refreshing lease on shard: %s for worker: %s", sc.shard.ID, sc.consumerID) + err := sc.checkpointer.GetLease(sc.shard, sc.consumerID) + if err != nil { + // log and return error + log.Errorf("Error in refreshing lease on shard: %s for worker: %s. Error: %+v", + sc.shard.ID, sc.consumerID, err) + return err + } + // log metric for renewed lease for worker + sc.mService.LeaseRenewed(sc.shard.ID) + case <-ctx.Done(): + // clean up timer resources + if !timer.Stop() { + <-timer.C + } + log.Debugf("renewLease was canceled") + return nil } - // log metric for renewed lease for worker - sc.mService.LeaseRenewed(sc.shard.ID) } }