2018-08-17 13:03:25 +00:00
/ *
* Copyright ( c ) 2018 VMware , Inc .
*
* Permission is hereby granted , free of charge , to any person obtaining a copy of this software and
* associated documentation files ( the "Software" ) , to deal in the Software without restriction , including
* without limitation the rights to use , copy , modify , merge , publish , distribute , sublicense , and / or sell
* copies of the Software , and to permit persons to whom the Software is furnished to do
* so , subject to the following conditions :
*
* The above copyright notice and this permission notice shall be included in all copies or substantial
* portions of the Software .
*
* THE SOFTWARE IS PROVIDED "AS IS" , WITHOUT WARRANTY OF ANY KIND , EXPRESS OR IMPLIED , INCLUDING BUT
* NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY , FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT .
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM , DAMAGES OR OTHER LIABILITY ,
* WHETHER IN AN ACTION OF CONTRACT , TORT OR OTHERWISE , ARISING FROM , OUT OF OR IN CONNECTION WITH THE
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE .
* /
2021-11-08 14:00:48 +00:00
// Package config
2018-08-17 13:03:25 +00:00
// The implementation is derived from https://github.com/awslabs/amazon-kinesis-client
/ *
* Copyright 2014 - 2015 Amazon . com , Inc . or its affiliates . All Rights Reserved .
*
* Licensed under the Amazon Software License ( the "License" ) .
* You may not use this file except in compliance with the License .
* A copy of the License is located at
*
* http : //aws.amazon.com/asl/
*
* or in the "license" file accompanying this file . This file is distributed
* on an "AS IS" BASIS , WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either
* express or implied . See the License for the specific language governing
* permissions and limitations under the License .
* /
2021-12-21 19:49:47 +00:00
2018-04-11 03:50:18 +00:00
package config
import (
2019-10-28 12:08:18 +00:00
"log"
2018-04-11 03:50:18 +00:00
"time"
2019-02-09 16:23:54 +00:00
2022-01-07 02:13:32 +00:00
"github.com/aws/aws-sdk-go-v2/aws"
2021-04-27 15:51:26 +00:00
2021-12-21 19:49:47 +00:00
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/utils"
"github.com/vmware/vmware-go-kcl-v2/logger"
2018-04-11 03:50:18 +00:00
)
2019-11-06 13:53:21 +00:00
// NewKinesisClientLibConfig creates a default KinesisClientLibConfiguration based on the required fields.
2018-04-17 16:25:41 +00:00
func NewKinesisClientLibConfig ( applicationName , streamName , regionName , workerID string ) * KinesisClientLibConfiguration {
2019-03-16 13:11:09 +00:00
return NewKinesisClientLibConfigWithCredentials ( applicationName , streamName , regionName , workerID ,
2019-11-06 13:53:21 +00:00
nil , nil )
2019-03-16 13:11:09 +00:00
}
2019-11-06 13:53:21 +00:00
// NewKinesisClientLibConfigWithCredential creates a default KinesisClientLibConfiguration based on the required fields and unique credentials.
2019-03-16 13:11:09 +00:00
func NewKinesisClientLibConfigWithCredential ( applicationName , streamName , regionName , workerID string ,
2022-01-07 02:13:32 +00:00
creds aws . CredentialsProvider ) * KinesisClientLibConfiguration {
2019-11-06 13:53:21 +00:00
return NewKinesisClientLibConfigWithCredentials ( applicationName , streamName , regionName , workerID , creds , creds )
2019-03-16 13:11:09 +00:00
}
2019-11-06 13:53:21 +00:00
// NewKinesisClientLibConfigWithCredentials creates a default KinesisClientLibConfiguration based on the required fields and specific credentials for each service.
2019-03-16 13:11:09 +00:00
func NewKinesisClientLibConfigWithCredentials ( applicationName , streamName , regionName , workerID string ,
2022-01-07 02:13:32 +00:00
kinesisCreds , dynamodbCreds aws . CredentialsProvider ) * KinesisClientLibConfiguration {
2018-04-13 04:02:30 +00:00
checkIsValueNotEmpty ( "ApplicationName" , applicationName )
checkIsValueNotEmpty ( "StreamName" , streamName )
2018-04-17 16:25:41 +00:00
checkIsValueNotEmpty ( "RegionName" , regionName )
2018-04-11 03:50:18 +00:00
if empty ( workerID ) {
workerID = utils . MustNewUUID ( )
}
// populate the KCL configuration with default values
2021-11-11 22:11:30 +00:00
return & KinesisClientLibConfiguration {
ApplicationName : applicationName ,
KinesisCredentials : kinesisCreds ,
DynamoDBCredentials : dynamodbCreds ,
TableName : applicationName ,
EnhancedFanOutConsumerName : applicationName ,
StreamName : streamName ,
RegionName : regionName ,
WorkerID : workerID ,
InitialPositionInStream : DefaultInitialPositionInStream ,
InitialPositionInStreamExtended : * newInitialPosition ( DefaultInitialPositionInStream ) ,
FailoverTimeMillis : DefaultFailoverTimeMillis ,
LeaseRefreshPeriodMillis : DefaultLeaseRefreshPeriodMillis ,
MaxRecords : DefaultMaxRecords ,
IdleTimeBetweenReadsInMillis : DefaultIdleTimeBetweenReadsMillis ,
CallProcessRecordsEvenForEmptyRecordList : DefaultDontCallProcessRecordsForEmptyRecordList ,
ParentShardPollIntervalMillis : DefaultParentShardPollIntervalMillis ,
ShardSyncIntervalMillis : DefaultShardSyncIntervalMillis ,
CleanupTerminatedShardsBeforeExpiry : DefaultCleanupLeasesUponShardsCompletion ,
TaskBackoffTimeMillis : DefaultTaskBackoffTimeMillis ,
ValidateSequenceNumberBeforeCheckpointing : DefaultValidateSequenceNumberBeforeCheckpointing ,
ShutdownGraceMillis : DefaultShutdownGraceMillis ,
MaxLeasesForWorker : DefaultMaxLeasesForWorker ,
MaxLeasesToStealAtOneTime : DefaultMaxLeasesToStealAtOneTime ,
InitialLeaseTableReadCapacity : DefaultInitialLeaseTableReadCapacity ,
2020-12-23 19:22:01 +00:00
InitialLeaseTableWriteCapacity : DefaultInitialLeaseTableWriteCapacity ,
SkipShardSyncAtWorkerInitializationIfLeasesExist : DefaultSkipShardSyncAtStartupIfLeasesExist ,
2021-06-01 23:18:26 +00:00
EnableLeaseStealing : DefaultEnableLeaseStealing ,
LeaseStealingIntervalMillis : DefaultLeaseStealingIntervalMillis ,
LeaseStealingClaimTimeoutMillis : DefaultLeaseStealingClaimTimeoutMillis ,
LeaseSyncingTimeIntervalMillis : DefaultLeaseSyncingIntervalMillis ,
Logger : logger . GetDefaultLogger ( ) ,
2018-04-11 03:50:18 +00:00
}
}
2019-02-09 16:23:54 +00:00
// WithKinesisEndpoint is used to provide an alternative Kinesis endpoint
func ( c * KinesisClientLibConfiguration ) WithKinesisEndpoint ( kinesisEndpoint string ) * KinesisClientLibConfiguration {
c . KinesisEndpoint = kinesisEndpoint
return c
}
// WithDynamoDBEndpoint is used to provide an alternative DynamoDB endpoint
func ( c * KinesisClientLibConfiguration ) WithDynamoDBEndpoint ( dynamoDBEndpoint string ) * KinesisClientLibConfiguration {
c . DynamoDBEndpoint = dynamoDBEndpoint
return c
}
2018-04-11 03:50:18 +00:00
// WithTableName to provide alternative lease table in DynamoDB
func ( c * KinesisClientLibConfiguration ) WithTableName ( tableName string ) * KinesisClientLibConfiguration {
2018-04-13 04:02:30 +00:00
c . TableName = tableName
2018-04-11 03:50:18 +00:00
return c
}
func ( c * KinesisClientLibConfiguration ) WithInitialPositionInStream ( initialPositionInStream InitialPositionInStream ) * KinesisClientLibConfiguration {
2018-04-13 04:02:30 +00:00
c . InitialPositionInStream = initialPositionInStream
c . InitialPositionInStreamExtended = * newInitialPosition ( initialPositionInStream )
2018-04-11 03:50:18 +00:00
return c
}
func ( c * KinesisClientLibConfiguration ) WithTimestampAtInitialPositionInStream ( timestamp * time . Time ) * KinesisClientLibConfiguration {
2018-04-13 04:02:30 +00:00
c . InitialPositionInStream = AT_TIMESTAMP
c . InitialPositionInStreamExtended = * newInitialPositionAtTimestamp ( timestamp )
2018-04-11 03:50:18 +00:00
return c
}
func ( c * KinesisClientLibConfiguration ) WithFailoverTimeMillis ( failoverTimeMillis int ) * KinesisClientLibConfiguration {
checkIsValuePositive ( "FailoverTimeMillis" , failoverTimeMillis )
2018-04-13 04:02:30 +00:00
c . FailoverTimeMillis = failoverTimeMillis
2018-04-11 03:50:18 +00:00
return c
}
2019-11-13 23:15:33 +00:00
func ( c * KinesisClientLibConfiguration ) WithLeaseRefreshPeriodMillis ( leaseRefreshPeriodMillis int ) * KinesisClientLibConfiguration {
checkIsValuePositive ( "LeaseRefreshPeriodMillis" , leaseRefreshPeriodMillis )
c . LeaseRefreshPeriodMillis = leaseRefreshPeriodMillis
return c
}
2018-04-11 03:50:18 +00:00
func ( c * KinesisClientLibConfiguration ) WithShardSyncIntervalMillis ( shardSyncIntervalMillis int ) * KinesisClientLibConfiguration {
checkIsValuePositive ( "ShardSyncIntervalMillis" , shardSyncIntervalMillis )
2018-04-13 04:02:30 +00:00
c . ShardSyncIntervalMillis = shardSyncIntervalMillis
2018-04-11 03:50:18 +00:00
return c
}
func ( c * KinesisClientLibConfiguration ) WithMaxRecords ( maxRecords int ) * KinesisClientLibConfiguration {
checkIsValuePositive ( "MaxRecords" , maxRecords )
2018-04-13 04:02:30 +00:00
c . MaxRecords = maxRecords
2018-04-11 03:50:18 +00:00
return c
}
2018-04-17 16:25:41 +00:00
// WithMaxLeasesForWorker configures maximum lease this worker can handles. It determines how maximun number of shards
// this worker can handle.
func ( c * KinesisClientLibConfiguration ) WithMaxLeasesForWorker ( n int ) * KinesisClientLibConfiguration {
checkIsValuePositive ( "MaxLeasesForWorker" , n )
c . MaxLeasesForWorker = n
return c
}
2021-11-11 22:11:30 +00:00
// WithIdleTimeBetweenReadsInMillis
// Controls how long the KCL will sleep if no records are returned from Kinesis
//
// <p>
// This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will
// immediately retrieve the next set of records after the call to
// {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)}
// has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this
// value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and
// monitor how far behind the records retrieved are by inspecting
// {@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the
// <a href=
// "http://docs.aws.amazon.com/streams/latest/dev/monitoring-with-cloudwatch.html#kinesis-metrics-stream">CloudWatch
// Metric: GetRecords.MillisBehindLatest</a>
// </p>
//
// @param IdleTimeBetweenReadsInMillis: how long to sleep between GetRecords calls when no records are returned.
// @return KinesisClientLibConfiguration
2018-04-11 03:50:18 +00:00
func ( c * KinesisClientLibConfiguration ) WithIdleTimeBetweenReadsInMillis ( idleTimeBetweenReadsInMillis int ) * KinesisClientLibConfiguration {
checkIsValuePositive ( "IdleTimeBetweenReadsInMillis" , idleTimeBetweenReadsInMillis )
2018-04-13 04:02:30 +00:00
c . IdleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis
2018-04-11 03:50:18 +00:00
return c
}
func ( c * KinesisClientLibConfiguration ) WithCallProcessRecordsEvenForEmptyRecordList ( callProcessRecordsEvenForEmptyRecordList bool ) * KinesisClientLibConfiguration {
2018-04-13 04:02:30 +00:00
c . CallProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList
2018-04-11 03:50:18 +00:00
return c
}
func ( c * KinesisClientLibConfiguration ) WithTaskBackoffTimeMillis ( taskBackoffTimeMillis int ) * KinesisClientLibConfiguration {
2018-04-13 04:02:30 +00:00
checkIsValuePositive ( "TaskBackoffTimeMillis" , taskBackoffTimeMillis )
c . TaskBackoffTimeMillis = taskBackoffTimeMillis
2018-04-11 03:50:18 +00:00
return c
}
2019-10-28 12:08:18 +00:00
func ( c * KinesisClientLibConfiguration ) WithLogger ( logger logger . Logger ) * KinesisClientLibConfiguration {
if logger == nil {
log . Panic ( "Logger cannot be null" )
}
c . Logger = logger
return c
}
2019-11-06 13:53:21 +00:00
// WithMonitoringService sets the monitoring service to use to publish metrics.
func ( c * KinesisClientLibConfiguration ) WithMonitoringService ( mService metrics . MonitoringService ) * KinesisClientLibConfiguration {
// Nil case is handled downward (at worker creation) so no need to do it here.
// Plus the user might want to be explicit about passing a nil monitoring service here.
c . MonitoringService = mService
return c
}
2021-04-27 15:51:26 +00:00
2021-04-29 02:19:12 +00:00
// WithEnhancedFanOutConsumer sets EnableEnhancedFanOutConsumer. If enhanced fan-out is enabled and ConsumerName is not specified ApplicationName is used as ConsumerName.
2021-04-27 15:51:26 +00:00
// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
// Note: You can register up to twenty consumers per stream to use enhanced fan-out.
2021-04-29 02:19:12 +00:00
func ( c * KinesisClientLibConfiguration ) WithEnhancedFanOutConsumer ( enable bool ) * KinesisClientLibConfiguration {
c . EnableEnhancedFanOutConsumer = enable
return c
}
// WithEnhancedFanOutConsumerName enables enhanced fan-out consumer with the specified name
// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
// Note: You can register up to twenty consumers per stream to use enhanced fan-out.
func ( c * KinesisClientLibConfiguration ) WithEnhancedFanOutConsumerName ( consumerName string ) * KinesisClientLibConfiguration {
2021-04-27 15:51:26 +00:00
checkIsValueNotEmpty ( "EnhancedFanOutConsumerName" , consumerName )
c . EnhancedFanOutConsumerName = consumerName
c . EnableEnhancedFanOutConsumer = true
return c
}
// WithEnhancedFanOutConsumerARN enables enhanced fan-out consumer with the specified consumer ARN
// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
// Note: You can register up to twenty consumers per stream to use enhanced fan-out.
func ( c * KinesisClientLibConfiguration ) WithEnhancedFanOutConsumerARN ( consumerARN string ) * KinesisClientLibConfiguration {
checkIsValueNotEmpty ( "EnhancedFanOutConsumerARN" , consumerARN )
c . EnhancedFanOutConsumerARN = consumerARN
c . EnableEnhancedFanOutConsumer = true
return c
}
2021-06-01 23:18:26 +00:00
func ( c * KinesisClientLibConfiguration ) WithLeaseStealing ( enableLeaseStealing bool ) * KinesisClientLibConfiguration {
c . EnableLeaseStealing = enableLeaseStealing
return c
}
func ( c * KinesisClientLibConfiguration ) WithLeaseStealingIntervalMillis ( leaseStealingIntervalMillis int ) * KinesisClientLibConfiguration {
c . LeaseStealingIntervalMillis = leaseStealingIntervalMillis
return c
}
func ( c * KinesisClientLibConfiguration ) WithLeaseSyncingIntervalMillis ( leaseSyncingIntervalMillis int ) * KinesisClientLibConfiguration {
c . LeaseSyncingTimeIntervalMillis = leaseSyncingIntervalMillis
return c
}