diff --git a/extensions/websocket/ws/manager.go b/extensions/websocket/ws/manager.go index 27dcf36..b250f7b 100644 --- a/extensions/websocket/ws/manager.go +++ b/extensions/websocket/ws/manager.go @@ -58,8 +58,7 @@ type SocketManager struct { sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]] idToRoom *xsync.MapOf[string, string] listeners []chan SocketEvent - listenersQueuedForDelete map[chan SocketEvent]bool - listenerLock sync.Mutex + listenersQueuedForDelete *xsync.MapOf[chan SocketEvent, bool] goroutinesRunning atomic.Int32 opts *opts.ExtensionOpts lock sync.Mutex @@ -131,7 +130,7 @@ func NewSocketManager(opts *opts.ExtensionOpts) *SocketManager { return &SocketManager{ sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](), idToRoom: xsync.NewMapOf[string, string](), - listenersQueuedForDelete: make(map[chan SocketEvent]bool), + listenersQueuedForDelete: xsync.NewMapOf[chan SocketEvent, bool](), opts: opts, goroutinesRunning: atomic.Int32{}, } @@ -180,20 +179,16 @@ func (manager *SocketManager) RunIntervalWithSocket(socketId string, interval ti } func (manager *SocketManager) Listen(listener chan SocketEvent) { - manager.listenerLock.Lock() if manager.listeners == nil { manager.listeners = make([]chan SocketEvent, 0) } if listener != nil { manager.listeners = append(manager.listeners, listener) } - manager.listenerLock.Unlock() } func (manager *SocketManager) RemoveListener(listener chan SocketEvent) { - manager.listenerLock.Lock() - manager.listenersQueuedForDelete[listener] = true - manager.listenerLock.Unlock() + manager.listenersQueuedForDelete.Store(listener, true) } func (manager *SocketManager) dispatch(event SocketEvent) { @@ -209,16 +204,13 @@ func (manager *SocketManager) dispatch(event SocketEvent) { } }() - manager.listenerLock.Lock() - if len(manager.listenersQueuedForDelete) > 0 { - newListener := make([]chan SocketEvent, 0) - for _, listener := range manager.listeners { - if _, ok := manager.listenersQueuedForDelete[listener]; !ok { - newListener = append(newListener, listener) - } + newListener := make([]chan SocketEvent, 0) + for _, listener := range manager.listeners { + if _, ok := manager.listenersQueuedForDelete.Load(listener); !ok { + newListener = append(newListener, listener) } - manager.listeners = newListener } + manager.listeners = newListener wg := sync.WaitGroup{} for _, listener := range manager.listeners { @@ -230,7 +222,6 @@ func (manager *SocketManager) dispatch(event SocketEvent) { } wg.Wait() - manager.listenerLock.Unlock() done <- struct{}{} }