htmgo/examples/chat/ws/manager.go

175 lines
3.8 KiB
Go
Raw Normal View History

package ws
import (
"context"
"github.com/coder/websocket"
"github.com/puzpuzpuz/xsync/v3"
)
2024-09-30 22:31:09 +00:00
type EventType string
const (
ConnectedEvent EventType = "connected"
DisconnectedEvent EventType = "disconnected"
MessageEvent EventType = "message"
)
type SocketEvent struct {
Id string
2024-10-01 03:08:52 +00:00
RoomId string
2024-09-30 22:31:09 +00:00
Type EventType
Payload map[string]any
}
2024-10-01 03:08:52 +00:00
type SocketConnection struct {
Id string
Conn *websocket.Conn
RoomId string
}
type SocketManager struct {
2024-10-01 18:42:14 +00:00
sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]]
idToRoom *xsync.MapOf[string, string]
2024-09-30 22:31:09 +00:00
listeners []chan SocketEvent
}
func NewSocketManager() *SocketManager {
return &SocketManager{
2024-10-01 18:42:14 +00:00
sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](),
idToRoom: xsync.NewMapOf[string, string](),
}
}
2024-10-01 03:49:03 +00:00
func (manager *SocketManager) ForEachSocket(roomId string, cb func(conn SocketConnection)) {
2024-10-01 18:42:14 +00:00
sockets, ok := manager.sockets.Load(roomId)
if !ok {
return
}
sockets.Range(func(id string, conn SocketConnection) bool {
cb(conn)
2024-10-01 03:49:03 +00:00
return true
})
}
2024-09-30 22:31:09 +00:00
func (manager *SocketManager) Listen(listener chan SocketEvent) {
if manager.listeners == nil {
2024-09-30 22:31:09 +00:00
manager.listeners = make([]chan SocketEvent, 0)
}
manager.listeners = append(manager.listeners, listener)
}
2024-09-30 22:31:09 +00:00
func (manager *SocketManager) dispatch(event SocketEvent) {
for _, listener := range manager.listeners {
2024-09-30 22:31:09 +00:00
listener <- event
}
}
2024-09-30 22:31:09 +00:00
func (manager *SocketManager) OnMessage(id string, message map[string]any) {
2024-10-01 03:08:52 +00:00
socket := manager.Get(id)
if socket == nil {
return
}
2024-09-30 22:31:09 +00:00
manager.dispatch(SocketEvent{
Id: id,
Type: MessageEvent,
Payload: message,
2024-10-01 03:08:52 +00:00
RoomId: socket.RoomId,
2024-09-30 22:31:09 +00:00
})
}
2024-10-01 03:08:52 +00:00
func (manager *SocketManager) Add(roomId string, id string, conn *websocket.Conn) {
2024-10-01 18:42:14 +00:00
manager.idToRoom.Store(id, roomId)
sockets, ok := manager.sockets.LoadOrCompute(roomId, func() *xsync.MapOf[string, SocketConnection] {
return xsync.NewMapOf[string, SocketConnection]()
})
sockets.Store(id, SocketConnection{
2024-10-01 03:08:52 +00:00
Id: id,
Conn: conn,
RoomId: roomId,
})
2024-10-01 18:42:14 +00:00
s, ok := sockets.Load(id)
2024-10-01 03:49:03 +00:00
if !ok {
return
}
2024-10-01 18:42:14 +00:00
2024-09-30 22:31:09 +00:00
manager.dispatch(SocketEvent{
2024-10-01 03:49:03 +00:00
Id: s.Id,
2024-09-30 22:31:09 +00:00
Type: ConnectedEvent,
2024-10-01 03:49:03 +00:00
RoomId: s.RoomId,
2024-09-30 22:31:09 +00:00
Payload: map[string]any{},
})
}
func (manager *SocketManager) OnClose(id string) {
2024-10-01 03:08:52 +00:00
socket := manager.Get(id)
if socket == nil {
return
}
2024-09-30 22:31:09 +00:00
manager.dispatch(SocketEvent{
Id: id,
Type: DisconnectedEvent,
2024-10-01 03:08:52 +00:00
RoomId: socket.RoomId,
2024-09-30 22:31:09 +00:00
Payload: map[string]any{},
})
manager.sockets.Delete(id)
}
2024-10-01 17:09:22 +00:00
func (manager *SocketManager) CloseWithError(id string, code websocket.StatusCode, message string) {
conn := manager.Get(id)
if conn != nil {
2024-10-01 17:09:22 +00:00
go manager.OnClose(id)
conn.Conn.Close(code, message)
}
}
func (manager *SocketManager) Disconnect(id string) {
conn := manager.Get(id)
if conn != nil {
2024-10-01 17:09:22 +00:00
go manager.OnClose(id)
2024-10-01 03:08:52 +00:00
_ = conn.Conn.CloseNow()
}
}
2024-10-01 03:08:52 +00:00
func (manager *SocketManager) Get(id string) *SocketConnection {
2024-10-01 18:42:14 +00:00
roomId, ok := manager.idToRoom.Load(id)
2024-10-01 03:08:52 +00:00
if !ok {
return nil
}
2024-10-01 18:42:14 +00:00
sockets, ok := manager.sockets.Load(roomId)
if !ok {
return nil
}
conn, ok := sockets.Load(id)
2024-10-01 03:08:52 +00:00
return &conn
}
2024-10-01 18:42:14 +00:00
func (manager *SocketManager) Broadcast(roomId string, message []byte, messageType websocket.MessageType, predicate func(conn SocketConnection) bool) {
ctx := context.Background()
2024-10-01 18:42:14 +00:00
sockets, ok := manager.sockets.Load(roomId)
if !ok {
return
}
sockets.Range(func(id string, conn SocketConnection) bool {
if predicate(conn) {
conn.Conn.Write(ctx, messageType, message)
}
return true
})
}
2024-10-01 18:42:14 +00:00
func (manager *SocketManager) BroadcastText(roomId string, message string, predicate func(conn SocketConnection) bool) {
manager.Broadcast(roomId, []byte(message), websocket.MessageText, predicate)
}
2024-09-30 22:31:09 +00:00
func (manager *SocketManager) SendText(id string, message string) {
conn := manager.Get(id)
if conn != nil {
2024-10-01 03:08:52 +00:00
_ = conn.Conn.Write(context.Background(), websocket.MessageText, []byte(message))
2024-09-30 22:31:09 +00:00
}
}