From 1ddeceaa828655ea302c2d3cad0ddc13545195ce Mon Sep 17 00:00:00 2001 From: maddalax Date: Mon, 4 Nov 2024 13:39:46 -0600 Subject: [PATCH] fixes, add k6 script --- examples/chat/sse/manager.go | 2 - examples/ws-example/k6.js | 58 ------------------- examples/ws-example/pages/index.go | 4 +- examples/ws-example/pages/ws/metrics.go | 3 + extensions/websocket/init.go | 4 +- .../websocket/internal/wsutil/manager.go | 30 ++++++++-- 6 files changed, 32 insertions(+), 69 deletions(-) diff --git a/examples/chat/sse/manager.go b/examples/chat/sse/manager.go index 1a1dedd..d812c6d 100644 --- a/examples/chat/sse/manager.go +++ b/examples/chat/sse/manager.go @@ -70,13 +70,11 @@ func (manager *SocketManager) Listen(listener chan SocketEvent) { } func (manager *SocketManager) dispatch(event SocketEvent) { - fmt.Printf("dispatching event: %s\n", event.Type) done := make(chan struct{}, 1) go func() { for { select { case <-done: - fmt.Printf("dispatched event: %s\n", event.Type) return case <-time.After(5 * time.Second): fmt.Printf("havent dispatched listener event after 5s, chan blocked: %s\n", event.Type) diff --git a/examples/ws-example/k6.js b/examples/ws-example/k6.js index 32814c4..e69de29 100644 --- a/examples/ws-example/k6.js +++ b/examples/ws-example/k6.js @@ -1,58 +0,0 @@ -import ws from 'k6/ws'; -import { check } from 'k6'; -import { sleep } from 'k6'; - -export let options = { - vus: 1000, // Number of users - duration: '10m', // Total duration of the test -}; - - - -function generateRandomString(length) { - const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'; - let result = ''; - for (let i = 0; i < length; i++) { - result += characters.charAt(Math.floor(Math.random() * characters.length)); - } - return result; -} - -export default function () { - let sessionId = generateRandomString(20); - const url = 'ws://localhost:3000/ws?sessionId=' + sessionId; - - const response = ws.connect(url, null, function (socket) { - // Check if the connection was successful - check(socket, { 'connected successfully': (s) => s.readyState === 1 }); - - // Event listener for when the WebSocket is open - socket.on('open', () => { - console.log('Connected to WebSocket'); - while (true) { - socket.send(JSON.stringify({ message: 'ping' })); - sleep(1); // Wait for 1 second before sending the next message - } - }); - - - // Event listener for incoming messages - socket.on('message', (msg) => { - console.log(`Received message: ${msg}`); - }); - - // Event listener for errors - socket.on('error', (e) => { - console.error('Error in WebSocket connection:', e); - }); - - // Event listener for close - socket.on('close', () => { - console.log('Disconnected from WebSocket'); - }); - }); - - // Check if the WebSocket handshake was successful - check(response, { 'status is 101 (Switching Protocols)': (r) => r && r.status === 101 }); - -} diff --git a/examples/ws-example/pages/index.go b/examples/ws-example/pages/index.go index caa6d59..ad1b666 100644 --- a/examples/ws-example/pages/index.go +++ b/examples/ws-example/pages/index.go @@ -29,10 +29,10 @@ func IndexPage(ctx *h.RequestContext) *h.Page { partials.Repeater(ctx, partials.RepeaterProps{ Id: "repeater-1", OnAdd: func(data ws.HandlerData) { - ws.BroadcastServerSideEvent("increment", map[string]any{}) + //ws.BroadcastServerSideEvent("increment", map[string]any{}) }, OnRemove: func(data ws.HandlerData, index int) { - ws.BroadcastServerSideEvent("decrement", map[string]any{}) + //ws.BroadcastServerSideEvent("decrement", map[string]any{}) }, AddButton: h.Button( h.Text("+ Add Item"), diff --git a/examples/ws-example/pages/ws/metrics.go b/examples/ws-example/pages/ws/metrics.go index 269cf85..ba58ea6 100644 --- a/examples/ws-example/pages/ws/metrics.go +++ b/examples/ws-example/pages/ws/metrics.go @@ -61,6 +61,9 @@ func List(metrics ws.Metrics) *h.Element { h.Dl( h.Class("-my-3 divide-y divide-gray-100 text-sm"), ListItem("Current Time", time.Now().Format("15:04:05")), + ListItem("Seconds Elapsed", fmt.Sprintf("%d", metrics.Manager.SecondsElapsed)), + ListItem("Total Messages", fmt.Sprintf("%d", metrics.Manager.TotalMessages)), + ListItem("Messages Per Second", fmt.Sprintf("%d", metrics.Manager.MessagesPerSecond)), ListItem("Total Goroutines For ws.Every", fmt.Sprintf("%d", metrics.Manager.RunningGoroutines)), ListItem("Total Goroutines In System", fmt.Sprintf("%d", runtime.NumGoroutine())), ListItem("Sockets", fmt.Sprintf("%d", metrics.Manager.TotalSockets)), diff --git a/extensions/websocket/init.go b/extensions/websocket/init.go index 54143df..9261063 100644 --- a/extensions/websocket/init.go +++ b/extensions/websocket/init.go @@ -22,7 +22,9 @@ func EnableExtension(app *h.App, opts opts.ExtensionOpts) { } service.Set[wsutil.SocketManager](app.Opts.ServiceLocator, service.Singleton, func() *wsutil.SocketManager { - return wsutil.NewSocketManager(&opts) + manager := wsutil.NewSocketManager(&opts) + manager.StartMetrics() + return manager }) ws.StartListener(app.Opts.ServiceLocator) app.Router.Handle(opts.WsPath, wsutil.WsHttpHandler(&opts)) diff --git a/extensions/websocket/internal/wsutil/manager.go b/extensions/websocket/internal/wsutil/manager.go index 38dc58f..905d1de 100644 --- a/extensions/websocket/internal/wsutil/manager.go +++ b/extensions/websocket/internal/wsutil/manager.go @@ -49,6 +49,9 @@ type ManagerMetrics struct { TotalListeners int SocketsPerRoomCount map[string]int SocketsPerRoom map[string][]string + TotalMessages int64 + MessagesPerSecond int + SecondsElapsed int } type SocketManager struct { @@ -58,6 +61,22 @@ type SocketManager struct { goroutinesRunning atomic.Int32 opts *opts.ExtensionOpts lock sync.Mutex + totalMessages atomic.Int64 + messagesPerSecond int + secondsElapsed int +} + +func (manager *SocketManager) StartMetrics() { + go func() { + for { + time.Sleep(time.Second) + manager.lock.Lock() + manager.secondsElapsed++ + totalMessages := manager.totalMessages.Load() + manager.messagesPerSecond = int(float64(totalMessages) / float64(manager.secondsElapsed)) + manager.lock.Unlock() + } + }() } func (manager *SocketManager) Metrics() ManagerMetrics { @@ -71,6 +90,9 @@ func (manager *SocketManager) Metrics() ManagerMetrics { TotalListeners: len(manager.listeners), SocketsPerRoom: make(map[string][]string), SocketsPerRoomCount: make(map[string]int), + TotalMessages: manager.totalMessages.Load(), + MessagesPerSecond: manager.messagesPerSecond, + SecondsElapsed: manager.secondsElapsed, } roomMap := make(map[string]int) @@ -164,7 +186,6 @@ func (manager *SocketManager) Listen(listener chan SocketEvent) { } func (manager *SocketManager) dispatch(event SocketEvent) { - fmt.Printf("dispatching event: %s\n", event.Type) done := make(chan struct{}, 1) go func() { for { @@ -187,17 +208,14 @@ func (manager *SocketManager) OnMessage(id string, message map[string]any) { if socket == nil { return } + + manager.totalMessages.Add(1) manager.dispatch(SocketEvent{ SessionId: id, Type: MessageEvent, Payload: message, RoomId: socket.RoomId, }) - - if message["message"] == "ping" { - manager.Ping(id) - } - } func (manager *SocketManager) Add(roomId string, id string, writer WriterChan, done DoneChan) {