diff --git a/clientlibrary/checkpoint/dynamodb-checkpointer.go b/clientlibrary/checkpoint/dynamodb-checkpointer.go index 091700b..dc7e8cd 100644 --- a/clientlibrary/checkpoint/dynamodb-checkpointer.go +++ b/clientlibrary/checkpoint/dynamodb-checkpointer.go @@ -61,7 +61,7 @@ type DynamoCheckpoint struct { LeaseDuration int svc *dynamodb.Client - kclConfig *config.KinesisClientLibConfiguration + kclConfig *config.KinesisClientLibConfiguration Retries int lastLeaseSync time.Time } diff --git a/clientlibrary/config/kcl-config.go b/clientlibrary/config/kcl-config.go index 45a6a2a..ae4935c 100644 --- a/clientlibrary/config/kcl-config.go +++ b/clientlibrary/config/kcl-config.go @@ -70,31 +70,31 @@ func NewKinesisClientLibConfigWithCredentials(applicationName, streamName, regio } // populate the KCL configuration with default values - return &KinesisClientLibConfiguration { - ApplicationName: applicationName, - KinesisCredentials: kinesisCreds, - DynamoDBCredentials: dynamodbCreds, - TableName: applicationName, - EnhancedFanOutConsumerName: applicationName, - StreamName: streamName, - RegionName: regionName, - WorkerID: workerID, - InitialPositionInStream: DefaultInitialPositionInStream, - InitialPositionInStreamExtended: *newInitialPosition(DefaultInitialPositionInStream), - FailoverTimeMillis: DefaultFailoverTimeMillis, - LeaseRefreshPeriodMillis: DefaultLeaseRefreshPeriodMillis, - MaxRecords: DefaultMaxRecords, - IdleTimeBetweenReadsInMillis: DefaultIdleTimeBetweenReadsMillis, - CallProcessRecordsEvenForEmptyRecordList: DefaultDontCallProcessRecordsForEmptyRecordList, - ParentShardPollIntervalMillis: DefaultParentShardPollIntervalMillis, - ShardSyncIntervalMillis: DefaultShardSyncIntervalMillis, - CleanupTerminatedShardsBeforeExpiry: DefaultCleanupLeasesUponShardsCompletion, - TaskBackoffTimeMillis: DefaultTaskBackoffTimeMillis, - ValidateSequenceNumberBeforeCheckpointing: DefaultValidateSequenceNumberBeforeCheckpointing, - ShutdownGraceMillis: DefaultShutdownGraceMillis, - MaxLeasesForWorker: DefaultMaxLeasesForWorker, - MaxLeasesToStealAtOneTime: DefaultMaxLeasesToStealAtOneTime, - InitialLeaseTableReadCapacity: DefaultInitialLeaseTableReadCapacity, + return &KinesisClientLibConfiguration{ + ApplicationName: applicationName, + KinesisCredentials: kinesisCreds, + DynamoDBCredentials: dynamodbCreds, + TableName: applicationName, + EnhancedFanOutConsumerName: applicationName, + StreamName: streamName, + RegionName: regionName, + WorkerID: workerID, + InitialPositionInStream: DefaultInitialPositionInStream, + InitialPositionInStreamExtended: *newInitialPosition(DefaultInitialPositionInStream), + FailoverTimeMillis: DefaultFailoverTimeMillis, + LeaseRefreshPeriodMillis: DefaultLeaseRefreshPeriodMillis, + MaxRecords: DefaultMaxRecords, + IdleTimeBetweenReadsInMillis: DefaultIdleTimeBetweenReadsMillis, + CallProcessRecordsEvenForEmptyRecordList: DefaultDontCallProcessRecordsForEmptyRecordList, + ParentShardPollIntervalMillis: DefaultParentShardPollIntervalMillis, + ShardSyncIntervalMillis: DefaultShardSyncIntervalMillis, + CleanupTerminatedShardsBeforeExpiry: DefaultCleanupLeasesUponShardsCompletion, + TaskBackoffTimeMillis: DefaultTaskBackoffTimeMillis, + ValidateSequenceNumberBeforeCheckpointing: DefaultValidateSequenceNumberBeforeCheckpointing, + ShutdownGraceMillis: DefaultShutdownGraceMillis, + MaxLeasesForWorker: DefaultMaxLeasesForWorker, + MaxLeasesToStealAtOneTime: DefaultMaxLeasesToStealAtOneTime, + InitialLeaseTableReadCapacity: DefaultInitialLeaseTableReadCapacity, InitialLeaseTableWriteCapacity: DefaultInitialLeaseTableWriteCapacity, SkipShardSyncAtWorkerInitializationIfLeasesExist: DefaultSkipShardSyncAtStartupIfLeasesExist, EnableLeaseStealing: DefaultEnableLeaseStealing, @@ -167,25 +167,24 @@ func (c *KinesisClientLibConfiguration) WithMaxLeasesForWorker(n int) *KinesisCl return c } -/* WithIdleTimeBetweenReadsInMillis - Controls how long the KCL will sleep if no records are returned from Kinesis - -

- This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will - immediately retrieve the next set of records after the call to - {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)} - has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this - value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and - monitor how far behind the records retrieved are by inspecting - {@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the - CloudWatch - Metric: GetRecords.MillisBehindLatest -

- - @param IdleTimeBetweenReadsInMillis: how long to sleep between GetRecords calls when no records are returned. - @return KinesisClientLibConfiguration - */ +// WithIdleTimeBetweenReadsInMillis +// Controls how long the KCL will sleep if no records are returned from Kinesis +// +//

+// This value is only used when no records are returned; if records are returned, the {@link com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask} will +// immediately retrieve the next set of records after the call to +// {@link com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor#processRecords(ProcessRecordsInput)} +// has returned. Setting this value to high may result in the KCL being unable to catch up. If you are changing this +// value it's recommended that you enable {@link #withCallProcessRecordsEvenForEmptyRecordList(boolean)}, and +// monitor how far behind the records retrieved are by inspecting +// {@link com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput#getMillisBehindLatest()}, and the +// CloudWatch +// Metric: GetRecords.MillisBehindLatest +//

+// +// @param IdleTimeBetweenReadsInMillis: how long to sleep between GetRecords calls when no records are returned. +// @return KinesisClientLibConfiguration func (c *KinesisClientLibConfiguration) WithIdleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis int) *KinesisClientLibConfiguration { checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis) c.IdleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis diff --git a/internal/deaggregator/deaggregator.go b/internal/deaggregator/deaggregator.go index 6aa8905..91a5ad5 100644 --- a/internal/deaggregator/deaggregator.go +++ b/internal/deaggregator/deaggregator.go @@ -93,4 +93,4 @@ func createUserRecord(partitionKeys []string, aggRec *rec.Record, record types.R PartitionKey: &partitionKey, SequenceNumber: record.SequenceNumber, } -} \ No newline at end of file +} diff --git a/internal/deaggregator/deaggregator_test.go b/internal/deaggregator/deaggregator_test.go index 5bc8be6..a933baa 100644 --- a/internal/deaggregator/deaggregator_test.go +++ b/internal/deaggregator/deaggregator_test.go @@ -61,7 +61,7 @@ func generateKinesisRecord(data []byte) types.Record { encryptionType := types.EncryptionTypeNone partitionKey := "1234" sequenceNumber := "21269319989900637946712965403778482371" - return types.Record { + return types.Record{ ApproximateArrivalTimestamp: ¤tTime, Data: data, EncryptionType: encryptionType, @@ -199,4 +199,4 @@ func TestRecordWithMismatchMd5SumReturnsSingleRecord(t *testing.T) { // A byte record with an MD5 sum that does not match with the md5.Sum(record) // will be marked as a non-aggregate record and return a single record assert.Equal(t, 1, len(dars), "Mismatch md5 sum test should return length of 1.") -} \ No newline at end of file +} diff --git a/internal/records/records.pb.go b/internal/records/records.pb.go index 689a1c8..89abba5 100644 --- a/internal/records/records.pb.go +++ b/internal/records/records.pb.go @@ -212,4 +212,4 @@ var fileDescriptor_6ae0159314830e16 = []byte{ 0x20, 0x1a, 0x0c, 0xa8, 0x78, 0xbe, 0xd0, 0xe7, 0x26, 0xcd, 0x52, 0x81, 0x08, 0x68, 0x47, 0x25, 0x92, 0x94, 0x28, 0x2b, 0xb4, 0x26, 0x6d, 0x3e, 0x03, 0x00, 0x00, 0xff, 0xff, 0x87, 0x3e, 0x63, 0x69, 0x7d, 0x01, 0x00, 0x00, -} \ No newline at end of file +} diff --git a/test/lease_stealing_util_test.go b/test/lease_stealing_util_test.go index aab57ae..cbd01aa 100644 --- a/test/lease_stealing_util_test.go +++ b/test/lease_stealing_util_test.go @@ -20,7 +20,7 @@ import ( type LeaseStealingTest struct { t *testing.T - config *TestClusterConfig + config *TestClusterConfig cluster *TestCluster kc *kinesis.Client dc *dynamodb.Client diff --git a/test/record_publisher_test.go b/test/record_publisher_test.go index 85017a3..5bd061a 100644 --- a/test/record_publisher_test.go +++ b/test/record_publisher_test.go @@ -47,7 +47,7 @@ func NewKinesisClient(t *testing.T, regionName, endpoint string, creds *credenti t.Logf("Creating Kinesis client") resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { - return aws.Endpoint { + return aws.Endpoint{ PartitionID: "aws", URL: endpoint, SigningRegion: regionName, @@ -73,13 +73,13 @@ func NewKinesisClient(t *testing.T, regionName, endpoint string, creds *credenti t.Fatalf("Failed in loading Kinesis default config for creating Worker: %+v", err) } - return kinesis.NewFromConfig(cfg) + return kinesis.NewFromConfig(cfg) } // NewDynamoDBClient to create a Kinesis Client. func NewDynamoDBClient(t *testing.T, regionName, endpoint string, creds *credentials.StaticCredentialsProvider) *dynamodb.Client { resolver := aws.EndpointResolverFunc(func(service, region string) (aws.Endpoint, error) { - return aws.Endpoint { + return aws.Endpoint{ PartitionID: "aws", URL: endpoint, SigningRegion: regionName, @@ -111,7 +111,7 @@ func continuouslyPublishSomeData(t *testing.T, kc *kinesis.Client) func() { var shards []types.Shard var nextToken *string for { - out, err := kc.ListShards(context.TODO(), &kinesis.ListShardsInput { + out, err := kc.ListShards(context.TODO(), &kinesis.ListShardsInput{ StreamName: aws.String(streamName), NextToken: nextToken, }) @@ -185,7 +185,7 @@ func publishSomeData(t *testing.T, kc *kinesis.Client) { // publishRecord to put a record into Kinesis stream using PutRecord API. func publishRecord(t *testing.T, kc *kinesis.Client, hashKey *string) { - input := &kinesis.PutRecordInput { + input := &kinesis.PutRecordInput{ Data: []byte(specstr), StreamName: aws.String(streamName), PartitionKey: aws.String(utils.RandStringBytesMaskImpr(10)), @@ -207,7 +207,7 @@ func publishRecords(t *testing.T, kc *kinesis.Client) { records := make([]types.PutRecordsRequestEntry, 5) for i := 0; i < 5; i++ { - record := types.PutRecordsRequestEntry { + record := types.PutRecordsRequestEntry{ Data: []byte(specstr), PartitionKey: aws.String(utils.RandStringBytesMaskImpr(10)), } @@ -228,7 +228,7 @@ func publishRecords(t *testing.T, kc *kinesis.Client) { func publishAggregateRecord(t *testing.T, kc *kinesis.Client) { data := generateAggregateRecord(5, specstr) // Use random string as partition key to ensure even distribution across shards - _, err := kc.PutRecord(context.TODO(), &kinesis.PutRecordInput { + _, err := kc.PutRecord(context.TODO(), &kinesis.PutRecordInput{ Data: data, StreamName: aws.String(streamName), PartitionKey: aws.String(utils.RandStringBytesMaskImpr(10)),