diff --git a/examples/chat/sse/manager.go b/examples/chat/sse/manager.go index 485afb2..1a1dedd 100644 --- a/examples/chat/sse/manager.go +++ b/examples/chat/sse/manager.go @@ -79,7 +79,7 @@ func (manager *SocketManager) dispatch(event SocketEvent) { fmt.Printf("dispatched event: %s\n", event.Type) return case <-time.After(5 * time.Second): - fmt.Printf("havent dispatched event after 5s, chan blocked: %s\n", event.Type) + fmt.Printf("havent dispatched listener event after 5s, chan blocked: %s\n", event.Type) } } }() diff --git a/examples/ws-example/k6.js b/examples/ws-example/k6.js new file mode 100644 index 0000000..32814c4 --- /dev/null +++ b/examples/ws-example/k6.js @@ -0,0 +1,58 @@ +import ws from 'k6/ws'; +import { check } from 'k6'; +import { sleep } from 'k6'; + +export let options = { + vus: 1000, // Number of users + duration: '10m', // Total duration of the test +}; + + + +function generateRandomString(length) { + const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789'; + let result = ''; + for (let i = 0; i < length; i++) { + result += characters.charAt(Math.floor(Math.random() * characters.length)); + } + return result; +} + +export default function () { + let sessionId = generateRandomString(20); + const url = 'ws://localhost:3000/ws?sessionId=' + sessionId; + + const response = ws.connect(url, null, function (socket) { + // Check if the connection was successful + check(socket, { 'connected successfully': (s) => s.readyState === 1 }); + + // Event listener for when the WebSocket is open + socket.on('open', () => { + console.log('Connected to WebSocket'); + while (true) { + socket.send(JSON.stringify({ message: 'ping' })); + sleep(1); // Wait for 1 second before sending the next message + } + }); + + + // Event listener for incoming messages + socket.on('message', (msg) => { + console.log(`Received message: ${msg}`); + }); + + // Event listener for errors + socket.on('error', (e) => { + console.error('Error in WebSocket connection:', e); + }); + + // Event listener for close + socket.on('close', () => { + console.log('Disconnected from WebSocket'); + }); + }); + + // Check if the WebSocket handshake was successful + check(response, { 'status is 101 (Switching Protocols)': (r) => r && r.status === 101 }); + +} diff --git a/extensions/websocket/internal/wsutil/manager.go b/extensions/websocket/internal/wsutil/manager.go index 2c6ea78..38dc58f 100644 --- a/extensions/websocket/internal/wsutil/manager.go +++ b/extensions/websocket/internal/wsutil/manager.go @@ -66,7 +66,7 @@ func (manager *SocketManager) Metrics() ManagerMetrics { count := manager.goroutinesRunning.Load() metrics := ManagerMetrics{ RunningGoroutines: count, - TotalSockets: manager.sockets.Size(), + TotalSockets: 0, TotalRooms: 0, TotalListeners: len(manager.listeners), SocketsPerRoom: make(map[string][]string), @@ -89,6 +89,7 @@ func (manager *SocketManager) Metrics() ManagerMetrics { metrics.SocketsPerRoom[roomId] = []string{} } metrics.SocketsPerRoom[roomId] = append(metrics.SocketsPerRoom[roomId], socketId) + metrics.TotalSockets++ return true }) return true @@ -169,7 +170,6 @@ func (manager *SocketManager) dispatch(event SocketEvent) { for { select { case <-done: - fmt.Printf("dispatched event: %s\n", event.Type) return case <-time.After(5 * time.Second): fmt.Printf("havent dispatched event after 5s, chan blocked: %s\n", event.Type) @@ -193,6 +193,11 @@ func (manager *SocketManager) OnMessage(id string, message map[string]any) { Payload: message, RoomId: socket.RoomId, }) + + if message["message"] == "ping" { + manager.Ping(id) + } + } func (manager *SocketManager) Add(roomId string, id string, writer WriterChan, done DoneChan) { diff --git a/extensions/websocket/ws/listener.go b/extensions/websocket/ws/listener.go index f159add..33341ae 100644 --- a/extensions/websocket/ws/listener.go +++ b/extensions/websocket/ws/listener.go @@ -10,28 +10,37 @@ func StartListener(locator *service.Locator) { manager := service.Get[wsutil.SocketManager](locator) manager.Listen(socketMessageListener) handler := NewMessageHandler(manager) - go func() { for { - select { - case event := <-serverSideMessageListener: - handler.OnServerSideEvent(event) - case event := <-socketMessageListener: - switch event.Type { - case wsutil.DisconnectedEvent: - handler.OnSocketDisconnected(event) - case wsutil.MessageEvent: - handlerId := event.Payload["id"].(string) - eventName := event.Payload["event"].(string) - sessionId := session.Id(event.SessionId) - if eventName == "dom-element-removed" { - handler.OnDomElementRemoved(handlerId) - continue - } else { - handler.OnClientSideEvent(handlerId, sessionId) - } - } - } + handle(handler) } }() } + +func handle(handler *MessageHandler) { + select { + case event := <-serverSideMessageListener: + handler.OnServerSideEvent(event) + case event := <-socketMessageListener: + switch event.Type { + case wsutil.DisconnectedEvent: + handler.OnSocketDisconnected(event) + case wsutil.MessageEvent: + + handlerId, ok := event.Payload["id"].(string) + eventName, ok2 := event.Payload["event"].(string) + + if !ok || !ok2 { + return + } + + sessionId := session.Id(event.SessionId) + if eventName == "dom-element-removed" { + handler.OnDomElementRemoved(handlerId) + return + } else { + handler.OnClientSideEvent(handlerId, sessionId) + } + } + } +}