diff --git a/consumer.go b/consumer.go index 7243beb..74b59bb 100644 --- a/consumer.go +++ b/consumer.go @@ -313,32 +313,22 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool { } type shards struct { - *sync.RWMutex - shardsInProcess map[string]struct{} + shardsInProcess sync.Map } func newShardsInProcess() *shards { - return &shards{ - RWMutex: &sync.RWMutex{}, - shardsInProcess: make(map[string]struct{}), - } + return &shards{} } func (s *shards) addShard(shardId string) { - s.Lock() - defer s.Unlock() - s.shardsInProcess[shardId] = struct{}{} + s.shardsInProcess.Store(shardId, struct{}{}) } func (s *shards) doesShardExist(shardId string) bool { - s.RLock() - defer s.RUnlock() - _, ok := s.shardsInProcess[shardId] + _, ok := s.shardsInProcess.Load(shardId) return ok } func (s *shards) deleteShard(shardId string) { - s.Lock() - defer s.Unlock() - delete(s.shardsInProcess, shardId) + s.shardsInProcess.Delete(shardId) }