diff --git a/src/clientlibrary/common/errors.go b/src/clientlibrary/common/errors.go index adddc26..1a366c1 100644 --- a/src/clientlibrary/common/errors.go +++ b/src/clientlibrary/common/errors.go @@ -35,11 +35,11 @@ const ( LeasingProvisionedThroughputError ErrorCode = 41203 // Misc Errors 41300 - 41400 - // NotImplemented - KinesisClientLibNotImplemented ErrorCode = 41301 + // NotImplemented + KinesisClientLibNotImplemented ErrorCode = 41301 - // Error indicates passing illegal or inappropriate argument - IllegalArgumentError ErrorCode = 41302 + // Error indicates passing illegal or inappropriate argument + IllegalArgumentError ErrorCode = 41302 ) var errorMap = map[ErrorCode]ClientLibraryError{ diff --git a/src/clientlibrary/config/config.go b/src/clientlibrary/config/config.go index e2c82c5..8f3322c 100644 --- a/src/clientlibrary/config/config.go +++ b/src/clientlibrary/config/config.go @@ -211,7 +211,7 @@ type ( SkipShardSyncAtWorkerInitializationIfLeasesExist bool // The max number of threads in the worker thread pool to getRecords. - WorkerThreadPoolSize int + WorkerThreadPoolSize int } ) diff --git a/src/clientlibrary/config/config_test.go b/src/clientlibrary/config/config_test.go index 19f1481..7d72137 100644 --- a/src/clientlibrary/config/config_test.go +++ b/src/clientlibrary/config/config_test.go @@ -19,5 +19,5 @@ func TestConfig(t *testing.T) { WithRegionName("us-west-2") assert.Equal(t, "appName", kclConfig.ApplicationName) - assert.Equal(t, "500", kclConfig.FailoverTimeMillis) + assert.Equal(t, 500, kclConfig.FailoverTimeMillis) } diff --git a/src/clientlibrary/config/kcl-config.go b/src/clientlibrary/config/kcl-config.go index 2f8bc0b..edc63db 100644 --- a/src/clientlibrary/config/kcl-config.go +++ b/src/clientlibrary/config/kcl-config.go @@ -17,11 +17,11 @@ func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *Ki // populate the KCL configuration with default values return &KinesisClientLibConfiguration{ - ApplicationName: applicationName, - TableName: applicationName, - StreamName: streamName, - WorkerID: workerID, - KinesisEndpoint: "", + ApplicationName: applicationName, + TableName: applicationName, + StreamName: streamName, + WorkerID: workerID, + KinesisEndpoint: "", InitialPositionInStream: DEFAULT_INITIAL_POSITION_IN_STREAM, InitialPositionInStreamExtended: *newInitialPosition(DEFAULT_INITIAL_POSITION_IN_STREAM), FailoverTimeMillis: DEFAULT_FAILOVER_TIME_MILLIS, @@ -29,12 +29,12 @@ func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *Ki IdleTimeBetweenReadsInMillis: DEFAULT_IDLETIME_BETWEEN_READS_MILLIS, CallProcessRecordsEvenForEmptyRecordList: DEFAULT_DONT_CALL_PROCESS_RECORDS_FOR_EMPTY_RECORD_LIST, ParentShardPollIntervalMillis: DEFAULT_PARENT_SHARD_POLL_INTERVAL_MILLIS, - ShardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, - CleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, - TaskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS, - MetricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS, - MetricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE, - ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, + ShardSyncIntervalMillis: DEFAULT_SHARD_SYNC_INTERVAL_MILLIS, + CleanupTerminatedShardsBeforeExpiry: DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION, + TaskBackoffTimeMillis: DEFAULT_TASK_BACKOFF_TIME_MILLIS, + MetricsBufferTimeMillis: DEFAULT_METRICS_BUFFER_TIME_MILLIS, + MetricsMaxQueueSize: DEFAULT_METRICS_MAX_QUEUE_SIZE, + ValidateSequenceNumberBeforeCheckpointing: DEFAULT_VALIDATE_SEQUENCE_NUMBER_BEFORE_CHECKPOINTING, RegionName: "", ShutdownGraceMillis: DEFAULT_SHUTDOWN_GRACE_MILLIS, MaxLeasesForWorker: DEFAULT_MAX_LEASES_FOR_WORKER, @@ -42,7 +42,7 @@ func NewKinesisClientLibConfig(applicationName, streamName, workerID string) *Ki InitialLeaseTableReadCapacity: DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, InitialLeaseTableWriteCapacity: DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, SkipShardSyncAtWorkerInitializationIfLeasesExist: DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, - WorkerThreadPoolSize: 1, + WorkerThreadPoolSize: 1, } } @@ -147,8 +147,7 @@ func (c *KinesisClientLibConfiguration) WithRegionName(regionName string) *Kines // WithWorkerThreadPoolSize configures worker thread pool size func (c *KinesisClientLibConfiguration) WithWorkerThreadPoolSize(n int) *KinesisClientLibConfiguration { - checkIsValuePositive("WorkerThreadPoolSize", n) - c.WorkerThreadPoolSize = n - return c + checkIsValuePositive("WorkerThreadPoolSize", n) + c.WorkerThreadPoolSize = n + return c } - diff --git a/src/clientlibrary/types/inputs.go b/src/clientlibrary/interfaces/inputs.go similarity index 94% rename from src/clientlibrary/types/inputs.go rename to src/clientlibrary/interfaces/inputs.go index 9668e64..1b21999 100644 --- a/src/clientlibrary/types/inputs.go +++ b/src/clientlibrary/interfaces/inputs.go @@ -1,11 +1,9 @@ -package types +package interfaces import ( "time" ks "github.com/aws/aws-sdk-go/service/kinesis" - - . "clientlibrary/interfaces" ) const ( diff --git a/src/clientlibrary/interfaces/record-processor-checkpointer.go b/src/clientlibrary/interfaces/record-processor-checkpointer.go index c752f04..296fd6a 100644 --- a/src/clientlibrary/interfaces/record-processor-checkpointer.go +++ b/src/clientlibrary/interfaces/record-processor-checkpointer.go @@ -2,8 +2,6 @@ package interfaces import ( ks "github.com/aws/aws-sdk-go/service/kinesis" - - . "clientlibrary/types" ) type ( diff --git a/src/clientlibrary/interfaces/record-processor.go b/src/clientlibrary/interfaces/record-processor.go index ab704a2..f704d0e 100644 --- a/src/clientlibrary/interfaces/record-processor.go +++ b/src/clientlibrary/interfaces/record-processor.go @@ -1,9 +1,5 @@ package interfaces -import ( - . "clientlibrary/types" -) - // IRecordProcessor is the interface for some callback functions invoked by KCL will // The main task of using KCL is to provide implementation on IRecordProcessor interface. // Note: This is exactly the same interface as Amazon KCL IRecordProcessor v2 diff --git a/src/clientlibrary/types/sequence-number.go b/src/clientlibrary/interfaces/sequence-number.go similarity index 96% rename from src/clientlibrary/types/sequence-number.go rename to src/clientlibrary/interfaces/sequence-number.go index 0dddb57..80ac68f 100644 --- a/src/clientlibrary/types/sequence-number.go +++ b/src/clientlibrary/interfaces/sequence-number.go @@ -1,4 +1,4 @@ -package types +package interfaces // ExtendedSequenceNumber represents a two-part sequence number for records aggregated by the Kinesis Producer Library. // diff --git a/src/clientlibrary/lib/checkpoint/checkpoint.go b/src/clientlibrary/lib/checkpoint/checkpoint.go index 9f0facc..1f480d8 100644 --- a/src/clientlibrary/lib/checkpoint/checkpoint.go +++ b/src/clientlibrary/lib/checkpoint/checkpoint.go @@ -1,7 +1,7 @@ package checkpoint import ( - . "clientlibrary/types" + . "clientlibrary/interfaces" ) const ( diff --git a/src/leases/impl/kinesis-client-lease.go b/src/leases/impl/kinesis-client-lease.go index 6132a40..abe049a 100644 --- a/src/leases/impl/kinesis-client-lease.go +++ b/src/leases/impl/kinesis-client-lease.go @@ -1,7 +1,7 @@ package impl import ( - . "clientlibrary/types" + . "clientlibrary/interfaces" ) // KinesisClientLease is a Lease subclass containing KinesisClientLibrary related fields for checkpoints. diff --git a/support/scripts/functions.sh b/support/scripts/functions.sh index c76d266..b7265ea 100644 --- a/support/scripts/functions.sh +++ b/support/scripts/functions.sh @@ -14,6 +14,7 @@ local_go_pkgs() { grep -Fv '/tmp/' | \ grep -Fv '/run/' | \ grep -Fv '/tests/' | \ + sed -r 's|(.+)/[^/]+\.go$|\1|g' | \ sort -u }