refactor into ws extension
This commit is contained in:
parent
d3db71f889
commit
84b94e6fc8
18 changed files with 116 additions and 174 deletions
|
|
@ -3,6 +3,7 @@ module sse-with-state
|
||||||
go 1.23.0
|
go 1.23.0
|
||||||
|
|
||||||
require github.com/maddalax/htmgo/framework v0.0.0-20241006162137-150c87b4560b
|
require github.com/maddalax/htmgo/framework v0.0.0-20241006162137-150c87b4560b
|
||||||
|
require github.com/maddalax/htmgo/extensions/ws v0.0.0-20241006162137-150c87b4560b
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/go-chi/chi/v5 v5.1.0 // indirect
|
github.com/go-chi/chi/v5 v5.1.0 // indirect
|
||||||
|
|
|
||||||
|
|
@ -1,28 +1,25 @@
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
websocket "github.com/maddalax/htmgo/extensions/ws"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"github.com/maddalax/htmgo/framework/service"
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
"io/fs"
|
"io/fs"
|
||||||
"net/http"
|
"net/http"
|
||||||
"sse-with-state/__htmgo"
|
"sse-with-state/__htmgo"
|
||||||
"sse-with-state/event"
|
|
||||||
"sse-with-state/sse"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
locator := service.NewLocator()
|
locator := service.NewLocator()
|
||||||
|
|
||||||
service.Set[sse.SocketManager](locator, service.Singleton, func() *sse.SocketManager {
|
|
||||||
return sse.NewSocketManager()
|
|
||||||
})
|
|
||||||
|
|
||||||
event.StartListener(locator)
|
|
||||||
|
|
||||||
h.Start(h.AppOpts{
|
h.Start(h.AppOpts{
|
||||||
ServiceLocator: locator,
|
ServiceLocator: locator,
|
||||||
LiveReload: true,
|
LiveReload: true,
|
||||||
Register: func(app *h.App) {
|
Register: func(app *h.App) {
|
||||||
|
websocket.EnableExtension(app, websocket.WsExtensionOpts{
|
||||||
|
WsPath: "/ws",
|
||||||
|
})
|
||||||
|
|
||||||
sub, err := fs.Sub(GetStaticAssets(), "assets/dist")
|
sub, err := fs.Sub(GetStaticAssets(), "assets/dist")
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
@ -32,7 +29,6 @@ func main() {
|
||||||
http.FileServerFS(sub)
|
http.FileServerFS(sub)
|
||||||
|
|
||||||
app.Router.Handle("/public/*", http.StripPrefix("/public", http.FileServerFS(sub)))
|
app.Router.Handle("/public/*", http.StripPrefix("/public", http.FileServerFS(sub)))
|
||||||
app.Router.Handle("/ws/test", sse.HandleWs())
|
|
||||||
__htmgo.Register(app.Router)
|
__htmgo.Register(app.Router)
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -2,10 +2,10 @@ package pages
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/state"
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/ws"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"sse-with-state/event"
|
|
||||||
"sse-with-state/partials"
|
"sse-with-state/partials"
|
||||||
"sse-with-state/state"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func IndexPage(ctx *h.RequestContext) *h.Page {
|
func IndexPage(ctx *h.RequestContext) *h.Page {
|
||||||
|
|
@ -22,11 +22,11 @@ func IndexPage(ctx *h.RequestContext) *h.Page {
|
||||||
|
|
||||||
partials.Repeater(ctx, partials.RepeaterProps{
|
partials.Repeater(ctx, partials.RepeaterProps{
|
||||||
Id: "repeater-1",
|
Id: "repeater-1",
|
||||||
OnAdd: func(data event.HandlerData) {
|
OnAdd: func(data ws.HandlerData) {
|
||||||
event.BroadcastServerSideEvent("increment", map[string]any{})
|
ws.BroadcastServerSideEvent("increment", map[string]any{})
|
||||||
},
|
},
|
||||||
OnRemove: func(data event.HandlerData, index int) {
|
OnRemove: func(data ws.HandlerData, index int) {
|
||||||
event.BroadcastServerSideEvent("decrement", map[string]any{})
|
ws.BroadcastServerSideEvent("decrement", map[string]any{})
|
||||||
},
|
},
|
||||||
AddButton: h.Button(
|
AddButton: h.Button(
|
||||||
h.Text("+ Add Item"),
|
h.Text("+ Add Item"),
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,8 @@
|
||||||
package pages
|
package pages
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/state"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"sse-with-state/state"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func RootPage(ctx *h.RequestContext, children ...h.Ren) h.Ren {
|
func RootPage(ctx *h.RequestContext, children ...h.Ren) h.Ren {
|
||||||
|
|
|
||||||
|
|
@ -1,19 +0,0 @@
|
||||||
package partials
|
|
||||||
|
|
||||||
import (
|
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
|
||||||
"sse-with-state/event"
|
|
||||||
)
|
|
||||||
|
|
||||||
func OnClick(ctx *h.RequestContext, handler event.Handler) *h.AttributeMapOrdered {
|
|
||||||
return event.AddClientSideHandler(ctx, "click", handler)
|
|
||||||
}
|
|
||||||
|
|
||||||
func OnServerSideEvent(ctx *h.RequestContext, eventName string, handler event.Handler) h.Ren {
|
|
||||||
event.AddServerSideHandler(ctx, eventName, handler)
|
|
||||||
return h.Attribute("data-handler-id", "")
|
|
||||||
}
|
|
||||||
|
|
||||||
func OnMouseOver(ctx *h.RequestContext, handler event.Handler) *h.AttributeMapOrdered {
|
|
||||||
return event.AddClientSideHandler(ctx, "mouseover", handler)
|
|
||||||
}
|
|
||||||
|
|
@ -1,21 +1,11 @@
|
||||||
package partials
|
package partials
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/state"
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/ws"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"sse-with-state/event"
|
|
||||||
"sse-with-state/state"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func UseState[T any](sessionId state.SessionId, key string, initial T) (func() T, func(T)) {
|
|
||||||
var get = func() T {
|
|
||||||
return state.Get[T](sessionId, key, initial)
|
|
||||||
}
|
|
||||||
var set = func(value T) {
|
|
||||||
state.Set(sessionId, key, value)
|
|
||||||
}
|
|
||||||
return get, set
|
|
||||||
}
|
|
||||||
|
|
||||||
type Counter struct {
|
type Counter struct {
|
||||||
Count func() int
|
Count func() int
|
||||||
Increment func()
|
Increment func()
|
||||||
|
|
@ -23,7 +13,7 @@ type Counter struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func UseCounter(sessionId state.SessionId, id string) Counter {
|
func UseCounter(sessionId state.SessionId, id string) Counter {
|
||||||
get, set := UseState(sessionId, id, 0)
|
get, set := state.Use(sessionId, id, 0)
|
||||||
|
|
||||||
var increment = func() {
|
var increment = func() {
|
||||||
set(get() + 1)
|
set(get() + 1)
|
||||||
|
|
@ -68,13 +58,13 @@ func CounterForm(ctx *h.RequestContext, props CounterProps) *h.Element {
|
||||||
h.Class("bg-rose-400 hover:bg-rose-500 text-white font-bold py-2 px-4 rounded"),
|
h.Class("bg-rose-400 hover:bg-rose-500 text-white font-bold py-2 px-4 rounded"),
|
||||||
h.Type("submit"),
|
h.Type("submit"),
|
||||||
h.Text("Increment"),
|
h.Text("Increment"),
|
||||||
OnServerSideEvent(ctx, "increment", func(data event.HandlerData) {
|
ws.OnServerSideEvent(ctx, "increment", func(data ws.HandlerData) {
|
||||||
counter.Increment()
|
counter.Increment()
|
||||||
event.PushElement(data, CounterForm(ctx, props))
|
ws.PushElement(data, CounterForm(ctx, props))
|
||||||
}),
|
}),
|
||||||
OnServerSideEvent(ctx, "decrement", func(data event.HandlerData) {
|
ws.OnServerSideEvent(ctx, "decrement", func(data ws.HandlerData) {
|
||||||
counter.Decrement()
|
counter.Decrement()
|
||||||
event.PushElement(data, CounterForm(ctx, props))
|
ws.PushElement(data, CounterForm(ctx, props))
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -2,8 +2,8 @@ package partials
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/ws"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"sse-with-state/event"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type RepeaterProps struct {
|
type RepeaterProps struct {
|
||||||
|
|
@ -13,8 +13,8 @@ type RepeaterProps struct {
|
||||||
DefaultItems []*h.Element
|
DefaultItems []*h.Element
|
||||||
Id string
|
Id string
|
||||||
currentIndex int
|
currentIndex int
|
||||||
OnAdd func(data event.HandlerData)
|
OnAdd func(data ws.HandlerData)
|
||||||
OnRemove func(data event.HandlerData, index int)
|
OnRemove func(data ws.HandlerData, index int)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (props *RepeaterProps) itemId(index int) string {
|
func (props *RepeaterProps) itemId(index int) string {
|
||||||
|
|
@ -34,10 +34,10 @@ func repeaterItem(ctx *h.RequestContext, item *h.Element, index int, props *Repe
|
||||||
props.RemoveButton(index,
|
props.RemoveButton(index,
|
||||||
h.ClassIf(index == 0, "opacity-0 disabled"),
|
h.ClassIf(index == 0, "opacity-0 disabled"),
|
||||||
h.If(index == 0, h.Disabled()),
|
h.If(index == 0, h.Disabled()),
|
||||||
OnClick(ctx, func(data event.HandlerData) {
|
ws.OnClick(ctx, func(data ws.HandlerData) {
|
||||||
props.OnRemove(data, index)
|
props.OnRemove(data, index)
|
||||||
props.currentIndex--
|
props.currentIndex--
|
||||||
event.PushElement(data,
|
ws.PushElement(data,
|
||||||
h.Div(
|
h.Div(
|
||||||
h.Attribute("hx-swap-oob", fmt.Sprintf("delete:#%s", id)),
|
h.Attribute("hx-swap-oob", fmt.Sprintf("delete:#%s", id)),
|
||||||
h.Div(),
|
h.Div(),
|
||||||
|
|
@ -61,9 +61,9 @@ func Repeater(ctx *h.RequestContext, props RepeaterProps) *h.Element {
|
||||||
h.Id(props.addButtonId()),
|
h.Id(props.addButtonId()),
|
||||||
h.Class("flex justify-center"),
|
h.Class("flex justify-center"),
|
||||||
props.AddButton,
|
props.AddButton,
|
||||||
OnClick(ctx, func(data event.HandlerData) {
|
ws.OnClick(ctx, func(data ws.HandlerData) {
|
||||||
props.OnAdd(data)
|
props.OnAdd(data)
|
||||||
event.PushElement(data,
|
ws.PushElement(data,
|
||||||
h.Div(
|
h.Div(
|
||||||
h.Attribute("hx-swap-oob", "beforebegin:#"+props.addButtonId()),
|
h.Attribute("hx-swap-oob", "beforebegin:#"+props.addButtonId()),
|
||||||
repeaterItem(
|
repeaterItem(
|
||||||
|
|
|
||||||
17
extensions/websocket/go.mod
Normal file
17
extensions/websocket/go.mod
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
module github.com/maddalax/htmgo/extensions/ws
|
||||||
|
|
||||||
|
go 1.23.0
|
||||||
|
|
||||||
|
|
||||||
|
require (
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/maddalax/htmgo/framework v0.0.0-20241014151703-8503dffa4e7d
|
||||||
|
github.com/go-chi/chi/v5 v5.1.0 // indirect
|
||||||
|
github.com/gobwas/httphead v0.1.0 // indirect
|
||||||
|
github.com/gobwas/pool v0.2.1 // indirect
|
||||||
|
github.com/gobwas/ws v1.4.0 // indirect
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
|
golang.org/x/sys v0.6.0 // indirect
|
||||||
|
)
|
||||||
23
extensions/websocket/init.go
Normal file
23
extensions/websocket/init.go
Normal file
|
|
@ -0,0 +1,23 @@
|
||||||
|
package websocket
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/ws"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WsExtensionOpts struct {
|
||||||
|
WsPath string
|
||||||
|
}
|
||||||
|
|
||||||
|
func EnableExtension(app *h.App, opts WsExtensionOpts) {
|
||||||
|
if app.Opts.ServiceLocator == nil {
|
||||||
|
app.Opts.ServiceLocator = service.NewLocator()
|
||||||
|
}
|
||||||
|
service.Set[wsutil.SocketManager](app.Opts.ServiceLocator, service.Singleton, func() *wsutil.SocketManager {
|
||||||
|
return wsutil.NewSocketManager()
|
||||||
|
})
|
||||||
|
ws.StartListener(app.Opts.ServiceLocator)
|
||||||
|
app.Router.Handle(opts.WsPath, wsutil.WsHttpHandler())
|
||||||
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package sse
|
package wsutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
@ -13,7 +13,7 @@ import (
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func HandleWs() http.HandlerFunc {
|
func WsHttpHandler() http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
|
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
|
||||||
locator := cc.ServiceLocator()
|
locator := cc.ServiceLocator()
|
||||||
|
|
@ -104,95 +104,3 @@ func HandleWs() http.HandlerFunc {
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func Handle() http.HandlerFunc {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
// Set the necessary headers
|
|
||||||
w.Header().Set("Content-Type", "text/event-stream")
|
|
||||||
w.Header().Set("Cache-Control", "no-cache")
|
|
||||||
w.Header().Set("Connection", "keep-alive")
|
|
||||||
w.Header().Set("Access-Control-Allow-Origin", "*") // Optional for CORS
|
|
||||||
|
|
||||||
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
|
|
||||||
locator := cc.ServiceLocator()
|
|
||||||
manager := service.Get[SocketManager](locator)
|
|
||||||
|
|
||||||
sessionCookie, _ := r.Cookie("state")
|
|
||||||
sessionId := ""
|
|
||||||
|
|
||||||
if sessionCookie != nil {
|
|
||||||
sessionId = sessionCookie.Value
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx := r.Context()
|
|
||||||
|
|
||||||
/*
|
|
||||||
Large buffer in case the client disconnects while we are writing
|
|
||||||
we don't want to block the writer
|
|
||||||
*/
|
|
||||||
done := make(chan bool, 1000)
|
|
||||||
writer := make(WriterChan, 1000)
|
|
||||||
|
|
||||||
wg := sync.WaitGroup{}
|
|
||||||
wg.Add(1)
|
|
||||||
|
|
||||||
/*
|
|
||||||
* This goroutine is responsible for writing messages to the client
|
|
||||||
*/
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
defer manager.Disconnect(sessionId)
|
|
||||||
|
|
||||||
defer func() {
|
|
||||||
fmt.Printf("empting channels\n")
|
|
||||||
for len(writer) > 0 {
|
|
||||||
<-writer
|
|
||||||
}
|
|
||||||
for len(done) > 0 {
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
ticker := time.NewTicker(5 * time.Second)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-done:
|
|
||||||
fmt.Printf("closing connection: \n")
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
manager.Ping(sessionId)
|
|
||||||
case message := <-writer:
|
|
||||||
_, err := fmt.Fprintf(w, message)
|
|
||||||
if err != nil {
|
|
||||||
done <- true
|
|
||||||
} else {
|
|
||||||
flusher, ok := w.(http.Flusher)
|
|
||||||
if ok {
|
|
||||||
flusher.Flush()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This goroutine is responsible for adding the client to the room
|
|
||||||
*/
|
|
||||||
wg.Add(1)
|
|
||||||
go func() {
|
|
||||||
defer wg.Done()
|
|
||||||
if sessionId == "" {
|
|
||||||
manager.writeCloseRaw(writer, "no session")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
manager.Add("all", sessionId, writer, done)
|
|
||||||
}()
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package sse
|
package wsutil
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
@ -2,9 +2,9 @@ package state
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/internal"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"github.com/puzpuzpuz/xsync/v3"
|
"github.com/puzpuzpuz/xsync/v3"
|
||||||
"sse-with-state/internal"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type SessionId string
|
type SessionId string
|
||||||
|
|
@ -61,3 +61,13 @@ func Set(sessionId SessionId, key string, value any) {
|
||||||
})
|
})
|
||||||
actual.Store(key, value)
|
actual.Store(key, value)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Use[T any](sessionId SessionId, key string, initial T) (func() T, func(T)) {
|
||||||
|
var get = func() T {
|
||||||
|
return Get[T](sessionId, key, initial)
|
||||||
|
}
|
||||||
|
var set = func(value T) {
|
||||||
|
Set(sessionId, key, value)
|
||||||
|
}
|
||||||
|
return get, set
|
||||||
|
}
|
||||||
16
extensions/websocket/ws/attribute.go
Normal file
16
extensions/websocket/ws/attribute.go
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import "github.com/maddalax/htmgo/framework/h"
|
||||||
|
|
||||||
|
func OnClick(ctx *h.RequestContext, handler Handler) *h.AttributeMapOrdered {
|
||||||
|
return AddClientSideHandler(ctx, "click", handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func OnServerSideEvent(ctx *h.RequestContext, eventName string, handler Handler) h.Ren {
|
||||||
|
AddServerSideHandler(ctx, eventName, handler)
|
||||||
|
return h.Attribute("data-handler-id", "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func OnMouseOver(ctx *h.RequestContext, handler Handler) *h.AttributeMapOrdered {
|
||||||
|
return AddClientSideHandler(ctx, "mouseover", handler)
|
||||||
|
}
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package event
|
package ws
|
||||||
|
|
||||||
import "github.com/maddalax/htmgo/framework/h"
|
import "github.com/maddalax/htmgo/framework/h"
|
||||||
|
|
||||||
|
|
@ -1,17 +1,17 @@
|
||||||
package event
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sse-with-state/sse"
|
"github.com/maddalax/htmgo/extensions/ws/internal/wsutil"
|
||||||
"sse-with-state/state"
|
"github.com/maddalax/htmgo/extensions/ws/state"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type MessageHandler struct {
|
type MessageHandler struct {
|
||||||
manager *sse.SocketManager
|
manager *wsutil.SocketManager
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessageHandler(manager *sse.SocketManager) *MessageHandler {
|
func NewMessageHandler(manager *wsutil.SocketManager) *MessageHandler {
|
||||||
return &MessageHandler{manager: manager}
|
return &MessageHandler{manager: manager}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -77,7 +77,7 @@ func (h *MessageHandler) OnDomElementRemoved(handlerId string) {
|
||||||
handlers.Delete(handlerId)
|
handlers.Delete(handlerId)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *MessageHandler) OnSocketDisconnected(event sse.SocketEvent) {
|
func (h *MessageHandler) OnSocketDisconnected(event wsutil.SocketEvent) {
|
||||||
sessionId := state.SessionId(event.SessionId)
|
sessionId := state.SessionId(event.SessionId)
|
||||||
hashes, ok := sessionIdToHashes.Load(sessionId)
|
hashes, ok := sessionIdToHashes.Load(sessionId)
|
||||||
if ok {
|
if ok {
|
||||||
|
|
@ -1,15 +1,15 @@
|
||||||
package event
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/state"
|
||||||
"github.com/maddalax/htmgo/framework/service"
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
"sse-with-state/sse"
|
|
||||||
"sse-with-state/state"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
func StartListener(locator *service.Locator) {
|
func StartListener(locator *service.Locator) {
|
||||||
manager := service.Get[sse.SocketManager](locator)
|
manager := service.Get[wsutil.SocketManager](locator)
|
||||||
manager.Listen(socketMessageListener)
|
manager.Listen(socketMessageListener)
|
||||||
handler := NewMessageHandler(manager)
|
handler := NewMessageHandler(manager)
|
||||||
|
|
||||||
|
|
@ -29,9 +29,9 @@ func StartListener(locator *service.Locator) {
|
||||||
handler.OnServerSideEvent(event)
|
handler.OnServerSideEvent(event)
|
||||||
case event := <-socketMessageListener:
|
case event := <-socketMessageListener:
|
||||||
switch event.Type {
|
switch event.Type {
|
||||||
case sse.DisconnectedEvent:
|
case wsutil.DisconnectedEvent:
|
||||||
handler.OnSocketDisconnected(event)
|
handler.OnSocketDisconnected(event)
|
||||||
case sse.MessageEvent:
|
case wsutil.MessageEvent:
|
||||||
handlerId := event.Payload["id"].(string)
|
handlerId := event.Payload["id"].(string)
|
||||||
eventName := event.Payload["event"].(string)
|
eventName := event.Payload["event"].(string)
|
||||||
sessionId := state.SessionId(event.SessionId)
|
sessionId := state.SessionId(event.SessionId)
|
||||||
|
|
@ -1,18 +1,18 @@
|
||||||
package event
|
package ws
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/extensions/ws/state"
|
||||||
"github.com/maddalax/htmgo/framework/h"
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
"github.com/puzpuzpuz/xsync/v3"
|
"github.com/puzpuzpuz/xsync/v3"
|
||||||
"sse-with-state/sse"
|
|
||||||
"sse-with-state/state"
|
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
)
|
)
|
||||||
|
|
||||||
type HandlerData struct {
|
type HandlerData struct {
|
||||||
SessionId state.SessionId
|
SessionId state.SessionId
|
||||||
Socket *sse.SocketConnection
|
Socket *wsutil.SocketConnection
|
||||||
Manager *sse.SocketManager
|
Manager *wsutil.SocketManager
|
||||||
}
|
}
|
||||||
|
|
||||||
type Handler func(data HandlerData)
|
type Handler func(data HandlerData)
|
||||||
|
|
@ -29,7 +29,7 @@ var sessionIdToHashes = xsync.NewMapOf[state.SessionId, map[KeyHash]bool]()
|
||||||
var hashesToSessionId = xsync.NewMapOf[KeyHash, state.SessionId]()
|
var hashesToSessionId = xsync.NewMapOf[KeyHash, state.SessionId]()
|
||||||
var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]()
|
var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]()
|
||||||
|
|
||||||
var socketMessageListener = make(chan sse.SocketEvent, 100)
|
var socketMessageListener = make(chan wsutil.SocketEvent, 100)
|
||||||
var serverSideMessageListener = make(chan ServerSideEvent, 100)
|
var serverSideMessageListener = make(chan ServerSideEvent, 100)
|
||||||
var lock = sync.Mutex{}
|
var lock = sync.Mutex{}
|
||||||
var callingHandler = atomic.Bool{}
|
var callingHandler = atomic.Bool{}
|
||||||
Loading…
Reference in a new issue