chore: Add test for renewLease

Signed-off-by: John Calixto <jcalixto@vmware.com>
This commit is contained in:
John Calixto 2023-04-07 17:43:33 -07:00
parent b12921da23
commit ec7c0b6f61

View file

@ -29,10 +29,15 @@ import (
"github.com/aws/aws-sdk-go-v2/service/kinesis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
chk "github.com/vmware/vmware-go-kcl-v2/clientlibrary/checkpoint"
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/config"
"github.com/vmware/vmware-go-kcl-v2/clientlibrary/metrics"
par "github.com/vmware/vmware-go-kcl-v2/clientlibrary/partition"
)
var (
testGetRecordsError = errors.New("GetRecords Error")
getLeaseTestFailure = errors.New("GetLease test failure")
)
func TestCallGetRecordsAPI(t *testing.T) {
@ -353,3 +358,141 @@ func TestPollingShardConsumer_checkCoolOffPeriod(t *testing.T) {
// restore original time.Now
rateLimitTimeNow = time.Now
}
func TestPollingShardConsumer_renewLease(t *testing.T) {
type fields struct {
checkpointer chk.Checkpointer
kclConfig *config.KinesisClientLibConfiguration
mService metrics.MonitoringService
}
tests := []struct {
name string
fields fields
testMillis time.Duration
expRenewalCalls int
expRenewals int
expErr error
}{
{
"renew once",
fields{
&mockCheckpointer{},
&config.KinesisClientLibConfiguration{
LeaseRefreshWaitTime: 10,
},
&mockMetrics{},
},
15,
1,
1,
nil,
},
{
"renew some",
fields{
&mockCheckpointer{},
&config.KinesisClientLibConfiguration{
LeaseRefreshWaitTime: 50,
},
&mockMetrics{},
},
50*5 + 10,
5,
5,
nil,
},
{
"renew twice every 2.5 seconds",
fields{
&mockCheckpointer{},
&config.KinesisClientLibConfiguration{
LeaseRefreshWaitTime: 2500,
},
&mockMetrics{},
},
5100,
2,
2,
nil,
},
{
"lease error",
fields{
&mockCheckpointer{fail: true},
&config.KinesisClientLibConfiguration{
LeaseRefreshWaitTime: 500,
},
&mockMetrics{},
},
1100,
1,
0,
getLeaseTestFailure,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
sc := &PollingShardConsumer{
commonShardConsumer: commonShardConsumer{
shard: &par.ShardStatus{},
checkpointer: tt.fields.checkpointer,
kclConfig: tt.fields.kclConfig,
},
mService: tt.fields.mService,
}
ctx, cancel := context.WithCancel(context.Background())
leaseRenewalErrChan := make(chan error, 1)
go func() {
leaseRenewalErrChan <- sc.renewLease(ctx)
}()
time.Sleep(tt.testMillis * time.Millisecond)
cancel()
err := <-leaseRenewalErrChan
assert.Equal(t, tt.expErr, err)
assert.Equal(t, tt.expRenewalCalls, sc.checkpointer.(*mockCheckpointer).getLeaseCalledTimes)
assert.Equal(t, tt.expRenewals, sc.mService.(*mockMetrics).leaseRenewedCalledTimes)
})
}
}
type mockCheckpointer struct {
getLeaseCalledTimes int
fail bool
}
func (m mockCheckpointer) Init() error { return nil }
func (m *mockCheckpointer) GetLease(*par.ShardStatus, string) error {
m.getLeaseCalledTimes++
if m.fail {
return getLeaseTestFailure
}
return nil
}
func (m mockCheckpointer) CheckpointSequence(*par.ShardStatus) error { return nil }
func (m mockCheckpointer) FetchCheckpoint(*par.ShardStatus) error { return nil }
func (m mockCheckpointer) RemoveLeaseInfo(string) error { return nil }
func (m mockCheckpointer) RemoveLeaseOwner(string) error { return nil }
func (m mockCheckpointer) GetLeaseOwner(string) (string, error) { return "", nil }
func (m mockCheckpointer) ListActiveWorkers(map[string]*par.ShardStatus) (map[string][]*par.ShardStatus, error) {
return map[string][]*par.ShardStatus{}, nil
}
func (m mockCheckpointer) ClaimShard(*par.ShardStatus, string) error { return nil }
type mockMetrics struct {
leaseRenewedCalledTimes int
}
func (m mockMetrics) Init(appName, streamName, workerID string) error { return nil }
func (m mockMetrics) Start() error { return nil }
func (m mockMetrics) IncrRecordsProcessed(shard string, count int) {}
func (m mockMetrics) IncrBytesProcessed(shard string, count int64) {}
func (m mockMetrics) MillisBehindLatest(shard string, milliSeconds float64) {}
func (m mockMetrics) DeleteMetricMillisBehindLatest(shard string) {}
func (m mockMetrics) LeaseGained(shard string) {}
func (m mockMetrics) LeaseLost(shard string) {}
func (m *mockMetrics) LeaseRenewed(shard string) {
m.leaseRenewedCalledTimes++
}
func (m mockMetrics) RecordGetRecordsTime(shard string, time float64) {}
func (m mockMetrics) RecordProcessRecordsTime(shard string, time float64) {}
func (m mockMetrics) Shutdown() {}