Websocket Extension - Alpha (#22)
* wip * merge * working again * refactor/make it a bit cleaner * fix to only call cb for session id who initiated the event * support broadcasting events to all clients * refactor * refactor into ws extension * add go mod * rename module * fix naming * refactor * rename * merge * fix manager ws delete, add manager tests * add metric page * fixes, add k6 script * fixes, add k6 script * deploy docker image * cleanup * cleanup * cleanup
This commit is contained in:
parent
c06d728ffb
commit
cda6a4eaec
44 changed files with 2029 additions and 6 deletions
48
.github/workflows/release-ws-test.yml
vendored
Normal file
48
.github/workflows/release-ws-test.yml
vendored
Normal file
|
|
@ -0,0 +1,48 @@
|
||||||
|
name: Build and Deploy ws-test
|
||||||
|
|
||||||
|
on:
|
||||||
|
workflow_run:
|
||||||
|
workflows: [ "Update HTMGO Framework Dependency" ] # The name of the first workflow
|
||||||
|
types:
|
||||||
|
- completed
|
||||||
|
workflow_dispatch:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- ws-testing
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build-and-push:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
steps:
|
||||||
|
- name: Checkout code
|
||||||
|
uses: actions/checkout@v3
|
||||||
|
|
||||||
|
- name: Set up Docker Buildx
|
||||||
|
uses: docker/setup-buildx-action@v2
|
||||||
|
|
||||||
|
- name: Log in to GitHub Container Registry
|
||||||
|
uses: docker/login-action@v2
|
||||||
|
with:
|
||||||
|
registry: ghcr.io
|
||||||
|
username: ${{ github.actor }}
|
||||||
|
password: ${{ secrets.GITHUB_TOKEN }}
|
||||||
|
|
||||||
|
- name: Get short commit hash
|
||||||
|
id: vars
|
||||||
|
run: echo "::set-output name=short_sha::$(echo $GITHUB_SHA | cut -c1-7)"
|
||||||
|
|
||||||
|
- name: Build Docker image
|
||||||
|
run: |
|
||||||
|
cd ./examples/ws-example && docker build -t ghcr.io/${{ github.repository_owner }}/ws-example:${{ steps.vars.outputs.short_sha }} .
|
||||||
|
|
||||||
|
- name: Tag as latest Docker image
|
||||||
|
run: |
|
||||||
|
docker tag ghcr.io/${{ github.repository_owner }}/ws-example:${{ steps.vars.outputs.short_sha }} ghcr.io/${{ github.repository_owner }}/ws-example:latest
|
||||||
|
|
||||||
|
- name: Log in to GitHub Container Registry
|
||||||
|
run: echo "${{ secrets.CR_PAT }}" | docker login ghcr.io -u ${{ github.actor }} --password-stdin
|
||||||
|
|
||||||
|
- name: Push Docker image
|
||||||
|
run: |
|
||||||
|
docker push ghcr.io/${{ github.repository_owner }}/ws-example:latest
|
||||||
|
|
@ -50,7 +50,6 @@ func Handle() http.HandlerFunc {
|
||||||
defer manager.Disconnect(sessionId)
|
defer manager.Disconnect(sessionId)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
fmt.Printf("empting channels\n")
|
|
||||||
for len(writer) > 0 {
|
for len(writer) > 0 {
|
||||||
<-writer
|
<-writer
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -70,16 +70,14 @@ func (manager *SocketManager) Listen(listener chan SocketEvent) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *SocketManager) dispatch(event SocketEvent) {
|
func (manager *SocketManager) dispatch(event SocketEvent) {
|
||||||
fmt.Printf("dispatching event: %s\n", event.Type)
|
|
||||||
done := make(chan struct{}, 1)
|
done := make(chan struct{}, 1)
|
||||||
go func() {
|
go func() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-done:
|
case <-done:
|
||||||
fmt.Printf("dispatched event: %s\n", event.Type)
|
|
||||||
return
|
return
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
fmt.Printf("havent dispatched event after 5s, chan blocked: %s\n", event.Type)
|
fmt.Printf("havent dispatched listener event after 5s, chan blocked: %s\n", event.Type)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
|
||||||
11
examples/ws-example/.dockerignore
Normal file
11
examples/ws-example/.dockerignore
Normal file
|
|
@ -0,0 +1,11 @@
|
||||||
|
# Project exclude paths
|
||||||
|
/tmp/
|
||||||
|
node_modules/
|
||||||
|
dist/
|
||||||
|
js/dist
|
||||||
|
js/node_modules
|
||||||
|
go.work
|
||||||
|
go.work.sum
|
||||||
|
.idea
|
||||||
|
!framework/assets/dist
|
||||||
|
__htmgo
|
||||||
6
examples/ws-example/.gitignore
vendored
Normal file
6
examples/ws-example/.gitignore
vendored
Normal file
|
|
@ -0,0 +1,6 @@
|
||||||
|
/assets/dist
|
||||||
|
tmp
|
||||||
|
node_modules
|
||||||
|
.idea
|
||||||
|
__htmgo
|
||||||
|
dist
|
||||||
38
examples/ws-example/Dockerfile
Normal file
38
examples/ws-example/Dockerfile
Normal file
|
|
@ -0,0 +1,38 @@
|
||||||
|
# Stage 1: Build the Go binary
|
||||||
|
FROM golang:1.23-alpine AS builder
|
||||||
|
|
||||||
|
RUN apk update
|
||||||
|
RUN apk add git
|
||||||
|
RUN apk add curl
|
||||||
|
|
||||||
|
# Set the working directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy go.mod and go.sum files
|
||||||
|
COPY go.mod go.sum ./
|
||||||
|
|
||||||
|
# Download and cache the Go modules
|
||||||
|
RUN go mod download
|
||||||
|
|
||||||
|
# Copy the source code into the container
|
||||||
|
COPY . .
|
||||||
|
|
||||||
|
# Build the Go binary for Linux
|
||||||
|
RUN GOPRIVATE=github.com/maddalax GOPROXY=direct go run github.com/maddalax/htmgo/cli/htmgo@latest build
|
||||||
|
|
||||||
|
|
||||||
|
# Stage 2: Create the smallest possible image
|
||||||
|
FROM gcr.io/distroless/base-debian11
|
||||||
|
|
||||||
|
# Set the working directory inside the container
|
||||||
|
WORKDIR /app
|
||||||
|
|
||||||
|
# Copy the Go binary from the builder stage
|
||||||
|
COPY --from=builder /app/dist .
|
||||||
|
|
||||||
|
# Expose the necessary port (replace with your server port)
|
||||||
|
EXPOSE 3000
|
||||||
|
|
||||||
|
|
||||||
|
# Command to run the binary
|
||||||
|
CMD ["./ws-example"]
|
||||||
20
examples/ws-example/Taskfile.yml
Normal file
20
examples/ws-example/Taskfile.yml
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
version: '3'
|
||||||
|
|
||||||
|
tasks:
|
||||||
|
run:
|
||||||
|
cmds:
|
||||||
|
- go run github.com/maddalax/htmgo/cli/htmgo@latest run
|
||||||
|
silent: true
|
||||||
|
|
||||||
|
build:
|
||||||
|
cmds:
|
||||||
|
- go run github.com/maddalax/htmgo/cli/htmgo@latest build
|
||||||
|
|
||||||
|
docker:
|
||||||
|
cmds:
|
||||||
|
- docker build .
|
||||||
|
|
||||||
|
watch:
|
||||||
|
cmds:
|
||||||
|
- go run github.com/maddalax/htmgo/cli/htmgo@latest watch
|
||||||
|
silent: true
|
||||||
13
examples/ws-example/assets.go
Normal file
13
examples/ws-example/assets.go
Normal file
|
|
@ -0,0 +1,13 @@
|
||||||
|
//go:build !prod
|
||||||
|
// +build !prod
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/fs"
|
||||||
|
"ws-example/internal/embedded"
|
||||||
|
)
|
||||||
|
|
||||||
|
func GetStaticAssets() fs.FS {
|
||||||
|
return embedded.NewOsFs()
|
||||||
|
}
|
||||||
3
examples/ws-example/assets/css/input.css
Normal file
3
examples/ws-example/assets/css/input.css
Normal file
|
|
@ -0,0 +1,3 @@
|
||||||
|
@tailwind base;
|
||||||
|
@tailwind components;
|
||||||
|
@tailwind utilities;
|
||||||
16
examples/ws-example/assets_prod.go
Normal file
16
examples/ws-example/assets_prod.go
Normal file
|
|
@ -0,0 +1,16 @@
|
||||||
|
//go:build prod
|
||||||
|
// +build prod
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"embed"
|
||||||
|
"io/fs"
|
||||||
|
)
|
||||||
|
|
||||||
|
//go:embed assets/dist/*
|
||||||
|
var staticAssets embed.FS
|
||||||
|
|
||||||
|
func GetStaticAssets() fs.FS {
|
||||||
|
return staticAssets
|
||||||
|
}
|
||||||
18
examples/ws-example/go.mod
Normal file
18
examples/ws-example/go.mod
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
module ws-example
|
||||||
|
|
||||||
|
go 1.23.0
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/go-chi/chi/v5 v5.1.0
|
||||||
|
github.com/maddalax/htmgo/extensions/websocket v0.0.0-20241104193946-1ddeceaa8286
|
||||||
|
github.com/maddalax/htmgo/framework v1.0.3-0.20241104193946-1ddeceaa8286
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/gobwas/httphead v0.1.0 // indirect
|
||||||
|
github.com/gobwas/pool v0.2.1 // indirect
|
||||||
|
github.com/gobwas/ws v1.4.0 // indirect
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
|
github.com/puzpuzpuz/xsync/v3 v3.4.0 // indirect
|
||||||
|
golang.org/x/sys v0.6.0 // indirect
|
||||||
|
)
|
||||||
28
examples/ws-example/go.sum
Normal file
28
examples/ws-example/go.sum
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
|
||||||
|
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
|
||||||
|
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
|
||||||
|
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
|
||||||
|
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
|
||||||
|
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
|
||||||
|
github.com/gobwas/ws v1.4.0 h1:CTaoG1tojrh4ucGPcoJFiAQUAsEWekEWvLy7GsVNqGs=
|
||||||
|
github.com/gobwas/ws v1.4.0/go.mod h1:G3gNqMNtPppf5XUz7O4shetPpcZ1VJ7zt18dlUeakrc=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/maddalax/htmgo/extensions/websocket v0.0.0-20241104193946-1ddeceaa8286 h1:5Z848JJUQ3OACuKWOX8XbzFb9krKwaiJb7inYSKCRKY=
|
||||||
|
github.com/maddalax/htmgo/extensions/websocket v0.0.0-20241104193946-1ddeceaa8286/go.mod h1:r6/VqntLp7VlAUpIXy3MWZMHs2EkPKJP5rJdDL8lFP4=
|
||||||
|
github.com/maddalax/htmgo/framework v1.0.3-0.20241104193946-1ddeceaa8286 h1:Z7L0W9OZyjrICsnCoLu/GVM33cj4YP7GHlj6/fHPplw=
|
||||||
|
github.com/maddalax/htmgo/framework v1.0.3-0.20241104193946-1ddeceaa8286/go.mod h1:NGGzWVXWksrQJ9kV9SGa/A1F1Bjsgc08cN7ZVb98RqY=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
|
||||||
|
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
|
||||||
|
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||||
|
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
|
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
|
||||||
|
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
|
||||||
|
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
|
||||||
|
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
17
examples/ws-example/internal/embedded/os.go
Normal file
17
examples/ws-example/internal/embedded/os.go
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
package embedded
|
||||||
|
|
||||||
|
import (
|
||||||
|
"io/fs"
|
||||||
|
"os"
|
||||||
|
)
|
||||||
|
|
||||||
|
type OsFs struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (receiver OsFs) Open(name string) (fs.File, error) {
|
||||||
|
return os.Open(name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOsFs() OsFs {
|
||||||
|
return OsFs{}
|
||||||
|
}
|
||||||
0
examples/ws-example/k6.js
Normal file
0
examples/ws-example/k6.js
Normal file
48
examples/ws-example/main.go
Normal file
48
examples/ws-example/main.go
Normal file
|
|
@ -0,0 +1,48 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket"
|
||||||
|
ws2 "github.com/maddalax/htmgo/extensions/websocket/opts"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
"io/fs"
|
||||||
|
"net/http"
|
||||||
|
"ws-example/__htmgo"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
locator := service.NewLocator()
|
||||||
|
|
||||||
|
h.Start(h.AppOpts{
|
||||||
|
ServiceLocator: locator,
|
||||||
|
LiveReload: true,
|
||||||
|
Register: func(app *h.App) {
|
||||||
|
|
||||||
|
app.Use(func(ctx *h.RequestContext) {
|
||||||
|
session.CreateSession(ctx)
|
||||||
|
})
|
||||||
|
|
||||||
|
websocket.EnableExtension(app, ws2.ExtensionOpts{
|
||||||
|
WsPath: "/ws",
|
||||||
|
RoomName: func(ctx *h.RequestContext) string {
|
||||||
|
return "all"
|
||||||
|
},
|
||||||
|
SessionId: func(ctx *h.RequestContext) string {
|
||||||
|
return ctx.QueryParam("sessionId")
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
sub, err := fs.Sub(GetStaticAssets(), "assets/dist")
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
http.FileServerFS(sub)
|
||||||
|
|
||||||
|
app.Router.Handle("/public/*", http.StripPrefix("/public", http.FileServerFS(sub)))
|
||||||
|
__htmgo.Register(app.Router)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
57
examples/ws-example/pages/index.go
Normal file
57
examples/ws-example/pages/index.go
Normal file
|
|
@ -0,0 +1,57 @@
|
||||||
|
package pages
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/ws"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"ws-example/partials"
|
||||||
|
)
|
||||||
|
|
||||||
|
func IndexPage(ctx *h.RequestContext) *h.Page {
|
||||||
|
sessionId := session.GetSessionId(ctx)
|
||||||
|
|
||||||
|
return h.NewPage(
|
||||||
|
RootPage(
|
||||||
|
ctx,
|
||||||
|
h.Div(
|
||||||
|
h.Attribute("ws-connect", fmt.Sprintf("/ws?sessionId=%s", sessionId)),
|
||||||
|
h.Class("flex flex-col gap-4 items-center pt-24 min-h-screen bg-neutral-100"),
|
||||||
|
h.H3(
|
||||||
|
h.Id("intro-text"),
|
||||||
|
h.Text("Repeater Example"),
|
||||||
|
h.Class("text-2xl"),
|
||||||
|
),
|
||||||
|
h.Div(
|
||||||
|
h.Id("ws-metrics"),
|
||||||
|
),
|
||||||
|
partials.CounterForm(ctx, partials.CounterProps{Id: "counter-1"}),
|
||||||
|
partials.Repeater(ctx, partials.RepeaterProps{
|
||||||
|
Id: "repeater-1",
|
||||||
|
OnAdd: func(data ws.HandlerData) {
|
||||||
|
//ws.BroadcastServerSideEvent("increment", map[string]any{})
|
||||||
|
},
|
||||||
|
OnRemove: func(data ws.HandlerData, index int) {
|
||||||
|
//ws.BroadcastServerSideEvent("decrement", map[string]any{})
|
||||||
|
},
|
||||||
|
AddButton: h.Button(
|
||||||
|
h.Text("+ Add Item"),
|
||||||
|
),
|
||||||
|
RemoveButton: func(index int, children ...h.Ren) *h.Element {
|
||||||
|
return h.Button(
|
||||||
|
h.Text("Remove"),
|
||||||
|
h.Children(children...),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
Item: func(index int) *h.Element {
|
||||||
|
return h.Input(
|
||||||
|
"text",
|
||||||
|
h.Class("border border-gray-300 rounded p-2"),
|
||||||
|
h.Value(fmt.Sprintf("item %d", index)),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
26
examples/ws-example/pages/root.go
Normal file
26
examples/ws-example/pages/root.go
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
package pages
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RootPage(ctx *h.RequestContext, children ...h.Ren) h.Ren {
|
||||||
|
return h.Html(
|
||||||
|
h.JoinExtensions(
|
||||||
|
h.HxExtension(
|
||||||
|
h.BaseExtensions(),
|
||||||
|
),
|
||||||
|
h.HxExtension("ws"),
|
||||||
|
),
|
||||||
|
h.Head(
|
||||||
|
h.Link("/public/main.css", "stylesheet"),
|
||||||
|
h.Script("/public/htmgo.js"),
|
||||||
|
),
|
||||||
|
h.Body(
|
||||||
|
h.Div(
|
||||||
|
h.Class("flex flex-col gap-2 bg-white h-full"),
|
||||||
|
h.Fragment(children...),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
129
examples/ws-example/pages/ws/metrics.go
Normal file
129
examples/ws-example/pages/ws/metrics.go
Normal file
|
|
@ -0,0 +1,129 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/ws"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"runtime"
|
||||||
|
"time"
|
||||||
|
"ws-example/pages"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Metrics(ctx *h.RequestContext) *h.Page {
|
||||||
|
|
||||||
|
ws.RunOnConnected(ctx, func() {
|
||||||
|
ws.PushElementCtx(
|
||||||
|
ctx,
|
||||||
|
metricsView(ctx),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
ws.Every(ctx, time.Second, func() bool {
|
||||||
|
return ws.PushElementCtx(
|
||||||
|
ctx,
|
||||||
|
metricsView(ctx),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
return h.NewPage(
|
||||||
|
pages.RootPage(
|
||||||
|
ctx,
|
||||||
|
h.Div(
|
||||||
|
h.Attribute("ws-connect", fmt.Sprintf("/ws?sessionId=%s", session.GetSessionId(ctx))),
|
||||||
|
h.Class("flex flex-col gap-4 items-center min-h-screen max-w-2xl mx-auto mt-8"),
|
||||||
|
h.H3(
|
||||||
|
h.Id("intro-text"),
|
||||||
|
h.Text("Websocket Metrics"),
|
||||||
|
h.Class("text-2xl"),
|
||||||
|
),
|
||||||
|
h.Div(
|
||||||
|
h.Id("ws-metrics"),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func metricsView(ctx *h.RequestContext) *h.Element {
|
||||||
|
metrics := ws.MetricsFromCtx(ctx)
|
||||||
|
|
||||||
|
return h.Div(
|
||||||
|
h.Id("ws-metrics"),
|
||||||
|
List(metrics),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func List(metrics ws.Metrics) *h.Element {
|
||||||
|
return h.Body(
|
||||||
|
h.Div(
|
||||||
|
h.Class("flow-root rounded-lg border border-gray-100 py-3 shadow-sm"),
|
||||||
|
h.Dl(
|
||||||
|
h.Class("-my-3 divide-y divide-gray-100 text-sm"),
|
||||||
|
ListItem("Current Time", time.Now().Format("15:04:05")),
|
||||||
|
ListItem("Seconds Elapsed", fmt.Sprintf("%d", metrics.Manager.SecondsElapsed)),
|
||||||
|
ListItem("Total Messages", fmt.Sprintf("%d", metrics.Manager.TotalMessages)),
|
||||||
|
ListItem("Messages Per Second", fmt.Sprintf("%d", metrics.Manager.MessagesPerSecond)),
|
||||||
|
ListItem("Total Goroutines For ws.Every", fmt.Sprintf("%d", metrics.Manager.RunningGoroutines)),
|
||||||
|
ListItem("Total Goroutines In System", fmt.Sprintf("%d", runtime.NumGoroutine())),
|
||||||
|
ListItem("Sockets", fmt.Sprintf("%d", metrics.Manager.TotalSockets)),
|
||||||
|
ListItem("Rooms", fmt.Sprintf("%d", metrics.Manager.TotalRooms)),
|
||||||
|
ListItem("Session Id To Hashes", fmt.Sprintf("%d", metrics.Handler.SessionIdToHashesCount)),
|
||||||
|
ListItem("Total Handlers", fmt.Sprintf("%d", metrics.Handler.TotalHandlers)),
|
||||||
|
ListItem("Server Event Names To Hash", fmt.Sprintf("%d", metrics.Handler.ServerEventNamesToHashCount)),
|
||||||
|
ListItem("Total Listeners", fmt.Sprintf("%d", metrics.Manager.TotalListeners)),
|
||||||
|
h.IterMap(metrics.Manager.SocketsPerRoom, func(key string, value []string) *h.Element {
|
||||||
|
return ListBlock(
|
||||||
|
fmt.Sprintf("Sockets In Room - %s", key),
|
||||||
|
h.IfElse(
|
||||||
|
len(value) > 100,
|
||||||
|
h.Div(
|
||||||
|
h.Pf("%d total sockets", len(value)),
|
||||||
|
),
|
||||||
|
h.Div(
|
||||||
|
h.List(value, func(item string, index int) *h.Element {
|
||||||
|
return h.Div(
|
||||||
|
h.Pf("%s", item),
|
||||||
|
)
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ListItem(term, description string) *h.Element {
|
||||||
|
return h.Div(
|
||||||
|
h.Class("grid grid-cols-1 gap-1 p-3 even:bg-gray-50 sm:grid-cols-3 sm:gap-4"),
|
||||||
|
DescriptionTerm(term),
|
||||||
|
DescriptionDetail(description),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ListBlock(title string, children *h.Element) *h.Element {
|
||||||
|
return h.Div(
|
||||||
|
h.Class("grid grid-cols-1 gap-1 p-3 even:bg-gray-50 sm:grid-cols-3 sm:gap-4"),
|
||||||
|
DescriptionTerm(title),
|
||||||
|
h.Dd(
|
||||||
|
h.Class("text-gray-700 sm:col-span-2"),
|
||||||
|
children,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func DescriptionTerm(term string) *h.Element {
|
||||||
|
return h.Dt(
|
||||||
|
h.Class("font-medium text-gray-900"),
|
||||||
|
h.Text(term),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func DescriptionDetail(detail string) *h.Element {
|
||||||
|
return h.Dd(
|
||||||
|
h.Class("text-gray-700 sm:col-span-2"),
|
||||||
|
h.Text(detail),
|
||||||
|
)
|
||||||
|
}
|
||||||
72
examples/ws-example/partials/index.go
Normal file
72
examples/ws-example/partials/index.go
Normal file
|
|
@ -0,0 +1,72 @@
|
||||||
|
package partials
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/ws"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Counter struct {
|
||||||
|
Count func() int
|
||||||
|
Increment func()
|
||||||
|
Decrement func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func UseCounter(ctx *h.RequestContext, id string) Counter {
|
||||||
|
sessionId := session.GetSessionId(ctx)
|
||||||
|
get, set := session.UseState(sessionId, id, 0)
|
||||||
|
|
||||||
|
var increment = func() {
|
||||||
|
set(get() + 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
var decrement = func() {
|
||||||
|
set(get() - 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
return Counter{
|
||||||
|
Count: get,
|
||||||
|
Increment: increment,
|
||||||
|
Decrement: decrement,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type CounterProps struct {
|
||||||
|
Id string
|
||||||
|
}
|
||||||
|
|
||||||
|
func CounterForm(ctx *h.RequestContext, props CounterProps) *h.Element {
|
||||||
|
if props.Id == "" {
|
||||||
|
props.Id = h.GenId(6)
|
||||||
|
}
|
||||||
|
counter := UseCounter(ctx, props.Id)
|
||||||
|
|
||||||
|
return h.Div(
|
||||||
|
h.Attribute("hx-swap", "none"),
|
||||||
|
h.Class("flex flex-col gap-3 items-center"),
|
||||||
|
h.Id(props.Id),
|
||||||
|
h.P(
|
||||||
|
h.Id("counter-text-"+props.Id),
|
||||||
|
h.AttributePairs(
|
||||||
|
"id", "counter",
|
||||||
|
"class", "text-xl",
|
||||||
|
"name", "count",
|
||||||
|
"text", "count",
|
||||||
|
),
|
||||||
|
h.TextF("Count: %d", counter.Count()),
|
||||||
|
),
|
||||||
|
h.Button(
|
||||||
|
h.Class("bg-rose-400 hover:bg-rose-500 text-white font-bold py-2 px-4 rounded"),
|
||||||
|
h.Type("submit"),
|
||||||
|
h.Text("Increment"),
|
||||||
|
ws.OnServerEvent(ctx, "increment", func(data ws.HandlerData) {
|
||||||
|
counter.Increment()
|
||||||
|
ws.PushElement(data, CounterForm(ctx, props))
|
||||||
|
}),
|
||||||
|
ws.OnServerEvent(ctx, "decrement", func(data ws.HandlerData) {
|
||||||
|
counter.Decrement()
|
||||||
|
ws.PushElement(data, CounterForm(ctx, props))
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
84
examples/ws-example/partials/repeater.go
Normal file
84
examples/ws-example/partials/repeater.go
Normal file
|
|
@ -0,0 +1,84 @@
|
||||||
|
package partials
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/ws"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RepeaterProps struct {
|
||||||
|
Item func(index int) *h.Element
|
||||||
|
RemoveButton func(index int, children ...h.Ren) *h.Element
|
||||||
|
AddButton *h.Element
|
||||||
|
DefaultItems []*h.Element
|
||||||
|
Id string
|
||||||
|
currentIndex int
|
||||||
|
OnAdd func(data ws.HandlerData)
|
||||||
|
OnRemove func(data ws.HandlerData, index int)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (props *RepeaterProps) itemId(index int) string {
|
||||||
|
return fmt.Sprintf("%s-repeater-item-%d", props.Id, index)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (props *RepeaterProps) addButtonId() string {
|
||||||
|
return fmt.Sprintf("%s-repeater-add-button", props.Id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func repeaterItem(ctx *h.RequestContext, item *h.Element, index int, props *RepeaterProps) *h.Element {
|
||||||
|
id := props.itemId(index)
|
||||||
|
return h.Div(
|
||||||
|
h.Class("flex gap-2 items-center"),
|
||||||
|
h.Id(id),
|
||||||
|
item,
|
||||||
|
props.RemoveButton(
|
||||||
|
index,
|
||||||
|
h.ClassIf(index == 0, "opacity-0 disabled"),
|
||||||
|
h.If(
|
||||||
|
index == 0,
|
||||||
|
h.Disabled(),
|
||||||
|
),
|
||||||
|
ws.OnClick(ctx, func(data ws.HandlerData) {
|
||||||
|
props.OnRemove(data, index)
|
||||||
|
props.currentIndex--
|
||||||
|
ws.PushElement(
|
||||||
|
data,
|
||||||
|
h.Div(
|
||||||
|
h.Attribute("hx-swap-oob", fmt.Sprintf("delete:#%s", id)),
|
||||||
|
h.Div(),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Repeater(ctx *h.RequestContext, props RepeaterProps) *h.Element {
|
||||||
|
if props.Id == "" {
|
||||||
|
props.Id = h.GenId(6)
|
||||||
|
}
|
||||||
|
return h.Div(
|
||||||
|
h.Class("flex flex-col gap-2"),
|
||||||
|
h.List(props.DefaultItems, func(item *h.Element, index int) *h.Element {
|
||||||
|
return repeaterItem(ctx, item, index, &props)
|
||||||
|
}),
|
||||||
|
h.Div(
|
||||||
|
h.Id(props.addButtonId()),
|
||||||
|
h.Class("flex justify-center"),
|
||||||
|
props.AddButton,
|
||||||
|
ws.OnClick(ctx, func(data ws.HandlerData) {
|
||||||
|
props.OnAdd(data)
|
||||||
|
ws.PushElement(
|
||||||
|
data,
|
||||||
|
h.Div(
|
||||||
|
h.Attribute("hx-swap-oob", "beforebegin:#"+props.addButtonId()),
|
||||||
|
repeaterItem(
|
||||||
|
ctx, props.Item(props.currentIndex), props.currentIndex, &props,
|
||||||
|
),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
props.currentIndex++
|
||||||
|
}),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
}
|
||||||
5
examples/ws-example/tailwind.config.js
Normal file
5
examples/ws-example/tailwind.config.js
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
/** @type {import('tailwindcss').Config} */
|
||||||
|
module.exports = {
|
||||||
|
content: ["**/*.go"],
|
||||||
|
plugins: [],
|
||||||
|
};
|
||||||
17
extensions/websocket/go.mod
Normal file
17
extensions/websocket/go.mod
Normal file
|
|
@ -0,0 +1,17 @@
|
||||||
|
module github.com/maddalax/htmgo/extensions/websocket
|
||||||
|
|
||||||
|
go 1.23.0
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/gobwas/ws v1.4.0
|
||||||
|
github.com/maddalax/htmgo/framework v0.0.0-20241014151703-8503dffa4e7d
|
||||||
|
github.com/puzpuzpuz/xsync/v3 v3.4.0
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
|
github.com/go-chi/chi/v5 v5.1.0 // indirect
|
||||||
|
github.com/gobwas/httphead v0.1.0 // indirect
|
||||||
|
github.com/gobwas/pool v0.2.1 // indirect
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
|
golang.org/x/sys v0.6.0 // indirect
|
||||||
|
)
|
||||||
26
extensions/websocket/go.sum
Normal file
26
extensions/websocket/go.sum
Normal file
|
|
@ -0,0 +1,26 @@
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw=
|
||||||
|
github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
|
||||||
|
github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU=
|
||||||
|
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
|
||||||
|
github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
|
||||||
|
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
|
||||||
|
github.com/gobwas/ws v1.4.0 h1:CTaoG1tojrh4ucGPcoJFiAQUAsEWekEWvLy7GsVNqGs=
|
||||||
|
github.com/gobwas/ws v1.4.0/go.mod h1:G3gNqMNtPppf5XUz7O4shetPpcZ1VJ7zt18dlUeakrc=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/maddalax/htmgo/framework v0.0.0-20241014151703-8503dffa4e7d h1:oysEaiKB7/WbvEklkyQ7SEE1xmDeGLrBUvF3BAsBUns=
|
||||||
|
github.com/maddalax/htmgo/framework v0.0.0-20241014151703-8503dffa4e7d/go.mod h1:HYKI49Pb6oyY2opSJdTt145B1vWgfWIDohvlolynv80=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/puzpuzpuz/xsync/v3 v3.4.0 h1:DuVBAdXuGFHv8adVXjWWZ63pJq+NRXOWVXlKDBZ+mJ4=
|
||||||
|
github.com/puzpuzpuz/xsync/v3 v3.4.0/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
|
||||||
|
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
|
||||||
|
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
|
golang.org/x/net v0.29.0 h1:5ORfpBpCs4HzDYoodCDBbwHzdR5UrLBZ3sOnUJmFoHo=
|
||||||
|
golang.org/x/net v0.29.0/go.mod h1:gLkgy8jTGERgjzMic6DS9+SP0ajcu6Xu3Orq/SpETg0=
|
||||||
|
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
|
||||||
|
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
31
extensions/websocket/init.go
Normal file
31
extensions/websocket/init.go
Normal file
|
|
@ -0,0 +1,31 @@
|
||||||
|
package websocket
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/opts"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/ws"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
func EnableExtension(app *h.App, opts opts.ExtensionOpts) {
|
||||||
|
if app.Opts.ServiceLocator == nil {
|
||||||
|
app.Opts.ServiceLocator = service.NewLocator()
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.WsPath == "" {
|
||||||
|
panic("websocket: WsPath is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
if opts.SessionId == nil {
|
||||||
|
panic("websocket: SessionId func is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
service.Set[wsutil.SocketManager](app.Opts.ServiceLocator, service.Singleton, func() *wsutil.SocketManager {
|
||||||
|
manager := wsutil.NewSocketManager(&opts)
|
||||||
|
manager.StartMetrics()
|
||||||
|
return manager
|
||||||
|
})
|
||||||
|
ws.StartListener(app.Opts.ServiceLocator)
|
||||||
|
app.Router.Handle(opts.WsPath, wsutil.WsHttpHandler(&opts))
|
||||||
|
}
|
||||||
115
extensions/websocket/internal/wsutil/handler.go
Normal file
115
extensions/websocket/internal/wsutil/handler.go
Normal file
|
|
@ -0,0 +1,115 @@
|
||||||
|
package wsutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"github.com/gobwas/ws"
|
||||||
|
"github.com/gobwas/ws/wsutil"
|
||||||
|
ws2 "github.com/maddalax/htmgo/extensions/websocket/opts"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func WsHttpHandler(opts *ws2.ExtensionOpts) http.HandlerFunc {
|
||||||
|
|
||||||
|
if opts.RoomName == nil {
|
||||||
|
opts.RoomName = func(ctx *h.RequestContext) string {
|
||||||
|
return "all"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
|
||||||
|
locator := cc.ServiceLocator()
|
||||||
|
manager := service.Get[SocketManager](locator)
|
||||||
|
|
||||||
|
sessionId := opts.SessionId(cc)
|
||||||
|
|
||||||
|
if sessionId == "" {
|
||||||
|
w.WriteHeader(http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, _, _, err := ws.UpgradeHTTP(r, w)
|
||||||
|
if err != nil {
|
||||||
|
slog.Info("failed to upgrade", slog.String("error", err.Error()))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
roomId := opts.RoomName(cc)
|
||||||
|
/*
|
||||||
|
Large buffer in case the client disconnects while we are writing
|
||||||
|
we don't want to block the writer
|
||||||
|
*/
|
||||||
|
done := make(chan bool, 1000)
|
||||||
|
writer := make(WriterChan, 1000)
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
|
manager.Add(roomId, sessionId, writer, done)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This goroutine is responsible for writing messages to the client
|
||||||
|
*/
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer manager.Disconnect(sessionId)
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
for len(writer) > 0 {
|
||||||
|
<-writer
|
||||||
|
}
|
||||||
|
for len(done) > 0 {
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ticker := time.NewTicker(5 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
fmt.Printf("closing connection: \n")
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
manager.Ping(sessionId)
|
||||||
|
case message := <-writer:
|
||||||
|
err = wsutil.WriteServerMessage(conn, ws.OpText, []byte(message))
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This goroutine is responsible for reading messages from the client
|
||||||
|
*/
|
||||||
|
go func() {
|
||||||
|
defer conn.Close()
|
||||||
|
for {
|
||||||
|
msg, op, err := wsutil.ReadClientData(conn)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if op != ws.OpText {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m := make(map[string]any)
|
||||||
|
err = json.Unmarshal(msg, &m)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
manager.OnMessage(sessionId, m)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
365
extensions/websocket/internal/wsutil/manager.go
Normal file
365
extensions/websocket/internal/wsutil/manager.go
Normal file
|
|
@ -0,0 +1,365 @@
|
||||||
|
package wsutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/opts"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
"github.com/puzpuzpuz/xsync/v3"
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EventType string
|
||||||
|
type WriterChan chan string
|
||||||
|
type DoneChan chan bool
|
||||||
|
|
||||||
|
const (
|
||||||
|
ConnectedEvent EventType = "connected"
|
||||||
|
DisconnectedEvent EventType = "disconnected"
|
||||||
|
MessageEvent EventType = "message"
|
||||||
|
)
|
||||||
|
|
||||||
|
type SocketEvent struct {
|
||||||
|
SessionId string
|
||||||
|
RoomId string
|
||||||
|
Type EventType
|
||||||
|
Payload map[string]any
|
||||||
|
}
|
||||||
|
|
||||||
|
type CloseEvent struct {
|
||||||
|
Code int
|
||||||
|
Reason string
|
||||||
|
}
|
||||||
|
|
||||||
|
type SocketConnection struct {
|
||||||
|
Id string
|
||||||
|
RoomId string
|
||||||
|
Done DoneChan
|
||||||
|
Writer WriterChan
|
||||||
|
}
|
||||||
|
|
||||||
|
type ManagerMetrics struct {
|
||||||
|
RunningGoroutines int32
|
||||||
|
TotalSockets int
|
||||||
|
TotalRooms int
|
||||||
|
TotalListeners int
|
||||||
|
SocketsPerRoomCount map[string]int
|
||||||
|
SocketsPerRoom map[string][]string
|
||||||
|
TotalMessages int64
|
||||||
|
MessagesPerSecond int
|
||||||
|
SecondsElapsed int
|
||||||
|
}
|
||||||
|
|
||||||
|
type SocketManager struct {
|
||||||
|
sockets *xsync.MapOf[string, *xsync.MapOf[string, SocketConnection]]
|
||||||
|
idToRoom *xsync.MapOf[string, string]
|
||||||
|
listeners []chan SocketEvent
|
||||||
|
goroutinesRunning atomic.Int32
|
||||||
|
opts *opts.ExtensionOpts
|
||||||
|
lock sync.Mutex
|
||||||
|
totalMessages atomic.Int64
|
||||||
|
messagesPerSecond int
|
||||||
|
secondsElapsed int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) StartMetrics() {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
manager.lock.Lock()
|
||||||
|
manager.secondsElapsed++
|
||||||
|
totalMessages := manager.totalMessages.Load()
|
||||||
|
manager.messagesPerSecond = int(float64(totalMessages) / float64(manager.secondsElapsed))
|
||||||
|
manager.lock.Unlock()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) Metrics() ManagerMetrics {
|
||||||
|
manager.lock.Lock()
|
||||||
|
defer manager.lock.Unlock()
|
||||||
|
count := manager.goroutinesRunning.Load()
|
||||||
|
metrics := ManagerMetrics{
|
||||||
|
RunningGoroutines: count,
|
||||||
|
TotalSockets: 0,
|
||||||
|
TotalRooms: 0,
|
||||||
|
TotalListeners: len(manager.listeners),
|
||||||
|
SocketsPerRoom: make(map[string][]string),
|
||||||
|
SocketsPerRoomCount: make(map[string]int),
|
||||||
|
TotalMessages: manager.totalMessages.Load(),
|
||||||
|
MessagesPerSecond: manager.messagesPerSecond,
|
||||||
|
SecondsElapsed: manager.secondsElapsed,
|
||||||
|
}
|
||||||
|
|
||||||
|
roomMap := make(map[string]int)
|
||||||
|
|
||||||
|
manager.idToRoom.Range(func(socketId string, roomId string) bool {
|
||||||
|
roomMap[roomId]++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
metrics.TotalRooms = len(roomMap)
|
||||||
|
|
||||||
|
manager.sockets.Range(func(roomId string, sockets *xsync.MapOf[string, SocketConnection]) bool {
|
||||||
|
metrics.SocketsPerRoomCount[roomId] = sockets.Size()
|
||||||
|
sockets.Range(func(socketId string, conn SocketConnection) bool {
|
||||||
|
if metrics.SocketsPerRoom[roomId] == nil {
|
||||||
|
metrics.SocketsPerRoom[roomId] = []string{}
|
||||||
|
}
|
||||||
|
metrics.SocketsPerRoom[roomId] = append(metrics.SocketsPerRoom[roomId], socketId)
|
||||||
|
metrics.TotalSockets++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func SocketManagerFromCtx(ctx *h.RequestContext) *SocketManager {
|
||||||
|
locator := ctx.ServiceLocator()
|
||||||
|
return service.Get[SocketManager](locator)
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSocketManager(opts *opts.ExtensionOpts) *SocketManager {
|
||||||
|
return &SocketManager{
|
||||||
|
sockets: xsync.NewMapOf[string, *xsync.MapOf[string, SocketConnection]](),
|
||||||
|
idToRoom: xsync.NewMapOf[string, string](),
|
||||||
|
opts: opts,
|
||||||
|
goroutinesRunning: atomic.Int32{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) ForEachSocket(roomId string, cb func(conn SocketConnection)) {
|
||||||
|
sockets, ok := manager.sockets.Load(roomId)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sockets.Range(func(id string, conn SocketConnection) bool {
|
||||||
|
cb(conn)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) RunIntervalWithSocket(socketId string, interval time.Duration, cb func() bool) {
|
||||||
|
socketIdSlog := slog.String("socketId", socketId)
|
||||||
|
slog.Debug("ws-extension: starting every loop", socketIdSlog, slog.Duration("duration", interval))
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
manager.goroutinesRunning.Add(1)
|
||||||
|
defer manager.goroutinesRunning.Add(-1)
|
||||||
|
tries := 0
|
||||||
|
for {
|
||||||
|
socket := manager.Get(socketId)
|
||||||
|
// This can run before the socket is established, lets try a few times and kill it if socket isn't connected after a bit.
|
||||||
|
if socket == nil {
|
||||||
|
if tries > 200 {
|
||||||
|
slog.Debug("ws-extension: socket disconnected, killing goroutine", socketIdSlog)
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
time.Sleep(time.Millisecond * 15)
|
||||||
|
tries++
|
||||||
|
slog.Debug("ws-extension: socket not connected yet, trying again", socketIdSlog, slog.Int("attempt", tries))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
success := cb()
|
||||||
|
if !success {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
time.Sleep(interval)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) Listen(listener chan SocketEvent) {
|
||||||
|
if manager.listeners == nil {
|
||||||
|
manager.listeners = make([]chan SocketEvent, 0)
|
||||||
|
}
|
||||||
|
if listener != nil {
|
||||||
|
manager.listeners = append(manager.listeners, listener)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) dispatch(event SocketEvent) {
|
||||||
|
done := make(chan struct{}, 1)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
fmt.Printf("havent dispatched event after 5s, chan blocked: %s\n", event.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for _, listener := range manager.listeners {
|
||||||
|
listener <- event
|
||||||
|
}
|
||||||
|
done <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) OnMessage(id string, message map[string]any) {
|
||||||
|
socket := manager.Get(id)
|
||||||
|
if socket == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
manager.totalMessages.Add(1)
|
||||||
|
manager.dispatch(SocketEvent{
|
||||||
|
SessionId: id,
|
||||||
|
Type: MessageEvent,
|
||||||
|
Payload: message,
|
||||||
|
RoomId: socket.RoomId,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) Add(roomId string, id string, writer WriterChan, done DoneChan) {
|
||||||
|
manager.idToRoom.Store(id, roomId)
|
||||||
|
|
||||||
|
sockets, ok := manager.sockets.LoadOrCompute(roomId, func() *xsync.MapOf[string, SocketConnection] {
|
||||||
|
return xsync.NewMapOf[string, SocketConnection]()
|
||||||
|
})
|
||||||
|
|
||||||
|
sockets.Store(id, SocketConnection{
|
||||||
|
Id: id,
|
||||||
|
Writer: writer,
|
||||||
|
RoomId: roomId,
|
||||||
|
Done: done,
|
||||||
|
})
|
||||||
|
|
||||||
|
s, ok := sockets.Load(id)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
manager.dispatch(SocketEvent{
|
||||||
|
SessionId: s.Id,
|
||||||
|
Type: ConnectedEvent,
|
||||||
|
RoomId: s.RoomId,
|
||||||
|
Payload: map[string]any{},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) OnClose(id string) {
|
||||||
|
socket := manager.Get(id)
|
||||||
|
if socket == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
slog.Debug("ws-extension: removing socket from manager", slog.String("socketId", id))
|
||||||
|
manager.dispatch(SocketEvent{
|
||||||
|
SessionId: id,
|
||||||
|
Type: DisconnectedEvent,
|
||||||
|
RoomId: socket.RoomId,
|
||||||
|
Payload: map[string]any{},
|
||||||
|
})
|
||||||
|
roomId, ok := manager.idToRoom.Load(id)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sockets, ok := manager.sockets.Load(roomId)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
sockets.Delete(id)
|
||||||
|
manager.idToRoom.Delete(id)
|
||||||
|
slog.Debug("ws-extension: removed socket from manager", slog.String("socketId", id))
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) CloseWithMessage(id string, message string) {
|
||||||
|
conn := manager.Get(id)
|
||||||
|
if conn != nil {
|
||||||
|
defer manager.OnClose(id)
|
||||||
|
manager.writeText(*conn, message)
|
||||||
|
conn.Done <- true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) Disconnect(id string) {
|
||||||
|
conn := manager.Get(id)
|
||||||
|
if conn != nil {
|
||||||
|
manager.OnClose(id)
|
||||||
|
conn.Done <- true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) Get(id string) *SocketConnection {
|
||||||
|
roomId, ok := manager.idToRoom.Load(id)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
sockets, ok := manager.sockets.Load(roomId)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
conn, ok := sockets.Load(id)
|
||||||
|
return &conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) Ping(id string) bool {
|
||||||
|
conn := manager.Get(id)
|
||||||
|
if conn != nil {
|
||||||
|
return manager.writeText(*conn, "ping")
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) writeCloseRaw(writer WriterChan, message string) {
|
||||||
|
manager.writeTextRaw(writer, message)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) writeTextRaw(writer WriterChan, message string) {
|
||||||
|
timeout := 3 * time.Second
|
||||||
|
select {
|
||||||
|
case writer <- message:
|
||||||
|
case <-time.After(timeout):
|
||||||
|
fmt.Printf("could not send %s to channel after %s\n", message, timeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) writeText(socket SocketConnection, message string) bool {
|
||||||
|
if socket.Writer == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
manager.writeTextRaw(socket.Writer, message)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) BroadcastText(roomId string, message string, predicate func(conn SocketConnection) bool) {
|
||||||
|
sockets, ok := manager.sockets.Load(roomId)
|
||||||
|
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sockets.Range(func(id string, conn SocketConnection) bool {
|
||||||
|
if predicate(conn) {
|
||||||
|
manager.writeText(conn, message)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) SendHtml(id string, message string) bool {
|
||||||
|
conn := manager.Get(id)
|
||||||
|
minified := strings.ReplaceAll(message, "\n", "")
|
||||||
|
minified = strings.ReplaceAll(minified, "\t", "")
|
||||||
|
minified = strings.TrimSpace(minified)
|
||||||
|
if conn != nil {
|
||||||
|
return manager.writeText(*conn, minified)
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) SendText(id string, message string) bool {
|
||||||
|
conn := manager.Get(id)
|
||||||
|
if conn != nil {
|
||||||
|
return manager.writeText(*conn, message)
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
202
extensions/websocket/internal/wsutil/manager_test.go
Normal file
202
extensions/websocket/internal/wsutil/manager_test.go
Normal file
|
|
@ -0,0 +1,202 @@
|
||||||
|
package wsutil
|
||||||
|
|
||||||
|
import (
|
||||||
|
ws2 "github.com/maddalax/htmgo/extensions/websocket/opts"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func createManager() *SocketManager {
|
||||||
|
return NewSocketManager(&ws2.ExtensionOpts{
|
||||||
|
WsPath: "/ws",
|
||||||
|
SessionId: func(ctx *h.RequestContext) string {
|
||||||
|
return "test"
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func addSocket(manager *SocketManager, roomId string, id string) (socketId string, writer WriterChan, done DoneChan) {
|
||||||
|
writer = make(chan string, 10)
|
||||||
|
done = make(chan bool, 10)
|
||||||
|
manager.Add(roomId, id, writer, done)
|
||||||
|
return id, writer, done
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManager(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
socketId, _, _ := addSocket(manager, "123", "456")
|
||||||
|
socket := manager.Get(socketId)
|
||||||
|
assert.NotNil(t, socket)
|
||||||
|
assert.Equal(t, socketId, socket.Id)
|
||||||
|
|
||||||
|
manager.OnClose(socketId)
|
||||||
|
socket = manager.Get(socketId)
|
||||||
|
assert.Nil(t, socket)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestManagerForEachSocket(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
addSocket(manager, "all", "456")
|
||||||
|
addSocket(manager, "all", "789")
|
||||||
|
var count int
|
||||||
|
manager.ForEachSocket("all", func(conn SocketConnection) {
|
||||||
|
count++
|
||||||
|
})
|
||||||
|
assert.Equal(t, 2, count)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendText(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
socketId, writer, done := addSocket(manager, "all", "456")
|
||||||
|
manager.SendText(socketId, "hello")
|
||||||
|
assert.Equal(t, "hello", <-writer)
|
||||||
|
manager.SendText(socketId, "hello2")
|
||||||
|
assert.Equal(t, "hello2", <-writer)
|
||||||
|
done <- true
|
||||||
|
assert.Equal(t, true, <-done)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBroadcastText(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
_, w1, d1 := addSocket(manager, "all", "456")
|
||||||
|
_, w2, d2 := addSocket(manager, "all", "789")
|
||||||
|
manager.BroadcastText("all", "hello", func(conn SocketConnection) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
assert.Equal(t, "hello", <-w1)
|
||||||
|
assert.Equal(t, "hello", <-w2)
|
||||||
|
d1 <- true
|
||||||
|
d2 <- true
|
||||||
|
assert.Equal(t, true, <-d1)
|
||||||
|
assert.Equal(t, true, <-d2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBroadcastTextWithPredicate(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
_, w1, _ := addSocket(manager, "all", "456")
|
||||||
|
_, w2, _ := addSocket(manager, "all", "789")
|
||||||
|
manager.BroadcastText("all", "hello", func(conn SocketConnection) bool {
|
||||||
|
return conn.Id != "456"
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.Equal(t, 0, len(w1))
|
||||||
|
assert.Equal(t, 1, len(w2))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestSendHtml(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
socketId, writer, _ := addSocket(manager, "all", "456")
|
||||||
|
rendered := h.Render(
|
||||||
|
h.Div(
|
||||||
|
h.P(
|
||||||
|
h.Text("hello"),
|
||||||
|
),
|
||||||
|
))
|
||||||
|
manager.SendHtml(socketId, rendered)
|
||||||
|
assert.Equal(t, "<div><p>hello</p></div>", <-writer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOnMessage(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
socketId, _, _ := addSocket(manager, "all", "456")
|
||||||
|
|
||||||
|
listener := make(chan SocketEvent, 10)
|
||||||
|
|
||||||
|
manager.Listen(listener)
|
||||||
|
|
||||||
|
manager.OnMessage(socketId, map[string]any{
|
||||||
|
"message": "hello",
|
||||||
|
})
|
||||||
|
|
||||||
|
event := <-listener
|
||||||
|
assert.Equal(t, "hello", event.Payload["message"])
|
||||||
|
assert.Equal(t, "456", event.SessionId)
|
||||||
|
assert.Equal(t, MessageEvent, event.Type)
|
||||||
|
assert.Equal(t, "all", event.RoomId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOnClose(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
socketId, _, _ := addSocket(manager, "all", "456")
|
||||||
|
listener := make(chan SocketEvent, 10)
|
||||||
|
manager.Listen(listener)
|
||||||
|
manager.OnClose(socketId)
|
||||||
|
event := <-listener
|
||||||
|
assert.Equal(t, "456", event.SessionId)
|
||||||
|
assert.Equal(t, DisconnectedEvent, event.Type)
|
||||||
|
assert.Equal(t, "all", event.RoomId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOnAdd(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
|
||||||
|
listener := make(chan SocketEvent, 10)
|
||||||
|
manager.Listen(listener)
|
||||||
|
|
||||||
|
socketId, _, _ := addSocket(manager, "all", "456")
|
||||||
|
event := <-listener
|
||||||
|
|
||||||
|
assert.Equal(t, socketId, event.SessionId)
|
||||||
|
assert.Equal(t, ConnectedEvent, event.Type)
|
||||||
|
assert.Equal(t, "all", event.RoomId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCloseWithMessage(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
socketId, w, _ := addSocket(manager, "all", "456")
|
||||||
|
manager.CloseWithMessage(socketId, "internal error")
|
||||||
|
assert.Equal(t, "internal error", <-w)
|
||||||
|
assert.Nil(t, manager.Get(socketId))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDisconnect(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
socketId, _, _ := addSocket(manager, "all", "456")
|
||||||
|
manager.Disconnect(socketId)
|
||||||
|
assert.Nil(t, manager.Get(socketId))
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPing(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
socketId, w, _ := addSocket(manager, "all", "456")
|
||||||
|
manager.Ping(socketId)
|
||||||
|
assert.Equal(t, "ping", <-w)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMultipleRooms(t *testing.T) {
|
||||||
|
manager := createManager()
|
||||||
|
socketId1, _, _ := addSocket(manager, "room1", "456")
|
||||||
|
socketId2, _, _ := addSocket(manager, "room2", "789")
|
||||||
|
|
||||||
|
room1Count := 0
|
||||||
|
room2Count := 0
|
||||||
|
|
||||||
|
manager.ForEachSocket("room1", func(conn SocketConnection) {
|
||||||
|
room1Count++
|
||||||
|
})
|
||||||
|
|
||||||
|
manager.ForEachSocket("room2", func(conn SocketConnection) {
|
||||||
|
room2Count++
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.Equal(t, 1, room1Count)
|
||||||
|
assert.Equal(t, 1, room2Count)
|
||||||
|
|
||||||
|
room1Count = 0
|
||||||
|
room2Count = 0
|
||||||
|
|
||||||
|
manager.OnClose(socketId1)
|
||||||
|
manager.OnClose(socketId2)
|
||||||
|
|
||||||
|
manager.ForEachSocket("room1", func(conn SocketConnection) {
|
||||||
|
room1Count++
|
||||||
|
})
|
||||||
|
|
||||||
|
manager.ForEachSocket("room2", func(conn SocketConnection) {
|
||||||
|
room2Count++
|
||||||
|
})
|
||||||
|
|
||||||
|
assert.Equal(t, 0, room1Count)
|
||||||
|
assert.Equal(t, 0, room2Count)
|
||||||
|
}
|
||||||
9
extensions/websocket/opts/opts.go
Normal file
9
extensions/websocket/opts/opts.go
Normal file
|
|
@ -0,0 +1,9 @@
|
||||||
|
package opts
|
||||||
|
|
||||||
|
import "github.com/maddalax/htmgo/framework/h"
|
||||||
|
|
||||||
|
type ExtensionOpts struct {
|
||||||
|
WsPath string
|
||||||
|
RoomName func(ctx *h.RequestContext) string
|
||||||
|
SessionId func(ctx *h.RequestContext) string
|
||||||
|
}
|
||||||
77
extensions/websocket/session/session.go
Normal file
77
extensions/websocket/session/session.go
Normal file
|
|
@ -0,0 +1,77 @@
|
||||||
|
package session
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/puzpuzpuz/xsync/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Id string
|
||||||
|
|
||||||
|
var cache = xsync.NewMapOf[Id, *xsync.MapOf[string, any]]()
|
||||||
|
|
||||||
|
type State struct {
|
||||||
|
SessionId Id
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewState(ctx *h.RequestContext) *State {
|
||||||
|
id := GetSessionId(ctx)
|
||||||
|
cache.Store(id, xsync.NewMapOf[string, any]())
|
||||||
|
return &State{
|
||||||
|
SessionId: id,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func CreateSession(ctx *h.RequestContext) Id {
|
||||||
|
sessionId := fmt.Sprintf("session-id-%s", h.GenId(30))
|
||||||
|
ctx.Set("session-id", sessionId)
|
||||||
|
return Id(sessionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetSessionId(ctx *h.RequestContext) Id {
|
||||||
|
sessionIdRaw := ctx.Get("session-id")
|
||||||
|
sessionId := ""
|
||||||
|
|
||||||
|
if sessionIdRaw == "" || sessionIdRaw == nil {
|
||||||
|
panic("session id is not set, please use session.CreateSession(ctx) in middleware to create a session id")
|
||||||
|
} else {
|
||||||
|
sessionId = sessionIdRaw.(string)
|
||||||
|
}
|
||||||
|
|
||||||
|
return Id(sessionId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Update[T any](sessionId Id, key string, compute func(prev T) T) T {
|
||||||
|
actual := Get[T](sessionId, key, *new(T))
|
||||||
|
next := compute(actual)
|
||||||
|
Set(sessionId, key, next)
|
||||||
|
return next
|
||||||
|
}
|
||||||
|
|
||||||
|
func Get[T any](sessionId Id, key string, fallback T) T {
|
||||||
|
actual, _ := cache.LoadOrCompute(sessionId, func() *xsync.MapOf[string, any] {
|
||||||
|
return xsync.NewMapOf[string, any]()
|
||||||
|
})
|
||||||
|
value, exists := actual.Load(key)
|
||||||
|
if exists {
|
||||||
|
return value.(T)
|
||||||
|
}
|
||||||
|
return fallback
|
||||||
|
}
|
||||||
|
|
||||||
|
func Set(sessionId Id, key string, value any) {
|
||||||
|
actual, _ := cache.LoadOrCompute(sessionId, func() *xsync.MapOf[string, any] {
|
||||||
|
return xsync.NewMapOf[string, any]()
|
||||||
|
})
|
||||||
|
actual.Store(key, value)
|
||||||
|
}
|
||||||
|
|
||||||
|
func UseState[T any](sessionId Id, key string, initial T) (func() T, func(T)) {
|
||||||
|
var get = func() T {
|
||||||
|
return Get[T](sessionId, key, initial)
|
||||||
|
}
|
||||||
|
var set = func(value T) {
|
||||||
|
Set(sessionId, key, value)
|
||||||
|
}
|
||||||
|
return get, set
|
||||||
|
}
|
||||||
10
extensions/websocket/ws/access.go
Normal file
10
extensions/websocket/ws/access.go
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
)
|
||||||
|
|
||||||
|
func ManagerFromCtx(ctx *h.RequestContext) *wsutil.SocketManager {
|
||||||
|
return wsutil.SocketManagerFromCtx(ctx)
|
||||||
|
}
|
||||||
20
extensions/websocket/ws/attribute.go
Normal file
20
extensions/websocket/ws/attribute.go
Normal file
|
|
@ -0,0 +1,20 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import "github.com/maddalax/htmgo/framework/h"
|
||||||
|
|
||||||
|
func OnClick(ctx *h.RequestContext, handler Handler) *h.AttributeMapOrdered {
|
||||||
|
return AddClientSideHandler(ctx, "click", handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func OnClientEvent(ctx *h.RequestContext, eventName string, handler Handler) *h.AttributeMapOrdered {
|
||||||
|
return AddClientSideHandler(ctx, eventName, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
func OnServerEvent(ctx *h.RequestContext, eventName string, handler Handler) h.Ren {
|
||||||
|
AddServerSideHandler(ctx, eventName, handler)
|
||||||
|
return h.Attribute("data-handler-id", "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func OnMouseOver(ctx *h.RequestContext, handler Handler) *h.AttributeMapOrdered {
|
||||||
|
return AddClientSideHandler(ctx, "mouseover", handler)
|
||||||
|
}
|
||||||
47
extensions/websocket/ws/dispatch.go
Normal file
47
extensions/websocket/ws/dispatch.go
Normal file
|
|
@ -0,0 +1,47 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PushServerSideEvent sends a server side event this specific session
|
||||||
|
func PushServerSideEvent(data HandlerData, event string, value map[string]any) {
|
||||||
|
serverSideMessageListener <- ServerSideEvent{
|
||||||
|
Event: event,
|
||||||
|
Payload: value,
|
||||||
|
SessionId: data.SessionId,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BroadcastServerSideEvent sends a server side event to all clients that have a handler for the event, not just the current session
|
||||||
|
func BroadcastServerSideEvent(event string, value map[string]any) {
|
||||||
|
serverSideMessageListener <- ServerSideEvent{
|
||||||
|
Event: event,
|
||||||
|
Payload: value,
|
||||||
|
SessionId: "*",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// PushElement sends an element to the current session and swaps it into the page
|
||||||
|
func PushElement(data HandlerData, el *h.Element) bool {
|
||||||
|
return data.Manager.SendHtml(data.Socket.Id, h.Render(el))
|
||||||
|
}
|
||||||
|
|
||||||
|
// PushElementCtx sends an element to the current session and swaps it into the page
|
||||||
|
func PushElementCtx(ctx *h.RequestContext, el *h.Element) bool {
|
||||||
|
locator := ctx.ServiceLocator()
|
||||||
|
socketManager := service.Get[wsutil.SocketManager](locator)
|
||||||
|
socketId := session.GetSessionId(ctx)
|
||||||
|
socket := socketManager.Get(string(socketId))
|
||||||
|
if socket == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return PushElement(HandlerData{
|
||||||
|
Socket: socket,
|
||||||
|
Manager: socketManager,
|
||||||
|
SessionId: socketId,
|
||||||
|
}, el)
|
||||||
|
}
|
||||||
29
extensions/websocket/ws/every.go
Normal file
29
extensions/websocket/ws/every.go
Normal file
|
|
@ -0,0 +1,29 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Every executes the given callback every interval, until the socket is disconnected, or the callback returns false.
|
||||||
|
func Every(ctx *h.RequestContext, interval time.Duration, cb func() bool) {
|
||||||
|
socketId := session.GetSessionId(ctx)
|
||||||
|
locator := ctx.ServiceLocator()
|
||||||
|
manager := service.Get[wsutil.SocketManager](locator)
|
||||||
|
manager.RunIntervalWithSocket(string(socketId), interval, cb)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Once(ctx *h.RequestContext, cb func()) {
|
||||||
|
// time is irrelevant, we just need to run the callback once, it will exit after because of the false return
|
||||||
|
Every(ctx, time.Millisecond, func() bool {
|
||||||
|
cb()
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func RunOnConnected(ctx *h.RequestContext, cb func()) {
|
||||||
|
Once(ctx, cb)
|
||||||
|
}
|
||||||
90
extensions/websocket/ws/handler.go
Normal file
90
extensions/websocket/ws/handler.go
Normal file
|
|
@ -0,0 +1,90 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MessageHandler struct {
|
||||||
|
manager *wsutil.SocketManager
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMessageHandler(manager *wsutil.SocketManager) *MessageHandler {
|
||||||
|
return &MessageHandler{manager: manager}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MessageHandler) OnServerSideEvent(e ServerSideEvent) {
|
||||||
|
fmt.Printf("received server side event: %s\n", e.Event)
|
||||||
|
hashes, ok := serverEventNamesToHash.Load(e.Event)
|
||||||
|
|
||||||
|
// If we are not broadcasting to everyone, filter it down to just the current session that invoked the event
|
||||||
|
// TODO optimize this
|
||||||
|
if e.SessionId != "*" {
|
||||||
|
hashesForSession, ok2 := sessionIdToHashes.Load(e.SessionId)
|
||||||
|
|
||||||
|
if ok2 {
|
||||||
|
subset := make(map[KeyHash]bool)
|
||||||
|
for hash := range hashes {
|
||||||
|
if _, ok := hashesForSession[hash]; ok {
|
||||||
|
subset[hash] = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hashes = subset
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if ok {
|
||||||
|
lock.Lock()
|
||||||
|
callingHandler.Store(true)
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
for hash := range hashes {
|
||||||
|
cb, ok := handlers.Load(hash)
|
||||||
|
if ok {
|
||||||
|
wg.Add(1)
|
||||||
|
go func(e ServerSideEvent) {
|
||||||
|
defer wg.Done()
|
||||||
|
sessionId, ok2 := hashesToSessionId.Load(hash)
|
||||||
|
if ok2 {
|
||||||
|
cb(HandlerData{
|
||||||
|
SessionId: sessionId,
|
||||||
|
Socket: h.manager.Get(string(sessionId)),
|
||||||
|
Manager: h.manager,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
callingHandler.Store(false)
|
||||||
|
lock.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MessageHandler) OnClientSideEvent(handlerId string, sessionId session.Id) {
|
||||||
|
cb, ok := handlers.Load(handlerId)
|
||||||
|
if ok {
|
||||||
|
cb(HandlerData{
|
||||||
|
SessionId: sessionId,
|
||||||
|
Socket: h.manager.Get(string(sessionId)),
|
||||||
|
Manager: h.manager,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MessageHandler) OnDomElementRemoved(handlerId string) {
|
||||||
|
handlers.Delete(handlerId)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MessageHandler) OnSocketDisconnected(event wsutil.SocketEvent) {
|
||||||
|
sessionId := session.Id(event.SessionId)
|
||||||
|
hashes, ok := sessionIdToHashes.Load(sessionId)
|
||||||
|
if ok {
|
||||||
|
for hash := range hashes {
|
||||||
|
hashesToSessionId.Delete(hash)
|
||||||
|
handlers.Delete(hash)
|
||||||
|
}
|
||||||
|
sessionIdToHashes.Delete(sessionId)
|
||||||
|
}
|
||||||
|
}
|
||||||
46
extensions/websocket/ws/listener.go
Normal file
46
extensions/websocket/ws/listener.go
Normal file
|
|
@ -0,0 +1,46 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
|
"github.com/maddalax/htmgo/framework/service"
|
||||||
|
)
|
||||||
|
|
||||||
|
func StartListener(locator *service.Locator) {
|
||||||
|
manager := service.Get[wsutil.SocketManager](locator)
|
||||||
|
manager.Listen(socketMessageListener)
|
||||||
|
handler := NewMessageHandler(manager)
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
handle(handler)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func handle(handler *MessageHandler) {
|
||||||
|
select {
|
||||||
|
case event := <-serverSideMessageListener:
|
||||||
|
handler.OnServerSideEvent(event)
|
||||||
|
case event := <-socketMessageListener:
|
||||||
|
switch event.Type {
|
||||||
|
case wsutil.DisconnectedEvent:
|
||||||
|
handler.OnSocketDisconnected(event)
|
||||||
|
case wsutil.MessageEvent:
|
||||||
|
|
||||||
|
handlerId, ok := event.Payload["id"].(string)
|
||||||
|
eventName, ok2 := event.Payload["event"].(string)
|
||||||
|
|
||||||
|
if !ok || !ok2 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
sessionId := session.Id(event.SessionId)
|
||||||
|
if eventName == "dom-element-removed" {
|
||||||
|
handler.OnDomElementRemoved(handlerId)
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
handler.OnClientSideEvent(handlerId, sessionId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
19
extensions/websocket/ws/metrics.go
Normal file
19
extensions/websocket/ws/metrics.go
Normal file
|
|
@ -0,0 +1,19 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Metrics struct {
|
||||||
|
Manager wsutil.ManagerMetrics
|
||||||
|
Handler HandlerMetrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func MetricsFromCtx(ctx *h.RequestContext) Metrics {
|
||||||
|
manager := ManagerFromCtx(ctx)
|
||||||
|
return Metrics{
|
||||||
|
Manager: manager.Metrics(),
|
||||||
|
Handler: GetHandlerMetics(),
|
||||||
|
}
|
||||||
|
}
|
||||||
92
extensions/websocket/ws/register.go
Normal file
92
extensions/websocket/ws/register.go
Normal file
|
|
@ -0,0 +1,92 @@
|
||||||
|
package ws
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/internal/wsutil"
|
||||||
|
"github.com/maddalax/htmgo/extensions/websocket/session"
|
||||||
|
"github.com/maddalax/htmgo/framework/h"
|
||||||
|
"github.com/puzpuzpuz/xsync/v3"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
type HandlerData struct {
|
||||||
|
SessionId session.Id
|
||||||
|
Socket *wsutil.SocketConnection
|
||||||
|
Manager *wsutil.SocketManager
|
||||||
|
}
|
||||||
|
|
||||||
|
type Handler func(data HandlerData)
|
||||||
|
|
||||||
|
type ServerSideEvent struct {
|
||||||
|
Event string
|
||||||
|
Payload map[string]any
|
||||||
|
SessionId session.Id
|
||||||
|
}
|
||||||
|
type KeyHash = string
|
||||||
|
|
||||||
|
var handlers = xsync.NewMapOf[KeyHash, Handler]()
|
||||||
|
var sessionIdToHashes = xsync.NewMapOf[session.Id, map[KeyHash]bool]()
|
||||||
|
var hashesToSessionId = xsync.NewMapOf[KeyHash, session.Id]()
|
||||||
|
var serverEventNamesToHash = xsync.NewMapOf[string, map[KeyHash]bool]()
|
||||||
|
|
||||||
|
var socketMessageListener = make(chan wsutil.SocketEvent, 100)
|
||||||
|
var serverSideMessageListener = make(chan ServerSideEvent, 100)
|
||||||
|
var lock = sync.Mutex{}
|
||||||
|
var callingHandler = atomic.Bool{}
|
||||||
|
|
||||||
|
type HandlerMetrics struct {
|
||||||
|
TotalHandlers int
|
||||||
|
ServerEventNamesToHashCount int
|
||||||
|
SessionIdToHashesCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetHandlerMetics() HandlerMetrics {
|
||||||
|
metrics := HandlerMetrics{
|
||||||
|
TotalHandlers: handlers.Size(),
|
||||||
|
ServerEventNamesToHashCount: serverEventNamesToHash.Size(),
|
||||||
|
SessionIdToHashesCount: sessionIdToHashes.Size(),
|
||||||
|
}
|
||||||
|
return metrics
|
||||||
|
}
|
||||||
|
|
||||||
|
func makeId() string {
|
||||||
|
return h.GenId(30)
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddServerSideHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
|
||||||
|
// If we are already in a handler, we don't want to add another handler
|
||||||
|
// this can happen if the handler renders another element that has a handler
|
||||||
|
if callingHandler.Load() {
|
||||||
|
return h.NewAttributeMap()
|
||||||
|
}
|
||||||
|
sessionId := session.GetSessionId(ctx)
|
||||||
|
hash := makeId()
|
||||||
|
handlers.LoadOrStore(hash, handler)
|
||||||
|
m, _ := serverEventNamesToHash.LoadOrCompute(event, func() map[KeyHash]bool {
|
||||||
|
return make(map[KeyHash]bool)
|
||||||
|
})
|
||||||
|
m[hash] = true
|
||||||
|
storeHashForSession(sessionId, hash)
|
||||||
|
storeSessionIdForHash(sessionId, hash)
|
||||||
|
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func AddClientSideHandler(ctx *h.RequestContext, event string, handler Handler) *h.AttributeMapOrdered {
|
||||||
|
hash := makeId()
|
||||||
|
handlers.LoadOrStore(hash, handler)
|
||||||
|
sessionId := session.GetSessionId(ctx)
|
||||||
|
storeHashForSession(sessionId, hash)
|
||||||
|
storeSessionIdForHash(sessionId, hash)
|
||||||
|
return h.AttributePairs("data-handler-id", hash, "data-handler-event", event)
|
||||||
|
}
|
||||||
|
|
||||||
|
func storeHashForSession(sessionId session.Id, hash KeyHash) {
|
||||||
|
m, _ := sessionIdToHashes.LoadOrCompute(sessionId, func() map[KeyHash]bool {
|
||||||
|
return make(map[KeyHash]bool)
|
||||||
|
})
|
||||||
|
m[hash] = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func storeSessionIdForHash(sessionId session.Id, hash KeyHash) {
|
||||||
|
hashesToSessionId.Store(hash, sessionId)
|
||||||
|
}
|
||||||
2
framework/assets/dist/htmgo.js
vendored
2
framework/assets/dist/htmgo.js
vendored
File diff suppressed because one or more lines are too long
|
|
@ -7,6 +7,8 @@ import "./htmxextensions/mutation-error";
|
||||||
import "./htmxextensions/livereload"
|
import "./htmxextensions/livereload"
|
||||||
import "./htmxextensions/htmgo";
|
import "./htmxextensions/htmgo";
|
||||||
import "./htmxextensions/sse"
|
import "./htmxextensions/sse"
|
||||||
|
import "./htmxextensions/ws"
|
||||||
|
import "./htmxextensions/ws-event-handler"
|
||||||
|
|
||||||
// @ts-ignore
|
// @ts-ignore
|
||||||
window.htmx = htmx;
|
window.htmx = htmx;
|
||||||
|
|
@ -44,7 +46,6 @@ function onUrlChange(newUrl: string) {
|
||||||
for (let [key, values] of url.searchParams) {
|
for (let [key, values] of url.searchParams) {
|
||||||
let eventName = "qs:" + key;
|
let eventName = "qs:" + key;
|
||||||
if (triggers.includes(eventName)) {
|
if (triggers.includes(eventName)) {
|
||||||
console.log("triggering", eventName);
|
|
||||||
htmx.trigger(element, eventName, null);
|
htmx.trigger(element, eventName, null);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
77
framework/assets/js/htmxextensions/ws-event-handler.ts
Normal file
77
framework/assets/js/htmxextensions/ws-event-handler.ts
Normal file
|
|
@ -0,0 +1,77 @@
|
||||||
|
import {ws} from "./ws";
|
||||||
|
|
||||||
|
window.onload = function () {
|
||||||
|
const elements = document.querySelectorAll("[hx-extension]");
|
||||||
|
for (let element of Array.from(elements)) {
|
||||||
|
const value = element.getAttribute("hx-extension");
|
||||||
|
if(value != null && value.split(" ").includes("ws")) {
|
||||||
|
addWsEventHandlers()
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
function sendWs(message: Record<string, any>) {
|
||||||
|
if(ws != null && ws.readyState === WebSocket.OPEN) {
|
||||||
|
ws.send(JSON.stringify(message));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function walk(node: Node, cb: (node: Node) => void) {
|
||||||
|
cb(node);
|
||||||
|
for (let child of Array.from(node.childNodes)) {
|
||||||
|
walk(child, cb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function addWsEventHandlers() {
|
||||||
|
const observer = new MutationObserver(register)
|
||||||
|
observer.observe(document.body, {childList: true, subtree: true})
|
||||||
|
|
||||||
|
let added = new Set<string>();
|
||||||
|
|
||||||
|
function register(mutations: MutationRecord[]) {
|
||||||
|
for (let mutation of mutations) {
|
||||||
|
for (let removedNode of Array.from(mutation.removedNodes)) {
|
||||||
|
walk(removedNode, (node) => {
|
||||||
|
if (node instanceof HTMLElement) {
|
||||||
|
const handlerId = node.getAttribute("data-handler-id")
|
||||||
|
if(handlerId) {
|
||||||
|
added.delete(handlerId)
|
||||||
|
sendWs({id: handlerId, event: 'dom-element-removed'})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
let ids = new Set<string>();
|
||||||
|
document.querySelectorAll("[data-handler-id]").forEach(element => {
|
||||||
|
const id = element.getAttribute("data-handler-id");
|
||||||
|
const event = element.getAttribute("data-handler-event");
|
||||||
|
|
||||||
|
if(id == null || event == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ids.add(id);
|
||||||
|
if (added.has(id)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
added.add(id);
|
||||||
|
element.addEventListener(event, (e) => {
|
||||||
|
sendWs({id, event})
|
||||||
|
});
|
||||||
|
})
|
||||||
|
for (let id of added) {
|
||||||
|
if (!ids.has(id)) {
|
||||||
|
added.delete(id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
register([])
|
||||||
|
}
|
||||||
|
|
||||||
87
framework/assets/js/htmxextensions/ws.ts
Normal file
87
framework/assets/js/htmxextensions/ws.ts
Normal file
|
|
@ -0,0 +1,87 @@
|
||||||
|
import htmx from 'htmx.org'
|
||||||
|
import {removeAssociatedScripts} from "./htmgo";
|
||||||
|
|
||||||
|
let api : any = null;
|
||||||
|
let processed = new Set<string>()
|
||||||
|
export let ws: WebSocket | null = null;
|
||||||
|
|
||||||
|
htmx.defineExtension("ws", {
|
||||||
|
init: function (apiRef) {
|
||||||
|
api = apiRef;
|
||||||
|
},
|
||||||
|
// @ts-ignore
|
||||||
|
onEvent: function (name, evt) {
|
||||||
|
const target = evt.target;
|
||||||
|
if(!(target instanceof HTMLElement)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if(name === 'htmx:beforeCleanupElement') {
|
||||||
|
removeAssociatedScripts(target);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(name === 'htmx:beforeProcessNode') {
|
||||||
|
const elements = document.querySelectorAll('[ws-connect]');
|
||||||
|
for (let element of Array.from(elements)) {
|
||||||
|
const url = element.getAttribute("ws-connect")!;
|
||||||
|
if(url && !processed.has(url)) {
|
||||||
|
connectWs(element, url)
|
||||||
|
processed.add(url)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
function exponentialBackoff(attempt: number, baseDelay = 100, maxDelay = 10000) {
|
||||||
|
// Exponential backoff: baseDelay * (2 ^ attempt) with jitter
|
||||||
|
const jitter = Math.random(); // Adding randomness to prevent collisions
|
||||||
|
return Math.min(baseDelay * Math.pow(2, attempt) * jitter, maxDelay);
|
||||||
|
}
|
||||||
|
|
||||||
|
function connectWs(ele: Element, url: string, attempt: number = 0) {
|
||||||
|
if(!url) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if(!url.startsWith('ws://') && !url.startsWith('wss://')) {
|
||||||
|
const isSecure = window.location.protocol === 'https:'
|
||||||
|
url = (isSecure ? 'wss://' : 'ws://') + window.location.host + url
|
||||||
|
}
|
||||||
|
console.info('connecting to ws', url)
|
||||||
|
|
||||||
|
ws = new WebSocket(url);
|
||||||
|
|
||||||
|
ws.addEventListener("close", function(event) {
|
||||||
|
htmx.trigger(ele, "htmx:wsClose", {event: event});
|
||||||
|
const delay = exponentialBackoff(attempt);
|
||||||
|
setTimeout(() => {
|
||||||
|
connectWs(ele, url, attempt + 1)
|
||||||
|
}, delay)
|
||||||
|
})
|
||||||
|
|
||||||
|
ws.addEventListener("open", function(event) {
|
||||||
|
htmx.trigger(ele, "htmx:wsOpen", {event: event});
|
||||||
|
})
|
||||||
|
|
||||||
|
ws.addEventListener("error", function(event) {
|
||||||
|
htmx.trigger(ele, "htmx:wsError", {event: event});
|
||||||
|
})
|
||||||
|
|
||||||
|
ws.addEventListener("message", function(event) {
|
||||||
|
const settleInfo = api.makeSettleInfo(ele);
|
||||||
|
htmx.trigger(ele, "htmx:wsBeforeMessage", {event: event});
|
||||||
|
const response = event.data
|
||||||
|
const fragment = api.makeFragment(response) as DocumentFragment;
|
||||||
|
const children = Array.from(fragment.children);
|
||||||
|
for (let child of children) {
|
||||||
|
api.oobSwap(api.getAttributeValue(child, 'hx-swap-oob') || 'true', child, settleInfo);
|
||||||
|
// support htmgo eval__ scripts
|
||||||
|
if(child.tagName === 'SCRIPT' && child.id.startsWith("__eval")) {
|
||||||
|
document.body.appendChild(child);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
htmx.trigger(ele, "htmx:wsAfterMessage", {event: event});
|
||||||
|
})
|
||||||
|
|
||||||
|
return ws
|
||||||
|
}
|
||||||
|
|
@ -174,6 +174,16 @@ func (app *App) UseWithContext(h func(w http.ResponseWriter, r *http.Request, co
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (app *App) Use(h func(ctx *RequestContext)) {
|
||||||
|
app.Router.Use(func(handler http.Handler) http.Handler {
|
||||||
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
cc := r.Context().Value(RequestContextKey).(*RequestContext)
|
||||||
|
h(cc)
|
||||||
|
handler.ServeHTTP(w, r)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func GetLogLevel() slog.Level {
|
func GetLogLevel() slog.Level {
|
||||||
// Get the log level from the environment variable
|
// Get the log level from the environment variable
|
||||||
logLevel := os.Getenv("LOG_LEVEL")
|
logLevel := os.Getenv("LOG_LEVEL")
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/maddalax/htmgo/framework/datastructure/orderedmap"
|
"github.com/maddalax/htmgo/framework/datastructure/orderedmap"
|
||||||
"github.com/maddalax/htmgo/framework/hx"
|
"github.com/maddalax/htmgo/framework/hx"
|
||||||
|
"github.com/maddalax/htmgo/framework/internal/util"
|
||||||
"strings"
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -358,3 +359,7 @@ func AriaHidden(value bool) *AttributeR {
|
||||||
func TabIndex(value int) *AttributeR {
|
func TabIndex(value int) *AttributeR {
|
||||||
return Attribute("tabindex", fmt.Sprintf("%d", value))
|
return Attribute("tabindex", fmt.Sprintf("%d", value))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GenId(len int) string {
|
||||||
|
return util.RandSeq(len)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -161,6 +161,18 @@ func Div(children ...Ren) *Element {
|
||||||
return Tag("div", children...)
|
return Tag("div", children...)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Dl(children ...Ren) *Element {
|
||||||
|
return Tag("dl", children...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Dt(children ...Ren) *Element {
|
||||||
|
return Tag("dt", children...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func Dd(children ...Ren) *Element {
|
||||||
|
return Tag("dd", children...)
|
||||||
|
}
|
||||||
|
|
||||||
func Article(children ...Ren) *Element {
|
func Article(children ...Ren) *Element {
|
||||||
return Tag("article", children...)
|
return Tag("article", children...)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue