This commit is contained in:
maddalax 2024-10-16 15:00:57 -05:00
parent 839c24ce59
commit d3db71f889
6 changed files with 204 additions and 168 deletions

View file

@ -0,0 +1,26 @@
package event
import "github.com/maddalax/htmgo/framework/h"
// PushServerSideEvent sends a server side event this specific session
func PushServerSideEvent(data HandlerData, event string, value map[string]any) {
serverSideMessageListener <- ServerSideEvent{
Event: event,
Payload: value,
SessionId: data.SessionId,
}
}
// BroadcastServerSideEvent sends a server side event to all clients that have a handler for the event, not just the current session
func BroadcastServerSideEvent(event string, value map[string]any) {
serverSideMessageListener <- ServerSideEvent{
Event: event,
Payload: value,
SessionId: "*",
}
}
// PushElement sends an element to the current session and swaps it into the page
func PushElement(data HandlerData, el *h.Element) {
data.Manager.SendHtml(data.Socket.Id, h.Render(el))
}

View file

@ -0,0 +1,90 @@
package event
import (
"fmt"
"sse-with-state/sse"
"sse-with-state/state"
"sync"
)
type MessageHandler struct {
manager *sse.SocketManager
}
func NewMessageHandler(manager *sse.SocketManager) *MessageHandler {
return &MessageHandler{manager: manager}
}
func (h *MessageHandler) OnServerSideEvent(e ServerSideEvent) {
fmt.Printf("received server side event: %s\n", e.Event)
hashes, ok := serverEventNamesToHash.Load(e.Event)
// If we are not broadcasting to everyone, filter it down to just the current session that invoked the event
// TODO optimize this
if e.SessionId != "*" {
hashesForSession, ok2 := sessionIdToHashes.Load(e.SessionId)
if ok2 {
subset := make(map[KeyHash]bool)
for hash := range hashes {
if _, ok := hashesForSession[hash]; ok {
subset[hash] = true
}
}
hashes = subset
}
}
if ok {
lock.Lock()
callingHandler.Store(true)
wg := sync.WaitGroup{}
for hash := range hashes {
cb, ok := handlers.Load(hash)
if ok {
wg.Add(1)
go func(e ServerSideEvent) {
defer wg.Done()
sessionId, ok2 := hashesToSessionId.Load(hash)
if ok2 {
cb(HandlerData{
SessionId: sessionId,
Socket: h.manager.Get(string(sessionId)),
Manager: h.manager,
})
}
}(e)
}
}
wg.Wait()
callingHandler.Store(false)
lock.Unlock()
}
}
func (h *MessageHandler) OnClientSideEvent(handlerId string, sessionId state.SessionId) {
cb, ok := handlers.Load(handlerId)
if ok {
cb(HandlerData{
SessionId: sessionId,
Socket: h.manager.Get(string(sessionId)),
Manager: h.manager,
})
}
}
func (h *MessageHandler) OnDomElementRemoved(handlerId string) {
handlers.Delete(handlerId)
}
func (h *MessageHandler) OnSocketDisconnected(event sse.SocketEvent) {
sessionId := state.SessionId(event.SessionId)
hashes, ok := sessionIdToHashes.Load(sessionId)
if ok {
for hash := range hashes {
hashesToSessionId.Delete(hash)
handlers.Delete(hash)
}
sessionIdToHashes.Delete(sessionId)
}
}

View file

@ -2,108 +2,16 @@ package event
import (
"fmt"
"github.com/google/uuid"
"github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service"
"github.com/puzpuzpuz/xsync/v3"
"sse-with-state/sse"
"sse-with-state/state"
"sync"
"sync/atomic"
"time"
)
type HandlerData struct {
SessionId state.SessionId
Socket *sse.SocketConnection
Manager *sse.SocketManager
}
type Handler func(data HandlerData)
type ServerSideEvent struct {
Event string
Payload map[string]any
SessionId state.SessionId
}
type KeyHash = string
var handlers = xsync.NewMapOf[KeyHash, Handler]()
var sessionIdToHashes = xsync.NewMapOf[state.SessionId, map[KeyHash]bool]()
var hashesToSessionId = xsync.NewMapOf[KeyHash, state.SessionId]()
var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]()
var socketMessageListener = make(chan sse.SocketEvent, 100)
var serverSideMessageListener = make(chan ServerSideEvent, 100)
var lock = sync.Mutex{}
var callingHandler = atomic.Bool{}
func AddServerSideHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
// If we are already in a handler, we don't want to add another handler
// this can happen if the handler renders another element that has a handler
if callingHandler.Load() {
return h.NewAttributeMap()
}
sessionId := state.GetSessionId(ctx)
hash := uuid.NewString()
fmt.Printf("adding server side handler %s\n", hash)
handlers.LoadOrStore(hash, handler)
m, _ := serverEventNamesToHash.LoadOrCompute(event, func() map[KeyHash]bool {
return make(map[KeyHash]bool)
})
m[hash] = true
storeHashForSession(sessionId, hash)
storeSessionIdForHash(sessionId, hash)
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
}
func AddClientSideHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
hash := uuid.NewString()
fmt.Printf("adding client side handler %s\n", hash)
handlers.LoadOrStore(hash, handler)
sessionId := state.GetSessionId(ctx)
storeHashForSession(sessionId, hash)
storeSessionIdForHash(sessionId, hash)
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
}
func storeHashForSession(sessionId state.SessionId, hash KeyHash) {
m, _ := sessionIdToHashes.LoadOrCompute(sessionId, func() map[KeyHash]bool {
return make(map[KeyHash]bool)
})
m[hash] = true
}
func storeSessionIdForHash(sessionId state.SessionId, hash KeyHash) {
hashesToSessionId.Store(hash, sessionId)
}
// PushServerSideEvent sends a server side event this specific session
func PushServerSideEvent(data HandlerData, event string, value map[string]any) {
serverSideMessageListener <- ServerSideEvent{
Event: event,
Payload: value,
SessionId: data.SessionId,
}
}
// BroadcastServerSideEvent sends a server side event to all clients that have a handler for the event, not just the current session
func BroadcastServerSideEvent(event string, value map[string]any) {
serverSideMessageListener <- ServerSideEvent{
Event: event,
Payload: value,
SessionId: "*",
}
}
func PushElement(data HandlerData, el *h.Element) {
data.Manager.SendHtml(data.Socket.Id, h.Render(el))
}
func StartListener(locator *service.Locator) {
manager := service.Get[sse.SocketManager](locator)
manager.Listen(socketMessageListener)
handler := NewMessageHandler(manager)
go func() {
for {
@ -117,85 +25,21 @@ func StartListener(locator *service.Locator) {
go func() {
for {
select {
case sevent := <-serverSideMessageListener:
fmt.Printf("received server side event: %s\n", sevent.Event)
hashes, ok := serverEventNamesToHash.Load(sevent.Event)
// If we are not broadcasting to everyone, filter it down to just the current session that invoked the event
// TODO optimize this
if sevent.SessionId != "*" {
hashesForSession, ok2 := sessionIdToHashes.Load(sevent.SessionId)
if ok2 {
subset := make(map[KeyHash]bool)
for hash := range hashes {
if _, ok := hashesForSession[hash]; ok {
subset[hash] = true
}
}
hashes = subset
}
}
if ok {
lock.Lock()
callingHandler.Store(true)
wg := sync.WaitGroup{}
for hash := range hashes {
cb, ok := handlers.Load(hash)
if ok {
wg.Add(1)
go func(e ServerSideEvent) {
defer wg.Done()
sessionId, ok2 := hashesToSessionId.Load(hash)
if ok2 {
cb(HandlerData{
SessionId: sessionId,
Socket: manager.Get(string(sessionId)),
Manager: manager,
})
}
}(sevent)
}
}
wg.Wait()
callingHandler.Store(false)
lock.Unlock()
}
case event := <-serverSideMessageListener:
handler.OnServerSideEvent(event)
case event := <-socketMessageListener:
if event.Type == sse.DisconnectedEvent {
sessionId := state.SessionId(event.SessionId)
fmt.Printf("disconnected sessionId: %s\n", sessionId)
hashes, ok := sessionIdToHashes.Load(sessionId)
if ok {
for hash := range hashes {
hashesToSessionId.Delete(hash)
handlers.Delete(hash)
}
sessionIdToHashes.Delete(sessionId)
}
}
if event.Type == sse.MessageEvent {
switch event.Type {
case sse.DisconnectedEvent:
handler.OnSocketDisconnected(event)
case sse.MessageEvent:
handlerId := event.Payload["id"].(string)
eventName := event.Payload["event"].(string)
sessionId := state.SessionId(event.SessionId)
fmt.Printf("received eventName: %s, handlerId: %s, sessionId: %s\n", eventName, handlerId, sessionId)
if eventName == "dom-element-removed" {
handlers.Delete(handlerId)
handler.OnDomElementRemoved(handlerId)
continue
}
cb, ok := handlers.Load(handlerId)
if ok {
cb(HandlerData{
SessionId: sessionId,
Socket: manager.Get(event.SessionId),
Manager: manager,
})
} else {
handler.OnClientSideEvent(handlerId, sessionId)
}
}
}

View file

@ -0,0 +1,77 @@
package event
import (
"github.com/maddalax/htmgo/framework/h"
"github.com/puzpuzpuz/xsync/v3"
"sse-with-state/sse"
"sse-with-state/state"
"sync"
"sync/atomic"
)
type HandlerData struct {
SessionId state.SessionId
Socket *sse.SocketConnection
Manager *sse.SocketManager
}
type Handler func(data HandlerData)
type ServerSideEvent struct {
Event string
Payload map[string]any
SessionId state.SessionId
}
type KeyHash = string
var handlers = xsync.NewMapOf[KeyHash, Handler]()
var sessionIdToHashes = xsync.NewMapOf[state.SessionId, map[KeyHash]bool]()
var hashesToSessionId = xsync.NewMapOf[KeyHash, state.SessionId]()
var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]()
var socketMessageListener = make(chan sse.SocketEvent, 100)
var serverSideMessageListener = make(chan ServerSideEvent, 100)
var lock = sync.Mutex{}
var callingHandler = atomic.Bool{}
func makeId() string {
return h.GenId(30)
}
func AddServerSideHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
// If we are already in a handler, we don't want to add another handler
// this can happen if the handler renders another element that has a handler
if callingHandler.Load() {
return h.NewAttributeMap()
}
sessionId := state.GetSessionId(ctx)
hash := makeId()
handlers.LoadOrStore(hash, handler)
m, _ := serverEventNamesToHash.LoadOrCompute(event, func() map[KeyHash]bool {
return make(map[KeyHash]bool)
})
m[hash] = true
storeHashForSession(sessionId, hash)
storeSessionIdForHash(sessionId, hash)
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
}
func AddClientSideHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
hash := makeId()
handlers.LoadOrStore(hash, handler)
sessionId := state.GetSessionId(ctx)
storeHashForSession(sessionId, hash)
storeSessionIdForHash(sessionId, hash)
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
}
func storeHashForSession(sessionId state.SessionId, hash KeyHash) {
m, _ := sessionIdToHashes.LoadOrCompute(sessionId, func() map[KeyHash]bool {
return make(map[KeyHash]bool)
})
m[hash] = true
}
func storeSessionIdForHash(sessionId state.SessionId, hash KeyHash) {
hashesToSessionId.Store(hash, sessionId)
}

File diff suppressed because one or more lines are too long

View file

@ -1,6 +1,5 @@
import htmx from 'htmx.org'
import {removeAssociatedScripts} from "./htmgo";
import {addWsEventHandlers} from "./ws-event-handler";
let api : any = null;
let processed = new Set<string>()