fixes for error handling
This commit is contained in:
parent
f06feffb9e
commit
5b60b9e915
6 changed files with 48 additions and 32 deletions
|
|
@ -47,14 +47,14 @@ func (m *Manager) OnConnected(e ws.SocketEvent) {
|
||||||
room, _ := m.service.GetRoom(e.RoomId)
|
room, _ := m.service.GetRoom(e.RoomId)
|
||||||
|
|
||||||
if room == nil {
|
if room == nil {
|
||||||
m.socketManager.CloseWithError(e.Id, 1008, "invalid room")
|
m.socketManager.CloseWithMessage(e.Id, "invalid room")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
user, err := m.queries.GetUserBySessionId(context.Background(), e.Id)
|
user, err := m.queries.GetUserBySessionId(context.Background(), e.Id)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
m.socketManager.CloseWithError(e.Id, 1008, "invalid user")
|
m.socketManager.CloseWithMessage(e.Id, "invalid user")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,7 +28,7 @@ func ChatRoom(ctx *h.RequestContext) *h.Page {
|
||||||
|
|
||||||
h.HxOnSseClose(
|
h.HxOnSseClose(
|
||||||
js.EvalJs(fmt.Sprintf(`
|
js.EvalJs(fmt.Sprintf(`
|
||||||
const reason = e.detail.event.reason
|
const reason = e.detail.event.data
|
||||||
if(['invalid room', 'no session', 'invalid user'].includes(reason)) {
|
if(['invalid room', 'no session', 'invalid user'].includes(reason)) {
|
||||||
window.location.href = '/?roomId=%s';
|
window.location.href = '/?roomId=%s';
|
||||||
} else if(e.detail.event.code === 1011) {
|
} else if(e.detail.event.code === 1011) {
|
||||||
|
|
|
||||||
|
|
@ -12,25 +12,32 @@ import (
|
||||||
|
|
||||||
func Handle() http.HandlerFunc {
|
func Handle() http.HandlerFunc {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Set the necessary headers
|
||||||
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
|
w.Header().Set("Connection", "keep-alive")
|
||||||
|
w.Header().Set("Access-Control-Allow-Origin", "*") // Optional for CORS
|
||||||
|
|
||||||
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
|
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
|
||||||
|
locator := cc.ServiceLocator()
|
||||||
|
manager := service.Get[SocketManager](locator)
|
||||||
|
// Flush the headers immediately
|
||||||
|
flusher, ok := w.(http.Flusher)
|
||||||
|
|
||||||
sessionCookie, _ := r.Cookie("session_id")
|
sessionCookie, _ := r.Cookie("session_id")
|
||||||
|
|
||||||
if sessionCookie == nil {
|
if sessionCookie == nil {
|
||||||
slog.Error("session cookie not found")
|
manager.writeCloseRaw(w, flusher, "no session")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
locator := cc.ServiceLocator()
|
|
||||||
manager := service.Get[SocketManager](locator)
|
|
||||||
|
|
||||||
sessionId := sessionCookie.Value
|
sessionId := sessionCookie.Value
|
||||||
|
|
||||||
roomId := chi.URLParam(r, "id")
|
roomId := chi.URLParam(r, "id")
|
||||||
|
|
||||||
if roomId == "" {
|
if roomId == "" {
|
||||||
slog.Error("invalid room", slog.String("room_id", roomId))
|
slog.Error("invalid room", slog.String("room_id", roomId))
|
||||||
manager.CloseWithError(sessionId, 1008, "invalid room")
|
manager.writeCloseRaw(w, flusher, "invalid room")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -43,15 +50,6 @@ func Handle() http.HandlerFunc {
|
||||||
manager.Disconnect(sessionId)
|
manager.Disconnect(sessionId)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// Set the necessary headers
|
|
||||||
w.Header().Set("Content-Type", "text/event-stream")
|
|
||||||
w.Header().Set("Cache-Control", "no-cache")
|
|
||||||
w.Header().Set("Connection", "keep-alive")
|
|
||||||
w.Header().Set("Access-Control-Allow-Origin", "*") // Optional for CORS
|
|
||||||
|
|
||||||
// Flush the headers immediately
|
|
||||||
flusher, ok := w.(http.Flusher)
|
|
||||||
|
|
||||||
if !ok {
|
if !ok {
|
||||||
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
|
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
|
||||||
return
|
return
|
||||||
|
|
|
||||||
|
|
@ -126,12 +126,13 @@ func (manager *SocketManager) OnClose(id string) {
|
||||||
manager.sockets.Delete(id)
|
manager.sockets.Delete(id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (manager *SocketManager) CloseWithError(id string, code int, message string) {
|
func (manager *SocketManager) CloseWithMessage(id string, message string) {
|
||||||
conn := manager.Get(id)
|
conn := manager.Get(id)
|
||||||
if conn != nil {
|
if conn != nil {
|
||||||
go manager.OnClose(id)
|
defer manager.OnClose(id)
|
||||||
|
manager.writeText(*conn, "close", message)
|
||||||
conn.Done <- CloseEvent{
|
conn.Done <- CloseEvent{
|
||||||
Code: code,
|
Code: -1,
|
||||||
Reason: message,
|
Reason: message,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -168,18 +169,33 @@ func (manager *SocketManager) Ping(id string) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) writeCloseRaw(writer http.ResponseWriter, flusher http.Flusher, message string) {
|
||||||
|
err := manager.writeTextRaw(writer, "close", message)
|
||||||
|
if err == nil {
|
||||||
|
flusher.Flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (manager *SocketManager) writeTextRaw(writer http.ResponseWriter, event string, message string) error {
|
||||||
|
if writer == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var err error
|
||||||
|
if event != "" {
|
||||||
|
_, err = fmt.Fprintf(writer, "event: %s\ndata: %s\n\n", event, message)
|
||||||
|
} else {
|
||||||
|
_, err = fmt.Fprintf(writer, "data: %s\n\n", message)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (manager *SocketManager) writeText(socket SocketConnection, event string, message string) {
|
func (manager *SocketManager) writeText(socket SocketConnection, event string, message string) {
|
||||||
if socket.Writer == nil {
|
if socket.Writer == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var err error
|
err := manager.writeTextRaw(socket.Writer, event, message)
|
||||||
if event != "" {
|
if err != nil && event != "close" {
|
||||||
_, err = fmt.Fprintf(socket.Writer, "event: %s\ndata: %s\n\n", event, message)
|
manager.CloseWithMessage(socket.Id, "failed to write message")
|
||||||
} else {
|
|
||||||
_, err = fmt.Fprintf(socket.Writer, "data: %s\n\n", message)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
manager.CloseWithError(socket.Id, 1008, "failed to write message")
|
|
||||||
}
|
}
|
||||||
socket.Flush <- true
|
socket.Flush <- true
|
||||||
}
|
}
|
||||||
|
|
|
||||||
4
framework/assets/dist/htmgo.js
vendored
4
framework/assets/dist/htmgo.js
vendored
File diff suppressed because one or more lines are too long
|
|
@ -39,8 +39,11 @@ function connectEventSource(ele: Element, url: string) {
|
||||||
console.info('Connecting to EventSource', url)
|
console.info('Connecting to EventSource', url)
|
||||||
const eventSource = new EventSource(url);
|
const eventSource = new EventSource(url);
|
||||||
|
|
||||||
|
eventSource.addEventListener("close", function(event) {
|
||||||
|
htmx.trigger(ele, "htmx:sseClose", {event: event});
|
||||||
|
})
|
||||||
|
|
||||||
eventSource.onopen = function(event) {
|
eventSource.onopen = function(event) {
|
||||||
console.log('EventSource open:', event);
|
|
||||||
htmx.trigger(ele, "htmx:sseOpen", {event: event});
|
htmx.trigger(ele, "htmx:sseOpen", {event: event});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -52,7 +55,6 @@ function connectEventSource(ele: Element, url: string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
eventSource.onmessage = function(event) {
|
eventSource.onmessage = function(event) {
|
||||||
console.log('EventSource message:', event.data);
|
|
||||||
htmx.trigger(ele, "htmx:sseBeforeMessage", {event: event});
|
htmx.trigger(ele, "htmx:sseBeforeMessage", {event: event});
|
||||||
const response = event.data
|
const response = event.data
|
||||||
const fragment = api.makeFragment(response) as DocumentFragment;
|
const fragment = api.makeFragment(response) as DocumentFragment;
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue