switch to sse

This commit is contained in:
maddalax 2024-10-01 22:26:03 -05:00
parent 5b10aed601
commit 8b816e9566
22 changed files with 306 additions and 517 deletions

View file

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/maddalax/htmgo/cli/htmgo/tasks/astgen" "github.com/maddalax/htmgo/cli/htmgo/tasks/astgen"
"github.com/maddalax/htmgo/cli/htmgo/tasks/copyassets"
"github.com/maddalax/htmgo/cli/htmgo/tasks/css" "github.com/maddalax/htmgo/cli/htmgo/tasks/css"
"github.com/maddalax/htmgo/cli/htmgo/tasks/run" "github.com/maddalax/htmgo/cli/htmgo/tasks/run"
"github.com/maddalax/htmgo/cli/htmgo/tasks/util" "github.com/maddalax/htmgo/cli/htmgo/tasks/util"
@ -101,6 +102,12 @@ func OnFileChange(version string, events []*fsnotify.Event) {
hasTask = true hasTask = true
} }
// framework assets changed
if c.HasAnySuffix("assets/dist/htmgo.js") {
copyassets.CopyAssets()
//tasks.Run = true
}
if hasTask { if hasTask {
slog.Info("file changed", slog.String("version", version), slog.String("file", c.Name())) slog.Info("file changed", slog.String("version", version), slog.String("file", c.Name()))
} }

View file

@ -4,10 +4,12 @@ import (
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/maddalax/htmgo/cli/htmgo/internal" "github.com/maddalax/htmgo/cli/htmgo/internal"
"github.com/maddalax/htmgo/cli/htmgo/tasks/module"
"log" "log"
"log/slog" "log/slog"
"os" "os"
"path/filepath" "path/filepath"
"strings"
"time" "time"
) )
@ -36,6 +38,7 @@ func startWatcher(cb func(version string, file []*fsnotify.Event)) {
if !ok { if !ok {
return return
} }
slog.Debug("event:", slog.String("name", event.Name), slog.String("op", event.Op.String()))
if event.Has(fsnotify.Write) || event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) { if event.Has(fsnotify.Write) || event.Has(fsnotify.Remove) || event.Has(fsnotify.Rename) {
events = append(events, &event) events = append(events, &event)
debouncer.Do(func() { debouncer.Do(func() {
@ -61,6 +64,15 @@ func startWatcher(cb func(version string, file []*fsnotify.Event)) {
}() }()
rootDir := "." rootDir := "."
frameworkPath := module.GetDependencyPath("github.com/maddalax/htmgo/framework")
if !strings.HasPrefix(frameworkPath, "github.com/") {
assetPath := filepath.Join(frameworkPath, "assets", "dist")
slog.Debug("Watching directory:", slog.String("path", assetPath))
watcher.Add(assetPath)
}
// Walk through the root directory and add all subdirectories to the watcher // Walk through the root directory and add all subdirectories to the watcher
err = filepath.Walk(rootDir, func(path string, info os.FileInfo, err error) error { err = filepath.Walk(rootDir, func(path string, info os.FileInfo, err error) error {
if err != nil { if err != nil {

13
examples/chat/assets.go Normal file
View file

@ -0,0 +1,13 @@
//go:build !prod
// +build !prod
package main
import (
"chat/internal/embedded"
"io/fs"
)
func GetStaticAssets() fs.FS {
return embedded.NewOsFs()
}

View file

@ -0,0 +1,16 @@
//go:build prod
// +build prod
package main
import (
"embed"
"io/fs"
)
//go:embed assets/dist/*
var staticAssets embed.FS
func GetStaticAssets() fs.FS {
return staticAssets
}

View file

@ -5,7 +5,6 @@ import (
"chat/ws" "chat/ws"
"context" "context"
"fmt" "fmt"
"github.com/coder/websocket"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/service"
"time" "time"
@ -48,14 +47,14 @@ func (m *Manager) OnConnected(e ws.SocketEvent) {
room, _ := m.service.GetRoom(e.RoomId) room, _ := m.service.GetRoom(e.RoomId)
if room == nil { if room == nil {
m.socketManager.CloseWithError(e.Id, websocket.StatusPolicyViolation, "invalid room") m.socketManager.CloseWithError(e.Id, 1008, "invalid room")
return return
} }
user, err := m.queries.GetUserBySessionId(context.Background(), e.Id) user, err := m.queries.GetUserBySessionId(context.Background(), e.Id)
if err != nil { if err != nil {
m.socketManager.CloseWithError(e.Id, websocket.StatusPolicyViolation, "invalid user") m.socketManager.CloseWithError(e.Id, 1008, "invalid user")
return return
} }

View file

@ -10,7 +10,7 @@ import (
func MessageRow(message *Message) *h.Element { func MessageRow(message *Message) *h.Element {
return h.Div( return h.Div(
h.Attribute("hx-swap-oob", "beforeend"), h.Attribute("hx-swap-oob", "beforeend"),
h.Class("flex flex-col gap-4 w-full"), h.Class("flex flex-col gap-4 w-full break-words whitespace-normal"), // Ensure container breaks long words
h.Id("messages"), h.Id("messages"),
h.Div( h.Div(
h.Class("flex flex-col gap-1"), h.Class("flex flex-col gap-1"),
@ -19,7 +19,10 @@ func MessageRow(message *Message) *h.Element {
h.Pf(message.UserName, h.Class("font-bold")), h.Pf(message.UserName, h.Class("font-bold")),
h.Pf(message.CreatedAt.In(time.Local).Format("01/02 03:04 PM")), h.Pf(message.CreatedAt.In(time.Local).Format("01/02 03:04 PM")),
), ),
h.P(h.Text(message.Message)), h.Article(
h.Class("break-words whitespace-normal"), // Ensure message text wraps correctly
h.P(h.Text(message.Message)),
),
), ),
) )
} }

View file

@ -0,0 +1,17 @@
package embedded
import (
"io/fs"
"os"
)
type OsFs struct {
}
func (receiver OsFs) Open(name string) (fs.File, error) {
return os.Open(name)
}
func NewOsFs() OsFs {
return OsFs{}
}

View file

@ -5,16 +5,12 @@ import (
"chat/chat" "chat/chat"
"chat/internal/db" "chat/internal/db"
"chat/ws" "chat/ws"
"embed"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/service"
"io/fs" "io/fs"
"net/http" "net/http"
) )
//go:embed assets/dist/*
var StaticAssets embed.FS
func main() { func main() {
locator := service.NewLocator() locator := service.NewLocator()
@ -30,7 +26,7 @@ func main() {
ServiceLocator: locator, ServiceLocator: locator,
LiveReload: true, LiveReload: true,
Register: func(app *h.App) { Register: func(app *h.App) {
sub, err := fs.Sub(StaticAssets, "assets/dist") sub, err := fs.Sub(GetStaticAssets(), "assets/dist")
if err != nil { if err != nil {
panic(err) panic(err)

View file

@ -2,6 +2,7 @@ package pages
import ( import (
"chat/chat" "chat/chat"
"chat/partials"
"fmt" "fmt"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
@ -19,13 +20,13 @@ func ChatRoom(ctx *h.RequestContext) *h.Page {
h.HxExtension("ws"), h.HxExtension("ws"),
), ),
h.Attribute("ws-connect", fmt.Sprintf("/ws/chat/%s", roomId)), h.Attribute("sse-connect", fmt.Sprintf("/ws/chat/%s", roomId)),
h.HxOnWsOpen( h.HxOnSseOpen(
js.ConsoleLog("Connected to chat room"), js.ConsoleLog("Connected to chat room"),
), ),
h.HxOnWsClose( h.HxOnSseClose(
js.EvalJs(fmt.Sprintf(` js.EvalJs(fmt.Sprintf(`
const reason = e.detail.event.reason const reason = e.detail.event.reason
if(['invalid room', 'no session', 'invalid user'].includes(reason)) { if(['invalid room', 'no session', 'invalid user'].includes(reason)) {
@ -54,7 +55,7 @@ func ChatRoom(ctx *h.RequestContext) *h.Page {
// Padding to push chat content below the fixed room name // Padding to push chat content below the fixed room name
h.Div(h.Class("pt-[50px]")), h.Div(h.Class("pt-[50px]")),
h.HxAfterWsMessage( h.HxAfterSseMessage(
js.EvalJsOnSibling("#messages", js.EvalJsOnSibling("#messages",
`element.scrollTop = element.scrollHeight;`), `element.scrollTop = element.scrollHeight;`),
), ),
@ -62,7 +63,7 @@ func ChatRoom(ctx *h.RequestContext) *h.Page {
// Chat Messages // Chat Messages
h.Div( h.Div(
h.Id("messages"), h.Id("messages"),
h.Class("flex flex-col gap-4 overflow-auto grow w-full"), h.Class("flex flex-col gap-4 overflow-auto grow w-full mb-4 max-w-[calc(100%-215px)]"),
), ),
// Chat Input at the bottom // Chat Input at the bottom
@ -128,7 +129,7 @@ func MessageInput() *h.Element {
h.Name("message"), h.Name("message"),
h.MaxLength(1000), h.MaxLength(1000),
h.Placeholder("Type a message..."), h.Placeholder("Type a message..."),
h.HxAfterWsSend( h.HxAfterSseMessage(
js.SetValue(""), js.SetValue(""),
), ),
) )
@ -138,6 +139,8 @@ func Form() *h.Element {
return h.Div( return h.Div(
h.Class("flex gap-4 items-center"), h.Class("flex gap-4 items-center"),
h.Form( h.Form(
h.NoSwap(),
h.PostPartial(partials.SendMessage),
h.Attribute("ws-send", ""), h.Attribute("ws-send", ""),
h.Class("flex flex-grow"), h.Class("flex flex-grow"),
MessageInput(), MessageInput(),

View file

@ -0,0 +1,35 @@
package partials
import (
"chat/components"
"chat/ws"
"github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service"
)
func SendMessage(ctx *h.RequestContext) *h.Partial {
locator := ctx.ServiceLocator()
socketManager := service.Get[ws.SocketManager](locator)
sessionCookie, err := ctx.Request.Cookie("session_id")
if err != nil {
return h.SwapPartial(ctx, components.FormError("Session not found"))
}
message := ctx.Request.FormValue("message")
if message == "" {
return h.SwapPartial(ctx, components.FormError("Message is required"))
}
if len(message) > 200 {
return h.SwapPartial(ctx, components.FormError("Message is too long"))
}
socketManager.OnMessage(sessionCookie.Value, map[string]any{
"message": message,
})
return h.EmptyPartial()
}

View file

@ -1,14 +1,13 @@
package ws package ws
import ( import (
"context" "fmt"
"github.com/coder/websocket"
"github.com/coder/websocket/wsjson"
"github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5"
"github.com/maddalax/htmgo/framework/h" "github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service" "github.com/maddalax/htmgo/framework/service"
"log/slog" "log/slog"
"net/http" "net/http"
"time"
) )
func Handle() http.HandlerFunc { func Handle() http.HandlerFunc {
@ -17,18 +16,8 @@ func Handle() http.HandlerFunc {
sessionCookie, _ := r.Cookie("session_id") sessionCookie, _ := r.Cookie("session_id")
c, err := websocket.Accept(w, r, nil)
// 2 mb
c.SetReadLimit(2 * 1024 * 1024)
if err != nil {
return
}
if sessionCookie == nil { if sessionCookie == nil {
slog.Error("session cookie not found") slog.Error("session cookie not found")
c.Close(websocket.StatusPolicyViolation, "no session")
return return
} }
@ -41,27 +30,48 @@ func Handle() http.HandlerFunc {
if roomId == "" { if roomId == "" {
slog.Error("invalid room", slog.String("room_id", roomId)) slog.Error("invalid room", slog.String("room_id", roomId))
manager.CloseWithError(sessionId, websocket.StatusPolicyViolation, "invalid room") manager.CloseWithError(sessionId, 1008, "invalid room")
return return
} }
manager.Add(roomId, sessionId, c) done := make(chan CloseEvent, 50)
flush := make(chan bool, 50)
manager.Add(roomId, sessionId, w, done, flush)
defer func() { defer func() {
manager.Disconnect(sessionId) manager.Disconnect(sessionId)
}() }()
// Set the necessary headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*") // Optional for CORS
// Flush the headers immediately
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for { for {
var v map[string]any select {
err = wsjson.Read(context.Background(), c, &v) case <-ticker.C:
if err != nil { manager.Ping(sessionId)
manager.CloseWithError(sessionId, websocket.StatusInternalError, "failed to read message") case <-flush:
if flusher != nil {
flusher.Flush()
}
case <-done: // Client closed the connection
fmt.Println("Client disconnected")
return return
} }
if v != nil {
manager.OnMessage(sessionId, v)
}
} }
} }
} }

View file

@ -1,9 +1,9 @@
package ws package ws
import ( import (
"context" "fmt"
"github.com/coder/websocket"
"github.com/puzpuzpuz/xsync/v3" "github.com/puzpuzpuz/xsync/v3"
"net/http"
) )
type EventType string type EventType string
@ -21,10 +21,17 @@ type SocketEvent struct {
Payload map[string]any Payload map[string]any
} }
type CloseEvent struct {
Code int
Reason string
}
type SocketConnection struct { type SocketConnection struct {
Id string Id string
Conn *websocket.Conn Writer http.ResponseWriter
RoomId string RoomId string
Done chan CloseEvent
Flush chan bool
} }
type SocketManager struct { type SocketManager struct {
@ -77,7 +84,7 @@ func (manager *SocketManager) OnMessage(id string, message map[string]any) {
}) })
} }
func (manager *SocketManager) Add(roomId string, id string, conn *websocket.Conn) { func (manager *SocketManager) Add(roomId string, id string, writer http.ResponseWriter, done chan CloseEvent, flush chan bool) {
manager.idToRoom.Store(id, roomId) manager.idToRoom.Store(id, roomId)
sockets, ok := manager.sockets.LoadOrCompute(roomId, func() *xsync.MapOf[string, SocketConnection] { sockets, ok := manager.sockets.LoadOrCompute(roomId, func() *xsync.MapOf[string, SocketConnection] {
@ -86,8 +93,10 @@ func (manager *SocketManager) Add(roomId string, id string, conn *websocket.Conn
sockets.Store(id, SocketConnection{ sockets.Store(id, SocketConnection{
Id: id, Id: id,
Conn: conn, Writer: writer,
RoomId: roomId, RoomId: roomId,
Done: done,
Flush: flush,
}) })
s, ok := sockets.Load(id) s, ok := sockets.Load(id)
@ -117,11 +126,14 @@ func (manager *SocketManager) OnClose(id string) {
manager.sockets.Delete(id) manager.sockets.Delete(id)
} }
func (manager *SocketManager) CloseWithError(id string, code websocket.StatusCode, message string) { func (manager *SocketManager) CloseWithError(id string, code int, message string) {
conn := manager.Get(id) conn := manager.Get(id)
if conn != nil { if conn != nil {
go manager.OnClose(id) go manager.OnClose(id)
conn.Conn.Close(code, message) conn.Done <- CloseEvent{
Code: code,
Reason: message,
}
} }
} }
@ -129,7 +141,10 @@ func (manager *SocketManager) Disconnect(id string) {
conn := manager.Get(id) conn := manager.Get(id)
if conn != nil { if conn != nil {
go manager.OnClose(id) go manager.OnClose(id)
_ = conn.Conn.CloseNow() conn.Done <- CloseEvent{
Code: -1,
Reason: "",
}
} }
} }
@ -146,8 +161,30 @@ func (manager *SocketManager) Get(id string) *SocketConnection {
return &conn return &conn
} }
func (manager *SocketManager) Broadcast(roomId string, message []byte, messageType websocket.MessageType, predicate func(conn SocketConnection) bool) { func (manager *SocketManager) Ping(id string) {
ctx := context.Background() conn := manager.Get(id)
if conn != nil {
manager.writeText(*conn, "ping", "")
}
}
func (manager *SocketManager) writeText(socket SocketConnection, event string, message string) {
if socket.Writer == nil {
return
}
var err error
if event != "" {
_, err = fmt.Fprintf(socket.Writer, "event: %s\ndata: %s\n\n", event, message)
} else {
_, err = fmt.Fprintf(socket.Writer, "data: %s\n\n", message)
}
if err != nil {
manager.CloseWithError(socket.Id, 1008, "failed to write message")
}
socket.Flush <- true
}
func (manager *SocketManager) BroadcastText(roomId string, message string, predicate func(conn SocketConnection) bool) {
sockets, ok := manager.sockets.Load(roomId) sockets, ok := manager.sockets.Load(roomId)
if !ok { if !ok {
@ -156,19 +193,15 @@ func (manager *SocketManager) Broadcast(roomId string, message []byte, messageTy
sockets.Range(func(id string, conn SocketConnection) bool { sockets.Range(func(id string, conn SocketConnection) bool {
if predicate(conn) { if predicate(conn) {
conn.Conn.Write(ctx, messageType, message) manager.writeText(conn, "", message)
} }
return true return true
}) })
} }
func (manager *SocketManager) BroadcastText(roomId string, message string, predicate func(conn SocketConnection) bool) {
manager.Broadcast(roomId, []byte(message), websocket.MessageText, predicate)
}
func (manager *SocketManager) SendText(id string, message string) { func (manager *SocketManager) SendText(id string, message string) {
conn := manager.Get(id) conn := manager.Get(id)
if conn != nil { if conn != nil {
_ = conn.Conn.Write(context.Background(), websocket.MessageText, []byte(message)) manager.writeText(*conn, "", message)
} }
} }

File diff suppressed because one or more lines are too long

View file

@ -6,7 +6,7 @@ import "./htmxextensions/response-targets";
import "./htmxextensions/mutation-error"; import "./htmxextensions/mutation-error";
import "./htmxextensions/livereload" import "./htmxextensions/livereload"
import "./htmxextensions/htmgo"; import "./htmxextensions/htmgo";
import "./htmxextensions/ws" import "./htmxextensions/sse"
function watchUrl(callback: (oldUrl: string, newUrl: string) => void) { function watchUrl(callback: (oldUrl: string, newUrl: string) => void) {
let lastUrl = window.location.href; let lastUrl = window.location.href;

View file

@ -0,0 +1,64 @@
import htmx from 'htmx.org'
let api : any = null;
let processed = new Set<string>()
htmx.defineExtension("sse", {
init: function (apiRef) {
api = apiRef;
},
// @ts-ignore
onEvent: function (name, evt) {
const target = evt.target;
if(!(target instanceof HTMLElement)) {
return
}
if(name === 'htmx:beforeProcessNode') {
const elements = document.querySelectorAll('[sse-connect]');
for (let element of Array.from(elements)) {
const url = element.getAttribute("sse-connect")!;
if(url && !processed.has(url)) {
connectEventSource(element, url)
processed.add(url)
}
}
}
}
})
function connectEventSource(ele: Element, url: string) {
if(!url) {
return
}
console.info('Connecting to EventSource', url)
const eventSource = new EventSource(url);
eventSource.onopen = function(event) {
console.log('EventSource open:', event);
htmx.trigger(ele, "htmx:sseOpen", {event: event});
}
eventSource.onerror = function(event) {
htmx.trigger(ele, "htmx:sseError", {event: event});
if (eventSource.readyState == EventSource.CLOSED) {
htmx.trigger(ele, "htmx:sseClose", {event: event});
}
}
eventSource.onmessage = function(event) {
console.log('EventSource message:', event.data);
htmx.trigger(ele, "htmx:sseBeforeMessage", {event: event});
const response = event.data
const fragment = api.makeFragment(response) as DocumentFragment;
const children = Array.from(fragment.children);
for (let child of children) {
api.oobSwap(api.getAttributeValue(child, 'hx-swap-oob') || 'true', child, {tasks: []});
// support htmgo eval__ scripts
if(child.tagName === 'SCRIPT' && child.id.startsWith("__eval")) {
document.body.appendChild(child);
}
}
htmx.trigger(ele, "htmx:sseAfterMessage", {event: event});
}
}

View file

@ -1,418 +0,0 @@
import htmx from 'htmx.org'
import {removeAssociatedScripts} from "./htmgo";
interface HtmxInternalApi {
getInternalData(elt: Element): any;
bodyContains(elt: Element): boolean;
getAttributeValue(elt: Element, name: string): string | null;
triggerEvent(elt: Element, name: string, detail?: any): boolean;
withExtensions(elt: Element, callback: (extension: any) => void): void;
makeSettleInfo(elt: Element): any;
makeFragment(html: string): DocumentFragment;
oobSwap(swapStyle: string, fragment: Element, settleInfo: any): void;
settleImmediately(tasks: any): void;
getClosestMatch(elt: Element, condition: (node: Element) => boolean): Element | null;
getTriggerSpecs(elt: Element): any[];
addTriggerHandler(elt: Element, triggerSpec: any, nodeData: any, handler: (elt: Element, evt: Event) => void): void;
getHeaders(elt: Element, target: Element): any;
getTarget(elt: Element): Element;
getInputValues(elt: Element, verb: string): { errors: any[]; values: any };
getExpressionVars(elt: Element): any;
mergeObjects(obj1: any, obj2: any): any;
filterValues(values: any, elt: Element): any;
triggerErrorEvent(elt?: Element, name?: string, detail?: any): void;
hasAttribute(elt: Element, name: string): boolean;
shouldCancel(evt: Event, elt: Element): boolean;
[key: string]: any;
}
interface WebSocketWrapper {
socket: WebSocket;
events : { [key: string]: ((event: Event) => void)[] };
messageQueue: { message: string; sendElt: Element | null }[];
retryCount: number;
sendImmediately(message: string, sendElt: Element | null): void;
send(message: string, sendElt: Element | null): void;
addEventListener(event: string, handler: (event: Event) => void): void;
handleQueuedMessages(): void;
init(): void;
close(): void;
publicInterface: {
send: (message: string, sendElt: Element | null) => void;
sendImmediately: (message: string, sendElt: Element | null) => void;
queue: { message: string; sendElt: Element | null }[];
};
}
let api: HtmxInternalApi;
function splitOnWhitespace(trigger: string): string[] {
return trigger.trim().split(/\s+/);
}
function getLegacyWebsocketURL(elt: Element): string | undefined {
const legacySSEValue = api.getAttributeValue(elt, 'hx-ws');
if (legacySSEValue) {
const values = splitOnWhitespace(legacySSEValue);
for (let i = 0; i < values.length; i++) {
const value = values[i].split(/:(.+)/);
if (value[0] === 'connect') {
return value[1];
}
}
}
return undefined;
}
function ensureWebSocket(socketElt: HTMLElement): void {
// If the element containing the WebSocket connection no longer exists, then
// do not connect/reconnect the WebSocket.
if (!api.bodyContains(socketElt)) {
return;
}
// Get the source straight from the element's value
let wssSource = api.getAttributeValue(socketElt, 'ws-connect');
if (wssSource == null || wssSource === '') {
const legacySource = getLegacyWebsocketURL(socketElt);
if (legacySource == null) {
return;
} else {
wssSource = legacySource;
}
}
// Guarantee that the wssSource value is a fully qualified URL
if (wssSource.indexOf('/') === 0) {
const base_part = location.hostname + (location.port ? ':' + location.port : '');
if (location.protocol === 'https:') {
wssSource = 'wss://' + base_part + wssSource;
} else if (location.protocol === 'http:') {
wssSource = 'ws://' + base_part + wssSource;
}
}
const socketWrapper = createWebsocketWrapper(socketElt, () => createWebSocket(wssSource));
socketWrapper.addEventListener('message', (event) => {
if (maybeCloseWebSocketSource(socketElt)) {
return;
}
let response = (event as MessageEvent).data;
if (
!api.triggerEvent(socketElt, 'htmx:wsBeforeMessage', {
message: response,
socketWrapper: socketWrapper.publicInterface,
})
) {
return;
}
api.withExtensions(socketElt, (extension) => {
response = extension.transformResponse(response, null, socketElt);
});
const settleInfo = api.makeSettleInfo(socketElt);
const fragment = api.makeFragment(response);
if (fragment.children.length) {
const children = Array.from(fragment.children);
for (let i = 0; i < children.length; i++) {
const child = children[i]
api.oobSwap(api.getAttributeValue(child, 'hx-swap-oob') || 'true', children[i], settleInfo);
// support htmgo eval__ scripts
if(child.tagName === 'SCRIPT' && child.id.startsWith("__eval")) {
document.body.appendChild(child);
}
}
}
api.settleImmediately(settleInfo.tasks);
api.triggerEvent(socketElt, 'htmx:wsAfterMessage', {
message: response,
socketWrapper: socketWrapper.publicInterface,
});
});
// Put the WebSocket into the HTML Element's custom data.
api.getInternalData(socketElt).webSocket = socketWrapper;
}
function createWebsocketWrapper(socketElt: HTMLElement, socketFunc: () => WebSocket): WebSocketWrapper {
const wrapper: WebSocketWrapper = {
socket: null as unknown as WebSocket,
messageQueue: [],
retryCount: 0,
events: {} as { [key: string]: ((event: Event) => void)[] },
addEventListener(event: string, handler: (event: Event) => void) {
if (this.socket) {
this.socket.addEventListener(event, handler);
}
if (!this.events[event]) {
this.events[event] = [];
}
this.events[event].push(handler);
},
sendImmediately(message: string, sendElt: Element | null) {
if (!this.socket) {
api.triggerErrorEvent(socketElt, 'htmx:wsError', { error: 'No socket available' });
}
if (
!sendElt ||
api.triggerEvent(sendElt, 'htmx:wsBeforeSend', {
message,
socketWrapper: this.publicInterface,
})
) {
this.socket.send(message);
if (sendElt) {
api.triggerEvent(sendElt, 'htmx:wsAfterSend', {
message,
socketWrapper: this.publicInterface,
});
}
}
},
send(message: string, sendElt: Element | null) {
if (this.socket.readyState !== this.socket.OPEN) {
this.messageQueue.push({ message, sendElt });
} else {
this.sendImmediately(message, sendElt);
}
},
handleQueuedMessages() {
while (this.messageQueue.length > 0) {
const queuedItem = this.messageQueue[0];
if (this.socket.readyState === this.socket.OPEN) {
this.sendImmediately(queuedItem.message, queuedItem.sendElt);
this.messageQueue.shift();
} else {
break;
}
}
},
init() {
if (this.socket && this.socket.readyState === this.socket.OPEN) {
// Close discarded socket
this.socket.close();
}
// Create a new WebSocket and event handlers
const socket = socketFunc();
// The event.type detail is added for interface conformance with the
// other two lifecycle events (open and close) so a single handler method
// can handle them polymorphically, if required.
api.triggerEvent(socketElt, 'htmx:wsConnecting', { event: { type: 'connecting' } });
this.socket = socket;
socket.onopen = (e) => {
this.retryCount = 0;
api.triggerEvent(socketElt, 'htmx:wsOpen', { event: e, socketWrapper: this.publicInterface });
this.handleQueuedMessages();
};
socket.onclose = (e) => {
// If socket should not be connected, stop further attempts to establish connection
// If Abnormal Closure/Service Restart/Try Again Later, then set a timer to reconnect after a pause.
if (!maybeCloseWebSocketSource(socketElt) && [1006, 1012, 1013].indexOf(e.code) >= 0) {
const delay = getWebSocketReconnectDelay(this.retryCount);
setTimeout(() => {
this.retryCount += 1;
this.init();
}, delay);
}
// Notify client code that connection has been closed. Client code can inspect `event` field
// to determine whether closure has been valid or abnormal
api.triggerEvent(socketElt, 'htmx:wsClose', { event: e, socketWrapper: this.publicInterface });
};
socket.onerror = (e) => {
api.triggerErrorEvent(socketElt, 'htmx:wsError', { error: e, socketWrapper: this });
maybeCloseWebSocketSource(socketElt);
};
const events = this.events;
Object.keys(events).forEach((k) => {
events[k].forEach((e) => {
socket.addEventListener(k, e);
});
});
},
close() {
this.socket.close();
},
publicInterface: {} as any,
};
wrapper.init();
wrapper.publicInterface = {
send: wrapper.send.bind(wrapper),
sendImmediately: wrapper.sendImmediately.bind(wrapper),
queue: wrapper.messageQueue,
};
return wrapper;
}
function ensureWebSocketSend(elt: HTMLElement): void {
const legacyAttribute = api.getAttributeValue(elt, 'hx-ws');
if (legacyAttribute && legacyAttribute !== 'send') {
return;
}
const webSocketParent = api.getClosestMatch(elt, (node) => {
return hasWebSocket(node as HTMLElement);
});
if (webSocketParent) {
processWebSocketSend(webSocketParent as HTMLElement, elt);
}
}
function hasWebSocket(node: HTMLElement): boolean {
return api.getInternalData(node).webSocket != null;
}
function processWebSocketSend(socketElt: HTMLElement, sendElt: HTMLElement): void {
const nodeData = api.getInternalData(sendElt);
const triggerSpecs = api.getTriggerSpecs(sendElt);
triggerSpecs.forEach((ts) => {
api.addTriggerHandler(sendElt, ts, nodeData, (elt: Element, evt: Event) => {
if (maybeCloseWebSocketSource(socketElt)) {
return;
}
const socketWrapper: WebSocketWrapper = api.getInternalData(socketElt).webSocket;
const headers = api.getHeaders(sendElt, api.getTarget(sendElt));
const results = api.getInputValues(sendElt, 'post');
const errors = results.errors;
const rawParameters = Object.assign({}, results.values);
const expressionVars = api.getExpressionVars(sendElt);
const allParameters = api.mergeObjects(rawParameters, expressionVars);
const filteredParameters = api.filterValues(allParameters, sendElt);
const sendConfig = {
parameters: filteredParameters,
unfilteredParameters: allParameters,
headers,
errors,
triggeringEvent: evt,
messageBody: undefined as string | undefined,
socketWrapper: socketWrapper.publicInterface,
};
if (!api.triggerEvent(elt, 'htmx:wsConfigSend', sendConfig)) {
return;
}
if (errors && errors.length > 0) {
api.triggerEvent(elt, 'htmx:validation:halted', errors);
return;
}
let body = sendConfig.messageBody;
if (body === undefined) {
const toSend = Object.assign({}, sendConfig.parameters);
if (sendConfig.headers) {
toSend.HEADERS = headers;
}
body = JSON.stringify(toSend);
}
socketWrapper.send(body, elt as Element);
if (evt && api.shouldCancel(evt, elt as Element)) {
evt.preventDefault();
}
});
});
}
function getWebSocketReconnectDelay(retryCount: number): number {
const exp = Math.min(retryCount, 6);
const maxDelay = 1000 * Math.pow(2, exp);
return maxDelay * Math.random();
}
function maybeCloseWebSocketSource(elt: HTMLElement): boolean {
if (!api.bodyContains(elt)) {
api.getInternalData(elt).webSocket.close();
return true;
}
return false;
}
function createWebSocket(url: string): WebSocket {
const sock = new WebSocket(url, []);
sock.binaryType = (htmx.config.wsBinaryType || 'blob') as unknown as BinaryType;
return sock;
}
function queryAttributeOnThisOrChildren(elt: HTMLElement, attributeName: string): HTMLElement[] {
const result: HTMLElement[] = [];
// If the parent element also contains the requested attribute, then add it to the results too.
if (api.hasAttribute(elt, attributeName) || api.hasAttribute(elt, 'hx-ws')) {
result.push(elt);
}
// Search all child nodes that match the requested attribute
elt.querySelectorAll('[' + attributeName + '], [data-' + attributeName + '], [data-hx-ws], [hx-ws]').forEach((node) => {
result.push(node as HTMLElement);
});
return result;
}
function forEach<T>(arr: T[], func: (item: T) => void): void {
if (arr) {
arr.forEach(func);
}
}
htmx.defineExtension('ws', {
init: (apiRef: HtmxInternalApi) => {
// Store reference to internal API
api = apiRef;
},
// @ts-ignore
onEvent: (name: string, evt: Event) => {
const parent: Element = evt.target as Element || (evt as CustomEvent).detail.elt;
if(!(parent instanceof HTMLElement)) {
return
}
switch (name) {
// Try to close the socket when elements are removed
case 'htmx:beforeCleanupElement':
removeAssociatedScripts(parent);
const internalData = api.getInternalData(parent);
if (internalData.webSocket) {
internalData.webSocket.close();
}
return;
// Try to create websockets when elements are processed
case 'htmx:beforeProcessNode':
if(parent.hasAttribute("ws-connect")) {
ensureWebSocket(parent as HTMLElement);
}
if(parent.hasAttribute("ws-send")) {
ensureWebSocketSend(parent as HTMLElement);
}
}
},
});

View file

@ -53,6 +53,10 @@ func NewAttributeMap(pairs ...string) *AttributeMapOrdered {
return &AttributeMapOrdered{data: m} return &AttributeMapOrdered{data: m}
} }
func NoSwap() *AttributeR {
return Attribute("hx-swap", "none")
}
func Attribute(key string, value string) *AttributeR { func Attribute(key string, value string) *AttributeR {
return &AttributeR{ return &AttributeR{
Name: key, Name: key,

View file

@ -68,6 +68,10 @@ func SwapPartial(ctx *RequestContext, swap *Element) *Partial {
SwapMany(ctx, swap)) SwapMany(ctx, swap))
} }
func EmptyPartial() *Partial {
return NewPartial(Fragment())
}
func SwapManyPartial(ctx *RequestContext, swaps ...*Element) *Partial { func SwapManyPartial(ctx *RequestContext, swaps ...*Element) *Partial {
return NewPartial( return NewPartial(
SwapMany(ctx, swaps...), SwapMany(ctx, swaps...),

View file

@ -3,7 +3,7 @@ package h
import "strings" import "strings"
func BaseExtensions() string { func BaseExtensions() string {
extensions := []string{"path-deps", "response-targets", "mutation-error", "htmgo"} extensions := []string{"path-deps", "response-targets", "mutation-error", "htmgo", "sse"}
if IsDevelopment() { if IsDevelopment() {
extensions = append(extensions, "livereload") extensions = append(extensions, "livereload")
} }

View file

@ -60,16 +60,6 @@ func (l *LifeCycle) HxBeforeRequest(cmd ...Command) *LifeCycle {
return l return l
} }
func (l *LifeCycle) HxBeforeWsSend(cmd ...Command) *LifeCycle {
l.OnEvent(hx.BeforeWsSendEvent, cmd...)
return l
}
func (l *LifeCycle) HxAfterWsSend(cmd ...Command) *LifeCycle {
l.OnEvent(hx.AfterWsSendEvent, cmd...)
return l
}
func HxOnLoad(cmd ...Command) *LifeCycle { func HxOnLoad(cmd ...Command) *LifeCycle {
return NewLifeCycle().OnEvent(hx.LoadEvent, cmd...) return NewLifeCycle().OnEvent(hx.LoadEvent, cmd...)
} }
@ -86,40 +76,32 @@ func OnEvent(event hx.Event, cmd ...Command) *LifeCycle {
return NewLifeCycle().OnEvent(event, cmd...) return NewLifeCycle().OnEvent(event, cmd...)
} }
func HxBeforeWsMessage(cmd ...Command) *LifeCycle { func HxBeforeSseMessage(cmd ...Command) *LifeCycle {
return NewLifeCycle().OnEvent(hx.WsBeforeMessageEvent, cmd...) return NewLifeCycle().OnEvent(hx.SseBeforeMessageEvent, cmd...)
} }
func HxAfterWsMessage(cmd ...Command) *LifeCycle { func HxAfterSseMessage(cmd ...Command) *LifeCycle {
return NewLifeCycle().OnEvent(hx.WsAfterMessageEvent, cmd...) return NewLifeCycle().OnEvent(hx.SseAfterMessageEvent, cmd...)
} }
func OnSubmit(cmd ...Command) *LifeCycle { func OnSubmit(cmd ...Command) *LifeCycle {
return NewLifeCycle().OnEvent(hx.SubmitEvent, cmd...) return NewLifeCycle().OnEvent(hx.SubmitEvent, cmd...)
} }
func HxOnWsError(cmd ...Command) *LifeCycle { func HxOnSseError(cmd ...Command) *LifeCycle {
return NewLifeCycle().OnEvent(hx.WsErrorEvent, cmd...) return NewLifeCycle().OnEvent(hx.SseErrorEvent, cmd...)
} }
func HxOnWsClose(cmd ...Command) *LifeCycle { func HxOnSseClose(cmd ...Command) *LifeCycle {
return NewLifeCycle().OnEvent(hx.WsClosedEvent, cmd...) return NewLifeCycle().OnEvent(hx.SseClosedEvent, cmd...)
} }
func HxOnWsConnecting(cmd ...Command) *LifeCycle { func HxOnSseConnecting(cmd ...Command) *LifeCycle {
return NewLifeCycle().OnEvent(hx.WsConnectingEvent, cmd...) return NewLifeCycle().OnEvent(hx.SseConnectingEvent, cmd...)
} }
func HxOnWsOpen(cmd ...Command) *LifeCycle { func HxOnSseOpen(cmd ...Command) *LifeCycle {
return NewLifeCycle().OnEvent(hx.WsConnectedEvent, cmd...) return NewLifeCycle().OnEvent(hx.SseConnectedEvent, cmd...)
}
func HxBeforeWsSend(cmd ...Command) *LifeCycle {
return NewLifeCycle().HxBeforeWsSend(cmd...)
}
func HxAfterWsSend(cmd ...Command) *LifeCycle {
return NewLifeCycle().HxAfterWsSend(cmd...)
} }
func HxBeforeRequest(cmd ...Command) *LifeCycle { func HxBeforeRequest(cmd ...Command) *LifeCycle {

View file

@ -1,6 +1,9 @@
package h package h
import "github.com/maddalax/htmgo/framework/hx" import (
"github.com/maddalax/htmgo/framework/hx"
"strings"
)
func Get(path string, trigger ...string) *AttributeMapOrdered { func Get(path string, trigger ...string) *AttributeMapOrdered {
return AttributeList(Attribute(hx.GetAttr, path), HxTriggerString(trigger...)) return AttributeList(Attribute(hx.GetAttr, path), HxTriggerString(trigger...))
@ -19,10 +22,18 @@ func GetWithQs(path string, qs *Qs, trigger string) *AttributeMapOrdered {
} }
func PostPartial(partial PartialFunc, triggers ...string) *AttributeMapOrdered { func PostPartial(partial PartialFunc, triggers ...string) *AttributeMapOrdered {
return Post(GetPartialPath(partial), triggers...) path := GetPartialPath(partial)
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
return Post(path, triggers...)
} }
func PostPartialWithQs(partial PartialFunc, qs *Qs, trigger ...string) *AttributeMapOrdered { func PostPartialWithQs(partial PartialFunc, qs *Qs, trigger ...string) *AttributeMapOrdered {
path := GetPartialPathWithQs(partial, qs)
if !strings.HasPrefix(path, "/") {
path = "/" + path
}
return Post(GetPartialPathWithQs(partial, qs), trigger...) return Post(GetPartialPathWithQs(partial, qs), trigger...)
} }

View file

@ -108,14 +108,12 @@ const (
XhrLoadEndEvent Event = "htmx:xhr:loadend" XhrLoadEndEvent Event = "htmx:xhr:loadend"
XhrLoadStartEvent Event = "htmx:xhr:loadstart" XhrLoadStartEvent Event = "htmx:xhr:loadstart"
XhrProgressEvent Event = "htmx:xhr:progress" XhrProgressEvent Event = "htmx:xhr:progress"
BeforeWsSendEvent Event = "htmx:wsBeforeSend" SseConnectedEvent Event = "htmx:sseOpen"
AfterWsSendEvent Event = "htmx:wsAfterSend" SseConnectingEvent Event = "htmx:sseConnecting"
WsConnectedEvent Event = "htmx:wsOpen" SseClosedEvent Event = "htmx:sseClose"
WsConnectingEvent Event = "htmx:wsConnecting" SseErrorEvent Event = "htmx:sseError"
WsClosedEvent Event = "htmx:wsClose" SseBeforeMessageEvent Event = "htmx:sseBeforeMessage"
WsErrorEvent Event = "htmx:wsError" SseAfterMessageEvent Event = "htmx:sseAfterMessage"
WsBeforeMessageEvent Event = "htmx:wsBeforeMessage"
WsAfterMessageEvent Event = "htmx:wsAfterMessage"
// RevealedEvent Misc Events // RevealedEvent Misc Events
RevealedEvent Event = "revealed" RevealedEvent Event = "revealed"