From 839c24ce59e0d36a1b8fec9aa37b5baa394610c9 Mon Sep 17 00:00:00 2001 From: maddalax Date: Wed, 16 Oct 2024 14:43:51 -0500 Subject: [PATCH] support broadcasting events to all clients --- examples/sse-with-state/event/listener.go | 54 +++++++++++++++++------ examples/sse-with-state/pages/index.go | 4 +- 2 files changed, 43 insertions(+), 15 deletions(-) diff --git a/examples/sse-with-state/event/listener.go b/examples/sse-with-state/event/listener.go index bbf84eb..f1adc3f 100644 --- a/examples/sse-with-state/event/listener.go +++ b/examples/sse-with-state/event/listener.go @@ -31,6 +31,7 @@ type KeyHash = string var handlers = xsync.NewMapOf[KeyHash, Handler]() var sessionIdToHashes = xsync.NewMapOf[state.SessionId, map[KeyHash]bool]() +var hashesToSessionId = xsync.NewMapOf[KeyHash, state.SessionId]() var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]() var socketMessageListener = make(chan sse.SocketEvent, 100) @@ -53,6 +54,7 @@ func AddServerSideHandler(ctx *h.RequestContext, event string, handler Handler) }) m[hash] = true storeHashForSession(sessionId, hash) + storeSessionIdForHash(sessionId, hash) return h.AttributePairs("data-handler-id", hash, "data-handler-event", event) } @@ -62,6 +64,7 @@ func AddClientSideHandler(ctx *h.RequestContext, event string, handler Handler) handlers.LoadOrStore(hash, handler) sessionId := state.GetSessionId(ctx) storeHashForSession(sessionId, hash) + storeSessionIdForHash(sessionId, hash) return h.AttributePairs("data-handler-id", hash, "data-handler-event", event) } @@ -72,6 +75,11 @@ func storeHashForSession(sessionId state.SessionId, hash KeyHash) { m[hash] = true } +func storeSessionIdForHash(sessionId state.SessionId, hash KeyHash) { + hashesToSessionId.Store(hash, sessionId) +} + +// PushServerSideEvent sends a server side event this specific session func PushServerSideEvent(data HandlerData, event string, value map[string]any) { serverSideMessageListener <- ServerSideEvent{ Event: event, @@ -80,6 +88,15 @@ func PushServerSideEvent(data HandlerData, event string, value map[string]any) { } } +// BroadcastServerSideEvent sends a server side event to all clients that have a handler for the event, not just the current session +func BroadcastServerSideEvent(event string, value map[string]any) { + serverSideMessageListener <- ServerSideEvent{ + Event: event, + Payload: value, + SessionId: "*", + } +} + func PushElement(data HandlerData, el *h.Element) { data.Manager.SendHtml(data.Socket.Id, h.Render(el)) } @@ -102,32 +119,42 @@ func StartListener(locator *service.Locator) { select { case sevent := <-serverSideMessageListener: fmt.Printf("received server side event: %s\n", sevent.Event) - // TODO optimize this hashes, ok := serverEventNamesToHash.Load(sevent.Event) - hashesForSession, ok2 := sessionIdToHashes.Load(sevent.SessionId) - subset := make(map[KeyHash]bool) - for hash := range hashes { - if _, ok := hashesForSession[hash]; ok { - subset[hash] = true + // If we are not broadcasting to everyone, filter it down to just the current session that invoked the event + // TODO optimize this + if sevent.SessionId != "*" { + hashesForSession, ok2 := sessionIdToHashes.Load(sevent.SessionId) + + if ok2 { + subset := make(map[KeyHash]bool) + for hash := range hashes { + if _, ok := hashesForSession[hash]; ok { + subset[hash] = true + } + } + hashes = subset } } - if ok && ok2 { + if ok { lock.Lock() callingHandler.Store(true) wg := sync.WaitGroup{} - for hash := range subset { + for hash := range hashes { cb, ok := handlers.Load(hash) if ok { wg.Add(1) go func(e ServerSideEvent) { defer wg.Done() - cb(HandlerData{ - SessionId: e.SessionId, - Socket: manager.Get(string(e.SessionId)), - Manager: manager, - }) + sessionId, ok2 := hashesToSessionId.Load(hash) + if ok2 { + cb(HandlerData{ + SessionId: sessionId, + Socket: manager.Get(string(sessionId)), + Manager: manager, + }) + } }(sevent) } } @@ -144,6 +171,7 @@ func StartListener(locator *service.Locator) { hashes, ok := sessionIdToHashes.Load(sessionId) if ok { for hash := range hashes { + hashesToSessionId.Delete(hash) handlers.Delete(hash) } sessionIdToHashes.Delete(sessionId) diff --git a/examples/sse-with-state/pages/index.go b/examples/sse-with-state/pages/index.go index 552aa2a..4dfe81a 100644 --- a/examples/sse-with-state/pages/index.go +++ b/examples/sse-with-state/pages/index.go @@ -23,10 +23,10 @@ func IndexPage(ctx *h.RequestContext) *h.Page { partials.Repeater(ctx, partials.RepeaterProps{ Id: "repeater-1", OnAdd: func(data event.HandlerData) { - event.PushServerSideEvent(data, "increment", map[string]any{}) + event.BroadcastServerSideEvent("increment", map[string]any{}) }, OnRemove: func(data event.HandlerData, index int) { - event.PushServerSideEvent(data, "decrement", map[string]any{}) + event.BroadcastServerSideEvent("decrement", map[string]any{}) }, AddButton: h.Button( h.Text("+ Add Item"),