Previously, a WaitGroup was used to track executing ShardConsumers
and prevent Worker.Shutdown() from returning until all ShardConsumers
had completed. Unfortunately, it was possible for Shutdown() to race
with the eventLoop(), leading to a situation where Worker.Shutdown()
returns while a ShardConsumer is still executing.
Now, we increment the WaitGroup to keep track the eventLoop() as well
as the ShardConsumers. This prevents shutdown from returning until all
background go-routines have completed.
Signed-off-by: Daniel Ferstay <dferstay@splunk.com>
* Add LeaseRefreshSpanMillis in configuration
For certain use cases of KCL the hard-coded value of 5s value,
representing the time span before the end of a lease timeout in
which the current owner gets to renew its own lease, is not
sufficient. When the time taken by ProcessRecords is higher
than 5s, the lease gets lost and the shard may end up to another
worker.
This commit adds a new configuration value, that defaults to 5s,
to let the user set this value to its own needs.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Slight code simplification
Or readability improvement
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
Reported at:
https://github.com/vmware/vmware-go-kcl/issues/54
The input params are not used to set monitor service in cloudwatch
Init function. The empty appName, streamName and workerID cause
PutMetricData failed with error string "Error in publishing
cloudwatch metrics. Error: InvalidParameter...".
Signed-off-by: Tao Jiang <taoj@vmware.com>
The time sent to the `metrics.MonitoringService.RecordGetRecordsTime`'
was not the time taken by GetRecords, it was the time taken by
`GetRecords` and `ProcessRecords` additioned together.
Fixes#51
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
Fix some broken unit and integ tests introduced by last commit.
Tests:
1. hmake test
2. Run integration test on Goland IDE and make sure all pass.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Remove MonitoringConfiguration and export no-op service
MonitoringConfiguration is not needed anymore as the user directly
implements its monitoring service or use one the default constructors.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Provide a constructor for CloudWatchMonitoringService
Unexport all fields
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Provide a constructor to PrometheusMonitoringService
Unexport fields
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Remove all CloudWatch specific-stuff from config package
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* NewWorker accepts a metrics.MonitoringService
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Fix tests
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Add WithMonitoringService to config
Instead of having an additional parameter to NewWorker so that the
user can provide its own MonitoringService, WithMonitoringService
is added to the configuration. This is much cleaner and remains
in-line with the rest of the current API.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Fix tests after introduction of WithMonitoringService
Also, fix tests that should have been fixed in earlier commits.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Move Prometheus into its own package
Also rename it to prometheus.MonitoringService to not have to repeat
Prometheus twice when using.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Move CloudWatch metrics into its own package
Also rename it to cloudwatch.MonitoringService to not have to repeat
Cloudwatch twice when using.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Remove references to Cloudwatch in comments
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
AT_TIMESTAMP start from the record at or after the specified
server-side Timestamp. However, the implementation was
missing. The bug was not notices until recently because most
of users never use this feature.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Add generic logger support
The current KCL has tight coupling with logrus and it causes
issue for customer to use different logging system such as zap log.
The issue has been opened via:
https://github.com/vmware/vmware-go-kcl/issues/27
This change is to created a logger interface be able to abstract
above logrus and zap log. It makes easy to add support for other
logging system in the fugure. The work is based on:
https://www.mountedthoughts.com/golang-logger-interface/
Some updates are made in order to make logging system easily
injectable and add more unit tests.
Tested against real kinesis and dyamodb as well.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Add lumberjack configuration options to have fine grained control
Update the file log configuratio by adding most of luberjack
configuration to avoid hardcode default value. Let user to specify
the value because log retention and rotation are very important
for prod environment.
Signed-off-by: Tao Jiang <taoj@vmware.com>
A waitgroup should always be incremented before the creation of the
goroutine which decrements it (through Done) or there is the
potential for deadlock.
That was not the case since the wg.Add was performed after the
`go getRecords() ` line.
Also, since there's only one path leading to the wg.Done in getRecords,
I moved wg.Done out of the getRecords function and placed it
alongside the goroutine creation, thus totally removing the need to
pass the waitgroup pointer to the sc instance, this lead to the
removal of the `waitGroup` field from the `ShardConsumer` struct.
This has been tested in production and didn't create any problem.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Release shard lease after shutdown
Currently, only local cached shard info has been removed when worker losts the
lease. The info inside checkpointer (dynamoDB) is not removed. This causes
lease has been hold until the lease expiration and it might take too long
for shard is ready for other worker to grab. This change release the lease
in checkpointer immediately.
The user need to ensure appropriate checkpointing before return from
Shutdown callback.
Test:
updated unit test and integration test to ensure only the shard owner
has been wiped out and leave the checkpoint information intact.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Add code coverage reporting
Add code coverage reporting for unit test.
Signed-off-by: Tao Jiang <taoj@vmware.com>
Currently, only local cached shard info has been removed when worker losts the
lease. The info inside checkpointer (dynamoDB) is not removed. This causes
lease has been hold until the lease expiration and it might take too long
for shard is ready for other worker to grab. This change release the lease
in checkpointer immediately.
The user need to ensure appropriate checkpointing before return from
Shutdown callback.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Update worker to let it inject checkpointer and kinesis
Add two functions to inject checkpointer and kinesis for custom
implementation or adding mock for unit test.
This change also remove the worker_custom.go since it is no longer
needed.
Test:
Update the integration tests to cover newly added functions.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Fix typo on the test function
Signed-off-by: Tao Jiang <taoj@vmware.com>
Update the unit test and move integration test under test folder.
Update retry logic by switching to AWS's default retry.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Add credential configuration for resources
Add credentials for Kinesis, DynamoDB and Cloudwatch. See the worker_test.go
to see how to use it.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Add support for providing custom checkpointer
Provide a new constructor for adding checkpointer instead of alway using
default dynamodb checkpointer.
The next step is to abstract out the Kinesis into a generic stream API and
this will be bigger change and will be addressed in different PR.
Test:
Use the new construtor to inject dynamodb checkpointer and run the existing
tests.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Add support for providing custom checkpointer
Provide a new constructor for adding checkpointer instead of alway using
default dynamodb checkpointer.
The next step is to abstract out the Kinesis into a generic stream API and
this will be bigger change and will be addressed in different PR.
Fix checkfmt error.
Test:
Use the new construtor to inject dynamodb checkpointer and run the existing
tests.
Signed-off-by: Tao Jiang <taoj@vmware.com>
Current, KCL doesn't release shard when returning on error
which causes the worker cannot get any shard because it has
the maximum number of shard already. This change makes sure
releasing shard when return.
update the log message.
Test:
Integration test by forcing error on reading shard to
simulate Kinesis Internal error and make sure the KCL
will not stop processing.
Jira CNA-1995
Change-Id: Iac91579634a5023ab5ed73c6af89e4ff1a9af564
After a few days of shard splitting, the parent shard will be
deleted by Kinesis system. KCL should ignore the error caused
by deleted parent shared and move on.
Test:
Manuall split shard on kcl-test stream in photon-infra account
Currently, shard3 is the parent shard of shard 4 and 5. Shard 3
has a parent shard 0 which has been deleted already. Verified
the test can run and not stuck in waiting for parent shard.
Jira CNA-2089
Change-Id: I15ed0db70ff9836313c22ccabf934a2a69379248
gas is now gosec. Need to update security scan and fix
security issue as needed.
No functional change.
Jira CNA-2022
Change-Id: I36f2a204114f3f13e2ed05579c04a9c89f528f9a
All source should be prepared in a manner that reflects
comments that VMware would be comfortable sharing with
the public.
Documentation only. No functional change.
Update the license to MIT to be consistent with approved
OSSTP product tracking ticket:
https://osstp.vmware.com/oss/#/upstreamcontrib/project/1101391
Jira CNA-1117
Change-Id: I3fe31f10db954887481e3b21ccd20ec8e39c5996
The processing Kinesis gets stuck after splitting shard. The
reason is that the app doesn't do mandatory checkpoint.
KCL document states:
// When the value of {@link ShutdownInput#getShutdownReason()} is
// {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason#TERMINATE} it is required that you
// checkpoint. Failure to do so will result in an IllegalArgumentException, and the KCL no longer making progress.
Also, fix shard lease to prevent one host takes more shard than
its configuration allowed.
Jira CNA-1701
Change-Id: Icbdacaf347c7a67b5793647ad05ff93cca629741
There might be verious reason for shard iterator to
expire, such as: not enough data in shard or process
even takes more than 5 minutes which cause shard
iterator not refreshing enough.
This change removes log.Fatal which causes panic.
Panic inside go routine will bring down the whole
app. Therefore, just log error and exit the go routine
instead.
Jira ID: CNA-1072
Change-Id: I34a8d9af7258f3ea75465e2245bbc25c2fafee35
cascade-kinesis-client will be used as a submodule of other projects,
so it should not have "src/vmware.com/cascade-kinesis-client" in
its path. To build this project locally, please manually create
the parent folders.
Change-Id: I8844e6a0e32aae65b28496915d8507e9fb1058c6