From 5ef4338a22b617fe3ae69a45d1a2428070421adc Mon Sep 17 00:00:00 2001 From: Tao Jiang Date: Sat, 21 Apr 2018 19:58:51 -0700 Subject: [PATCH] KCL: Update shard sync to remove not existed shard Need to remove shard not longer existed in Kinesis from shardStatus cache. Change-Id: I09b4a4c3c6480b8300fa937e6073dcd578156b29 --- src/clientlibrary/worker/worker.go | 32 +++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/clientlibrary/worker/worker.go b/src/clientlibrary/worker/worker.go index 9a7e27c..1603ff4 100644 --- a/src/clientlibrary/worker/worker.go +++ b/src/clientlibrary/worker/worker.go @@ -168,12 +168,6 @@ func (w *Worker) initialize() error { wg := sync.WaitGroup{} w.waitGroup = &wg - err = w.getShardIDs("") - if err != nil { - log.Errorf("Error getting Kinesis shards: %s", err) - return err - } - log.Info("Initialization complete.") return nil @@ -199,12 +193,13 @@ func (w *Worker) newShardConsumer(shard *shardStatus) *ShardConsumer { // eventLoop func (w *Worker) eventLoop() { for { - err := w.getShardIDs("") + err := w.syncShard() if err != nil { log.Errorf("Error getting Kinesis shards: %v", err) // Back-off? time.Sleep(500 * time.Millisecond) } + log.Infof("Found %d shards", len(w.shardStatus)) // Count the number of leases hold by this worker @@ -271,7 +266,9 @@ func (w *Worker) eventLoop() { } // List all ACTIVE shard and store them into shardStatus table -func (w *Worker) getShardIDs(startShardID string) error { +// If shard has been removed, need to exclude it from cached shard status. +func (w *Worker) getShardIDs(startShardID string, shardInfo map[string]bool) error { + // The default pagination limit is 100. args := &kinesis.DescribeStreamInput{ StreamName: aws.String(w.streamName), } @@ -289,6 +286,8 @@ func (w *Worker) getShardIDs(startShardID string) error { var lastShardID string for _, s := range streamDesc.StreamDescription.Shards { + // record avail shardId from fresh reading from Kinesis + shardInfo[*s.ShardId] = true // found new shard if _, ok := w.shardStatus[*s.ShardId]; !ok { log.Debugf("Found shard with id %s", *s.ShardId) @@ -304,7 +303,7 @@ func (w *Worker) getShardIDs(startShardID string) error { } if *streamDesc.StreamDescription.HasMoreShards { - err := w.getShardIDs(lastShardID) + err := w.getShardIDs(lastShardID, shardInfo) if err != nil { return err } @@ -312,3 +311,18 @@ func (w *Worker) getShardIDs(startShardID string) error { return nil } + +// syncShard to sync the cached shard info with actual shard info from Kinesis +func (w *Worker) syncShard() error { + shardInfo := make(map[string]bool) + err := w.getShardIDs("", shardInfo) + + for _, shard := range w.shardStatus { + // The cached shard no longer existed, remove it. + if _, ok := shardInfo[shard.ID]; !ok { + delete(w.shardStatus, shard.ID) + } + } + + return err +}