websocket graceful shutdown and safety queue
[brisk.git] / web / xynt-streaming.js
index 13eeb7b..009bb01 100644 (file)
@@ -16,6 +16,7 @@ function transport_ws(doc, xynt_streaming, page)
     else
         this.name = "WebSocket";
     this.ctx_new = "";
+    this.out_queue = [];
     var self = this;
 
     this.doc = doc;
@@ -25,6 +26,8 @@ function transport_ws(doc, xynt_streaming, page)
         this.xynt_streaming.log("PAGE: "+page);
         this.ws = new WebSocket(page);
         this.ws.onopen = function () {
+            console.log('WS On open');
+
             self.xynt_streaming.log("onopen");
             if (this.readyState == 1) {
                 // connected
@@ -33,14 +36,16 @@ function transport_ws(doc, xynt_streaming, page)
             }
         };
         this.ws.onmessage = function (msg) {
+            console.log('WS On message');
             self.xynt_streaming.log("onmessage");
             // new data in msg.data
             self.ctx_new += msg.data;
         };
         this.ws.onclose = function (msg) {
-            this.onopen  = null;
-            this.onclose = null;
-            this.onerror = null;
+            console.log('WS On close');
+            self.onopen  = null;
+            self.onclose = null;
+            self.onerror = null;
             self.xynt_streaming.log("onclose"+self.init_steps);
             if (self.init_steps == 0)
                 self.ws_cb("error");
@@ -68,6 +73,7 @@ transport_ws.prototype = {
     name: null,
     xynt_streaming: "ready",
     ws: null,
+    out_queue: null,
     stopped: true,
     failed: false,
 
@@ -99,11 +105,56 @@ this.xynt_streaming.log("DEC: "+this.xynt_streaming.transp_fback);
                 }
             }
         }
+        else if (from == "open") {
+            this.flush_out_queue();
+        }
+
         if (this.ws != null && this.ws.readyState > 1) {
            this.stopped = true;
         }
     },
 
+    flush_out_queue: function() {
+        var l_out = this.out_queue.length;
+        if (l_out == 0)
+            return;
+
+        for (var i = 0 ; i < l_out ; i++) {
+            if (this.ws.readyState != 1) {
+                break;
+            }
+            var item = this.out_queue.shift();
+            var sent = true;
+            try {
+                this.ws.send(item);
+            }
+            catch (ex) {
+                this.out_queue.unshift(item);
+                break;
+            }
+        }
+    },
+
+    send: function(msg) {
+        console.log('new send');
+        if (this.ws && this.ws.readyState == 1) {
+            try {
+                console.log('Try send ... ');
+                this.flush_out_queue();
+                this.ws.send(msg);
+                console.log(' ... done');
+            }
+            catch (ex) {
+                console.log(' ... catched exception');
+                this.flush_out.push(msg);
+            }
+        }
+        else {
+            console.log('ws not ready: push into flush_out');
+            this.flush_out.push(msg);
+        }
+    },
+
     ws_abort: function() {
         if (this.ws != null) {
 this.xynt_streaming.log("WSCLOSE");
@@ -934,6 +985,15 @@ xynt_streaming.prototype = {
         return;
     },
 
+    send: function(msg) {
+        if (typeof(this.transp.send) == 'undefined') {
+            this.log('send not implemented for ' + this.transp_type);
+            return;
+        }
+
+        return this.transp.send(msg);
+    },
+
     //
     // moved to xynt-streaming-ifra as push()
     //