some cleanup

This commit is contained in:
maddalax 2024-10-04 11:15:57 -05:00
parent 7503e1b2c1
commit 1e51b6b8e8
9 changed files with 27 additions and 75 deletions

View file

@ -2,7 +2,7 @@ package chat
import ( import (
"chat/internal/db" "chat/internal/db"
"chat/ws" "chat/sse"
"context" "context"
"fmt" "fmt"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
@ -11,32 +11,32 @@ import (
) )
type Manager struct { type Manager struct {
socketManager *ws.SocketManager socketManager *sse.SocketManager
queries *db.Queries queries *db.Queries
service *Service service *Service
} }
func NewManager(locator *service.Locator) *Manager { func NewManager(locator *service.Locator) *Manager {
return &Manager{ return &Manager{
socketManager: service.Get[ws.SocketManager](locator), socketManager: service.Get[sse.SocketManager](locator),
queries: service.Get[db.Queries](locator), queries: service.Get[db.Queries](locator),
service: NewService(locator), service: NewService(locator),
} }
} }
func (m *Manager) StartListener() { func (m *Manager) StartListener() {
c := make(chan ws.SocketEvent, 1) c := make(chan sse.SocketEvent, 1)
m.socketManager.Listen(c) m.socketManager.Listen(c)
for { for {
select { select {
case event := <-c: case event := <-c:
switch event.Type { switch event.Type {
case ws.ConnectedEvent: case sse.ConnectedEvent:
m.OnConnected(event) m.OnConnected(event)
case ws.DisconnectedEvent: case sse.DisconnectedEvent:
m.OnDisconnected(event) m.OnDisconnected(event)
case ws.MessageEvent: case sse.MessageEvent:
m.onMessage(event) m.onMessage(event)
default: default:
fmt.Printf("Unknown event type: %s\n", event.Type) fmt.Printf("Unknown event type: %s\n", event.Type)
@ -45,12 +45,12 @@ func (m *Manager) StartListener() {
} }
} }
func (m *Manager) dispatchConnectedUsers(roomId string, predicate func(conn ws.SocketConnection) bool) { func (m *Manager) dispatchConnectedUsers(roomId string, predicate func(conn sse.SocketConnection) bool) {
connectedUsers := make([]db.User, 0) connectedUsers := make([]db.User, 0)
// backfill all existing clients to the connected client // backfill all existing clients to the connected client
m.socketManager.ForEachSocket(roomId, func(conn ws.SocketConnection) { m.socketManager.ForEachSocket(roomId, func(conn sse.SocketConnection) {
if !predicate(conn) { if !predicate(conn) {
return return
} }
@ -61,12 +61,12 @@ func (m *Manager) dispatchConnectedUsers(roomId string, predicate func(conn ws.S
connectedUsers = append(connectedUsers, user) connectedUsers = append(connectedUsers, user)
}) })
m.socketManager.ForEachSocket(roomId, func(conn ws.SocketConnection) { m.socketManager.ForEachSocket(roomId, func(conn sse.SocketConnection) {
m.socketManager.SendText(conn.Id, h.Render(ConnectedUsers(connectedUsers, conn.Id))) m.socketManager.SendText(conn.Id, h.Render(ConnectedUsers(connectedUsers, conn.Id)))
}) })
} }
func (m *Manager) OnConnected(e ws.SocketEvent) { func (m *Manager) OnConnected(e sse.SocketEvent) {
room, _ := m.service.GetRoom(e.RoomId) room, _ := m.service.GetRoom(e.RoomId)
if room == nil { if room == nil {
@ -83,14 +83,14 @@ func (m *Manager) OnConnected(e ws.SocketEvent) {
fmt.Printf("User %s connected to %s\n", user.Name, e.RoomId) fmt.Printf("User %s connected to %s\n", user.Name, e.RoomId)
m.dispatchConnectedUsers(e.RoomId, func(conn ws.SocketConnection) bool { m.dispatchConnectedUsers(e.RoomId, func(conn sse.SocketConnection) bool {
return true return true
}) })
m.backFill(e.Id, e.RoomId) m.backFill(e.Id, e.RoomId)
} }
func (m *Manager) OnDisconnected(e ws.SocketEvent) { func (m *Manager) OnDisconnected(e sse.SocketEvent) {
user, err := m.queries.GetUserBySessionId(context.Background(), e.Id) user, err := m.queries.GetUserBySessionId(context.Background(), e.Id)
if err != nil { if err != nil {
return return
@ -100,7 +100,7 @@ func (m *Manager) OnDisconnected(e ws.SocketEvent) {
return return
} }
fmt.Printf("User %s disconnected from %s\n", user.Name, room.ID) fmt.Printf("User %s disconnected from %s\n", user.Name, room.ID)
m.dispatchConnectedUsers(e.RoomId, func(conn ws.SocketConnection) bool { m.dispatchConnectedUsers(e.RoomId, func(conn sse.SocketConnection) bool {
return conn.Id != e.Id return conn.Id != e.Id
}) })
} }
@ -123,7 +123,7 @@ func (m *Manager) backFill(socketId string, roomId string) {
} }
} }
func (m *Manager) onMessage(e ws.SocketEvent) { func (m *Manager) onMessage(e sse.SocketEvent) {
message := e.Payload["message"].(string) message := e.Payload["message"].(string)
if message == "" { if message == "" {
@ -147,7 +147,7 @@ func (m *Manager) onMessage(e ws.SocketEvent) {
m.socketManager.BroadcastText( m.socketManager.BroadcastText(
e.RoomId, e.RoomId,
h.Render(MessageRow(saved)), h.Render(MessageRow(saved)),
func(conn ws.SocketConnection) bool { func(conn sse.SocketConnection) bool {
return true return true
}, },
) )

View file

@ -4,7 +4,7 @@ import (
"chat/__htmgo" "chat/__htmgo"
"chat/chat" "chat/chat"
"chat/internal/db" "chat/internal/db"
"chat/ws" "chat/sse"
"fmt" "fmt"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/service"
@ -18,8 +18,8 @@ func main() {
locator := service.NewLocator() locator := service.NewLocator()
service.Set[db.Queries](locator, service.Singleton, db.Provide) service.Set[db.Queries](locator, service.Singleton, db.Provide)
service.Set[ws.SocketManager](locator, service.Singleton, func() *ws.SocketManager { service.Set[sse.SocketManager](locator, service.Singleton, func() *sse.SocketManager {
return ws.NewSocketManager() return sse.NewSocketManager()
}) })
chatManager := chat.NewManager(locator) chatManager := chat.NewManager(locator)
@ -46,7 +46,7 @@ func main() {
http.FileServerFS(sub) http.FileServerFS(sub)
app.Router.Handle("/public/*", http.StripPrefix("/public", http.FileServerFS(sub))) app.Router.Handle("/public/*", http.StripPrefix("/public", http.FileServerFS(sub)))
app.Router.Handle("/ws/chat/{id}", ws.Handle()) app.Router.Handle("/sse/chat/{id}", sse.Handle())
__htmgo.Register(app.Router) __htmgo.Register(app.Router)
}, },

View file

@ -18,7 +18,7 @@ func ChatRoom(ctx *h.RequestContext) *h.Page {
h.Div( h.Div(
h.TriggerChildren(), h.TriggerChildren(),
h.Attribute("sse-connect", fmt.Sprintf("/ws/chat/%s", roomId)), h.Attribute("sse-connect", fmt.Sprintf("/sse/chat/%s", roomId)),
h.HxOnSseOpen( h.HxOnSseOpen(
js.ConsoleLog("Connected to chat room"), js.ConsoleLog("Connected to chat room"),
@ -152,7 +152,6 @@ func Form() *h.Element {
h.Form( h.Form(
h.NoSwap(), h.NoSwap(),
h.PostPartial(partials.SendMessage), h.PostPartial(partials.SendMessage),
h.Attribute("ws-send", ""),
h.Class("flex flex-grow"), h.Class("flex flex-grow"),
MessageInput(), MessageInput(),
), ),

View file

@ -2,14 +2,14 @@ package partials
import ( import (
"chat/components" "chat/components"
"chat/ws" "chat/sse"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/service"
) )
func SendMessage(ctx *h.RequestContext) *h.Partial { func SendMessage(ctx *h.RequestContext) *h.Partial {
locator := ctx.ServiceLocator() locator := ctx.ServiceLocator()
socketManager := service.Get[ws.SocketManager](locator) socketManager := service.Get[sse.SocketManager](locator)
sessionCookie, err := ctx.Request.Cookie("session_id") sessionCookie, err := ctx.Request.Cookie("session_id")

View file

@ -1,4 +1,4 @@
package ws package sse
import ( import (
"fmt" "fmt"

View file

@ -1,4 +1,4 @@
package ws package sse
import ( import (
"chat/internal/routine" "chat/internal/routine"

File diff suppressed because one or more lines are too long

View file

@ -1,5 +1,4 @@
import htmx from "htmx.org"; import htmx from "htmx.org";
import {createWebSocketClient} from "../util/ws";
let lastVersion = ""; let lastVersion = "";
@ -48,4 +47,4 @@ htmx.defineExtension("livereload", {
function reload() { function reload() {
window.location.reload() window.location.reload()
} }

View file

@ -1,46 +0,0 @@
type WsOpts = {
url: string;
reconnectInterval?: number;
onOpen?: () => void;
onMessage: (message: string) => void;
onError?: (error: Event) => void;
onClose?: () => void;
}
export function createWebSocketClient(opts: WsOpts) {
let socket: WebSocket | null = null;
const connect = (tries: number) => {
console.log('connecting to ws', opts.url, 'attempt', tries)
socket = new WebSocket(opts.url);
// Handle incoming messages
socket.onmessage = (event) => {
opts.onMessage(event.data)
};
// Handle connection errors
socket.onerror = (error) => {
try {
socket?.close()
} catch(ex) {
// noop
}
socket = null
let interval = tries * (opts.reconnectInterval || 1000);
setTimeout(() => connect(tries + 1), interval);
};
// Handle connection close and attempt reconnection
socket.onclose = () => {
socket = null;
let interval = tries * (opts.reconnectInterval || 1000);
setTimeout(() => connect(tries + 1), interval);
};
};
connect(1);
const sendMessage = (message: string) => {
if (socket && socket.readyState === WebSocket.OPEN) {
socket.send(message);
} else {
setTimeout(() => sendMessage(message), 100);
}
};
return { sendMessage };
}