From 75831cfec517cd43c1f5a41ca607e6b276683205 Mon Sep 17 00:00:00 2001 From: magiusdarrigo Date: Fri, 25 Nov 2022 17:07:01 -0600 Subject: [PATCH] call checkpointer nil in order to let workers know shard is expired --- clientlibrary/worker/polling-shard-consumer.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/clientlibrary/worker/polling-shard-consumer.go b/clientlibrary/worker/polling-shard-consumer.go index 32703f7..a9934b6 100644 --- a/clientlibrary/worker/polling-shard-consumer.go +++ b/clientlibrary/worker/polling-shard-consumer.go @@ -154,11 +154,12 @@ func (sc *PollingShardConsumer) getRecords() error { sc.processRecords(getRecordsStartTime, getResp.Records, getResp.MillisBehindLatest, recordCheckpointer) - // The shard has been closed, so no new records can be read from it + // The shard has no more records. We can stop polling it. It should expire soon. if getResp.NextShardIterator == nil { - log.Infof("Shard %s closed", sc.shard.ID) + log.Infof("Shard %s has no more records", sc.shard.ID) shutdownInput := &kcl.ShutdownInput{ShutdownReason: kcl.TERMINATE, Checkpointer: recordCheckpointer} sc.recordProcessor.Shutdown(shutdownInput) + recordCheckpointer.Checkpoint(nil) return nil } shardIterator = getResp.NextShardIterator