lock listeners
This commit is contained in:
parent
c593c466a3
commit
79b15fe194
1 changed files with 7 additions and 0 deletions
|
|
@ -59,6 +59,7 @@ type SocketManager struct {
|
||||||
idToRoom *xsync.MapOf[string, string]
|
idToRoom *xsync.MapOf[string, string]
|
||||||
listeners []chan SocketEvent
|
listeners []chan SocketEvent
|
||||||
listenersQueuedForDelete map[chan SocketEvent]bool
|
listenersQueuedForDelete map[chan SocketEvent]bool
|
||||||
|
listenerLock sync.Mutex
|
||||||
goroutinesRunning atomic.Int32
|
goroutinesRunning atomic.Int32
|
||||||
opts *opts.ExtensionOpts
|
opts *opts.ExtensionOpts
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
|
@ -179,16 +180,20 @@ func (manager *SocketManager) RunIntervalWithSocket(socketId string, interval ti
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *SocketManager) Listen(listener chan SocketEvent) {
|
func (manager *SocketManager) Listen(listener chan SocketEvent) {
|
||||||
|
manager.listenerLock.Lock()
|
||||||
if manager.listeners == nil {
|
if manager.listeners == nil {
|
||||||
manager.listeners = make([]chan SocketEvent, 0)
|
manager.listeners = make([]chan SocketEvent, 0)
|
||||||
}
|
}
|
||||||
if listener != nil {
|
if listener != nil {
|
||||||
manager.listeners = append(manager.listeners, listener)
|
manager.listeners = append(manager.listeners, listener)
|
||||||
}
|
}
|
||||||
|
manager.listenerLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *SocketManager) RemoveListener(listener chan SocketEvent) {
|
func (manager *SocketManager) RemoveListener(listener chan SocketEvent) {
|
||||||
|
manager.listenerLock.Lock()
|
||||||
manager.listenersQueuedForDelete[listener] = true
|
manager.listenersQueuedForDelete[listener] = true
|
||||||
|
manager.listenerLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *SocketManager) dispatch(event SocketEvent) {
|
func (manager *SocketManager) dispatch(event SocketEvent) {
|
||||||
|
|
@ -204,6 +209,7 @@ func (manager *SocketManager) dispatch(event SocketEvent) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
manager.listenerLock.Lock()
|
||||||
if len(manager.listenersQueuedForDelete) > 0 {
|
if len(manager.listenersQueuedForDelete) > 0 {
|
||||||
newListener := make([]chan SocketEvent, 0)
|
newListener := make([]chan SocketEvent, 0)
|
||||||
for _, listener := range manager.listeners {
|
for _, listener := range manager.listeners {
|
||||||
|
|
@ -224,6 +230,7 @@ func (manager *SocketManager) dispatch(event SocketEvent) {
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
manager.listenerLock.Unlock()
|
||||||
done <- struct{}{}
|
done <- struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue