Compare commits

...

5 commits

Author SHA1 Message Date
maddalax
519be8771d Merge remote-tracking branch 'origin/master' into ws-extension-updates 2024-11-18 11:28:49 -06:00
maddalax
3a3039609f remove listener lock 2024-11-12 16:12:42 -06:00
maddalax
79b15fe194 lock listeners 2024-11-12 15:47:06 -06:00
maddalax
c593c466a3 better code for deleting the listener 2024-11-12 13:40:31 -06:00
maddalax
3fda1039a9 remove listener 2024-11-12 13:09:33 -06:00

View file

@ -58,6 +58,7 @@ type SocketManager struct {
sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]]
idToRoom *xsync.MapOf[string, string]
listeners []chan SocketEvent
listenersQueuedForDelete *xsync.MapOf[chan SocketEvent, bool]
goroutinesRunning atomic.Int32
opts *opts.ExtensionOpts
lock sync.Mutex
@ -129,6 +130,7 @@ func NewSocketManager(opts *opts.ExtensionOpts) *SocketManager {
return &SocketManager{
sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](),
idToRoom: xsync.NewMapOf[string, string](),
listenersQueuedForDelete: xsync.NewMapOf[chan SocketEvent, bool](),
opts: opts,
goroutinesRunning: atomic.Int32{},
}
@ -185,6 +187,10 @@ func (manager *SocketManager) Listen(listener chan SocketEvent) {
}
}
func (manager *SocketManager) RemoveListener(listener chan SocketEvent) {
manager.listenersQueuedForDelete.Store(listener, true)
}
func (manager *SocketManager) dispatch(event SocketEvent) {
done := make(chan struct{}, 1)
go func() {
@ -197,9 +203,25 @@ func (manager *SocketManager) dispatch(event SocketEvent) {
}
}
}()
newListener := make([]chan SocketEvent, 0)
for _, listener := range manager.listeners {
listener <- event
if _, ok := manager.listenersQueuedForDelete.Load(listener); !ok {
newListener = append(newListener, listener)
}
}
manager.listeners = newListener
wg := sync.WaitGroup{}
for _, listener := range manager.listeners {
wg.Add(1)
go func() {
defer wg.Done()
listener <- event
}()
}
wg.Wait()
done <- struct{}{}
}