Compare commits
5 commits
master
...
ws-extensi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
519be8771d | ||
|
|
3a3039609f | ||
|
|
79b15fe194 | ||
|
|
c593c466a3 | ||
|
|
3fda1039a9 |
1 changed files with 36 additions and 14 deletions
|
|
@ -55,15 +55,16 @@ type ManagerMetrics struct {
|
|||
}
|
||||
|
||||
type SocketManager struct {
|
||||
sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]]
|
||||
idToRoom *xsync.MapOf[string, string]
|
||||
listeners []chan SocketEvent
|
||||
goroutinesRunning atomic.Int32
|
||||
opts *opts.ExtensionOpts
|
||||
lock sync.Mutex
|
||||
totalMessages atomic.Int64
|
||||
messagesPerSecond int
|
||||
secondsElapsed int
|
||||
sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]]
|
||||
idToRoom *xsync.MapOf[string, string]
|
||||
listeners []chan SocketEvent
|
||||
listenersQueuedForDelete *xsync.MapOf[chan SocketEvent, bool]
|
||||
goroutinesRunning atomic.Int32
|
||||
opts *opts.ExtensionOpts
|
||||
lock sync.Mutex
|
||||
totalMessages atomic.Int64
|
||||
messagesPerSecond int
|
||||
secondsElapsed int
|
||||
}
|
||||
|
||||
func (manager *SocketManager) StartMetrics() {
|
||||
|
|
@ -127,10 +128,11 @@ func SocketManagerFromCtx(ctx *h.RequestContext) *SocketManager {
|
|||
|
||||
func NewSocketManager(opts *opts.ExtensionOpts) *SocketManager {
|
||||
return &SocketManager{
|
||||
sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](),
|
||||
idToRoom: xsync.NewMapOf[string, string](),
|
||||
opts: opts,
|
||||
goroutinesRunning: atomic.Int32{},
|
||||
sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](),
|
||||
idToRoom: xsync.NewMapOf[string, string](),
|
||||
listenersQueuedForDelete: xsync.NewMapOf[chan SocketEvent, bool](),
|
||||
opts: opts,
|
||||
goroutinesRunning: atomic.Int32{},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -185,6 +187,10 @@ func (manager *SocketManager) Listen(listener chan SocketEvent) {
|
|||
}
|
||||
}
|
||||
|
||||
func (manager *SocketManager) RemoveListener(listener chan SocketEvent) {
|
||||
manager.listenersQueuedForDelete.Store(listener, true)
|
||||
}
|
||||
|
||||
func (manager *SocketManager) dispatch(event SocketEvent) {
|
||||
done := make(chan struct{}, 1)
|
||||
go func() {
|
||||
|
|
@ -197,9 +203,25 @@ func (manager *SocketManager) dispatch(event SocketEvent) {
|
|||
}
|
||||
}
|
||||
}()
|
||||
|
||||
newListener := make([]chan SocketEvent, 0)
|
||||
for _, listener := range manager.listeners {
|
||||
listener <- event
|
||||
if _, ok := manager.listenersQueuedForDelete.Load(listener); !ok {
|
||||
newListener = append(newListener, listener)
|
||||
}
|
||||
}
|
||||
manager.listeners = newListener
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for _, listener := range manager.listeners {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
listener <- event
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
done <- struct{}{}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue