* Use ApplicationName as default for EnhancedFanOutConsumerName
Signed-off-by: Ilia Cimpoes <ilia.cimpoes@ellation.com>
* Add tests
Signed-off-by: Ilia Cimpoes <ilia.cimpoes@ellation.com>
* Implement enhanced fan-out consumer
Signed-off-by: Ilia Cimpoes <ilia.cimpoes@ellation.com>
* Add test cases
Signed-off-by: Ilia Cimpoes <ilia.cimpoes@ellation.com>
* Small adjustments in fan-out consumer
Signed-off-by: Ilia Cimpoes <ilia.cimpoes@ellation.com>
Add support for Kinesis aggregation format to consume record
published by KPL.
Note: current implementation need to checkpoint the whole batch
of the de-aggregated records instead of just portion of them.
Add cache entry and exit time.
Signed-off-by: Tao Jiang <taoj@vmware.com>
Adding min/max retry and throttle delay for the retryer.
Also, increase the max retries to 10 which is inline with
dynamodb default retry count.
Signed-off-by: Tao Jiang <taoj@vmware.com>
Update aws go sdk to the latest. Also, update
integration tests by publishing data using both
PutRecord and PutRecords.
Signed-off-by: Tao Jiang <taoj@vmware.com>
The FieldLogger interface is satisfied by either *Logger or *Entry.
Accepting this interface in place of the concrete *Logger type allows
users to inject a logger with some fields already set. For example, the
application developer might want all logging from the library to have a
`subsystem=kcl` field.
Signed-off-by: Mike Pye <mail@mdpye.co.uk>
* Refactor
* Use `nextToken` paramter as string.
Use `nextToken` paramter as string instead of pointer to match the original code base.
* Log the last shard token when failing.
* Use aws.StringValue to get the string pointer value.
Co-authored-by: Wesam Gerges <wesam.gerges.discovery@gmail.com>
ull-request #62 wrongly introduced an increased delay on
shutdown.
Before #62 the `stop` channel could be triggered while waiting for
`syncShard` milliseconds, so the function could return as soon as
`stop` was received.
However #62 changed this behavior by sleeping in the default case:
`stop` couldn't be handled right away anymore. Instead it was
handled after a whole new loop, potentially delaying shutdown by
minutes. (up to synchard * 1.5 ms).
This commit fixes that.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Add a random number generator to Worker
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Add random jitter to the worker shard sync sleep
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Add random jitter in case syncShard fails
Fixes#61
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
Previously, a WaitGroup was used to track executing ShardConsumers
and prevent Worker.Shutdown() from returning until all ShardConsumers
had completed. Unfortunately, it was possible for Shutdown() to race
with the eventLoop(), leading to a situation where Worker.Shutdown()
returns while a ShardConsumer is still executing.
Now, we increment the WaitGroup to keep track the eventLoop() as well
as the ShardConsumers. This prevents shutdown from returning until all
background go-routines have completed.
Signed-off-by: Daniel Ferstay <dferstay@splunk.com>
* Add LeaseRefreshSpanMillis in configuration
For certain use cases of KCL the hard-coded value of 5s value,
representing the time span before the end of a lease timeout in
which the current owner gets to renew its own lease, is not
sufficient. When the time taken by ProcessRecords is higher
than 5s, the lease gets lost and the shard may end up to another
worker.
This commit adds a new configuration value, that defaults to 5s,
to let the user set this value to its own needs.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Slight code simplification
Or readability improvement
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
Reported at:
https://github.com/vmware/vmware-go-kcl/issues/54
The input params are not used to set monitor service in cloudwatch
Init function. The empty appName, streamName and workerID cause
PutMetricData failed with error string "Error in publishing
cloudwatch metrics. Error: InvalidParameter...".
Signed-off-by: Tao Jiang <taoj@vmware.com>
The time sent to the `metrics.MonitoringService.RecordGetRecordsTime`'
was not the time taken by GetRecords, it was the time taken by
`GetRecords` and `ProcessRecords` additioned together.
Fixes#51
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
Fix some broken unit and integ tests introduced by last commit.
Tests:
1. hmake test
2. Run integration test on Goland IDE and make sure all pass.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Remove MonitoringConfiguration and export no-op service
MonitoringConfiguration is not needed anymore as the user directly
implements its monitoring service or use one the default constructors.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Provide a constructor for CloudWatchMonitoringService
Unexport all fields
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Provide a constructor to PrometheusMonitoringService
Unexport fields
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Remove all CloudWatch specific-stuff from config package
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* NewWorker accepts a metrics.MonitoringService
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Fix tests
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Add WithMonitoringService to config
Instead of having an additional parameter to NewWorker so that the
user can provide its own MonitoringService, WithMonitoringService
is added to the configuration. This is much cleaner and remains
in-line with the rest of the current API.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Fix tests after introduction of WithMonitoringService
Also, fix tests that should have been fixed in earlier commits.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Move Prometheus into its own package
Also rename it to prometheus.MonitoringService to not have to repeat
Prometheus twice when using.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Move CloudWatch metrics into its own package
Also rename it to cloudwatch.MonitoringService to not have to repeat
Cloudwatch twice when using.
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
* Remove references to Cloudwatch in comments
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
The https://github.com/vmware/vmware-go-kcl/pull/47 move zap
into its own packge but it also breaks the integration test.
This change is to fix integ test by correcting its package
reference.
Signed-off-by: Tao Jiang <taoj@vmware.com>
Since #27 vmware-go-kcl has support the any logger interface,
which is very nice.
However due to the fact that `logger/zap.go` directly imports zap.
zap became a dependency of whoever uses `vmware-go-kcl.` The
problem is that zap also has many dependencies.
In order to avoid KCL users to pay a cost for a feature they don't
need, the zap stuff has been moved to a `logger/zap` sub-package.
Fixes#45
Signed-off-by: Aurélien Rainone <aurelien.rainone@gmail.com>
AT_TIMESTAMP start from the record at or after the specified
server-side Timestamp. However, the implementation was
missing. The bug was not notices until recently because most
of users never use this feature.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Add generic logger support
The current KCL has tight coupling with logrus and it causes
issue for customer to use different logging system such as zap log.
The issue has been opened via:
https://github.com/vmware/vmware-go-kcl/issues/27
This change is to created a logger interface be able to abstract
above logrus and zap log. It makes easy to add support for other
logging system in the fugure. The work is based on:
https://www.mountedthoughts.com/golang-logger-interface/
Some updates are made in order to make logging system easily
injectable and add more unit tests.
Tested against real kinesis and dyamodb as well.
Signed-off-by: Tao Jiang <taoj@vmware.com>
* Add lumberjack configuration options to have fine grained control
Update the file log configuratio by adding most of luberjack
configuration to avoid hardcode default value. Let user to specify
the value because log retention and rotation are very important
for prod environment.
Signed-off-by: Tao Jiang <taoj@vmware.com>