diff options
Diffstat (limited to '')
-rw-r--r-- | wsc.go | 41 |
1 files changed, 41 insertions, 0 deletions
@@ -24,6 +24,7 @@ import ( "context" "errors" "fmt" + "log" "sync" "sync/atomic" "time" @@ -53,6 +54,10 @@ func handleConn( session string, userID string, ) (retErr error) { + send := make(chan string, config.Perf.SendQ) + chanPool.Store(userID, &send) + defer chanPool.CompareAndDelete(userID, &send) + reportError := makeReportError(ctx, c) newCtx, newCancel := context.WithCancel(ctx) @@ -235,6 +240,21 @@ func handleConn( errContextCancelled, newCtx.Err(), ) + case sendText := <-send: + select { + case <-newCtx.Done(): + return fmt.Errorf( + "%w: %w", + errContextCancelled, + newCtx.Err(), + ) + default: + } + + err := writeText(newCtx, c, sendText) + if err != nil { + return err + } case courseID := <-usemParent: select { case <-newCtx.Done(): @@ -326,3 +346,24 @@ func handleConn( } var cancelPool sync.Map /* string, *context.CancelFunc */ + +var chanPool sync.Map /* string, *chan string */ + +func propagate(msg string) { + chanPool.Range(func(_userID, _ch interface{}) bool { + ch, ok := _ch.(*chan string) + if !ok { + panic("chanPool has non-\"*chan string\" key") + } + select { + case *ch <- msg: + default: + userID, ok := _userID.(string) + if !ok { + panic("chanPool has non-string key") + } + log.Println("WARNING: SendQ exceeded for " + userID) + } + return true + }) +} |