This commit is contained in:
maddalax 2024-10-09 09:57:51 -05:00
parent 9f53e8b2aa
commit e806656ec5
13 changed files with 276 additions and 104 deletions

View file

@ -29,44 +29,94 @@ type ServerSideEvent struct {
SessionId state.SessionId SessionId state.SessionId
} }
var Map = xsync.NewMapOf[string, handlerWrapper]() type Metrics struct {
var ServerSideEventMap = xsync.NewMapOf[string, *xsync.MapOf[string, handlerWrapper]]() Sessions []MetricPerSession
var socketMessageListener = make(chan sse.SocketEvent, 100)
var serverSideMessageListener = make(chan ServerSideEvent, 100)
func AddServerSideHandler(ctx *h.RequestContext, id string, event string, handler Handler) {
sessionId := state.GetSessionId(ctx)
wrapper := handlerWrapper{
handler: handler,
sessionId: sessionId,
}
handlers, ok := ServerSideEventMap.Load(event)
if !ok {
ServerSideEventMap.Store(event, xsync.NewMapOf[string, handlerWrapper]())
}
handlers, _ = ServerSideEventMap.Load(event)
handlers.Store(id, wrapper)
fmt.Printf("added server side handler for %s, %v\n", event, handlers)
} }
func AddHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered { type MetricPerSession struct {
handlerId := fmt.Sprintf("event_%s_%s", event, internal.RandSeq(30)) SessionId state.SessionId
for { ClientListeners []MetricListener
_, ok := Map.Load(handlerId) ServerListeners []MetricListener
if ok { }
handlerId = fmt.Sprintf("event_%s_%s", event, internal.RandSeq(30))
} else { type MetricListener struct {
break Event string
} HandlerId string
}
func GetMetrics() *Metrics {
metrics := &Metrics{
Sessions: make([]MetricPerSession, 0),
} }
sessionId := state.GetSessionId(ctx)
Map.Store(handlerId, handlerWrapper{ Map.Range(func(key state.SessionId, value *Events) bool {
clientListeners := make([]MetricListener, 0)
value.client.Range(func(key string, value handlerWrapper) bool {
clientListeners = append(clientListeners, MetricListener{
Event: "",
HandlerId: key,
})
return true
})
serverListeners := make([]MetricListener, 0)
value.server.Range(func(event string, value *xsync.MapOf[string, handlerWrapper]) bool {
value.Range(func(handlerId string, value handlerWrapper) bool {
serverListeners = append(serverListeners, MetricListener{
Event: event,
HandlerId: handlerId,
})
return true
})
return true
})
metrics.Sessions = append(metrics.Sessions, MetricPerSession{
SessionId: key,
ClientListeners: clientListeners,
ServerListeners: serverListeners,
})
return true
})
return metrics
}
type Events struct {
SessionId state.SessionId
server *xsync.MapOf[string, *xsync.MapOf[string, handlerWrapper]]
client *xsync.MapOf[string, handlerWrapper]
}
func NewEvents(sessionId state.SessionId) *Events {
return &Events{
SessionId: sessionId,
server: xsync.NewMapOf[string, *xsync.MapOf[string, handlerWrapper]](),
client: xsync.NewMapOf[string, handlerWrapper](),
}
}
func (e *Events) AddServerSideHandler(event string, id string, handler Handler) {
wrapper := handlerWrapper{
handler: handler, handler: handler,
sessionId: sessionId, sessionId: e.SessionId,
}
if _, ok := e.server.Load(event); !ok {
e.server.Store(event, xsync.NewMapOf[string, handlerWrapper]())
}
handlers, _ := e.server.Load(event)
handlers.Store(id, wrapper)
e.server.Store(event, handlers)
}
func (e *Events) AddClientSideHandler(event string, handler Handler) *h.AttributeMapOrdered {
handlerId := fmt.Sprintf("event_%s_%s", event, internal.RandSeq(30))
fmt.Printf("adding client side handler %s\n", handlerId)
e.client.Store(handlerId, handlerWrapper{
handler: handler,
sessionId: e.SessionId,
}) })
return h.AttributePairs( return h.AttributePairs(
"data-handler-id", handlerId, "data-handler-id", handlerId,
@ -74,6 +124,80 @@ func AddHandler(ctx *h.RequestContext, event string, handler Handler) *h.Attribu
) )
} }
func (e *Events) OnServerSideEvent(manager *sse.SocketManager, eventName string) {
handlers, ok := e.server.Load(eventName)
if ok {
socket := manager.Get(string(e.SessionId))
if socket == nil {
fmt.Printf("socket not found, must be disconnected: %s", e.SessionId)
e.OnSocketDisconnected()
Map.Delete(e.SessionId)
return
}
handlers.Range(func(key string, value handlerWrapper) bool {
go value.handler(HandlerData{
SessionId: e.SessionId,
Socket: socket,
Manager: manager,
})
return true
})
}
}
func (e *Events) OnClientSideEvent(manager *sse.SocketManager, handlerId string) {
handlers, ok := e.client.Load(handlerId)
if ok {
go handlers.handler(HandlerData{
SessionId: e.SessionId,
Socket: manager.Get(string(e.SessionId)),
Manager: manager,
})
}
}
func (e *Events) OnDomElementRemoved(id string) {
e.server.Range(func(key string, value *xsync.MapOf[string, handlerWrapper]) bool {
value.Delete(id)
return true
})
e.client.Delete(id)
}
func (e *Events) OnSocketDisconnected() {
e.client.Clear()
e.server.Clear()
}
var Map = xsync.NewMapOf[state.SessionId, *Events]()
var socketMessageListener = make(chan sse.SocketEvent, 100)
var serverSideMessageListener = make(chan ServerSideEvent, 100)
func AddServerSideHandler(ctx *h.RequestContext, id string, event string, handler Handler) {
sessionId := state.GetSessionId(ctx)
events, ok := Map.Load(sessionId)
if !ok {
events = NewEvents(sessionId)
Map.Store(sessionId, events)
}
events.AddServerSideHandler(event, id, handler)
}
func AddHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
sessionId := state.GetSessionId(ctx)
events, ok := Map.Load(sessionId)
if !ok {
events = NewEvents(sessionId)
Map.Store(sessionId, events)
}
return events.AddClientSideHandler(event, handler)
}
func PushServerSideEvent(sessionId state.SessionId, event string) { func PushServerSideEvent(sessionId state.SessionId, event string) {
serverSideMessageListener <- ServerSideEvent{ serverSideMessageListener <- ServerSideEvent{
Event: event, Event: event,
@ -89,34 +213,49 @@ func PushElement(data HandlerData, el *h.Element) {
func StartListener(locator *service.Locator) { func StartListener(locator *service.Locator) {
manager := service.Get[sse.SocketManager](locator) manager := service.Get[sse.SocketManager](locator)
manager.Listen(socketMessageListener) manager.Listen(socketMessageListener)
go func() { go func() {
for { for {
select { select {
case sevent := <-serverSideMessageListener: case sevent := <-serverSideMessageListener:
handlers, ok := ServerSideEventMap.Load(sevent.Event) Map.Range(func(key state.SessionId, value *Events) bool {
if ok { value.OnServerSideEvent(
handlers.Range(func(key string, value handlerWrapper) bool { manager,
go value.handler(HandlerData{ sevent.Event,
SessionId: sevent.SessionId, )
Socket: manager.Get(string(sevent.SessionId)), return true
Manager: manager, })
})
return true
})
}
case event := <-socketMessageListener: case event := <-socketMessageListener:
if event.Type == sse.DisconnectedEvent {
sessionId := state.SessionId(event.SessionId)
handler, ok := Map.Load(sessionId)
if ok {
handler.OnSocketDisconnected()
Map.Delete(sessionId)
}
continue
}
if event.Type == sse.MessageEvent { if event.Type == sse.MessageEvent {
handlerId := event.Payload["id"].(string) handlerId := event.Payload["id"].(string)
eventName := event.Payload["event"].(string) eventName := event.Payload["event"].(string)
cb, ok := Map.Load(handlerId) sessionId := state.SessionId(event.SessionId)
if ok {
fmt.Printf("calling %s handler for session: %s\n", eventName, cb.sessionId) fmt.Printf("received eventName: %s, handlerId: %s, sessionId: %s\n", eventName, handlerId, sessionId)
go cb.handler(HandlerData{
SessionId: cb.sessionId, handler, ok := Map.Load(sessionId)
Socket: manager.Get(event.SocketId),
Manager: manager, if !ok {
}) return
} }
if eventName == "dom-element-removed" {
handler.OnDomElementRemoved(handlerId)
continue
}
handler.OnClientSideEvent(manager, handlerId)
} }
} }
} }

View file

@ -1,6 +1,7 @@
package main package main
import ( import (
"encoding/json"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/service"
"io/fs" "io/fs"
@ -33,6 +34,13 @@ func main() {
app.Router.Handle("/public/*", http.StripPrefix("/public", http.FileServerFS(sub))) app.Router.Handle("/public/*", http.StripPrefix("/public", http.FileServerFS(sub)))
app.Router.Handle("/ws/test", sse.HandleWs()) app.Router.Handle("/ws/test", sse.HandleWs())
app.Router.Get("/metrics", func(writer http.ResponseWriter, request *http.Request) {
writer.Header().Set("Content-Type", "application/json")
writer.WriteHeader(http.StatusOK)
metrics := event.GetMetrics()
serialized, _ := json.Marshal(metrics)
_, _ = writer.Write(serialized)
})
__htmgo.Register(app.Router) __htmgo.Register(app.Router)
}, },
}) })

View file

@ -8,11 +8,12 @@ import (
) )
func IndexPage(ctx *h.RequestContext) *h.Page { func IndexPage(ctx *h.RequestContext) *h.Page {
state.NewState(ctx) sessionId := state.GetSessionId(ctx)
return h.NewPage( return h.NewPage(
RootPage( RootPage(
ctx,
h.Div( h.Div(
h.Attribute("ws-connect", fmt.Sprintf("/ws/test")), h.Attribute("ws-connect", fmt.Sprintf("/ws/test?sessionId=%s", sessionId)),
h.Class("flex flex-col gap-4 items-center pt-24 min-h-screen bg-neutral-100"), h.Class("flex flex-col gap-4 items-center pt-24 min-h-screen bg-neutral-100"),
h.H3(h.Id("intro-text"), h.Text("Repeater Example"), h.Class("text-2xl")), h.H3(h.Id("intro-text"), h.Text("Repeater Example"), h.Class("text-2xl")),

View file

@ -2,10 +2,13 @@ package pages
import ( import (
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"starter-template/state"
) )
func RootPage(children ...h.Ren) h.Ren { func RootPage(ctx *h.RequestContext, children ...h.Ren) h.Ren {
s := state.NewState(ctx)
return h.Html( return h.Html(
h.Attribute("data-session-id", s.SessionId),
h.HxExtension(h.BaseExtensions()), h.HxExtension(h.BaseExtensions()),
h.Head( h.Head(
h.Link("/public/main.css", "stylesheet"), h.Link("/public/main.css", "stylesheet"),

View file

@ -3,15 +3,17 @@ package partials
import ( import (
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"sse-with-state/event" "sse-with-state/event"
"sse-with-state/internal"
) )
func OnClick(ctx *h.RequestContext, handler event.Handler) *h.AttributeMapOrdered { func OnClick(ctx *h.RequestContext, handler event.Handler) *h.AttributeMapOrdered {
return event.AddHandler(ctx, "click", handler) return event.AddHandler(ctx, "click", handler)
} }
func OnServerSideEvent(ctx *h.RequestContext, id string, eventName string, handler event.Handler) h.Ren { func OnServerSideEvent(ctx *h.RequestContext, eventName string, handler event.Handler) h.Ren {
id := internal.RandSeq(30)
event.AddServerSideHandler(ctx, id, eventName, handler) event.AddServerSideHandler(ctx, id, eventName, handler)
return h.Empty() return h.Attribute("data-handler-id", id)
} }
func OnMouseOver(ctx *h.RequestContext, handler event.Handler) *h.AttributeMapOrdered { func OnMouseOver(ctx *h.RequestContext, handler event.Handler) *h.AttributeMapOrdered {

View file

@ -61,7 +61,7 @@ func CounterForm(ctx *h.RequestContext, props CounterProps) *h.Element {
h.Class("bg-rose-400 hover:bg-rose-500 text-white font-bold py-2 px-4 rounded"), h.Class("bg-rose-400 hover:bg-rose-500 text-white font-bold py-2 px-4 rounded"),
h.Type("submit"), h.Type("submit"),
h.Text("Increment"), h.Text("Increment"),
OnServerSideEvent(ctx, props.Id, "increment", func(data event.HandlerData) { OnServerSideEvent(ctx, "increment", func(data event.HandlerData) {
counter.Increment() counter.Increment()
event.PushElement(data, CounterForm(ctx, props)) event.PushElement(data, CounterForm(ctx, props))
}), }),

View file

@ -37,9 +37,7 @@ func repeaterItem(ctx *h.RequestContext, item *h.Element, index int, props *Repe
event.PushElement(data, event.PushElement(data,
h.Div( h.Div(
h.Attribute("hx-swap-oob", fmt.Sprintf("delete:#%s", id)), h.Attribute("hx-swap-oob", fmt.Sprintf("delete:#%s", id)),
repeaterItem( h.Div(),
ctx, item, index, props,
),
), ),
) )
}), }),

View file

@ -19,12 +19,7 @@ func HandleWs() http.HandlerFunc {
locator := cc.ServiceLocator() locator := cc.ServiceLocator()
manager := service.Get[SocketManager](locator) manager := service.Get[SocketManager](locator)
sessionCookie, _ := r.Cookie("state") sessionId := r.URL.Query().Get("sessionId")
sessionId := ""
if sessionCookie != nil {
sessionId = sessionCookie.Value
}
if sessionId == "" { if sessionId == "" {
w.WriteHeader(http.StatusUnauthorized) w.WriteHeader(http.StatusUnauthorized)

View file

@ -18,10 +18,10 @@ const (
) )
type SocketEvent struct { type SocketEvent struct {
SocketId string SessionId string
RoomId string RoomId string
Type EventType Type EventType
Payload map[string]any Payload map[string]any
} }
type CloseEvent struct { type CloseEvent struct {
@ -95,10 +95,10 @@ func (manager *SocketManager) OnMessage(id string, message map[string]any) {
return return
} }
manager.dispatch(SocketEvent{ manager.dispatch(SocketEvent{
SocketId: id, SessionId: id,
Type: MessageEvent, Type: MessageEvent,
Payload: message, Payload: message,
RoomId: socket.RoomId, RoomId: socket.RoomId,
}) })
} }
@ -122,10 +122,10 @@ func (manager *SocketManager) Add(roomId string, id string, writer WriterChan, d
} }
manager.dispatch(SocketEvent{ manager.dispatch(SocketEvent{
SocketId: s.Id, SessionId: s.Id,
Type: ConnectedEvent, Type: ConnectedEvent,
RoomId: s.RoomId, RoomId: s.RoomId,
Payload: map[string]any{}, Payload: map[string]any{},
}) })
fmt.Printf("User %s connected to %s\n", id, roomId) fmt.Printf("User %s connected to %s\n", id, roomId)
@ -137,10 +137,10 @@ func (manager *SocketManager) OnClose(id string) {
return return
} }
manager.dispatch(SocketEvent{ manager.dispatch(SocketEvent{
SocketId: id, SessionId: id,
Type: DisconnectedEvent, Type: DisconnectedEvent,
RoomId: socket.RoomId, RoomId: socket.RoomId,
Payload: map[string]any{}, Payload: map[string]any{},
}) })
manager.sockets.Delete(id) manager.sockets.Delete(id)
} }

View file

@ -1,10 +1,10 @@
package state package state
import ( import (
"github.com/google/uuid" "fmt"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"github.com/puzpuzpuz/xsync/v3" "github.com/puzpuzpuz/xsync/v3"
"net/http" "sse-with-state/internal"
) )
type SessionId string type SessionId string
@ -24,19 +24,15 @@ func NewState(ctx *h.RequestContext) *State {
} }
func GetSessionId(ctx *h.RequestContext) SessionId { func GetSessionId(ctx *h.RequestContext) SessionId {
stateCookie, err := ctx.Request.Cookie("state") sessionIdRaw := ctx.Get("session-id")
sessionId := "" sessionId := ""
if err == nil {
sessionId = stateCookie.Value
} else {
sessionId = uuid.NewString()
}
c := http.Cookie{ if sessionIdRaw == "" || sessionIdRaw == nil {
Name: "state", sessionId = fmt.Sprintf("session-id-%s", internal.RandSeq(30))
Value: sessionId, ctx.Set("session-id", sessionId)
} else {
sessionId = sessionIdRaw.(string)
} }
ctx.Response.Header().Set("Set-Cookie", c.String())
return SessionId(sessionId) return SessionId(sessionId)
} }

File diff suppressed because one or more lines are too long

View file

@ -2,6 +2,19 @@ import {ws} from "./ws";
window.onload = addWsEventHandlers; window.onload = addWsEventHandlers;
function sendWs(message: Record<string, any>) {
if(ws != null && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(message));
}
}
function walk(node: Node, cb: (node: Node) => void) {
cb(node);
for (let child of Array.from(node.childNodes)) {
walk(child, cb);
}
}
export function addWsEventHandlers() { export function addWsEventHandlers() {
console.log('add ws event handlers') console.log('add ws event handlers')
const observer = new MutationObserver(register) const observer = new MutationObserver(register)
@ -9,7 +22,26 @@ export function addWsEventHandlers() {
let added = new Set<string>(); let added = new Set<string>();
function register() { function register(mutations: MutationRecord[]) {
console.log(mutations)
for (let mutation of mutations) {
for (let removedNode of Array.from(mutation.removedNodes)) {
walk(removedNode, (node) => {
if (node instanceof HTMLElement) {
console.log('removing', node.innerHTML)
const handlerId = node.getAttribute("data-handler-id")
if(handlerId) {
added.delete(handlerId)
sendWs({id: handlerId, event: 'dom-element-removed'})
}
}
})
}
}
let ids = new Set<string>(); let ids = new Set<string>();
document.querySelectorAll("[data-handler-id]").forEach(element => { document.querySelectorAll("[data-handler-id]").forEach(element => {
const id = element.getAttribute("data-handler-id"); const id = element.getAttribute("data-handler-id");
@ -25,10 +57,7 @@ export function addWsEventHandlers() {
} }
added.add(id); added.add(id);
element.addEventListener(event, (e) => { element.addEventListener(event, (e) => {
console.log('sending event', id, event, ws) sendWs({id, event})
if(ws != null && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({id, event}));
}
}); });
}) })
for (let id of added) { for (let id of added) {
@ -39,6 +68,6 @@ export function addWsEventHandlers() {
console.log('size', added.size) console.log('size', added.size)
} }
register() register([])
} }

View file

@ -49,6 +49,7 @@ function connectWs(ele: Element, url: string, attempt: number = 0) {
url = (isSecure ? 'wss://' : 'ws://') + window.location.host + url url = (isSecure ? 'wss://' : 'ws://') + window.location.host + url
} }
console.info('connecting to ws', url) console.info('connecting to ws', url)
ws = new WebSocket(url); ws = new WebSocket(url);
ws.addEventListener("close", function(event) { ws.addEventListener("close", function(event) {