2021-06-01 23:18:26 +00:00
|
|
|
package test
|
|
|
|
|
|
|
|
|
|
import (
|
2021-12-23 04:16:06 +00:00
|
|
|
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
|
2021-06-01 23:18:26 +00:00
|
|
|
"testing"
|
|
|
|
|
|
2021-12-21 19:49:47 +00:00
|
|
|
cfg "github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
|
|
|
|
|
wk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/worker"
|
|
|
|
|
"github.com/vmware/vmware-go-kcl-v2/logger"
|
2021-06-01 23:18:26 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
func TestLeaseStealing(t *testing.T) {
|
|
|
|
|
config := &TestClusterConfig{
|
|
|
|
|
numShards: 4,
|
|
|
|
|
numWorkers: 2,
|
|
|
|
|
appName: appName,
|
|
|
|
|
streamName: streamName,
|
|
|
|
|
regionName: regionName,
|
|
|
|
|
workerIDTemplate: workerID + "-%v",
|
|
|
|
|
}
|
|
|
|
|
test := NewLeaseStealingTest(t, config, newLeaseStealingWorkerFactory(t))
|
|
|
|
|
test.Run(LeaseStealingAssertions{
|
2021-11-08 15:27:29 +00:00
|
|
|
expectedLeasesForInitialWorker: config.numShards,
|
|
|
|
|
expectedLeasesPerWorker: config.numShards / config.numWorkers,
|
2021-06-01 23:18:26 +00:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type leaseStealingWorkerFactory struct {
|
|
|
|
|
t *testing.T
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newLeaseStealingWorkerFactory(t *testing.T) *leaseStealingWorkerFactory {
|
|
|
|
|
return &leaseStealingWorkerFactory{t}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (wf *leaseStealingWorkerFactory) CreateKCLConfig(workerID string, config *TestClusterConfig) *cfg.KinesisClientLibConfiguration {
|
|
|
|
|
log := logger.NewLogrusLoggerWithConfig(logger.Configuration{
|
|
|
|
|
EnableConsole: true,
|
|
|
|
|
ConsoleLevel: logger.Error,
|
|
|
|
|
ConsoleJSONFormat: false,
|
|
|
|
|
EnableFile: true,
|
|
|
|
|
FileLevel: logger.Info,
|
|
|
|
|
FileJSONFormat: true,
|
|
|
|
|
Filename: "log.log",
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
log.WithFields(logger.Fields{"worker": workerID})
|
|
|
|
|
|
|
|
|
|
return cfg.NewKinesisClientLibConfig(config.appName, config.streamName, config.regionName, workerID).
|
|
|
|
|
WithInitialPositionInStream(cfg.LATEST).
|
|
|
|
|
WithMaxRecords(10).
|
|
|
|
|
WithShardSyncIntervalMillis(5000).
|
|
|
|
|
WithFailoverTimeMillis(10000).
|
|
|
|
|
WithLeaseStealing(true).
|
|
|
|
|
WithLogger(log)
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-08 15:27:29 +00:00
|
|
|
func (wf *leaseStealingWorkerFactory) CreateWorker(_ string, kclConfig *cfg.KinesisClientLibConfiguration) *wk.Worker {
|
2021-06-01 23:18:26 +00:00
|
|
|
worker := wk.NewWorker(recordProcessorFactory(wf.t), kclConfig)
|
|
|
|
|
return worker
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestLeaseStealingInjectCheckpointer(t *testing.T) {
|
|
|
|
|
config := &TestClusterConfig{
|
|
|
|
|
numShards: 4,
|
|
|
|
|
numWorkers: 2,
|
|
|
|
|
appName: appName,
|
|
|
|
|
streamName: streamName,
|
|
|
|
|
regionName: regionName,
|
|
|
|
|
workerIDTemplate: workerID + "-%v",
|
|
|
|
|
}
|
|
|
|
|
test := NewLeaseStealingTest(t, config, newleaseStealingWorkerFactoryCustomChk(t))
|
|
|
|
|
test.Run(LeaseStealingAssertions{
|
2021-11-08 15:27:29 +00:00
|
|
|
expectedLeasesForInitialWorker: config.numShards,
|
|
|
|
|
expectedLeasesPerWorker: config.numShards / config.numWorkers,
|
2021-06-01 23:18:26 +00:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type leaseStealingWorkerFactoryCustom struct {
|
|
|
|
|
*leaseStealingWorkerFactory
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func newleaseStealingWorkerFactoryCustomChk(t *testing.T) *leaseStealingWorkerFactoryCustom {
|
|
|
|
|
return &leaseStealingWorkerFactoryCustom{
|
|
|
|
|
newLeaseStealingWorkerFactory(t),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (wfc *leaseStealingWorkerFactoryCustom) CreateWorker(workerID string, kclConfig *cfg.KinesisClientLibConfiguration) *wk.Worker {
|
|
|
|
|
worker := wfc.leaseStealingWorkerFactory.CreateWorker(workerID, kclConfig)
|
|
|
|
|
checkpointer := chk.NewDynamoCheckpoint(kclConfig)
|
|
|
|
|
return worker.WithCheckpointer(checkpointer)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func TestLeaseStealingWithMaxLeasesForWorker(t *testing.T) {
|
|
|
|
|
config := &TestClusterConfig{
|
|
|
|
|
numShards: 4,
|
|
|
|
|
numWorkers: 2,
|
|
|
|
|
appName: appName,
|
|
|
|
|
streamName: streamName,
|
|
|
|
|
regionName: regionName,
|
|
|
|
|
workerIDTemplate: workerID + "-%v",
|
|
|
|
|
}
|
2021-11-08 15:27:29 +00:00
|
|
|
test := NewLeaseStealingTest(t, config, newLeaseStealingWorkerFactoryMaxLeases(t, config.numShards-1))
|
2021-06-01 23:18:26 +00:00
|
|
|
test.Run(LeaseStealingAssertions{
|
2021-11-08 15:27:29 +00:00
|
|
|
expectedLeasesForInitialWorker: config.numShards - 1,
|
|
|
|
|
expectedLeasesPerWorker: 2,
|
2021-06-01 23:18:26 +00:00
|
|
|
})
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type leaseStealingWorkerFactoryMaxLeases struct {
|
|
|
|
|
maxLeases int
|
|
|
|
|
*leaseStealingWorkerFactory
|
|
|
|
|
}
|
|
|
|
|
|
2021-11-08 15:27:29 +00:00
|
|
|
func newLeaseStealingWorkerFactoryMaxLeases(t *testing.T, maxLeases int) *leaseStealingWorkerFactoryMaxLeases {
|
2021-06-01 23:18:26 +00:00
|
|
|
return &leaseStealingWorkerFactoryMaxLeases{
|
|
|
|
|
maxLeases,
|
|
|
|
|
newLeaseStealingWorkerFactory(t),
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (wfm *leaseStealingWorkerFactoryMaxLeases) CreateKCLConfig(workerID string, config *TestClusterConfig) *cfg.KinesisClientLibConfiguration {
|
|
|
|
|
kclConfig := wfm.leaseStealingWorkerFactory.CreateKCLConfig(workerID, config)
|
|
|
|
|
kclConfig.WithMaxLeasesForWorker(wfm.maxLeases)
|
|
|
|
|
return kclConfig
|
|
|
|
|
}
|