htmgo/examples/chat/sse/handler.go

113 lines
2.3 KiB
Go
Raw Normal View History

2024-10-04 16:15:57 +00:00
package sse
import (
2024-10-02 03:26:03 +00:00
"fmt"
2024-10-01 03:08:52 +00:00
"github.com/go-chi/chi/v5"
"github.com/maddalax/htmgo/framework/h"
"github.com/maddalax/htmgo/framework/service"
2024-10-01 17:09:22 +00:00
"log/slog"
"net/http"
"sync"
2024-10-02 03:26:03 +00:00
"time"
)
func Handle() http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
2024-10-02 04:04:04 +00:00
// Set the necessary headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*") // Optional for CORS
cc := r.Context().Value(h.RequestContextKey).(*h.RequestContext)
2024-10-02 04:04:04 +00:00
locator := cc.ServiceLocator()
manager := service.Get[SocketManager](locator)
2024-10-01 03:08:52 +00:00
2024-10-01 17:09:22 +00:00
sessionCookie, _ := r.Cookie("session_id")
sessionId := ""
2024-10-01 03:08:52 +00:00
if sessionCookie != nil {
sessionId = sessionCookie.Value
2024-10-01 17:09:22 +00:00
}
2024-10-01 03:08:52 +00:00
ctx := r.Context()
2024-10-02 17:10:04 +00:00
/*
Large buffer in case the client disconnects while we are writing
we don't want to block the writer
*/
done := make(chan bool, 1000)
writer := make(WriterChan, 1000)
2024-10-01 03:08:52 +00:00
wg := sync.WaitGroup{}
wg.Add(1)
2024-10-01 03:08:52 +00:00
/*
* This goroutine is responsible for writing messages to the client
*/
go func() {
defer wg.Done()
defer manager.Disconnect(sessionId)
2024-10-02 03:26:03 +00:00
2024-10-02 17:10:04 +00:00
defer func() {
fmt.Printf("empting channels\n")
for len(writer) > 0 {
<-writer
}
for len(done) > 0 {
<-done
}
}()
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
2024-10-02 16:06:00 +00:00
case <-done:
fmt.Printf("closing connection: \n")
return
case <-ticker.C:
manager.Ping(sessionId)
case message := <-writer:
_, err := fmt.Fprintf(w, message)
if err != nil {
2024-10-02 16:06:00 +00:00
done <- true
} else {
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
}
}
}
}
}()
/**
* This goroutine is responsible for adding the client to the room
*/
wg.Add(1)
go func() {
defer wg.Done()
if sessionId == "" {
manager.writeCloseRaw(writer, "no session")
return
}
2024-10-02 03:26:03 +00:00
roomId := chi.URLParam(r, "id")
2024-10-02 03:26:03 +00:00
if roomId == "" {
slog.Error("invalid room", slog.String("room_id", roomId))
manager.writeCloseRaw(writer, "invalid room")
return
}
manager.Add(roomId, sessionId, writer, done)
}()
wg.Wait()
}
}