Compare commits

...

1 commit

Author SHA1 Message Date
Guy A Molinari
baf8258298
Fix issue #167 - Concurrent write/write of in-flight shard map (#168) 2024-12-07 11:54:45 -08:00

View file

@ -313,32 +313,22 @@ func isShardClosed(nextShardIterator, currentShardIterator *string) bool {
}
type shards struct {
*sync.RWMutex
shardsInProcess map[string]struct{}
shardsInProcess sync.Map
}
func newShardsInProcess() *shards {
return &shards{
RWMutex: &sync.RWMutex{},
shardsInProcess: make(map[string]struct{}),
}
return &shards{}
}
func (s *shards) addShard(shardId string) {
s.Lock()
defer s.Unlock()
s.shardsInProcess[shardId] = struct{}{}
s.shardsInProcess.Store(shardId, struct{}{})
}
func (s *shards) doesShardExist(shardId string) bool {
s.RLock()
defer s.RUnlock()
_, ok := s.shardsInProcess[shardId]
_, ok := s.shardsInProcess.Load(shardId)
return ok
}
func (s *shards) deleteShard(shardId string) {
s.Lock()
defer s.Unlock()
delete(s.shardsInProcess, shardId)
s.shardsInProcess.Delete(shardId)
}