summaryrefslogtreecommitdiff
path: root/wsc.go
diff options
context:
space:
mode:
Diffstat (limited to 'wsc.go')
-rw-r--r--wsc.go41
1 files changed, 41 insertions, 0 deletions
diff --git a/wsc.go b/wsc.go
index 0bb2c7f..2f00302 100644
--- a/wsc.go
+++ b/wsc.go
@@ -24,6 +24,7 @@ import (
"context"
"errors"
"fmt"
+ "log"
"sync"
"sync/atomic"
"time"
@@ -53,6 +54,10 @@ func handleConn(
session string,
userID string,
) (retErr error) {
+ send := make(chan string, config.Perf.SendQ)
+ chanPool.Store(userID, &send)
+ defer chanPool.CompareAndDelete(userID, &send)
+
reportError := makeReportError(ctx, c)
newCtx, newCancel := context.WithCancel(ctx)
@@ -235,6 +240,21 @@ func handleConn(
errContextCancelled,
newCtx.Err(),
)
+ case sendText := <-send:
+ select {
+ case <-newCtx.Done():
+ return fmt.Errorf(
+ "%w: %w",
+ errContextCancelled,
+ newCtx.Err(),
+ )
+ default:
+ }
+
+ err := writeText(newCtx, c, sendText)
+ if err != nil {
+ return err
+ }
case courseID := <-usemParent:
select {
case <-newCtx.Done():
@@ -326,3 +346,24 @@ func handleConn(
}
var cancelPool sync.Map /* string, *context.CancelFunc */
+
+var chanPool sync.Map /* string, *chan string */
+
+func propagate(msg string) {
+ chanPool.Range(func(_userID, _ch interface{}) bool {
+ ch, ok := _ch.(*chan string)
+ if !ok {
+ panic("chanPool has non-\"*chan string\" key")
+ }
+ select {
+ case *ch <- msg:
+ default:
+ userID, ok := _userID.(string)
+ if !ok {
+ panic("chanPool has non-string key")
+ }
+ log.Println("WARNING: SendQ exceeded for " + userID)
+ }
+ return true
+ })
+}