ResetShardIterator added to rp interface (#2)

This commit is contained in:
Matteo Agius 2023-02-07 20:25:45 -05:00 committed by GitHub
parent 72d3b6f328
commit c89ce3608b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 0 deletions

View file

@ -61,6 +61,12 @@ type (
*/ */
ProcessRecords(processRecordsInput *ProcessRecordsInput) ProcessRecords(processRecordsInput *ProcessRecordsInput)
// ResetShardIterator
/*
* If true, resets the shard iterator to the last checkpointed sequence number for the shard.
*/
ResetShardIterator() bool
// Shutdown // Shutdown
/* /*
* Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this * Invoked by the Amazon Kinesis Client Library to indicate it will no longer send data records to this

View file

@ -127,6 +127,17 @@ func (sc *PollingShardConsumer) getRecords() error {
getRecordsStartTime := time.Now() getRecordsStartTime := time.Now()
log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator)) log.Debugf("Trying to read %d record from iterator: %v", sc.kclConfig.MaxRecords, aws.ToString(shardIterator))
// check if ResetShardIterator returns true
if sc.recordProcessor.ResetShardIterator() {
// reset shard iterator
shardIterator, err = sc.getShardIterator()
if err != nil {
log.Errorf("Unable to get shard iterator for %s: %v", sc.shard.ID, err)
return err
}
}
getRecordsArgs := &kinesis.GetRecordsInput{ getRecordsArgs := &kinesis.GetRecordsInput{
Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)), Limit: aws.Int32(int32(sc.kclConfig.MaxRecords)),
ShardIterator: shardIterator, ShardIterator: shardIterator,

View file

@ -57,6 +57,10 @@ func (dd *dumpRecordProcessor) Initialize(input *kc.InitializationInput) {
dd.count = 0 dd.count = 0
} }
func (dd *dumpRecordProcessor) ResetShardIterator() bool {
return false
}
func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) { func (dd *dumpRecordProcessor) ProcessRecords(input *kc.ProcessRecordsInput) {
dd.t.Log("Processing Records...") dd.t.Log("Processing Records...")