fix issues with channel buffer size
This commit is contained in:
parent
d739ef3758
commit
6ea3f77b62
4 changed files with 53 additions and 10 deletions
|
|
@ -33,11 +33,11 @@ func (m *Manager) StartListener() {
|
|||
case event := <-c:
|
||||
switch event.Type {
|
||||
case ws.ConnectedEvent:
|
||||
go m.OnConnected(event)
|
||||
m.OnConnected(event)
|
||||
case ws.DisconnectedEvent:
|
||||
go m.OnDisconnected(event)
|
||||
m.OnDisconnected(event)
|
||||
case ws.MessageEvent:
|
||||
go m.onMessage(event)
|
||||
m.onMessage(event)
|
||||
default:
|
||||
fmt.Printf("Unknown event type: %s\n", event.Type)
|
||||
}
|
||||
|
|
|
|||
25
examples/chat/internal/routine/goroutine.go
Normal file
25
examples/chat/internal/routine/goroutine.go
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
package routine
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func DebugLongRunning(name string, f func()) {
|
||||
now := time.Now()
|
||||
done := make(chan struct{}, 1)
|
||||
go func() {
|
||||
ticker := time.NewTicker(time.Second * 5)
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
case <-ticker.C:
|
||||
elapsed := time.Since(now).Milliseconds()
|
||||
fmt.Printf("function %s has not finished after %dms\n", name, elapsed)
|
||||
}
|
||||
}
|
||||
}()
|
||||
f()
|
||||
done <- struct{}{}
|
||||
}
|
||||
|
|
@ -31,8 +31,13 @@ func Handle() http.HandlerFunc {
|
|||
}
|
||||
|
||||
ctx := r.Context()
|
||||
done := make(chan bool, 1)
|
||||
writer := make(WriterChan, 1)
|
||||
|
||||
/*
|
||||
Large buffer in case the client disconnects while we are writing
|
||||
we don't want to block the writer
|
||||
*/
|
||||
done := make(chan bool, 1000)
|
||||
writer := make(WriterChan, 1000)
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
|
|
@ -44,6 +49,16 @@ func Handle() http.HandlerFunc {
|
|||
defer wg.Done()
|
||||
defer manager.Disconnect(sessionId)
|
||||
|
||||
defer func() {
|
||||
fmt.Printf("empting channels\n")
|
||||
for len(writer) > 0 {
|
||||
<-writer
|
||||
}
|
||||
for len(done) > 0 {
|
||||
<-done
|
||||
}
|
||||
}()
|
||||
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
package ws
|
||||
|
||||
import (
|
||||
"chat/internal/routine"
|
||||
"fmt"
|
||||
"github.com/puzpuzpuz/xsync/v3"
|
||||
"time"
|
||||
|
|
@ -186,11 +187,13 @@ func (manager *SocketManager) writeCloseRaw(writer WriterChan, message string) {
|
|||
}
|
||||
|
||||
func (manager *SocketManager) writeTextRaw(writer WriterChan, event string, message string) {
|
||||
routine.DebugLongRunning("writeTextRaw", func() {
|
||||
if event != "" {
|
||||
writer <- fmt.Sprintf("event: %s\ndata: %s\n\n", event, message)
|
||||
} else {
|
||||
writer <- fmt.Sprintf("data: %s\n\n", message)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func (manager *SocketManager) writeText(socket SocketConnection, event string, message string) {
|
||||
|
|
|
|||
Loading…
Reference in a new issue