htmgo/examples/chat/sse/manager.go
maddalax 34e816ff7c
Websocket Extension - Alpha (#22)
* wip

* merge

* working again

* refactor/make it a bit cleaner

* fix to only call cb for session id who initiated the event

* support broadcasting events to all clients

* refactor

* refactor into ws extension

* add go mod

* rename module

* fix naming

* refactor

* rename

* merge

* fix manager ws delete, add manager tests

* add metric page

* fixes, add k6 script

* fixes, add k6 script

* deploy docker image

* cleanup

* cleanup

* cleanup
2024-11-09 12:05:53 -06:00

231 lines
4.9 KiB
Go

package sse
import (
"chat/internal/routine"
"fmt"
"github.com/puzpuzpuz/xsync/v3"
"time"
)
type EventType string
type WriterChan chan string
type DoneChan chan bool
const (
ConnectedEvent EventType = "connected"
DisconnectedEvent EventType = "disconnected"
MessageEvent EventType = "message"
)
type SocketEvent struct {
Id string
RoomId string
Type EventType
Payload map[string]any
}
type CloseEvent struct {
Code int
Reason string
}
type SocketConnection struct {
Id string
RoomId string
Done DoneChan
Writer WriterChan
}
type SocketManager struct {
sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]]
idToRoom *xsync.MapOf[string, string]
listeners []chan SocketEvent
}
func NewSocketManager() *SocketManager {
return &SocketManager{
sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](),
idToRoom: xsync.NewMapOf[string, string](),
}
}
func (manager *SocketManager) ForEachSocket(roomId string, cb func(conn SocketConnection)) {
sockets, ok := manager.sockets.Load(roomId)
if !ok {
return
}
sockets.Range(func(id string, conn SocketConnection) bool {
cb(conn)
return true
})
}
func (manager *SocketManager) Listen(listener chan SocketEvent) {
if manager.listeners == nil {
manager.listeners = make([]chan SocketEvent, 0)
}
if listener != nil {
manager.listeners = append(manager.listeners, listener)
}
}
func (manager *SocketManager) dispatch(event SocketEvent) {
done := make(chan struct{}, 1)
go func() {
for {
select {
case <-done:
return
case <-time.After(5 * time.Second):
fmt.Printf("havent dispatched listener event after 5s, chan blocked: %s\n", event.Type)
}
}
}()
for _, listener := range manager.listeners {
listener <- event
}
done <- struct{}{}
}
func (manager *SocketManager) OnMessage(id string, message map[string]any) {
socket := manager.Get(id)
if socket == nil {
return
}
manager.dispatch(SocketEvent{
Id: id,
Type: MessageEvent,
Payload: message,
RoomId: socket.RoomId,
})
}
func (manager *SocketManager) Add(roomId string, id string, writer WriterChan, done DoneChan) {
manager.idToRoom.Store(id, roomId)
sockets, ok := manager.sockets.LoadOrCompute(roomId, func() *xsync.MapOf[string, SocketConnection] {
return xsync.NewMapOf[string, SocketConnection]()
})
sockets.Store(id, SocketConnection{
Id: id,
Writer: writer,
RoomId: roomId,
Done: done,
})
s, ok := sockets.Load(id)
if !ok {
return
}
manager.dispatch(SocketEvent{
Id: s.Id,
Type: ConnectedEvent,
RoomId: s.RoomId,
Payload: map[string]any{},
})
fmt.Printf("User %s connected to %s\n", id, roomId)
}
func (manager *SocketManager) OnClose(id string) {
socket := manager.Get(id)
if socket == nil {
return
}
manager.dispatch(SocketEvent{
Id: id,
Type: DisconnectedEvent,
RoomId: socket.RoomId,
Payload: map[string]any{},
})
manager.sockets.Delete(id)
}
func (manager *SocketManager) CloseWithMessage(id string, message string) {
conn := manager.Get(id)
if conn != nil {
defer manager.OnClose(id)
manager.writeText(*conn, "error", message)
conn.Done <- true
}
}
func (manager *SocketManager) Disconnect(id string) {
conn := manager.Get(id)
if conn != nil {
manager.OnClose(id)
conn.Done <- true
}
}
func (manager *SocketManager) Get(id string) *SocketConnection {
roomId, ok := manager.idToRoom.Load(id)
if !ok {
return nil
}
sockets, ok := manager.sockets.Load(roomId)
if !ok {
return nil
}
conn, ok := sockets.Load(id)
return &conn
}
func (manager *SocketManager) Ping(id string) {
conn := manager.Get(id)
if conn != nil {
manager.writeText(*conn, "ping", "")
}
}
func (manager *SocketManager) writeCloseRaw(writer WriterChan, message string) {
manager.writeTextRaw(writer, "close", message)
}
func (manager *SocketManager) writeTextRaw(writer WriterChan, event string, message string) {
routine.DebugLongRunning("writeTextRaw", func() {
timeout := 3 * time.Second
data := ""
if event != "" {
data = fmt.Sprintf("event: %s\ndata: %s\n\n", event, message)
} else {
data = fmt.Sprintf("data: %s\n\n", message)
}
select {
case writer <- data:
case <-time.After(timeout):
fmt.Printf("could not send %s to channel after %s\n", data, timeout)
}
})
}
func (manager *SocketManager) writeText(socket SocketConnection, event string, message string) {
if socket.Writer == nil {
return
}
manager.writeTextRaw(socket.Writer, event, message)
}
func (manager *SocketManager) BroadcastText(roomId string, message string, predicate func(conn SocketConnection) bool) {
sockets, ok := manager.sockets.Load(roomId)
if !ok {
return
}
sockets.Range(func(id string, conn SocketConnection) bool {
if predicate(conn) {
manager.writeText(conn, "", message)
}
return true
})
}
func (manager *SocketManager) SendText(id string, message string) {
conn := manager.Get(id)
if conn != nil {
manager.writeText(*conn, "", message)
}
}