From fdef1a9688cac4bcd234b9ff44c168d16d4743eb Mon Sep 17 00:00:00 2001 From: maddalax Date: Mon, 4 Nov 2024 10:37:41 -0600 Subject: [PATCH] add metric page --- examples/ws-example/main.go | 3 + examples/ws-example/pages/index.go | 14 +- examples/ws-example/pages/ws/metrics.go | 120 ++++++++++++++++++ extensions/websocket/internal/random.go | 13 -- .../websocket/internal/wsutil/handler.go | 11 +- .../websocket/internal/wsutil/manager.go | 103 ++++++++++++++- extensions/websocket/opts/opts.go | 1 + extensions/websocket/ws/access.go | 10 ++ extensions/websocket/ws/every.go | 45 +++---- extensions/websocket/ws/listener.go | 11 -- extensions/websocket/ws/metrics.go | 19 +++ extensions/websocket/ws/register.go | 15 +++ framework/h/tag.go | 12 ++ 13 files changed, 302 insertions(+), 75 deletions(-) create mode 100644 examples/ws-example/pages/ws/metrics.go delete mode 100644 extensions/websocket/internal/random.go create mode 100644 extensions/websocket/ws/access.go create mode 100644 extensions/websocket/ws/metrics.go diff --git a/examples/ws-example/main.go b/examples/ws-example/main.go index 576d8d9..86c3b0f 100644 --- a/examples/ws-example/main.go +++ b/examples/ws-example/main.go @@ -25,6 +25,9 @@ func main() { websocket.EnableExtension(app, ws2.ExtensionOpts{ WsPath: "/ws", + RoomName: func(ctx *h.RequestContext) string { + return "all" + }, SessionId: func(ctx *h.RequestContext) string { return ctx.QueryParam("sessionId") }, diff --git a/examples/ws-example/pages/index.go b/examples/ws-example/pages/index.go index 2bf090e..caa6d59 100644 --- a/examples/ws-example/pages/index.go +++ b/examples/ws-example/pages/index.go @@ -5,24 +5,12 @@ import ( "github.com/maddalax/htmgo/extensions/websocket/ws" "github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/session" - "time" "ws-example/partials" ) func IndexPage(ctx *h.RequestContext) *h.Page { sessionId := session.GetSessionId(ctx) - ws.Every(ctx, time.Second, func() bool { - return ws.PushElementCtx( - ctx, - h.Div( - h.Attribute("hx-swap-oob", "true"), - h.Id("current-time"), - h.TextF("Current time: %s", time.Now().Format("15:04:05")), - ), - ) - }) - return h.NewPage( RootPage( ctx, @@ -35,7 +23,7 @@ func IndexPage(ctx *h.RequestContext) *h.Page { h.Class("text-2xl"), ), h.Div( - h.Id("current-time"), + h.Id("ws-metrics"), ), partials.CounterForm(ctx, partials.CounterProps{Id: "counter-1"}), partials.Repeater(ctx, partials.RepeaterProps{ diff --git a/examples/ws-example/pages/ws/metrics.go b/examples/ws-example/pages/ws/metrics.go new file mode 100644 index 0000000..269cf85 --- /dev/null +++ b/examples/ws-example/pages/ws/metrics.go @@ -0,0 +1,120 @@ +package ws + +import ( + "fmt" + "github.com/maddalax/htmgo/extensions/websocket/ws" + "github.com/maddalax/htmgo/framework/h" + "github.com/maddalax/htmgo/framework/session" + "runtime" + "time" + "ws-example/pages" +) + +func Metrics(ctx *h.RequestContext) *h.Page { + + ws.RunOnConnected(ctx, func() { + ws.PushElementCtx( + ctx, + metricsView(ctx), + ) + }) + + ws.Every(ctx, time.Second, func() bool { + return ws.PushElementCtx( + ctx, + metricsView(ctx), + ) + }) + + return h.NewPage( + pages.RootPage( + ctx, + h.Div( + h.Attribute("ws-connect", fmt.Sprintf("/ws?sessionId=%s", session.GetSessionId(ctx))), + h.Class("flex flex-col gap-4 items-center min-h-screen max-w-2xl mx-auto mt-8"), + h.H3( + h.Id("intro-text"), + h.Text("Websocket Metrics"), + h.Class("text-2xl"), + ), + h.Div( + h.Id("ws-metrics"), + ), + ), + ), + ) +} + +func metricsView(ctx *h.RequestContext) *h.Element { + metrics := ws.MetricsFromCtx(ctx) + + return h.Div( + h.Id("ws-metrics"), + List(metrics), + ) +} + +func List(metrics ws.Metrics) *h.Element { + return h.Body( + h.Div( + h.Class("flow-root rounded-lg border border-gray-100 py-3 shadow-sm"), + h.Dl( + h.Class("-my-3 divide-y divide-gray-100 text-sm"), + ListItem("Current Time", time.Now().Format("15:04:05")), + ListItem("Total Goroutines For ws.Every", fmt.Sprintf("%d", metrics.Manager.RunningGoroutines)), + ListItem("Total Goroutines In System", fmt.Sprintf("%d", runtime.NumGoroutine())), + ListItem("Sockets", fmt.Sprintf("%d", metrics.Manager.TotalSockets)), + ListItem("Rooms", fmt.Sprintf("%d", metrics.Manager.TotalRooms)), + ListItem("Session Id To Hashes", fmt.Sprintf("%d", metrics.Handler.SessionIdToHashesCount)), + ListItem("Total Handlers", fmt.Sprintf("%d", metrics.Handler.TotalHandlers)), + ListItem("Server Event Names To Hash", fmt.Sprintf("%d", metrics.Handler.ServerEventNamesToHashCount)), + ListItem("Total Listeners", fmt.Sprintf("%d", metrics.Manager.TotalListeners)), + h.IterMap(metrics.Manager.SocketsPerRoom, func(key string, value []string) *h.Element { + return ListBlock( + fmt.Sprintf("Sockets In Room - %s", key), + h.Div( + h.List(value, func(item string, index int) *h.Element { + return h.Div( + h.Pf("%s", item), + ) + }), + ), + ) + }), + ), + ), + ) +} + +func ListItem(term, description string) *h.Element { + return h.Div( + h.Class("grid grid-cols-1 gap-1 p-3 even:bg-gray-50 sm:grid-cols-3 sm:gap-4"), + DescriptionTerm(term), + DescriptionDetail(description), + ) +} + +func ListBlock(title string, children *h.Element) *h.Element { + return h.Div( + h.Class("grid grid-cols-1 gap-1 p-3 even:bg-gray-50 sm:grid-cols-3 sm:gap-4"), + DescriptionTerm(title), + h.Dd( + h.Class("text-gray-700 sm:col-span-2"), + children, + ), + ) +} + +func DescriptionTerm(term string) *h.Element { + return h.Dt( + h.Class("font-medium text-gray-900"), + h.Text(term), + ) +} + +func DescriptionDetail(detail string) *h.Element { + return h.Dd( + h.Class("text-gray-700 sm:col-span-2"), + h.Text(detail), + ) +} diff --git a/extensions/websocket/internal/random.go b/extensions/websocket/internal/random.go deleted file mode 100644 index cdd5416..0000000 --- a/extensions/websocket/internal/random.go +++ /dev/null @@ -1,13 +0,0 @@ -package internal - -import "math/rand" - -var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") - -func RandSeq(n int) string { - b := make([]rune, n) - for i := range b { - b[i] = letters[rand.Intn(len(letters))] - } - return string(b) -} diff --git a/extensions/websocket/internal/wsutil/handler.go b/extensions/websocket/internal/wsutil/handler.go index 4beb0c3..efa7ece 100644 --- a/extensions/websocket/internal/wsutil/handler.go +++ b/extensions/websocket/internal/wsutil/handler.go @@ -15,6 +15,13 @@ import ( ) func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc { + + if opts.RoomName == nil { + opts.RoomName = func(ctx *h.RequestContext) string { + return "all" + } + } + return func(w http.ResponseWriter, r *http.Request) { cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext) locator := cc.ServiceLocator() @@ -32,6 +39,8 @@ func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc { slog.Info("failed to upgrade", slog.String("error", err.Error())) return } + + roomId := opts.RoomName(cc) /* Large buffer in case the client disconnects while we are writing we don't want to block the writer @@ -41,7 +50,7 @@ func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc { wg := sync.WaitGroup{} - manager.Add("all", sessionId, writer, done) + manager.Add(roomId, sessionId, writer, done) /* * This goroutine is responsible for writing messages to the client diff --git a/extensions/websocket/internal/wsutil/manager.go b/extensions/websocket/internal/wsutil/manager.go index b9eb566..2c6ea78 100644 --- a/extensions/websocket/internal/wsutil/manager.go +++ b/extensions/websocket/internal/wsutil/manager.go @@ -3,9 +3,13 @@ package wsutil import ( "fmt" "github.com/maddalax/htmgo/extensions/websocket/opts" + "github.com/maddalax/htmgo/framework/h" + "github.com/maddalax/htmgo/framework/service" "github.com/puzpuzpuz/xsync/v3" "log/slog" "strings" + "sync" + "sync/atomic" "time" ) @@ -38,18 +42,72 @@ type SocketConnection struct { Writer WriterChan } +type ManagerMetrics struct { + RunningGoroutines int32 + TotalSockets int + TotalRooms int + TotalListeners int + SocketsPerRoomCount map[string]int + SocketsPerRoom map[string][]string +} + type SocketManager struct { - sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]] - idToRoom *xsync.MapOf[string, string] - listeners []chan SocketEvent - opts *opts.ExtensionOpts + sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]] + idToRoom *xsync.MapOf[string, string] + listeners []chan SocketEvent + goroutinesRunning atomic.Int32 + opts *opts.ExtensionOpts + lock sync.Mutex +} + +func (manager *SocketManager) Metrics() ManagerMetrics { + manager.lock.Lock() + defer manager.lock.Unlock() + count := manager.goroutinesRunning.Load() + metrics := ManagerMetrics{ + RunningGoroutines: count, + TotalSockets: manager.sockets.Size(), + TotalRooms: 0, + TotalListeners: len(manager.listeners), + SocketsPerRoom: make(map[string][]string), + SocketsPerRoomCount: make(map[string]int), + } + + roomMap := make(map[string]int) + + manager.idToRoom.Range(func(socketId string, roomId string) bool { + roomMap[roomId]++ + return true + }) + + metrics.TotalRooms = len(roomMap) + + manager.sockets.Range(func(roomId string, sockets *xsync.MapOf[string, SocketConnection]) bool { + metrics.SocketsPerRoomCount[roomId] = sockets.Size() + sockets.Range(func(socketId string, conn SocketConnection) bool { + if metrics.SocketsPerRoom[roomId] == nil { + metrics.SocketsPerRoom[roomId] = []string{} + } + metrics.SocketsPerRoom[roomId] = append(metrics.SocketsPerRoom[roomId], socketId) + return true + }) + return true + }) + + return metrics +} + +func SocketManagerFromCtx(ctx *h.RequestContext) *SocketManager { + locator := ctx.ServiceLocator() + return service.Get[SocketManager](locator) } func NewSocketManager(opts *opts.ExtensionOpts) *SocketManager { return &SocketManager{ - sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](), - idToRoom: xsync.NewMapOf[string, string](), - opts: opts, + sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](), + idToRoom: xsync.NewMapOf[string, string](), + opts: opts, + goroutinesRunning: atomic.Int32{}, } } @@ -64,6 +122,37 @@ func (manager *SocketManager) ForEachSocket(roomId string, cb func(conn SocketCo }) } +func (manager *SocketManager) RunIntervalWithSocket(socketId string, interval time.Duration, cb func() bool) { + socketIdSlog := slog.String("socketId", socketId) + slog.Debug("ws-extension: starting every loop", socketIdSlog, slog.Duration("duration", interval)) + + go func() { + manager.goroutinesRunning.Add(1) + defer manager.goroutinesRunning.Add(-1) + tries := 0 + for { + socket := manager.Get(socketId) + // This can run before the socket is established, lets try a few times and kill it if socket isn't connected after a bit. + if socket == nil { + if tries > 200 { + slog.Debug("ws-extension: socket disconnected, killing goroutine", socketIdSlog) + return + } else { + time.Sleep(time.Millisecond * 15) + tries++ + slog.Debug("ws-extension: socket not connected yet, trying again", socketIdSlog, slog.Int("attempt", tries)) + continue + } + } + success := cb() + if !success { + return + } + time.Sleep(interval) + } + }() +} + func (manager *SocketManager) Listen(listener chan SocketEvent) { if manager.listeners == nil { manager.listeners = make([]chan SocketEvent, 0) diff --git a/extensions/websocket/opts/opts.go b/extensions/websocket/opts/opts.go index 1019cfa..b59373b 100644 --- a/extensions/websocket/opts/opts.go +++ b/extensions/websocket/opts/opts.go @@ -4,5 +4,6 @@ import "github.com/maddalax/htmgo/framework/h" type ExtensionOpts struct { WsPath string + RoomName func(ctx *h.RequestContext) string SessionId func(ctx *h.RequestContext) string } diff --git a/extensions/websocket/ws/access.go b/extensions/websocket/ws/access.go new file mode 100644 index 0000000..efa63c5 --- /dev/null +++ b/extensions/websocket/ws/access.go @@ -0,0 +1,10 @@ +package ws + +import ( + "github.com/maddalax/htmgo/extensions/websocket/internal/wsutil" + "github.com/maddalax/htmgo/framework/h" +) + +func ManagerFromCtx(ctx *h.RequestContext) *wsutil.SocketManager { + return wsutil.SocketManagerFromCtx(ctx) +} diff --git a/extensions/websocket/ws/every.go b/extensions/websocket/ws/every.go index 1e73fbd..bbe19b7 100644 --- a/extensions/websocket/ws/every.go +++ b/extensions/websocket/ws/every.go @@ -5,40 +5,25 @@ import ( "github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/session" - "log/slog" "time" ) // Every executes the given callback every interval, until the socket is disconnected, or the callback returns false. func Every(ctx *h.RequestContext, interval time.Duration, cb func() bool) { socketId := session.GetSessionId(ctx) - socketIdSlog := slog.String("socketId", string(socketId)) - - slog.Debug("ws-extension: starting every loop", socketIdSlog, slog.Duration("duration", interval)) - - go func() { - tries := 0 - for { - locator := ctx.ServiceLocator() - socketManager := service.Get[wsutil.SocketManager](locator) - socket := socketManager.Get(string(socketId)) - // This can run before the socket is established, lets try a few times and kill it if socket isn't connected after a bit. - if socket == nil { - if tries > 5 { - slog.Debug("ws-extension: socket disconnected, killing goroutine", socketIdSlog) - return - } else { - time.Sleep(time.Second) - tries++ - slog.Debug("ws-extension: socket not connected yet, trying again", socketIdSlog, slog.Int("attempt", tries)) - continue - } - } - success := cb() - if !success { - return - } - time.Sleep(interval) - } - }() + locator := ctx.ServiceLocator() + manager := service.Get[wsutil.SocketManager](locator) + manager.RunIntervalWithSocket(string(socketId), interval, cb) +} + +func Once(ctx *h.RequestContext, cb func()) { + // time is irrelevant, we just need to run the callback once, it will exit after because of the false return + Every(ctx, time.Millisecond, func() bool { + cb() + return false + }) +} + +func RunOnConnected(ctx *h.RequestContext, cb func()) { + Once(ctx, cb) } diff --git a/extensions/websocket/ws/listener.go b/extensions/websocket/ws/listener.go index dff8e1a..f159add 100644 --- a/extensions/websocket/ws/listener.go +++ b/extensions/websocket/ws/listener.go @@ -1,11 +1,9 @@ package ws import ( - "fmt" "github.com/maddalax/htmgo/extensions/websocket/internal/wsutil" "github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/session" - "time" ) func StartListener(locator *service.Locator) { @@ -13,15 +11,6 @@ func StartListener(locator *service.Locator) { manager.Listen(socketMessageListener) handler := NewMessageHandler(manager) - go func() { - for { - fmt.Printf("total handlers: %d\n", handlers.Size()) - fmt.Printf("total serverEventNamesToHash: %d\n", serverEventNamesToHash.Size()) - fmt.Printf("total sessionIdToHashes: %d\n", sessionIdToHashes.Size()) - time.Sleep(5 * time.Second) - } - }() - go func() { for { select { diff --git a/extensions/websocket/ws/metrics.go b/extensions/websocket/ws/metrics.go new file mode 100644 index 0000000..258493b --- /dev/null +++ b/extensions/websocket/ws/metrics.go @@ -0,0 +1,19 @@ +package ws + +import ( + "github.com/maddalax/htmgo/extensions/websocket/internal/wsutil" + "github.com/maddalax/htmgo/framework/h" +) + +type Metrics struct { + Manager wsutil.ManagerMetrics + Handler HandlerMetrics +} + +func MetricsFromCtx(ctx *h.RequestContext) Metrics { + manager := ManagerFromCtx(ctx) + return Metrics{ + Manager: manager.Metrics(), + Handler: GetHandlerMetics(), + } +} diff --git a/extensions/websocket/ws/register.go b/extensions/websocket/ws/register.go index f712230..7622680 100644 --- a/extensions/websocket/ws/register.go +++ b/extensions/websocket/ws/register.go @@ -34,6 +34,21 @@ var serverSideMessageListener = make(chan ServerSideEvent, 100) var lock = sync.Mutex{} var callingHandler = atomic.Bool{} +type HandlerMetrics struct { + TotalHandlers int + ServerEventNamesToHashCount int + SessionIdToHashesCount int +} + +func GetHandlerMetics() HandlerMetrics { + metrics := HandlerMetrics{ + TotalHandlers: handlers.Size(), + ServerEventNamesToHashCount: serverEventNamesToHash.Size(), + SessionIdToHashesCount: sessionIdToHashes.Size(), + } + return metrics +} + func makeId() string { return h.GenId(30) } diff --git a/framework/h/tag.go b/framework/h/tag.go index c1f6079..93352fd 100644 --- a/framework/h/tag.go +++ b/framework/h/tag.go @@ -161,6 +161,18 @@ func Div(children ...Ren) *Element { return Tag("div", children...) } +func Dl(children ...Ren) *Element { + return Tag("dl", children...) +} + +func Dt(children ...Ren) *Element { + return Tag("dt", children...) +} + +func Dd(children ...Ren) *Element { + return Tag("dd", children...) +} + func Article(children ...Ren) *Element { return Tag("article", children...) }