refactor/make it a bit cleaner
This commit is contained in:
parent
ef54e274f5
commit
64fd4cfeab
6 changed files with 99 additions and 202 deletions
|
|
@ -1,15 +1,18 @@
|
|||
package event
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"github.com/google/uuid"
|
||||
"github.com/maddalax/htmgo/framework/h"
|
||||
"github.com/maddalax/htmgo/framework/service"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"runtime"
|
||||
"sse-with-state/internal"
|
||||
"sse-with-state/sse"
|
||||
"sse-with-state/state"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
|
@ -21,207 +24,84 @@ type HandlerData struct {
|
|||
|
||||
type Handler func(data HandlerData)
|
||||
|
||||
type handlerWrapper struct {
|
||||
handler Handler
|
||||
sessionId state.SessionId
|
||||
}
|
||||
|
||||
type ServerSideEvent struct {
|
||||
Event string
|
||||
Payload map[string]any
|
||||
SessionId state.SessionId
|
||||
}
|
||||
|
||||
type Metrics struct {
|
||||
Sessions []MetricPerSession
|
||||
}
|
||||
type KeyHash = string
|
||||
|
||||
type MetricPerSession struct {
|
||||
SessionId state.SessionId
|
||||
ClientListeners []MetricListener
|
||||
ServerListeners []MetricListener
|
||||
}
|
||||
|
||||
type MetricListener struct {
|
||||
Event string
|
||||
HandlerId string
|
||||
}
|
||||
|
||||
func GetMetrics() *Metrics {
|
||||
metrics := &Metrics{
|
||||
Sessions: make([]MetricPerSession, 0),
|
||||
}
|
||||
|
||||
Map.Range(func(key state.SessionId, value *Events) bool {
|
||||
clientListeners := make([]MetricListener, 0)
|
||||
value.client.Range(func(key string, value handlerWrapper) bool {
|
||||
clientListeners = append(clientListeners, MetricListener{
|
||||
Event: "",
|
||||
HandlerId: key,
|
||||
})
|
||||
return true
|
||||
})
|
||||
|
||||
serverListeners := make([]MetricListener, 0)
|
||||
value.server.Range(func(event string, value *xsync.MapOf[string, handlerWrapper]) bool {
|
||||
value.Range(func(handlerId string, value handlerWrapper) bool {
|
||||
serverListeners = append(serverListeners, MetricListener{
|
||||
Event: event,
|
||||
HandlerId: handlerId,
|
||||
})
|
||||
return true
|
||||
})
|
||||
return true
|
||||
})
|
||||
|
||||
metrics.Sessions = append(metrics.Sessions, MetricPerSession{
|
||||
SessionId: key,
|
||||
ClientListeners: clientListeners,
|
||||
ServerListeners: serverListeners,
|
||||
})
|
||||
return true
|
||||
})
|
||||
|
||||
return metrics
|
||||
}
|
||||
|
||||
type Events struct {
|
||||
SessionId state.SessionId
|
||||
server *xsync.MapOf[string, *xsync.MapOf[string, handlerWrapper]]
|
||||
client *xsync.MapOf[string, handlerWrapper]
|
||||
}
|
||||
|
||||
func NewEvents(sessionId state.SessionId) *Events {
|
||||
return &Events{
|
||||
SessionId: sessionId,
|
||||
server: xsync.NewMapOf[string, *xsync.MapOf[string, handlerWrapper]](),
|
||||
client: xsync.NewMapOf[string, handlerWrapper](),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Events) AddServerSideHandler(event string, id string, handler Handler) {
|
||||
wrapper := handlerWrapper{
|
||||
handler: handler,
|
||||
sessionId: e.SessionId,
|
||||
}
|
||||
|
||||
if _, ok := e.server.Load(event); !ok {
|
||||
e.server.Store(event, xsync.NewMapOf[string, handlerWrapper]())
|
||||
}
|
||||
|
||||
handlers, _ := e.server.Load(event)
|
||||
handlers.Store(id, wrapper)
|
||||
e.server.Store(event, handlers)
|
||||
}
|
||||
|
||||
func (e *Events) AddClientSideHandler(event string, handler Handler) *h.AttributeMapOrdered {
|
||||
handlerId := fmt.Sprintf("event_%s_%s", event, internal.RandSeq(30))
|
||||
fmt.Printf("adding client side handler %s\n", handlerId)
|
||||
e.client.Store(handlerId, handlerWrapper{
|
||||
handler: handler,
|
||||
sessionId: e.SessionId,
|
||||
})
|
||||
return h.AttributePairs(
|
||||
"data-handler-id", handlerId,
|
||||
"data-handler-event", event,
|
||||
)
|
||||
}
|
||||
|
||||
func (e *Events) OnServerSideEvent(manager *sse.SocketManager, eventName string) {
|
||||
handlers, ok := e.server.Load(eventName)
|
||||
if ok {
|
||||
socket := manager.Get(string(e.SessionId))
|
||||
if socket == nil {
|
||||
fmt.Printf("socket not found, must be disconnected: %s", e.SessionId)
|
||||
e.OnSocketDisconnected()
|
||||
Map.Delete(e.SessionId)
|
||||
return
|
||||
}
|
||||
handlers.Range(func(key string, value handlerWrapper) bool {
|
||||
go value.handler(HandlerData{
|
||||
SessionId: e.SessionId,
|
||||
Socket: socket,
|
||||
Manager: manager,
|
||||
})
|
||||
return true
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Events) OnClientSideEvent(manager *sse.SocketManager, handlerId string) {
|
||||
handlers, ok := e.client.Load(handlerId)
|
||||
if ok {
|
||||
go handlers.handler(HandlerData{
|
||||
SessionId: e.SessionId,
|
||||
Socket: manager.Get(string(e.SessionId)),
|
||||
Manager: manager,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Events) OnDomElementRemoved(id string) {
|
||||
e.server.Range(func(key string, value *xsync.MapOf[string, handlerWrapper]) bool {
|
||||
value.Delete(id)
|
||||
return true
|
||||
})
|
||||
e.client.Delete(id)
|
||||
}
|
||||
|
||||
func (e *Events) OnSocketDisconnected() {
|
||||
e.client.Clear()
|
||||
e.server.Clear()
|
||||
}
|
||||
|
||||
var Map = xsync.NewMapOf[state.SessionId, *Events]()
|
||||
var handlers = xsync.NewMapOf[KeyHash, Handler]()
|
||||
var sessionIdToHashes = xsync.NewMapOf[state.SessionId, map[KeyHash]bool]()
|
||||
var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]()
|
||||
|
||||
var socketMessageListener = make(chan sse.SocketEvent, 100)
|
||||
var serverSideMessageListener = make(chan ServerSideEvent, 100)
|
||||
var lock = sync.Mutex{}
|
||||
var callingHandler = atomic.Bool{}
|
||||
|
||||
func getCallerHash() string {
|
||||
now := time.Now()
|
||||
pc := make([]uintptr, 1000) // Adjust the size if you need a deeper stack trace
|
||||
n := runtime.Callers(2, pc) // Skip 2: runtime.Callers() and printCallers()
|
||||
|
||||
frames := runtime.CallersFrames(pc[:n])
|
||||
calls := make([]string, 0)
|
||||
hitBegin := false
|
||||
|
||||
for {
|
||||
frame, more := frames.Next()
|
||||
calls = append(calls, fmt.Sprintf("%s:%d", frame.Function, frame.Line))
|
||||
if strings.Contains(frame.Function, "runtime.goexit") {
|
||||
hitBegin = true
|
||||
}
|
||||
if !more {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
took := time.Since(now)
|
||||
fmt.Printf("took: %dms\n", took.Milliseconds())
|
||||
return strings.Join(calls, ",")
|
||||
if !hitBegin {
|
||||
fmt.Printf("hitBegin: %v\n", hitBegin)
|
||||
panic("unable to add handler, stack trace too deep, maximum 1000, unable to find runtime.goexit")
|
||||
}
|
||||
|
||||
joined := strings.Join(calls, ",")
|
||||
hash := sha256.Sum256([]byte(joined))
|
||||
return fmt.Sprintf("%x", hash)
|
||||
}
|
||||
|
||||
func AddServerSideHandler(ctx *h.RequestContext, event string, handler Handler) {
|
||||
hash := getCallerHash()
|
||||
func AddServerSideHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
|
||||
// If we are already in a handler, we don't want to add another handler
|
||||
// this can happen if the handler renders another element that has a handler
|
||||
if callingHandler.Load() {
|
||||
return h.NewAttributeMap()
|
||||
}
|
||||
sessionId := state.GetSessionId(ctx)
|
||||
hash := uuid.NewString()
|
||||
fmt.Printf("adding server side handler %s\n", hash)
|
||||
sessionId := state.GetSessionId(ctx)
|
||||
events, ok := Map.Load(sessionId)
|
||||
|
||||
if !ok {
|
||||
events = NewEvents(sessionId)
|
||||
Map.Store(sessionId, events)
|
||||
}
|
||||
|
||||
events.AddServerSideHandler(event, "test", handler)
|
||||
handlers.LoadOrStore(hash, handler)
|
||||
m, _ := serverEventNamesToHash.LoadOrCompute(event, func() map[KeyHash]bool {
|
||||
return make(map[KeyHash]bool)
|
||||
})
|
||||
m[hash] = true
|
||||
storeHashForSession(sessionId, hash)
|
||||
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
|
||||
}
|
||||
|
||||
func AddHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
|
||||
func AddClientSideHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
|
||||
hash := uuid.NewString()
|
||||
fmt.Printf("adding client side handler %s\n", hash)
|
||||
handlers.LoadOrStore(hash, handler)
|
||||
sessionId := state.GetSessionId(ctx)
|
||||
events, ok := Map.Load(sessionId)
|
||||
storeHashForSession(sessionId, hash)
|
||||
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
|
||||
}
|
||||
|
||||
if !ok {
|
||||
events = NewEvents(sessionId)
|
||||
Map.Store(sessionId, events)
|
||||
}
|
||||
|
||||
return events.AddClientSideHandler(event, handler)
|
||||
func storeHashForSession(sessionId state.SessionId, hash KeyHash) {
|
||||
m, _ := sessionIdToHashes.LoadOrCompute(sessionId, func() map[KeyHash]bool {
|
||||
return make(map[KeyHash]bool)
|
||||
})
|
||||
m[hash] = true
|
||||
}
|
||||
|
||||
func PushServerSideEvent(sessionId state.SessionId, event string) {
|
||||
|
|
@ -240,48 +120,71 @@ func StartListener(locator *service.Locator) {
|
|||
manager := service.Get[sse.SocketManager](locator)
|
||||
manager.Listen(socketMessageListener)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
fmt.Printf("total handlers: %d\n", handlers.Size())
|
||||
fmt.Printf("total serverEventNamesToHash: %d\n", serverEventNamesToHash.Size())
|
||||
fmt.Printf("total sessionIdToHashes: %d\n", sessionIdToHashes.Size())
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
}()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case sevent := <-serverSideMessageListener:
|
||||
Map.Range(func(key state.SessionId, value *Events) bool {
|
||||
value.OnServerSideEvent(
|
||||
manager,
|
||||
sevent.Event,
|
||||
)
|
||||
return true
|
||||
})
|
||||
fmt.Printf("received server side event: %s\n", sevent.Event)
|
||||
hashes, ok := serverEventNamesToHash.Load(sevent.Event)
|
||||
if ok {
|
||||
for hash := range hashes {
|
||||
cb, ok := handlers.Load(hash)
|
||||
if ok {
|
||||
lock.Lock()
|
||||
callingHandler.Store(true)
|
||||
cb(HandlerData{
|
||||
SessionId: sevent.SessionId,
|
||||
Socket: manager.Get(string(sevent.SessionId)),
|
||||
Manager: manager,
|
||||
})
|
||||
callingHandler.Store(false)
|
||||
lock.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case event := <-socketMessageListener:
|
||||
|
||||
if event.Type == sse.DisconnectedEvent {
|
||||
sessionId := state.SessionId(event.SessionId)
|
||||
handler, ok := Map.Load(sessionId)
|
||||
fmt.Printf("disconnected sessionId: %s\n", sessionId)
|
||||
hashes, ok := sessionIdToHashes.Load(sessionId)
|
||||
if ok {
|
||||
handler.OnSocketDisconnected()
|
||||
Map.Delete(sessionId)
|
||||
for hash := range hashes {
|
||||
handlers.Delete(hash)
|
||||
}
|
||||
sessionIdToHashes.Delete(sessionId)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if event.Type == sse.MessageEvent {
|
||||
handlerId := event.Payload["id"].(string)
|
||||
eventName := event.Payload["event"].(string)
|
||||
sessionId := state.SessionId(event.SessionId)
|
||||
|
||||
fmt.Printf("received eventName: %s, handlerId: %s, sessionId: %s\n", eventName, handlerId, sessionId)
|
||||
|
||||
handler, ok := Map.Load(sessionId)
|
||||
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if eventName == "dom-element-removed" {
|
||||
handler.OnDomElementRemoved(handlerId)
|
||||
handlers.Delete(handlerId)
|
||||
continue
|
||||
}
|
||||
|
||||
handler.OnClientSideEvent(manager, handlerId)
|
||||
cb, ok := handlers.Load(handlerId)
|
||||
if ok {
|
||||
cb(HandlerData{
|
||||
SessionId: sessionId,
|
||||
Socket: manager.Get(event.SessionId),
|
||||
Manager: manager,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/maddalax/htmgo/framework/h"
|
||||
"github.com/maddalax/htmgo/framework/service"
|
||||
"io/fs"
|
||||
|
|
@ -34,13 +33,6 @@ func main() {
|
|||
|
||||
app.Router.Handle("/public/*", http.StripPrefix("/public", http.FileServerFS(sub)))
|
||||
app.Router.Handle("/ws/test", sse.HandleWs())
|
||||
app.Router.Get("/metrics", func(writer http.ResponseWriter, request *http.Request) {
|
||||
writer.Header().Set("Content-Type", "application/json")
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
metrics := event.GetMetrics()
|
||||
serialized, _ := json.Marshal(metrics)
|
||||
_, _ = writer.Write(serialized)
|
||||
})
|
||||
__htmgo.Register(app.Router)
|
||||
},
|
||||
})
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ import (
|
|||
)
|
||||
|
||||
func OnClick(ctx *h.RequestContext, handler event.Handler) *h.AttributeMapOrdered {
|
||||
return event.AddHandler(ctx, "click", handler)
|
||||
return event.AddClientSideHandler(ctx, "click", handler)
|
||||
}
|
||||
|
||||
func OnServerSideEvent(ctx *h.RequestContext, eventName string, handler event.Handler) h.Ren {
|
||||
|
|
@ -15,5 +15,5 @@ func OnServerSideEvent(ctx *h.RequestContext, eventName string, handler event.Ha
|
|||
}
|
||||
|
||||
func OnMouseOver(ctx *h.RequestContext, handler event.Handler) *h.AttributeMapOrdered {
|
||||
return event.AddHandler(ctx, "mouseover", handler)
|
||||
return event.AddClientSideHandler(ctx, "mouseover", handler)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@ func CounterForm(ctx *h.RequestContext, props CounterProps) *h.Element {
|
|||
props.Id = h.GenId(6)
|
||||
}
|
||||
counter := UseCounter(state.GetSessionId(ctx), props.Id)
|
||||
|
||||
return h.Div(
|
||||
h.Attribute("hx-swap", "none"),
|
||||
h.Class("flex flex-col gap-3 items-center"),
|
||||
|
|
|
|||
4
framework/assets/dist/htmgo.js
vendored
4
framework/assets/dist/htmgo.js
vendored
File diff suppressed because one or more lines are too long
|
|
@ -53,9 +53,11 @@ export function addWsEventHandlers() {
|
|||
|
||||
ids.add(id);
|
||||
if (added.has(id)) {
|
||||
console.debug('already added, skipping', id)
|
||||
return;
|
||||
}
|
||||
added.add(id);
|
||||
console.debug('adding event listener for ws send', id, event)
|
||||
element.addEventListener(event, (e) => {
|
||||
sendWs({id, event})
|
||||
});
|
||||
|
|
@ -65,7 +67,6 @@ export function addWsEventHandlers() {
|
|||
added.delete(id);
|
||||
}
|
||||
}
|
||||
console.log('size', added.size)
|
||||
}
|
||||
|
||||
register([])
|
||||
|
|
|
|||
Loading…
Reference in a new issue