add metric page

This commit is contained in:
maddalax 2024-11-04 10:37:41 -06:00
parent c052a83ece
commit fdef1a9688
13 changed files with 302 additions and 75 deletions

View file

@ -25,6 +25,9 @@ func main() {
websocket.EnableExtension(app, ws2.ExtensionOpts{ websocket.EnableExtension(app, ws2.ExtensionOpts{
WsPath: "/ws", WsPath: "/ws",
RoomName: func(ctx *h.RequestContext) string {
return "all"
},
SessionId: func(ctx *h.RequestContext) string { SessionId: func(ctx *h.RequestContext) string {
return ctx.QueryParam("sessionId") return ctx.QueryParam("sessionId")
}, },

View file

@ -5,24 +5,12 @@ import (
"github.com/maddalax/htmgo/extensions/websocket/ws" "github.com/maddalax/htmgo/extensions/websocket/ws"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/session" "github.com/maddalax/htmgo/framework/session"
"time"
"ws-example/partials" "ws-example/partials"
) )
func IndexPage(ctx *h.RequestContext) *h.Page { func IndexPage(ctx *h.RequestContext) *h.Page {
sessionId := session.GetSessionId(ctx) sessionId := session.GetSessionId(ctx)
ws.Every(ctx, time.Second, func() bool {
return ws.PushElementCtx(
ctx,
h.Div(
h.Attribute("hx-swap-oob", "true"),
h.Id("current-time"),
h.TextF("Current time: %s", time.Now().Format("15:04:05")),
),
)
})
return h.NewPage( return h.NewPage(
RootPage( RootPage(
ctx, ctx,
@ -35,7 +23,7 @@ func IndexPage(ctx *h.RequestContext) *h.Page {
h.Class("text-2xl"), h.Class("text-2xl"),
), ),
h.Div( h.Div(
h.Id("current-time"), h.Id("ws-metrics"),
), ),
partials.CounterForm(ctx, partials.CounterProps{Id: "counter-1"}), partials.CounterForm(ctx, partials.CounterProps{Id: "counter-1"}),
partials.Repeater(ctx, partials.RepeaterProps{ partials.Repeater(ctx, partials.RepeaterProps{

View file

@ -0,0 +1,120 @@
package ws
import (
"fmt"
"github.com/maddalax/htmgo/extensions/websocket/ws"
"github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/session"
"runtime"
"time"
"ws-example/pages"
)
func Metrics(ctx *h.RequestContext) *h.Page {
ws.RunOnConnected(ctx, func() {
ws.PushElementCtx(
ctx,
metricsView(ctx),
)
})
ws.Every(ctx, time.Second, func() bool {
return ws.PushElementCtx(
ctx,
metricsView(ctx),
)
})
return h.NewPage(
pages.RootPage(
ctx,
h.Div(
h.Attribute("ws-connect", fmt.Sprintf("/ws?sessionId=%s", session.GetSessionId(ctx))),
h.Class("flex flex-col gap-4 items-center min-h-screen max-w-2xl mx-auto mt-8"),
h.H3(
h.Id("intro-text"),
h.Text("Websocket Metrics"),
h.Class("text-2xl"),
),
h.Div(
h.Id("ws-metrics"),
),
),
),
)
}
func metricsView(ctx *h.RequestContext) *h.Element {
metrics := ws.MetricsFromCtx(ctx)
return h.Div(
h.Id("ws-metrics"),
List(metrics),
)
}
func List(metrics ws.Metrics) *h.Element {
return h.Body(
h.Div(
h.Class("flow-root rounded-lg border border-gray-100 py-3 shadow-sm"),
h.Dl(
h.Class("-my-3 divide-y divide-gray-100 text-sm"),
ListItem("Current Time", time.Now().Format("15:04:05")),
ListItem("Total Goroutines For ws.Every", fmt.Sprintf("%d", metrics.Manager.RunningGoroutines)),
ListItem("Total Goroutines In System", fmt.Sprintf("%d", runtime.NumGoroutine())),
ListItem("Sockets", fmt.Sprintf("%d", metrics.Manager.TotalSockets)),
ListItem("Rooms", fmt.Sprintf("%d", metrics.Manager.TotalRooms)),
ListItem("Session Id To Hashes", fmt.Sprintf("%d", metrics.Handler.SessionIdToHashesCount)),
ListItem("Total Handlers", fmt.Sprintf("%d", metrics.Handler.TotalHandlers)),
ListItem("Server Event Names To Hash", fmt.Sprintf("%d", metrics.Handler.ServerEventNamesToHashCount)),
ListItem("Total Listeners", fmt.Sprintf("%d", metrics.Manager.TotalListeners)),
h.IterMap(metrics.Manager.SocketsPerRoom, func(key string, value []string) *h.Element {
return ListBlock(
fmt.Sprintf("Sockets In Room - %s", key),
h.Div(
h.List(value, func(item string, index int) *h.Element {
return h.Div(
h.Pf("%s", item),
)
}),
),
)
}),
),
),
)
}
func ListItem(term, description string) *h.Element {
return h.Div(
h.Class("grid grid-cols-1 gap-1 p-3 even:bg-gray-50 sm:grid-cols-3 sm:gap-4"),
DescriptionTerm(term),
DescriptionDetail(description),
)
}
func ListBlock(title string, children *h.Element) *h.Element {
return h.Div(
h.Class("grid grid-cols-1 gap-1 p-3 even:bg-gray-50 sm:grid-cols-3 sm:gap-4"),
DescriptionTerm(title),
h.Dd(
h.Class("text-gray-700 sm:col-span-2"),
children,
),
)
}
func DescriptionTerm(term string) *h.Element {
return h.Dt(
h.Class("font-medium text-gray-900"),
h.Text(term),
)
}
func DescriptionDetail(detail string) *h.Element {
return h.Dd(
h.Class("text-gray-700 sm:col-span-2"),
h.Text(detail),
)
}

View file

@ -1,13 +0,0 @@
package internal
import "math/rand"
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func RandSeq(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}

View file

@ -15,6 +15,13 @@ import (
) )
func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc { func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc {
if opts.RoomName == nil {
opts.RoomName = func(ctx *h.RequestContext) string {
return "all"
}
}
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext) cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
locator := cc.ServiceLocator() locator := cc.ServiceLocator()
@ -32,6 +39,8 @@ func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc {
slog.Info("failed to upgrade", slog.String("error", err.Error())) slog.Info("failed to upgrade", slog.String("error", err.Error()))
return return
} }
roomId := opts.RoomName(cc)
/* /*
Large buffer in case the client disconnects while we are writing Large buffer in case the client disconnects while we are writing
we don't want to block the writer we don't want to block the writer
@ -41,7 +50,7 @@ func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc {
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
manager.Add("all", sessionId, writer, done) manager.Add(roomId, sessionId, writer, done)
/* /*
* This goroutine is responsible for writing messages to the client * This goroutine is responsible for writing messages to the client

View file

@ -3,9 +3,13 @@ package wsutil
import ( import (
"fmt" "fmt"
"github.com/maddalax/htmgo/extensions/websocket/opts" "github.com/maddalax/htmgo/extensions/websocket/opts"
"github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service"
"github.com/puzpuzpuz/xsync/v3" "github.com/puzpuzpuz/xsync/v3"
"log/slog" "log/slog"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
) )
@ -38,18 +42,72 @@ type SocketConnection struct {
Writer WriterChan Writer WriterChan
} }
type ManagerMetrics struct {
RunningGoroutines int32
TotalSockets int
TotalRooms int
TotalListeners int
SocketsPerRoomCount map[string]int
SocketsPerRoom map[string][]string
}
type SocketManager struct { type SocketManager struct {
sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]] sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]]
idToRoom *xsync.MapOf[string, string] idToRoom *xsync.MapOf[string, string]
listeners []chan SocketEvent listeners []chan SocketEvent
opts *opts.ExtensionOpts goroutinesRunning atomic.Int32
opts *opts.ExtensionOpts
lock sync.Mutex
}
func (manager *SocketManager) Metrics() ManagerMetrics {
manager.lock.Lock()
defer manager.lock.Unlock()
count := manager.goroutinesRunning.Load()
metrics := ManagerMetrics{
RunningGoroutines: count,
TotalSockets: manager.sockets.Size(),
TotalRooms: 0,
TotalListeners: len(manager.listeners),
SocketsPerRoom: make(map[string][]string),
SocketsPerRoomCount: make(map[string]int),
}
roomMap := make(map[string]int)
manager.idToRoom.Range(func(socketId string, roomId string) bool {
roomMap[roomId]++
return true
})
metrics.TotalRooms = len(roomMap)
manager.sockets.Range(func(roomId string, sockets *xsync.MapOf[string, SocketConnection]) bool {
metrics.SocketsPerRoomCount[roomId] = sockets.Size()
sockets.Range(func(socketId string, conn SocketConnection) bool {
if metrics.SocketsPerRoom[roomId] == nil {
metrics.SocketsPerRoom[roomId] = []string{}
}
metrics.SocketsPerRoom[roomId] = append(metrics.SocketsPerRoom[roomId], socketId)
return true
})
return true
})
return metrics
}
func SocketManagerFromCtx(ctx *h.RequestContext) *SocketManager {
locator := ctx.ServiceLocator()
return service.Get[SocketManager](locator)
} }
func NewSocketManager(opts *opts.ExtensionOpts) *SocketManager { func NewSocketManager(opts *opts.ExtensionOpts) *SocketManager {
return &SocketManager{ return &SocketManager{
sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](), sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](),
idToRoom: xsync.NewMapOf[string, string](), idToRoom: xsync.NewMapOf[string, string](),
opts: opts, opts: opts,
goroutinesRunning: atomic.Int32{},
} }
} }
@ -64,6 +122,37 @@ func (manager *SocketManager) ForEachSocket(roomId string, cb func(conn SocketCo
}) })
} }
func (manager *SocketManager) RunIntervalWithSocket(socketId string, interval time.Duration, cb func() bool) {
socketIdSlog := slog.String("socketId", socketId)
slog.Debug("ws-extension: starting every loop", socketIdSlog, slog.Duration("duration", interval))
go func() {
manager.goroutinesRunning.Add(1)
defer manager.goroutinesRunning.Add(-1)
tries := 0
for {
socket := manager.Get(socketId)
// This can run before the socket is established, lets try a few times and kill it if socket isn't connected after a bit.
if socket == nil {
if tries > 200 {
slog.Debug("ws-extension: socket disconnected, killing goroutine", socketIdSlog)
return
} else {
time.Sleep(time.Millisecond * 15)
tries++
slog.Debug("ws-extension: socket not connected yet, trying again", socketIdSlog, slog.Int("attempt", tries))
continue
}
}
success := cb()
if !success {
return
}
time.Sleep(interval)
}
}()
}
func (manager *SocketManager) Listen(listener chan SocketEvent) { func (manager *SocketManager) Listen(listener chan SocketEvent) {
if manager.listeners == nil { if manager.listeners == nil {
manager.listeners = make([]chan SocketEvent, 0) manager.listeners = make([]chan SocketEvent, 0)

View file

@ -4,5 +4,6 @@ import "github.com/maddalax/htmgo/framework/h"
type ExtensionOpts struct { type ExtensionOpts struct {
WsPath string WsPath string
RoomName func(ctx *h.RequestContext) string
SessionId func(ctx *h.RequestContext) string SessionId func(ctx *h.RequestContext) string
} }

View file

@ -0,0 +1,10 @@
package ws
import (
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
"github.com/maddalax/htmgo/framework/h"
)
func ManagerFromCtx(ctx *h.RequestContext) *wsutil.SocketManager {
return wsutil.SocketManagerFromCtx(ctx)
}

View file

@ -5,40 +5,25 @@ import (
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/service"
"github.com/maddalax/htmgo/framework/session" "github.com/maddalax/htmgo/framework/session"
"log/slog"
"time" "time"
) )
// Every executes the given callback every interval, until the socket is disconnected, or the callback returns false. // Every executes the given callback every interval, until the socket is disconnected, or the callback returns false.
func Every(ctx *h.RequestContext, interval time.Duration, cb func() bool) { func Every(ctx *h.RequestContext, interval time.Duration, cb func() bool) {
socketId := session.GetSessionId(ctx) socketId := session.GetSessionId(ctx)
socketIdSlog := slog.String("socketId", string(socketId)) locator := ctx.ServiceLocator()
manager := service.Get[wsutil.SocketManager](locator)
slog.Debug("ws-extension: starting every loop", socketIdSlog, slog.Duration("duration", interval)) manager.RunIntervalWithSocket(string(socketId), interval, cb)
}
go func() {
tries := 0 func Once(ctx *h.RequestContext, cb func()) {
for { // time is irrelevant, we just need to run the callback once, it will exit after because of the false return
locator := ctx.ServiceLocator() Every(ctx, time.Millisecond, func() bool {
socketManager := service.Get[wsutil.SocketManager](locator) cb()
socket := socketManager.Get(string(socketId)) return false
// This can run before the socket is established, lets try a few times and kill it if socket isn't connected after a bit. })
if socket == nil { }
if tries > 5 {
slog.Debug("ws-extension: socket disconnected, killing goroutine", socketIdSlog) func RunOnConnected(ctx *h.RequestContext, cb func()) {
return Once(ctx, cb)
} else {
time.Sleep(time.Second)
tries++
slog.Debug("ws-extension: socket not connected yet, trying again", socketIdSlog, slog.Int("attempt", tries))
continue
}
}
success := cb()
if !success {
return
}
time.Sleep(interval)
}
}()
} }

View file

@ -1,11 +1,9 @@
package ws package ws
import ( import (
"fmt"
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil" "github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
"github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/service"
"github.com/maddalax/htmgo/framework/session" "github.com/maddalax/htmgo/framework/session"
"time"
) )
func StartListener(locator *service.Locator) { func StartListener(locator *service.Locator) {
@ -13,15 +11,6 @@ func StartListener(locator *service.Locator) {
manager.Listen(socketMessageListener) manager.Listen(socketMessageListener)
handler := NewMessageHandler(manager) handler := NewMessageHandler(manager)
go func() {
for {
fmt.Printf("total handlers: %d\n", handlers.Size())
fmt.Printf("total serverEventNamesToHash: %d\n", serverEventNamesToHash.Size())
fmt.Printf("total sessionIdToHashes: %d\n", sessionIdToHashes.Size())
time.Sleep(5 * time.Second)
}
}()
go func() { go func() {
for { for {
select { select {

View file

@ -0,0 +1,19 @@
package ws
import (
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
"github.com/maddalax/htmgo/framework/h"
)
type Metrics struct {
Manager wsutil.ManagerMetrics
Handler HandlerMetrics
}
func MetricsFromCtx(ctx *h.RequestContext) Metrics {
manager := ManagerFromCtx(ctx)
return Metrics{
Manager: manager.Metrics(),
Handler: GetHandlerMetics(),
}
}

View file

@ -34,6 +34,21 @@ var serverSideMessageListener = make(chan ServerSideEvent, 100)
var lock = sync.Mutex{} var lock = sync.Mutex{}
var callingHandler = atomic.Bool{} var callingHandler = atomic.Bool{}
type HandlerMetrics struct {
TotalHandlers int
ServerEventNamesToHashCount int
SessionIdToHashesCount int
}
func GetHandlerMetics() HandlerMetrics {
metrics := HandlerMetrics{
TotalHandlers: handlers.Size(),
ServerEventNamesToHashCount: serverEventNamesToHash.Size(),
SessionIdToHashesCount: sessionIdToHashes.Size(),
}
return metrics
}
func makeId() string { func makeId() string {
return h.GenId(30) return h.GenId(30)
} }

View file

@ -161,6 +161,18 @@ func Div(children ...Ren) *Element {
return Tag("div", children...) return Tag("div", children...)
} }
func Dl(children ...Ren) *Element {
return Tag("dl", children...)
}
func Dt(children ...Ren) *Element {
return Tag("dt", children...)
}
func Dd(children ...Ren) *Element {
return Tag("dd", children...)
}
func Article(children ...Ren) *Element { func Article(children ...Ren) *Element {
return Tag("article", children...) return Tag("article", children...)
} }