From 6ea3f77b62c1dcd9dcea59b0f6d07154e397e1f1 Mon Sep 17 00:00:00 2001 From: maddalax Date: Wed, 2 Oct 2024 12:10:04 -0500 Subject: [PATCH] fix issues with channel buffer size --- examples/chat/chat/broadcast.go | 6 ++--- examples/chat/internal/routine/goroutine.go | 25 +++++++++++++++++++++ examples/chat/ws/handler.go | 19 ++++++++++++++-- examples/chat/ws/manager.go | 13 ++++++----- 4 files changed, 53 insertions(+), 10 deletions(-) create mode 100644 examples/chat/internal/routine/goroutine.go diff --git a/examples/chat/chat/broadcast.go b/examples/chat/chat/broadcast.go index 6d68fdf..4c76de4 100644 --- a/examples/chat/chat/broadcast.go +++ b/examples/chat/chat/broadcast.go @@ -33,11 +33,11 @@ func (m *Manager) StartListener() { case event := <-c: switch event.Type { case ws.ConnectedEvent: - go m.OnConnected(event) + m.OnConnected(event) case ws.DisconnectedEvent: - go m.OnDisconnected(event) + m.OnDisconnected(event) case ws.MessageEvent: - go m.onMessage(event) + m.onMessage(event) default: fmt.Printf("Unknown event type: %s\n", event.Type) } diff --git a/examples/chat/internal/routine/goroutine.go b/examples/chat/internal/routine/goroutine.go new file mode 100644 index 0000000..144f68a --- /dev/null +++ b/examples/chat/internal/routine/goroutine.go @@ -0,0 +1,25 @@ +package routine + +import ( + "fmt" + "time" +) + +func DebugLongRunning(name string, f func()) { + now := time.Now() + done := make(chan struct{}, 1) + go func() { + ticker := time.NewTicker(time.Second * 5) + for { + select { + case <-done: + return + case <-ticker.C: + elapsed := time.Since(now).Milliseconds() + fmt.Printf("function %s has not finished after %dms\n", name, elapsed) + } + } + }() + f() + done <- struct{}{} +} diff --git a/examples/chat/ws/handler.go b/examples/chat/ws/handler.go index e6a7060..04a2a8d 100644 --- a/examples/chat/ws/handler.go +++ b/examples/chat/ws/handler.go @@ -31,8 +31,13 @@ func Handle() http.HandlerFunc { } ctx := r.Context() - done := make(chan bool, 1) - writer := make(WriterChan, 1) + + /* + Large buffer in case the client disconnects while we are writing + we don't want to block the writer + */ + done := make(chan bool, 1000) + writer := make(WriterChan, 1000) wg := sync.WaitGroup{} wg.Add(1) @@ -44,6 +49,16 @@ func Handle() http.HandlerFunc { defer wg.Done() defer manager.Disconnect(sessionId) + defer func() { + fmt.Printf("empting channels\n") + for len(writer) > 0 { + <-writer + } + for len(done) > 0 { + <-done + } + }() + ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() diff --git a/examples/chat/ws/manager.go b/examples/chat/ws/manager.go index f4dbb9b..4d653f1 100644 --- a/examples/chat/ws/manager.go +++ b/examples/chat/ws/manager.go @@ -1,6 +1,7 @@ package ws import ( + "chat/internal/routine" "fmt" "github.com/puzpuzpuz/xsync/v3" "time" @@ -186,11 +187,13 @@ func (manager *SocketManager) writeCloseRaw(writer WriterChan, message string) { } func (manager *SocketManager) writeTextRaw(writer WriterChan, event string, message string) { - if event != "" { - writer <- fmt.Sprintf("event: %s\ndata: %s\n\n", event, message) - } else { - writer <- fmt.Sprintf("data: %s\n\n", message) - } + routine.DebugLongRunning("writeTextRaw", func() { + if event != "" { + writer <- fmt.Sprintf("event: %s\ndata: %s\n\n", event, message) + } else { + writer <- fmt.Sprintf("data: %s\n\n", message) + } + }) } func (manager *SocketManager) writeText(socket SocketConnection, event string, message string) {