From 79b15fe1945452044dc5b8631b5bcb5d0ba7742a Mon Sep 17 00:00:00 2001 From: maddalax Date: Tue, 12 Nov 2024 15:47:06 -0600 Subject: [PATCH] lock listeners --- extensions/websocket/ws/manager.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/extensions/websocket/ws/manager.go b/extensions/websocket/ws/manager.go index 8d5616b..27dcf36 100644 --- a/extensions/websocket/ws/manager.go +++ b/extensions/websocket/ws/manager.go @@ -59,6 +59,7 @@ type SocketManager struct { idToRoom *xsync.MapOf[string, string] listeners []chan SocketEvent listenersQueuedForDelete map[chan SocketEvent]bool + listenerLock sync.Mutex goroutinesRunning atomic.Int32 opts *opts.ExtensionOpts lock sync.Mutex @@ -179,16 +180,20 @@ func (manager *SocketManager) RunIntervalWithSocket(socketId string, interval ti } func (manager *SocketManager) Listen(listener chan SocketEvent) { + manager.listenerLock.Lock() if manager.listeners == nil { manager.listeners = make([]chan SocketEvent, 0) } if listener != nil { manager.listeners = append(manager.listeners, listener) } + manager.listenerLock.Unlock() } func (manager *SocketManager) RemoveListener(listener chan SocketEvent) { + manager.listenerLock.Lock() manager.listenersQueuedForDelete[listener] = true + manager.listenerLock.Unlock() } func (manager *SocketManager) dispatch(event SocketEvent) { @@ -204,6 +209,7 @@ func (manager *SocketManager) dispatch(event SocketEvent) { } }() + manager.listenerLock.Lock() if len(manager.listenersQueuedForDelete) > 0 { newListener := make([]chan SocketEvent, 0) for _, listener := range manager.listeners { @@ -224,6 +230,7 @@ func (manager *SocketManager) dispatch(event SocketEvent) { } wg.Wait() + manager.listenerLock.Unlock() done <- struct{}{} }