Fix missing init position with AT_TIMESTAMP (#44)

AT_TIMESTAMP start from the record at or after the specified
server-side Timestamp. However, the implementation was
missing. The bug was not notices until recently because most
of users never use this feature.

Signed-off-by: Tao Jiang <taoj@vmware.com>
This commit is contained in:
Tao Jiang 2019-11-01 08:42:04 -05:00
parent 0d91fbd443
commit 971d748195
2 changed files with 42 additions and 5 deletions

View file

@ -95,13 +95,26 @@ func (sc *ShardConsumer) getShardIterator(shard *par.ShardStatus) (*string, erro
// If there isn't any checkpoint for the shard, use the configuration value.
if shard.Checkpoint == "" {
initPos := sc.kclConfig.InitialPositionInStream
shardIteratorType := config.InitalPositionInStreamToShardIteratorType(initPos)
log.Debugf("No checkpoint recorded for shard: %v, starting with: %v", shard.ID,
aws.StringValue(config.InitalPositionInStreamToShardIteratorType(initPos)))
shardIterArgs := &kinesis.GetShardIteratorInput{
ShardId: &shard.ID,
ShardIteratorType: config.InitalPositionInStreamToShardIteratorType(initPos),
StreamName: &sc.streamName,
aws.StringValue(shardIteratorType))
var shardIterArgs *kinesis.GetShardIteratorInput
if initPos == config.AT_TIMESTAMP {
shardIterArgs = &kinesis.GetShardIteratorInput{
ShardId: &shard.ID,
ShardIteratorType: shardIteratorType,
Timestamp: sc.kclConfig.InitialPositionInStreamExtended.Timestamp,
StreamName: &sc.streamName,
}
} else {
shardIterArgs = &kinesis.GetShardIteratorInput{
ShardId: &shard.ID,
ShardIteratorType: shardIteratorType,
StreamName: &sc.streamName,
}
}
iterResp, err := sc.kc.GetShardIterator(shardIterArgs)
if err != nil {
return nil, err

View file

@ -81,6 +81,30 @@ func TestWorker(t *testing.T) {
runTest(kclConfig, false, t)
}
func TestWorkerWithTimestamp(t *testing.T) {
// In order to have precise control over logging. Use logger with config
config := logger.Configuration{
EnableConsole: true,
ConsoleLevel: logger.Debug,
ConsoleJSONFormat: false,
}
// Use logrus logger
log := logger.NewLogrusLoggerWithConfig(config)
ts := time.Now().Add(time.Second * 5)
kclConfig := cfg.NewKinesisClientLibConfig("appName", streamName, regionName, workerID).
WithTimestampAtInitialPositionInStream(&ts).
WithMaxRecords(10).
WithMaxLeasesForWorker(1).
WithShardSyncIntervalMillis(5000).
WithFailoverTimeMillis(300000).
WithMetricsBufferTimeMillis(10000).
WithMetricsMaxQueueSize(20).
WithLogger(log)
runTest(kclConfig, false, t)
}
func TestWorkerWithSigInt(t *testing.T) {
// At miminal. use standard zap logger
//zapLogger, err := zap.NewProduction()