Use application name as default enhanced fan-out consumer name (#91)

* Use ApplicationName as default for EnhancedFanOutConsumerName

Signed-off-by: Ilia Cimpoes <ilia.cimpoes@ellation.com>

* Add tests

Signed-off-by: Ilia Cimpoes <ilia.cimpoes@ellation.com>
This commit is contained in:
Ilia Cimpoes 2021-04-29 05:19:12 +03:00 committed by Tao Jiang
parent ddcc2d0f95
commit 4a642bfa2f
5 changed files with 54 additions and 13 deletions

View file

@ -175,7 +175,7 @@ type (
// Either consumer name or consumer ARN must be specified when Enhanced Fan-Out is enabled. // Either consumer name or consumer ARN must be specified when Enhanced Fan-Out is enabled.
EnableEnhancedFanOutConsumer bool EnableEnhancedFanOutConsumer bool
// EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create. // EnhancedFanOutConsumerName is the name of the enhanced fan-out consumer to create. If this isn't set the ApplicationName will be used.
EnhancedFanOutConsumerName string EnhancedFanOutConsumerName string
// EnhancedFanOutConsumerARN is the ARN of an already created enhanced fan-out consumer, if this is set no automatic consumer creation will be attempted // EnhancedFanOutConsumerARN is the ARN of an already created enhanced fan-out consumer, if this is set no automatic consumer creation will be attempted

View file

@ -34,7 +34,7 @@ func TestConfig(t *testing.T) {
WithIdleTimeBetweenReadsInMillis(20). WithIdleTimeBetweenReadsInMillis(20).
WithCallProcessRecordsEvenForEmptyRecordList(true). WithCallProcessRecordsEvenForEmptyRecordList(true).
WithTaskBackoffTimeMillis(10). WithTaskBackoffTimeMillis(10).
WithEnhancedFanOutConsumer("fan-out-consumer") WithEnhancedFanOutConsumerName("fan-out-consumer")
assert.Equal(t, "appName", kclConfig.ApplicationName) assert.Equal(t, "appName", kclConfig.ApplicationName)
assert.Equal(t, 500, kclConfig.FailoverTimeMillis) assert.Equal(t, 500, kclConfig.FailoverTimeMillis)
@ -47,9 +47,17 @@ func TestConfig(t *testing.T) {
contextLogger.Infof("Default logger is awesome") contextLogger.Infof("Default logger is awesome")
} }
func TestConfigDefaultEnhancedFanOutConsumerName(t *testing.T) {
kclConfig := NewKinesisClientLibConfig("appName", "StreamName", "us-west-2", "workerId")
assert.Equal(t, "appName", kclConfig.ApplicationName)
assert.False(t, kclConfig.EnableEnhancedFanOutConsumer)
assert.Equal(t, "appName", kclConfig.EnhancedFanOutConsumerName)
}
func TestEmptyEnhancedFanOutConsumerName(t *testing.T) { func TestEmptyEnhancedFanOutConsumerName(t *testing.T) {
assert.PanicsWithValue(t, "Non-empty value expected for EnhancedFanOutConsumerName, actual: ", func() { assert.PanicsWithValue(t, "Non-empty value expected for EnhancedFanOutConsumerName, actual: ", func() {
NewKinesisClientLibConfig("app", "stream", "us-west-2", "worker").WithEnhancedFanOutConsumer("") NewKinesisClientLibConfig("app", "stream", "us-west-2", "worker").WithEnhancedFanOutConsumerName("")
}) })
} }

View file

@ -73,6 +73,7 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio
KinesisCredentials: kiniesisCreds, KinesisCredentials: kiniesisCreds,
DynamoDBCredentials: dynamodbCreds, DynamoDBCredentials: dynamodbCreds,
TableName: applicationName, TableName: applicationName,
EnhancedFanOutConsumerName: applicationName,
StreamName: streamName, StreamName: streamName,
RegionName: regionName, RegionName: regionName,
WorkerID: workerID, WorkerID: workerID,
@ -213,10 +214,18 @@ func (c *KinesisClientLibConfiguration) WithMonitoringService(mService metrics.M
return c return c
} }
// WithEnhancedFanOutConsumer enables enhanced fan-out consumer with the specified name // WithEnhancedFanOutConsumer sets EnableEnhancedFanOutConsumer. If enhanced fan-out is enabled and ConsumerName is not specified ApplicationName is used as ConsumerName.
// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html // For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
// Note: You can register up to twenty consumers per stream to use enhanced fan-out. // Note: You can register up to twenty consumers per stream to use enhanced fan-out.
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(consumerName string) *KinesisClientLibConfiguration { func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumer(enable bool) *KinesisClientLibConfiguration {
c.EnableEnhancedFanOutConsumer = enable
return c
}
// WithEnhancedFanOutConsumerName enables enhanced fan-out consumer with the specified name
// For more info see: https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html
// Note: You can register up to twenty consumers per stream to use enhanced fan-out.
func (c *KinesisClientLibConfiguration) WithEnhancedFanOutConsumerName(consumerName string) *KinesisClientLibConfiguration {
checkIsValueNotEmpty("EnhancedFanOutConsumerName", consumerName) checkIsValueNotEmpty("EnhancedFanOutConsumerName", consumerName)
c.EnhancedFanOutConsumerName = consumerName c.EnhancedFanOutConsumerName = consumerName
c.EnableEnhancedFanOutConsumer = true c.EnableEnhancedFanOutConsumer = true

View file

@ -184,19 +184,14 @@ func (w *Worker) initialize() error {
if w.kclConfig.EnableEnhancedFanOutConsumer { if w.kclConfig.EnableEnhancedFanOutConsumer {
log.Debugf("Enhanced fan-out is enabled") log.Debugf("Enhanced fan-out is enabled")
switch { w.consumerARN = w.kclConfig.EnhancedFanOutConsumerARN
case w.kclConfig.EnhancedFanOutConsumerARN != "": if w.consumerARN == "" {
w.consumerARN = w.kclConfig.EnhancedFanOutConsumerARN
case w.kclConfig.EnhancedFanOutConsumerName != "":
var err error var err error
w.consumerARN, err = w.fetchConsumerARNWithRetry() w.consumerARN, err = w.fetchConsumerARNWithRetry()
if err != nil { if err != nil {
log.Errorf("Failed to fetch consumer ARN for: %s, %v", w.kclConfig.EnhancedFanOutConsumerName, err) log.Errorf("Failed to fetch consumer ARN for: %s, %v", w.kclConfig.EnhancedFanOutConsumerName, err)
return err return err
} }
default:
log.Errorf("Consumer Name or ARN were not specified with enhanced fan-out enabled")
return errors.New("Consumer Name or ARN must be specified when enhanced fan-out is enabled")
} }
} }

View file

@ -191,7 +191,36 @@ func TestEnhancedFanOutConsumer(t *testing.T) {
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID). kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
WithInitialPositionInStream(cfg.LATEST). WithInitialPositionInStream(cfg.LATEST).
WithEnhancedFanOutConsumer(consumerName). WithEnhancedFanOutConsumerName(consumerName).
WithMaxRecords(10).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000).
WithLogger(log)
runTest(kclConfig, false, t)
}
func TestEnhancedFanOutConsumerDefaultConsumerName(t *testing.T) {
// At miminal, use standard logrus logger
// log := logger.NewLogrusLogger(logrus.StandardLogger())
//
// In order to have precise control over logging. Use logger with config
config := logger.Configuration{
EnableConsole: true,
ConsoleLevel: logger.Debug,
ConsoleJSONFormat: false,
EnableFile: true,
FileLevel: logger.Info,
FileJSONFormat: true,
Filename: "log.log",
}
// Use logrus logger
log := logger.NewLogrusLoggerWithConfig(config)
kclConfig := cfg.NewKinesisClientLibConfig(appName, streamName, regionName, workerID).
WithInitialPositionInStream(cfg.LATEST).
WithEnhancedFanOutConsumer(true).
WithMaxRecords(10). WithMaxRecords(10).
WithMaxLeasesForWorker(1). WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000). WithShardSyncIntervalMillis(5000).