websocket graceful shutdown and safety queue
[brisk.git] / web / xynt-streaming.js
index 507490f..009bb01 100644 (file)
@@ -5,16 +5,29 @@
 //
 function transport_ws(doc, xynt_streaming, page)
 {
+    // if four arguments manage if WS or WSS connection
+    if (arguments.length > 3)
+        this.is_secure = arguments[3];
+    else
+        this.is_secure = false;
+
+    if (this.is_secure)
+        this.name = "WebSocketSecure";
+    else
+        this.name = "WebSocket";
     this.ctx_new = "";
+    this.out_queue = [];
     var self = this;
 
     this.doc = doc;
     this.failed = false;
     this.xynt_streaming = xynt_streaming;
     try {
-this.xynt_streaming.log("PAGE: "+page);
+        this.xynt_streaming.log("PAGE: "+page);
         this.ws = new WebSocket(page);
         this.ws.onopen = function () {
+            console.log('WS On open');
+
             self.xynt_streaming.log("onopen");
             if (this.readyState == 1) {
                 // connected
@@ -23,14 +36,16 @@ this.xynt_streaming.log("PAGE: "+page);
             }
         };
         this.ws.onmessage = function (msg) {
+            console.log('WS On message');
             self.xynt_streaming.log("onmessage");
             // new data in msg.data
             self.ctx_new += msg.data;
         };
         this.ws.onclose = function (msg) {
-            this.onopen  = null;
-            this.onclose = null;
-            this.onerror = null;
+            console.log('WS On close');
+            self.onopen  = null;
+            self.onclose = null;
+            self.onerror = null;
             self.xynt_streaming.log("onclose"+self.init_steps);
             if (self.init_steps == 0)
                 self.ws_cb("error");
@@ -55,8 +70,10 @@ this.xynt_streaming.log("PAGE: "+page);
 
 transport_ws.prototype = {
     doc: null,
+    name: null,
     xynt_streaming: "ready",
     ws: null,
+    out_queue: null,
     stopped: true,
     failed: false,
 
@@ -88,11 +105,56 @@ this.xynt_streaming.log("DEC: "+this.xynt_streaming.transp_fback);
                 }
             }
         }
+        else if (from == "open") {
+            this.flush_out_queue();
+        }
+
         if (this.ws != null && this.ws.readyState > 1) {
            this.stopped = true;
         }
     },
 
+    flush_out_queue: function() {
+        var l_out = this.out_queue.length;
+        if (l_out == 0)
+            return;
+
+        for (var i = 0 ; i < l_out ; i++) {
+            if (this.ws.readyState != 1) {
+                break;
+            }
+            var item = this.out_queue.shift();
+            var sent = true;
+            try {
+                this.ws.send(item);
+            }
+            catch (ex) {
+                this.out_queue.unshift(item);
+                break;
+            }
+        }
+    },
+
+    send: function(msg) {
+        console.log('new send');
+        if (this.ws && this.ws.readyState == 1) {
+            try {
+                console.log('Try send ... ');
+                this.flush_out_queue();
+                this.ws.send(msg);
+                console.log(' ... done');
+            }
+            catch (ex) {
+                console.log(' ... catched exception');
+                this.flush_out.push(msg);
+            }
+        }
+        else {
+            console.log('ws not ready: push into flush_out');
+            this.flush_out.push(msg);
+        }
+    },
+
     ws_abort: function() {
         if (this.ws != null) {
 this.xynt_streaming.log("WSCLOSE");
@@ -161,6 +223,7 @@ this.xynt_streaming.log("WSCLOSE");
 //
 function transport_xhr(doc, xynt_streaming, page)
 {
+    this.name = "XHR";
     this.doc = doc;
     this.xynt_streaming = xynt_streaming;
     this.xhr = createXMLHttpRequest();
@@ -175,6 +238,7 @@ function transport_xhr(doc, xynt_streaming, page)
 
 transport_xhr.prototype = {
     doc: null,
+    name: null,
     xynt_streaming: "ready",
     xhr: null,
     stopped: true,
@@ -290,6 +354,7 @@ transport_xhr.prototype = {
 //
 function transport_htmlfile(doc, xynt_streaming, page)
 {
+    this.name = "HTMLFile";
     this.doc = doc;
     this.xynt_streaming = xynt_streaming;
     this.transfdoc = new ActiveXObject("htmlfile");
@@ -304,6 +369,7 @@ function transport_htmlfile(doc, xynt_streaming, page)
 
 transport_htmlfile.prototype = {
     doc: null,
+    name: null,
     xynt_streaming: null,
     stopped: true,
     ifra: null,
@@ -393,6 +459,7 @@ transport_htmlfile.prototype = {
 //
 function transport_iframe(doc, xynt_streaming, page)
 {
+    this.name = "IFRAME";
     this.doc = doc;
     this.xynt_streaming = xynt_streaming;
     this.ifra = doc.createElement("iframe");
@@ -404,6 +471,7 @@ function transport_iframe(doc, xynt_streaming, page)
 
 transport_iframe.prototype = {
     doc: null,
+    name: null,
     xynt_streaming: null,
     stopped: true,
     ifra: null,
@@ -527,6 +595,8 @@ xynt_streaming.prototype = {
     win:               null,
     transp_type:       null,
     transp_port:         80,
+    transp_type_cur:   null,
+    transp_port_cur:     80,
     transp_fback:         0,
     transp:            null,
     console:           null,
@@ -614,24 +684,44 @@ xynt_streaming.prototype = {
         // page arrangement
         this.page = url_complete(this.win.location.href, this.page);
 
+        // DEFAULT TRANSPORT PROTOCOL HERE websocketsec, websocket
         if (this.transp_fback > 0) {
-            transp_type = "websocket";
-            transp_port = (this.transp_fback == 2 ? 80 : 8080);
+            if (location.protocol == 'https:') {
+                transp_type = "websocketsec";
+                transp_port = 443;
+            }
+            else {
+                transp_type = "websocket";
+                transp_port = (this.transp_fback == 2 ? 80 : 8080);
+            }
+
         }
         else {
             transp_type = this.transp_type;
             transp_port = this.transp_port;
         }
 
-        if (transp_type == "websocket") {
-            var end_proto, first_slash;
+        this.transp_type_cur = transp_type;
+        this.transp_port_cur = transp_port;
+
+        if (transp_type == "websocket" || transp_type == "websocketsec") {
+            var end_proto, first_slash, newpage;
 
             // change protocol
             this.log("precha ["+this.page+"]");
-            end_proto = this.page.indexOf("://");
-            first_slash = this.page.substring(end_proto+3).indexOf("/");
+            if (transp_type == "websocketsec") {
+                newpage = this.page.replace(/\.php$/g, "_wss.php").replace(/\.php\?/g, "_wss.php?");
+                }
+            else {
+                newpage = this.page;
+                }
+            end_proto = newpage.indexOf("://");
+            first_slash = newpage.substring(end_proto+3).indexOf("/");
 
-            page = "ws://" + this.page.substring(end_proto+3, end_proto+3+first_slash) + ":" + transp_port + this.page.substring(end_proto+3 + first_slash);
+            page = (transp_type == "websocketsec" ? "wss://" : "ws://")
+                + newpage.substring(end_proto+3, end_proto+3 + first_slash) + ":"
+                + transp_port + newpage.substring(end_proto+3 + first_slash);
+            // this.log("MOP WS: " + page);
         }
         else {
             page = this.page;
@@ -644,7 +734,11 @@ xynt_streaming.prototype = {
 
         try {
             // transport instantiation
-            if (transp_type == "websocket") {
+            if (transp_type == "websocketsec") {
+                page = url_append_args(page, "transp", "websocketsec");
+                this.transp = new transport_ws(this.doc, this, page, true);
+            }
+            else if (transp_type == "websocket") {
                 page = url_append_args(page, "transp", "websocket");
                 this.transp = new transport_ws(this.doc, this, page);
             }
@@ -725,8 +819,7 @@ xynt_streaming.prototype = {
         var ctx_new_len;
 
         if (this.sandbox != null) {
-            // from old: var zug = "POLL sess = "+sess+" stat = "+stat+" subst = "+subst+" step = "+this.gst.st+" step_loc = "+this.gst.st_loc+" step_loc_new = "+this.gst.st_loc_new+" STOP: "+this.stopped;
-            var zug = "WATCHDOG  sess = ["+this.sess+"]  step = "+this.gst.st+" step_loc = "+this.gst.st_loc+" step_loc_new = "+this.gst.st_loc_new;
+            var zug = "WATCHDOG  sess = ["+this.sess+"]  step = "+this.gst.st+" step_loc = "+this.gst.st_loc+" step_loc_new = "+this.gst.st_loc_new+"Transport: "+this.transp.name;
             if (zug != this.sandbox.innerHTML)
                this.sandbox.innerHTML = zug;
         }
@@ -869,7 +962,12 @@ xynt_streaming.prototype = {
                //xx this.hbit("+");
 
                 // alert("SINGLE: ["+singlecomm+"]");
+                // window.console.log("["+singlecomm+"]");
                this.cmdproc(singlecomm);
+                if (this.transp_type_cur) {
+                    this.transp_type = this.transp_type_cur;
+                    this.transp_port = this.transp_port_cur;
+                }
                again = 1;
            }
         } while (again);
@@ -887,6 +985,15 @@ xynt_streaming.prototype = {
         return;
     },
 
+    send: function(msg) {
+        if (typeof(this.transp.send) == 'undefined') {
+            this.log('send not implemented for ' + this.transp_type);
+            return;
+        }
+
+        return this.transp.send(msg);
+    },
+
     //
     // moved to xynt-streaming-ifra as push()
     //