From baf8258298362f90b75a3f0e4674683fdabe1c5b Mon Sep 17 00:00:00 2001 From: Guy A Molinari Date: Sat, 7 Dec 2024 13:54:45 -0600 Subject: [PATCH] Fix issue #167 - Concurrent write/write of in-flight shard map (#168) --- consumer.go | 20 +++++--------------- 1 file changed, 5 insertions(+), 15 deletions(-) diff --git a/consumer.go b/consumer.go index 7243beb..74b59bb 100644 --- a/consumer.go +++ b/consumer.go @@ -313,32 +313,22 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool { } type shards struct { - *sync.RWMutex - shardsInProcess map[string]struct{} + shardsInProcess sync.Map } func newShardsInProcess() *shards { - return &shards{ - RWMutex: &sync.RWMutex{}, - shardsInProcess: make(map[string]struct{}), - } + return &shards{} } func (s *shards) addShard(shardId string) { - s.Lock() - defer s.Unlock() - s.shardsInProcess[shardId] = struct{}{} + s.shardsInProcess.Store(shardId, struct{}{}) } func (s *shards) doesShardExist(shardId string) bool { - s.RLock() - defer s.RUnlock() - _, ok := s.shardsInProcess[shardId] + _, ok := s.shardsInProcess.Load(shardId) return ok } func (s *shards) deleteShard(shardId string) { - s.Lock() - defer s.Unlock() - delete(s.shardsInProcess, shardId) + s.shardsInProcess.Delete(shardId) }