support broadcasting events to all clients

This commit is contained in:
maddalax 2024-10-16 14:43:51 -05:00
parent 278161782c
commit 839c24ce59
2 changed files with 43 additions and 15 deletions

View file

@ -31,6 +31,7 @@ type KeyHash = string
var handlers = xsync.NewMapOf[KeyHash, Handler]() var handlers = xsync.NewMapOf[KeyHash, Handler]()
var sessionIdToHashes = xsync.NewMapOf[state.SessionId, map[KeyHash]bool]() var sessionIdToHashes = xsync.NewMapOf[state.SessionId, map[KeyHash]bool]()
var hashesToSessionId = xsync.NewMapOf[KeyHash, state.SessionId]()
var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]() var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]()
var socketMessageListener = make(chan sse.SocketEvent, 100) var socketMessageListener = make(chan sse.SocketEvent, 100)
@ -53,6 +54,7 @@ func AddServerSideHandler(ctx *h.RequestContext, event string, handler Handler)
}) })
m[hash] = true m[hash] = true
storeHashForSession(sessionId, hash) storeHashForSession(sessionId, hash)
storeSessionIdForHash(sessionId, hash)
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event) return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
} }
@ -62,6 +64,7 @@ func AddClientSideHandler(ctx *h.RequestContext, event string, handler Handler)
handlers.LoadOrStore(hash, handler) handlers.LoadOrStore(hash, handler)
sessionId := state.GetSessionId(ctx) sessionId := state.GetSessionId(ctx)
storeHashForSession(sessionId, hash) storeHashForSession(sessionId, hash)
storeSessionIdForHash(sessionId, hash)
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event) return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
} }
@ -72,6 +75,11 @@ func storeHashForSession(sessionId state.SessionId, hash KeyHash) {
m[hash] = true m[hash] = true
} }
func storeSessionIdForHash(sessionId state.SessionId, hash KeyHash) {
hashesToSessionId.Store(hash, sessionId)
}
// PushServerSideEvent sends a server side event this specific session
func PushServerSideEvent(data HandlerData, event string, value map[string]any) { func PushServerSideEvent(data HandlerData, event string, value map[string]any) {
serverSideMessageListener <- ServerSideEvent{ serverSideMessageListener <- ServerSideEvent{
Event: event, Event: event,
@ -80,6 +88,15 @@ func PushServerSideEvent(data HandlerData, event string, value map[string]any) {
} }
} }
// BroadcastServerSideEvent sends a server side event to all clients that have a handler for the event, not just the current session
func BroadcastServerSideEvent(event string, value map[string]any) {
serverSideMessageListener <- ServerSideEvent{
Event: event,
Payload: value,
SessionId: "*",
}
}
func PushElement(data HandlerData, el *h.Element) { func PushElement(data HandlerData, el *h.Element) {
data.Manager.SendHtml(data.Socket.Id, h.Render(el)) data.Manager.SendHtml(data.Socket.Id, h.Render(el))
} }
@ -102,32 +119,42 @@ func StartListener(locator *service.Locator) {
select { select {
case sevent := <-serverSideMessageListener: case sevent := <-serverSideMessageListener:
fmt.Printf("received server side event: %s\n", sevent.Event) fmt.Printf("received server side event: %s\n", sevent.Event)
// TODO optimize this
hashes, ok := serverEventNamesToHash.Load(sevent.Event) hashes, ok := serverEventNamesToHash.Load(sevent.Event)
// If we are not broadcasting to everyone, filter it down to just the current session that invoked the event
// TODO optimize this
if sevent.SessionId != "*" {
hashesForSession, ok2 := sessionIdToHashes.Load(sevent.SessionId) hashesForSession, ok2 := sessionIdToHashes.Load(sevent.SessionId)
if ok2 {
subset := make(map[KeyHash]bool) subset := make(map[KeyHash]bool)
for hash := range hashes { for hash := range hashes {
if _, ok := hashesForSession[hash]; ok { if _, ok := hashesForSession[hash]; ok {
subset[hash] = true subset[hash] = true
} }
} }
hashes = subset
}
}
if ok && ok2 { if ok {
lock.Lock() lock.Lock()
callingHandler.Store(true) callingHandler.Store(true)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for hash := range subset { for hash := range hashes {
cb, ok := handlers.Load(hash) cb, ok := handlers.Load(hash)
if ok { if ok {
wg.Add(1) wg.Add(1)
go func(e ServerSideEvent) { go func(e ServerSideEvent) {
defer wg.Done() defer wg.Done()
sessionId, ok2 := hashesToSessionId.Load(hash)
if ok2 {
cb(HandlerData{ cb(HandlerData{
SessionId: e.SessionId, SessionId: sessionId,
Socket: manager.Get(string(e.SessionId)), Socket: manager.Get(string(sessionId)),
Manager: manager, Manager: manager,
}) })
}
}(sevent) }(sevent)
} }
} }
@ -144,6 +171,7 @@ func StartListener(locator *service.Locator) {
hashes, ok := sessionIdToHashes.Load(sessionId) hashes, ok := sessionIdToHashes.Load(sessionId)
if ok { if ok {
for hash := range hashes { for hash := range hashes {
hashesToSessionId.Delete(hash)
handlers.Delete(hash) handlers.Delete(hash)
} }
sessionIdToHashes.Delete(sessionId) sessionIdToHashes.Delete(sessionId)

View file

@ -23,10 +23,10 @@ func IndexPage(ctx *h.RequestContext) *h.Page {
partials.Repeater(ctx, partials.RepeaterProps{ partials.Repeater(ctx, partials.RepeaterProps{
Id: "repeater-1", Id: "repeater-1",
OnAdd: func(data event.HandlerData) { OnAdd: func(data event.HandlerData) {
event.PushServerSideEvent(data, "increment", map[string]any{}) event.BroadcastServerSideEvent("increment", map[string]any{})
}, },
OnRemove: func(data event.HandlerData, index int) { OnRemove: func(data event.HandlerData, index int) {
event.PushServerSideEvent(data, "decrement", map[string]any{}) event.BroadcastServerSideEvent("decrement", map[string]any{})
}, },
AddButton: h.Button( AddButton: h.Button(
h.Text("+ Add Item"), h.Text("+ Add Item"),