fixes, add k6 script

This commit is contained in:
maddalax 2024-11-04 13:39:46 -06:00
parent e71d875687
commit 1ddeceaa82
6 changed files with 32 additions and 69 deletions

View file

@ -70,13 +70,11 @@ func (manager *SocketManager) Listen(listener chan SocketEvent) {
} }
func (manager *SocketManager) dispatch(event SocketEvent) { func (manager *SocketManager) dispatch(event SocketEvent) {
fmt.Printf("dispatching event: %s\n", event.Type)
done := make(chan struct{}, 1) done := make(chan struct{}, 1)
go func() { go func() {
for { for {
select { select {
case <-done: case <-done:
fmt.Printf("dispatched event: %s\n", event.Type)
return return
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
fmt.Printf("havent dispatched listener event after 5s, chan blocked: %s\n", event.Type) fmt.Printf("havent dispatched listener event after 5s, chan blocked: %s\n", event.Type)

View file

@ -1,58 +0,0 @@
import ws from 'k6/ws';
import { check } from 'k6';
import { sleep } from 'k6';
export let options = {
vus: 1000, // Number of users
duration: '10m', // Total duration of the test
};
function generateRandomString(length) {
const characters = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
let result = '';
for (let i = 0; i < length; i++) {
result += characters.charAt(Math.floor(Math.random() * characters.length));
}
return result;
}
export default function () {
let sessionId = generateRandomString(20);
const url = 'ws://localhost:3000/ws?sessionId=' + sessionId;
const response = ws.connect(url, null, function (socket) {
// Check if the connection was successful
check(socket, { 'connected successfully': (s) => s.readyState === 1 });
// Event listener for when the WebSocket is open
socket.on('open', () => {
console.log('Connected to WebSocket');
while (true) {
socket.send(JSON.stringify({ message: 'ping' }));
sleep(1); // Wait for 1 second before sending the next message
}
});
// Event listener for incoming messages
socket.on('message', (msg) => {
console.log(`Received message: ${msg}`);
});
// Event listener for errors
socket.on('error', (e) => {
console.error('Error in WebSocket connection:', e);
});
// Event listener for close
socket.on('close', () => {
console.log('Disconnected from WebSocket');
});
});
// Check if the WebSocket handshake was successful
check(response, { 'status is 101 (Switching Protocols)': (r) => r && r.status === 101 });
}

View file

@ -29,10 +29,10 @@ func IndexPage(ctx *h.RequestContext) *h.Page {
partials.Repeater(ctx, partials.RepeaterProps{ partials.Repeater(ctx, partials.RepeaterProps{
Id: "repeater-1", Id: "repeater-1",
OnAdd: func(data ws.HandlerData) { OnAdd: func(data ws.HandlerData) {
ws.BroadcastServerSideEvent("increment", map[string]any{}) //ws.BroadcastServerSideEvent("increment", map[string]any{})
}, },
OnRemove: func(data ws.HandlerData, index int) { OnRemove: func(data ws.HandlerData, index int) {
ws.BroadcastServerSideEvent("decrement", map[string]any{}) //ws.BroadcastServerSideEvent("decrement", map[string]any{})
}, },
AddButton: h.Button( AddButton: h.Button(
h.Text("+ Add Item"), h.Text("+ Add Item"),

View file

@ -61,6 +61,9 @@ func List(metrics ws.Metrics) *h.Element {
h.Dl( h.Dl(
h.Class("-my-3 divide-y divide-gray-100 text-sm"), h.Class("-my-3 divide-y divide-gray-100 text-sm"),
ListItem("Current Time", time.Now().Format("15:04:05")), ListItem("Current Time", time.Now().Format("15:04:05")),
ListItem("Seconds Elapsed", fmt.Sprintf("%d", metrics.Manager.SecondsElapsed)),
ListItem("Total Messages", fmt.Sprintf("%d", metrics.Manager.TotalMessages)),
ListItem("Messages Per Second", fmt.Sprintf("%d", metrics.Manager.MessagesPerSecond)),
ListItem("Total Goroutines For ws.Every", fmt.Sprintf("%d", metrics.Manager.RunningGoroutines)), ListItem("Total Goroutines For ws.Every", fmt.Sprintf("%d", metrics.Manager.RunningGoroutines)),
ListItem("Total Goroutines In System", fmt.Sprintf("%d", runtime.NumGoroutine())), ListItem("Total Goroutines In System", fmt.Sprintf("%d", runtime.NumGoroutine())),
ListItem("Sockets", fmt.Sprintf("%d", metrics.Manager.TotalSockets)), ListItem("Sockets", fmt.Sprintf("%d", metrics.Manager.TotalSockets)),

View file

@ -22,7 +22,9 @@ func EnableExtension(app *h.App, opts opts.ExtensionOpts) {
} }
service.Set[wsutil.SocketManager](app.Opts.ServiceLocator, service.Singleton, func() *wsutil.SocketManager { service.Set[wsutil.SocketManager](app.Opts.ServiceLocator, service.Singleton, func() *wsutil.SocketManager {
return wsutil.NewSocketManager(&opts) manager := wsutil.NewSocketManager(&opts)
manager.StartMetrics()
return manager
}) })
ws.StartListener(app.Opts.ServiceLocator) ws.StartListener(app.Opts.ServiceLocator)
app.Router.Handle(opts.WsPath, wsutil.WsHttpHandler(&opts)) app.Router.Handle(opts.WsPath, wsutil.WsHttpHandler(&opts))

View file

@ -49,6 +49,9 @@ type ManagerMetrics struct {
TotalListeners int TotalListeners int
SocketsPerRoomCount map[string]int SocketsPerRoomCount map[string]int
SocketsPerRoom map[string][]string SocketsPerRoom map[string][]string
TotalMessages int64
MessagesPerSecond int
SecondsElapsed int
} }
type SocketManager struct { type SocketManager struct {
@ -58,6 +61,22 @@ type SocketManager struct {
goroutinesRunning atomic.Int32 goroutinesRunning atomic.Int32
opts *opts.ExtensionOpts opts *opts.ExtensionOpts
lock sync.Mutex lock sync.Mutex
totalMessages atomic.Int64
messagesPerSecond int
secondsElapsed int
}
func (manager *SocketManager) StartMetrics() {
go func() {
for {
time.Sleep(time.Second)
manager.lock.Lock()
manager.secondsElapsed++
totalMessages := manager.totalMessages.Load()
manager.messagesPerSecond = int(float64(totalMessages) / float64(manager.secondsElapsed))
manager.lock.Unlock()
}
}()
} }
func (manager *SocketManager) Metrics() ManagerMetrics { func (manager *SocketManager) Metrics() ManagerMetrics {
@ -71,6 +90,9 @@ func (manager *SocketManager) Metrics() ManagerMetrics {
TotalListeners: len(manager.listeners), TotalListeners: len(manager.listeners),
SocketsPerRoom: make(map[string][]string), SocketsPerRoom: make(map[string][]string),
SocketsPerRoomCount: make(map[string]int), SocketsPerRoomCount: make(map[string]int),
TotalMessages: manager.totalMessages.Load(),
MessagesPerSecond: manager.messagesPerSecond,
SecondsElapsed: manager.secondsElapsed,
} }
roomMap := make(map[string]int) roomMap := make(map[string]int)
@ -164,7 +186,6 @@ func (manager *SocketManager) Listen(listener chan SocketEvent) {
} }
func (manager *SocketManager) dispatch(event SocketEvent) { func (manager *SocketManager) dispatch(event SocketEvent) {
fmt.Printf("dispatching event: %s\n", event.Type)
done := make(chan struct{}, 1) done := make(chan struct{}, 1)
go func() { go func() {
for { for {
@ -187,17 +208,14 @@ func (manager *SocketManager) OnMessage(id string, message map[string]any) {
if socket == nil { if socket == nil {
return return
} }
manager.totalMessages.Add(1)
manager.dispatch(SocketEvent{ manager.dispatch(SocketEvent{
SessionId: id, SessionId: id,
Type: MessageEvent, Type: MessageEvent,
Payload: message, Payload: message,
RoomId: socket.RoomId, RoomId: socket.RoomId,
}) })
if message["message"] == "ping" {
manager.Ping(id)
}
} }
func (manager *SocketManager) Add(roomId string, id string, writer WriterChan, done DoneChan) { func (manager *SocketManager) Add(roomId string, id string, writer WriterChan, done DoneChan) {