move socket manager
This commit is contained in:
parent
257def3b53
commit
423fd3f429
11 changed files with 21 additions and 27 deletions
|
|
@ -21,8 +21,8 @@ func EnableExtension(app *h.App, opts opts.ExtensionOpts) {
|
||||||
panic("websocket: SessionId func is required")
|
panic("websocket: SessionId func is required")
|
||||||
}
|
}
|
||||||
|
|
||||||
service.Set[wsutil.SocketManager](app.Opts.ServiceLocator, service.Singleton, func() *wsutil.SocketManager {
|
service.Set[ws.SocketManager](app.Opts.ServiceLocator, service.Singleton, func() *ws.SocketManager {
|
||||||
manager := wsutil.NewSocketManager(&opts)
|
manager := ws.NewSocketManager(&opts)
|
||||||
manager.StartMetrics()
|
manager.StartMetrics()
|
||||||
return manager
|
return manager
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"github.com/gobwas/ws"
|
"github.com/gobwas/ws"
|
||||||
"github.com/gobwas/ws/wsutil"
|
"github.com/gobwas/ws/wsutil"
|
||||||
ws2 "github.com/maddalax/htmgo/extensions/websocket/opts"
|
ws2 "github.com/maddalax/htmgo/extensions/websocket/opts"
|
||||||
|
ws3 "github.com/maddalax/htmgo/extensions/websocket/ws"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"github.com/maddalax/htmgo/framework/service"
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
|
@ -25,7 +26,7 @@ func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
|
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
|
||||||
locator := cc.ServiceLocator()
|
locator := cc.ServiceLocator()
|
||||||
manager := service.Get[SocketManager](locator)
|
manager := service.Get[ws3.SocketManager](locator)
|
||||||
|
|
||||||
sessionId := opts.SessionId(cc)
|
sessionId := opts.SessionId(cc)
|
||||||
|
|
||||||
|
|
@ -46,7 +47,7 @@ func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc {
|
||||||
we don't want to block the writer
|
we don't want to block the writer
|
||||||
*/
|
*/
|
||||||
done := make(chan bool, 1000)
|
done := make(chan bool, 1000)
|
||||||
writer := make(WriterChan, 1000)
|
writer := make(ws3.WriterChan, 1000)
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,9 @@
|
||||||
package ws
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ManagerFromCtx(ctx *h.RequestContext) *wsutil.SocketManager {
|
func ManagerFromCtx(ctx *h.RequestContext) *SocketManager {
|
||||||
return wsutil.SocketManagerFromCtx(ctx)
|
return SocketManagerFromCtx(ctx)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package ws
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/session"
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"github.com/maddalax/htmgo/framework/service"
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
|
@ -33,7 +32,7 @@ func PushElement(data HandlerData, el *h.Element) bool {
|
||||||
// PushElementCtx sends an element to the current session and swaps it into the page
|
// PushElementCtx sends an element to the current session and swaps it into the page
|
||||||
func PushElementCtx(ctx *h.RequestContext, el *h.Element) bool {
|
func PushElementCtx(ctx *h.RequestContext, el *h.Element) bool {
|
||||||
locator := ctx.ServiceLocator()
|
locator := ctx.ServiceLocator()
|
||||||
socketManager := service.Get[wsutil.SocketManager](locator)
|
socketManager := service.Get[SocketManager](locator)
|
||||||
socketId := session.GetSessionId(ctx)
|
socketId := session.GetSessionId(ctx)
|
||||||
socket := socketManager.Get(string(socketId))
|
socket := socketManager.Get(string(socketId))
|
||||||
if socket == nil {
|
if socket == nil {
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package ws
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/session"
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"github.com/maddalax/htmgo/framework/service"
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
|
@ -12,7 +11,7 @@ import (
|
||||||
func Every(ctx *h.RequestContext, interval time.Duration, cb func() bool) {
|
func Every(ctx *h.RequestContext, interval time.Duration, cb func() bool) {
|
||||||
socketId := session.GetSessionId(ctx)
|
socketId := session.GetSessionId(ctx)
|
||||||
locator := ctx.ServiceLocator()
|
locator := ctx.ServiceLocator()
|
||||||
manager := service.Get[wsutil.SocketManager](locator)
|
manager := service.Get[SocketManager](locator)
|
||||||
manager.RunIntervalWithSocket(string(socketId), interval, cb)
|
manager.RunIntervalWithSocket(string(socketId), interval, cb)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,16 +2,15 @@ package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/session"
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageHandler struct {
|
type MessageHandler struct {
|
||||||
manager *wsutil.SocketManager
|
manager *SocketManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessageHandler(manager *wsutil.SocketManager) *MessageHandler {
|
func NewMessageHandler(manager *SocketManager) *MessageHandler {
|
||||||
return &MessageHandler{manager: manager}
|
return &MessageHandler{manager: manager}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -77,7 +76,7 @@ func (h *MessageHandler) OnDomElementRemoved(handlerId string) {
|
||||||
handlers.Delete(handlerId)
|
handlers.Delete(handlerId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *MessageHandler) OnSocketDisconnected(event wsutil.SocketEvent) {
|
func (h *MessageHandler) OnSocketDisconnected(event SocketEvent) {
|
||||||
sessionId := session.Id(event.SessionId)
|
sessionId := session.Id(event.SessionId)
|
||||||
hashes, ok := sessionIdToHashes.Load(sessionId)
|
hashes, ok := sessionIdToHashes.Load(sessionId)
|
||||||
if ok {
|
if ok {
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,12 @@
|
||||||
package ws
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/session"
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
"github.com/maddalax/htmgo/framework/service"
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
)
|
)
|
||||||
|
|
||||||
func StartListener(locator *service.Locator) {
|
func StartListener(locator *service.Locator) {
|
||||||
manager := service.Get[wsutil.SocketManager](locator)
|
manager := service.Get[SocketManager](locator)
|
||||||
manager.Listen(socketMessageListener)
|
manager.Listen(socketMessageListener)
|
||||||
handler := NewMessageHandler(manager)
|
handler := NewMessageHandler(manager)
|
||||||
go func() {
|
go func() {
|
||||||
|
|
@ -23,9 +22,9 @@ func handle(handler *MessageHandler) {
|
||||||
handler.OnServerSideEvent(event)
|
handler.OnServerSideEvent(event)
|
||||||
case event := <-socketMessageListener:
|
case event := <-socketMessageListener:
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
case wsutil.DisconnectedEvent:
|
case DisconnectedEvent:
|
||||||
handler.OnSocketDisconnected(event)
|
handler.OnSocketDisconnected(event)
|
||||||
case wsutil.MessageEvent:
|
case MessageEvent:
|
||||||
|
|
||||||
handlerId, ok := event.Payload["id"].(string)
|
handlerId, ok := event.Payload["id"].(string)
|
||||||
eventName, ok2 := event.Payload["event"].(string)
|
eventName, ok2 := event.Payload["event"].(string)
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package wsutil
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package wsutil
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
ws2 "github.com/maddalax/htmgo/extensions/websocket/opts"
|
ws2 "github.com/maddalax/htmgo/extensions/websocket/opts"
|
||||||
|
|
@ -1,12 +1,11 @@
|
||||||
package ws
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Metrics struct {
|
type Metrics struct {
|
||||||
Manager wsutil.ManagerMetrics
|
Manager ManagerMetrics
|
||||||
Handler HandlerMetrics
|
Handler HandlerMetrics
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,6 @@
|
||||||
package ws
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
|
||||||
"github.com/maddalax/htmgo/extensions/websocket/session"
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"github.com/puzpuzpuz/xsync/v3"
|
"github.com/puzpuzpuz/xsync/v3"
|
||||||
|
|
@ -11,8 +10,8 @@ import (
|
||||||
|
|
||||||
type HandlerData struct {
|
type HandlerData struct {
|
||||||
SessionId session.Id
|
SessionId session.Id
|
||||||
Socket *wsutil.SocketConnection
|
Socket *SocketConnection
|
||||||
Manager *wsutil.SocketManager
|
Manager *SocketManager
|
||||||
}
|
}
|
||||||
|
|
||||||
type Handler func(data HandlerData)
|
type Handler func(data HandlerData)
|
||||||
|
|
@ -29,7 +28,7 @@ var sessionIdToHashes = xsync.NewMapOf[session.Id, map[KeyHash]bool]()
|
||||||
var hashesToSessionId = xsync.NewMapOf[KeyHash, session.Id]()
|
var hashesToSessionId = xsync.NewMapOf[KeyHash, session.Id]()
|
||||||
var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]()
|
var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]()
|
||||||
|
|
||||||
var socketMessageListener = make(chan wsutil.SocketEvent, 100)
|
var socketMessageListener = make(chan SocketEvent, 100)
|
||||||
var serverSideMessageListener = make(chan ServerSideEvent, 100)
|
var serverSideMessageListener = make(chan ServerSideEvent, 100)
|
||||||
var lock = sync.Mutex{}
|
var lock = sync.Mutex{}
|
||||||
var callingHandler = atomic.Bool{}
|
var callingHandler = atomic.Bool{}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue