From 971d748195422a2579426e7565467b6d2ef467c1 Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Fri, 1 Nov 2019 08:42:04 -0500 Subject: [PATCH] Fix missing init position with AT_TIMESTAMP (#44) AT_TIMESTAMP start from the record at or after the specified server-side Timestamp. However, the implementation was missing. The bug was not notices until recently because most of users never use this feature. Signed-off-by: Tao Jiang --- clientlibrary/worker/shard-consumer.go | 23 ++++++++++++++++++----- test/worker_test.go | 24 ++++++++++++++++++++++++ 2 files changed, 42 insertions(+), 5 deletions(-) diff --git a/clientlibrary/worker/shard-consumer.go b/clientlibrary/worker/shard-consumer.go index 04bae9f..2b44922 100644 --- a/clientlibrary/worker/shard-consumer.go +++ b/clientlibrary/worker/shard-consumer.go @@ -95,13 +95,26 @@ func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, erro // If there isn't any checkpoint for the shard, use the configuration value. if shard.Checkpoint == "" { initPos := sc.kclConfig.InitialPositionInStream + shardIteratorType := config.InitalPositionInStreamToShardIteratorType(initPos) log.Debugf("No checkpoint recorded for shard: %v, starting with: %v", shard.ID, - aws.StringValue(config.InitalPositionInStreamToShardIteratorType(initPos))) - shardIterArgs := &kinesis.GetShardIteratorInput{ - ShardId: &shard.ID, - ShardIteratorType: config.InitalPositionInStreamToShardIteratorType(initPos), - StreamName: &sc.streamName, + aws.StringValue(shardIteratorType)) + + var shardIterArgs *kinesis.GetShardIteratorInput + if initPos == config.AT_TIMESTAMP { + shardIterArgs = &kinesis.GetShardIteratorInput{ + ShardId: &shard.ID, + ShardIteratorType: shardIteratorType, + Timestamp: sc.kclConfig.InitialPositionInStreamExtended.Timestamp, + StreamName: &sc.streamName, + } + } else { + shardIterArgs = &kinesis.GetShardIteratorInput{ + ShardId: &shard.ID, + ShardIteratorType: shardIteratorType, + StreamName: &sc.streamName, + } } + iterResp, err := sc.kc.GetShardIterator(shardIterArgs) if err != nil { return nil, err diff --git a/test/worker_test.go b/test/worker_test.go index 22f7ccb..e83966a 100644 --- a/test/worker_test.go +++ b/test/worker_test.go @@ -81,6 +81,30 @@ func TestWorker(t *testing.T) { runTest(kclConfig, false, t) } +func TestWorkerWithTimestamp(t *testing.T) { + // In order to have precise control over logging. Use logger with config + config := logger.Configuration{ + EnableConsole: true, + ConsoleLevel: logger.Debug, + ConsoleJSONFormat: false, + } + // Use logrus logger + log := logger.NewLogrusLoggerWithConfig(config) + + ts := time.Now().Add(time.Second * 5) + kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID). + WithTimestampAtInitialPositionInStream(&ts). + WithMaxRecords(10). + WithMaxLeasesForWorker(1). + WithShardSyncIntervalMillis(5000). + WithFailoverTimeMillis(300000). + WithMetricsBufferTimeMillis(10000). + WithMetricsMaxQueueSize(20). + WithLogger(log) + + runTest(kclConfig, false, t) +} + func TestWorkerWithSigInt(t *testing.T) { // At miminal. use standard zap logger //zapLogger, err := zap.NewProduction()