diff --git a/clientlibrary/interfaces/record-processor.go b/clientlibrary/interfaces/record-processor.go index 1c41d56..ece0a1d 100644 --- a/clientlibrary/interfaces/record-processor.go +++ b/clientlibrary/interfaces/record-processor.go @@ -61,6 +61,12 @@ type ( */ ProcessRecords(processRecordsInput *ProcessRecordsInput) + // ResetShardIterator + /* + * If true, resets the shard iterator to the last checkpointed sequence number for the shard. + */ + ResetShardIterator() bool + // Shutdown /* * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index a9934b6..2ebdddf 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -127,6 +127,17 @@ func (sc *PollingShardConsumer) getRecords() error { getRecordsStartTime := time.Now() log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) + + // check if ResetShardIterator returns true + if sc.recordProcessor.ResetShardIterator() { + // reset shard iterator + shardIterator, err = sc.getShardIterator() + if err != nil { + log.Errorf("Unable to get shard iterator for %s: %v", sc.shard.ID, err) + return err + } + } + getRecordsArgs := &kinesis.GetRecordsInput{ Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)), ShardIterator: shardIterator, diff --git a/test/record_processor_test.go b/test/record_processor_test.go index 3eb274a..aecc03a 100644 --- a/test/record_processor_test.go +++ b/test/record_processor_test.go @@ -57,6 +57,10 @@ func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) { dd.count = 0 } +func (dd *dumpRecordProcessor) ResetShardIterator() bool { + return false +} + func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { dd.t.Log("Processing Records...")