websocket graceful shutdown and safety queue
authorMatteo Nastasi <nastasi@alternativeoutput.it>
Sat, 11 Apr 2020 15:36:22 +0000 (17:36 +0200)
committerMatteo Nastasi <nastasi@alternativeoutput.it>
Sat, 11 Apr 2020 15:36:22 +0000 (17:36 +0200)
web/Obj/sac-a-push.phh
web/Obj/transports.phh
web/Obj/user.phh
web/commons.js
web/usermgmt.php
web/xynt-streaming.js

index fdbb968..4d8706b 100644 (file)
@@ -430,6 +430,7 @@ class Sac_a_push {
     var $socks;
     var $s2u;             // user associated with input socket
     var $s2p;             // pending page associated with input socket
+    var $s2c;             // ws sockets in closing phase
     var $pending_pages;
     var $is_daemon;
 
@@ -491,6 +492,7 @@ class Sac_a_push {
         $thiz->socks = array();
         $thiz->s2u  = array();
         $thiz->s2p  = array();
+        $thiz->s2c  = array();
         $thiz->pending_pages = array();
         $thiz->is_daemon = FALSE;
 
@@ -558,15 +560,17 @@ class Sac_a_push {
         return ($thiz);
     }
 
-    function socks_set($sock, $user, $pendpage)
+    function socks_set($sock, $user, $pendpage, $postclose = NULL)
     {
         $id = intval($sock);
 
         $this->socks[$id] = $sock;
         if ($user != NULL)
-            $this->s2u[$id]   = $user;
+            $this->s2u[$id] = $user;
         if ($pendpage != NULL)
-            $this->s2p[$id]   = $pendpage;
+            $this->s2p[$id] = $pendpage;
+        if ($postclose != NULL)
+            $this->s2c[$id] = $postclose;
     }
 
     function socks_unset($sock)
@@ -577,6 +581,8 @@ class Sac_a_push {
             unset($this->s2u[$id]);
         if (isset($this->s2p[$id]))
             unset($this->s2p[$id]);
+        if (isset($this->s2c[$id]))
+            unset($this->s2c[$id]);
         unset($this->socks[$id]);
     }
 
@@ -641,7 +647,14 @@ class Sac_a_push {
 
         foreach ($this->socks as $k => $sock) {
             $id = intval($sock);
-            if (isset($this->s2u[$id])) {
+            if (isset($this->s2c[$id])) {
+                $postclose = $this->s2c[$id];
+                if ($postclose->read('', $this->curtime) == 0) {
+                    fclose($sock);
+                    $this->socks_unset($sock);
+                }
+            }
+            else if (isset($this->s2u[$id])) {
                 $user = $this->s2u[$id];
                 if ($user->the_end) {
                     if (($user->rd_toflush == FALSE && $user->rd_step == $user->step)
@@ -834,6 +847,7 @@ class Sac_a_push {
                         $buf = fread($sock, 4096);
                         // if socket is closed
                         if ($buf == FALSE || feof($sock)) {
+                            $postclose = NULL;
                             // close socket case
                             if ($buf == FALSE) {
                                 // printf("INFO: read return false\n");
@@ -858,6 +872,7 @@ class Sac_a_push {
                                     if ($this->s2u[$id]->rd_socket_get() != NULL) {
                                         // try to send close frame (for websocket)
                                         $clo = $this->s2u[$id]->stream_close();
+                                        $postclose = $user->stream_postclose_get($sock, $this->curtime);
                                         $clo_l = mb_strlen($clo, "ASCII");
                                         @fwrite($sock, $clo, $clo_l);
                                         $this->s2u[$id]->rd_socket_set(NULL);
@@ -865,7 +880,13 @@ class Sac_a_push {
                                     unset($this->s2u[$id]);
                                 }
                             }
-                            fclose($sock);
+                            if ($postclose != NULL) {
+                                // print("POSTCLOSE found!");
+                                $this->socks_set($sock, NULL, NULL, $postclose);
+                            }
+                            else {
+                                fclose($sock);
+                            }
                             // printf("CLOSE ON READ\n");
 
                             if ($this->debug > 1) {
@@ -926,7 +947,7 @@ class Sac_a_push {
 
                                     // fprintf(STDERR, 'POST USER');
                                     if ($user && $user->rd_transp && strpos($user->rd_transp->type, "websocket") !== FALSE) {
-                                        $clie_cmd = $user->rd_transp->unchunk($buf);
+                                        $clie_cmd = $user->rd_transp->unchunk($buf, $sock);
                                         $clie_cmd = json_decode($clie_cmd, TRUE);
                                         // fprintf(STDERR, "HERE WE ARE INCOMING DATA [%s]\n", print_r($clie_cmd, TRUE));
 
@@ -949,6 +970,9 @@ class Sac_a_push {
                                                 ob_end_clean();
                                             }
                                         }
+                                        else {
+                                            fprintf(STDERR, "Unknown page [%s]\n", $wr_addr);
+                                        }
                                         /*
                                           briskin5/index_wr.php
 
@@ -988,6 +1012,17 @@ class Sac_a_push {
                                         $manage_page = TRUE;
                                     }
                                 }
+
+                                // postclose case
+                                if (isset($this->s2c[$id])) {
+                                    $postclose = $this->s2c[$id];
+                                    // printf("POSTCLOSE: found pc in s2c list\n");
+                                    if ($postclose->read($buf, $this->curtime) == 0) {
+                                        printf("POSTCLOSE: received end opcode, close\n");
+                                        fclose($sock);
+                                        $this->socks_unset($sock);
+                                    }
+                                }
                             }
                         }
                     }
@@ -1012,7 +1047,7 @@ class Sac_a_push {
                         }
                         if ($rret == FALSE) {
                             // FIXME: manage 404 !!!
-                            printf("TODO: fix unknown page\n");
+                            printf("TODO: fix unknown page: %s\n", $path);
                             fclose($new_socket);
                         }
                     }
@@ -1095,15 +1130,21 @@ class Sac_a_push {
 
                     // close socket after a while to prevent client memory consumption
                     if ($user->rd_endtime_is_expired($this->curtime)) {
-                        if ($this->s2u[$id]->rd_socket_get() != NULL) {
-                            $this->s2u[$id]->rd_socket_set(NULL);
+                        $postclose = $user->stream_postclose_get($sock, $this->curtime);
+                        if ($user->rd_socket_get() != NULL) {
+                            $user->rd_socket_set(NULL);
                         }
                         unset($this->socks[$id]);
                         unset($this->s2u[$id]);
                         $clo = $user->stream_close();
                         $clo_l = mb_strlen($clo, "ASCII");
                         @fwrite($sock, $clo, $clo_l);
-                        fclose($sock);
+                        if ($postclose) {
+                            $this->socks_set($sock, NULL, NULL, $postclose);
+                        }
+                        else {
+                            fclose($sock);
+                        }
                         // printf("CLOSE ON LOOP\n");
                     }
                 }  // if (isset($this->s2u[$id]...
index 0cc5f54..921d15e 100644 (file)
@@ -75,6 +75,11 @@ class Transport_template {
     {
     }
 
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
+
     function chunk($step, $cont)
     {
     }
@@ -90,6 +95,38 @@ class Transport_template {
     }
 }
 
+define("TRANSP_WS_CLOSE_TOUT", 5);
+
+class Transport_websocket_postclose {
+    function Transport_websocket_postclose($transp_ws, $sock, $curtime) {
+        printf("POSTCLOSE: Creation\n");
+        $this->transp_ws = $transp_ws;
+        $this->sock = $sock;
+        $this->start =  $curtime;
+        // status not required, currently
+        // $this->status = "begin";
+    }
+
+    function read($payload, $curtime) {
+        if ($this->start + TRANSP_WS_CLOSE_TOUT < $curtime) {
+            printf("POSTCLOSE: Closing ws (%d) force close by timeout\n", $this->sock);
+            return 0;
+        }
+        if (mb_strlen($payload, "ASCII") > 1) {
+            $this->transp_ws->unchunk($payload, $this->sock);
+        }
+        if ($this->transp_ws->hasSentClose) {
+            printf("POSTCLOSE: Closing ws gracefully\n");
+            return 0;
+        }
+        else {
+            printf("POSTCLOSE: not yet finished\n");
+            return 1;
+        }
+    }
+}
+
+
 class Transport_websocket {
     protected $magicGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 
@@ -203,10 +240,10 @@ class Transport_websocket {
         return $strout . "\n";
     }
 
-    function unchunk($cont)
+    function unchunk($cont, $sock)
     {
         // fprintf(STDERR, "CHUNK: [%s]\n", $cont);
-        return $this->deframe($cont);
+        return $this->deframe($cont, $sock);
     }
 
     function chunk($step, $cont)
@@ -281,7 +318,7 @@ class Transport_websocket {
         return chr($b1) . chr($b2) . $lengthField . $message;
     }
 
-    protected function deframe($message) {
+    protected function deframe($message, $socket) {
         //echo $this->strtohex($message);
         $headers = $this->extractHeaders($message);
         $pongReply = false;
@@ -325,7 +362,8 @@ class Transport_websocket {
         if ($pongReply) {
             $reply = $this->frame($payload,$this,'pong');
             // TODO FIXME ALL socket_write management
-            socket_write($user->socket,$reply,mb_strlen($reply, "ASCII"));
+            // socket_write($user->socket,$reply,mb_strlen($reply, "ASCII"));
+            @fwrite($socket, $reply, mb_strlen($reply, "ASCII"));
             return false;
         }
         if (extension_loaded('mbstring')) {
@@ -476,6 +514,11 @@ class Transport_websocket {
         return(chr(0x88).chr(0x02).chr(0xe8).chr(0x03));
     }
 
+    function postclose_get($sock, $curtime)
+    {
+       return new Transport_websocket_postclose($this, $sock, $curtime);
+    }
+
     static function fini($init_string, $base, $blockerr)
     {
         return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@',  ($blockerr ? 'xstm.stop(); ' : ''), $base).self::close());
@@ -511,6 +554,11 @@ class Transport_xhr {
         return "";
     }
 
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
+
     static function fini($init_string, $base, $blockerr)
     {
         return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@',  ($blockerr ? 'xstm.stop(); ' : ''), $base));
@@ -568,6 +616,11 @@ window.onload = function () { try { if (xynt_streaming != \"ready\") { xynt_stre
         return "";
     }
 
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
+
     static function fini($init_string, $base, $blockerr)
     {
         $ret = "";
@@ -613,6 +666,11 @@ class Transport_htmlfile extends Transport_iframe {
     function Transport_htmlfile() {
         $this->type = 'htmlfile';
     }
+
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
 }
 
 class Transport {
index 573883c..d6a4395 100644 (file)
@@ -869,6 +869,11 @@ function stream_close()
     return ($this->rd_transp->close());
 }
 
+function stream_postclose_get($sock, $curtime)
+{
+    return ($this->rd_transp->postclose_get($sock, $curtime));
+}
+
 static function base_get()
 {
     $c = get_called_class();
index 62d3330..721b816 100644 (file)
@@ -284,7 +284,16 @@ function send_mesg(mesg, content)
             lang: readCookie("lang")
             });
         // console.log(ws_msg);
-        xstm.transp.ws.send(ws_msg);
+        xstm.send(ws_msg);
+        /*
+        if (xstm.transp.ws.readyState == 1) {
+            xstm.transp.ws.send(ws_msg);
+        }
+        else {
+            xstm.transp.out_queue.push(ws_msg);
+        }
+        */
+
     }
     else {
     var xhr_wr = createXMLHttpRequest();
index 57a6ae1..24948a1 100644 (file)
@@ -263,6 +263,73 @@ SELECT usr.*, guar.login AS guar_login
                 }
             }
         } // else if ($action == "accept") {
+        else if ($action == "delete") {
+            foreach($_POST as $key => $value) {
+                if (substr($key, 0, 9) != "f_newuser")
+                    continue;
+
+                $id = (int)substr($key, 9);
+                if ($id <= 0)
+                    continue;
+
+                // check existence of username or email
+                $is_trans = FALSE;
+                $res = FALSE;
+                do {
+                    if (($bdb = BriskDB::create()) == FALSE)
+                        break;
+
+                    // retrieve list added users
+                    $usr_sql = sprintf("
+SELECT usr.*, guar.login AS guar_login
+     FROM %susers AS usr
+     JOIN %susers AS guar ON guar.code = usr.guar_code
+     WHERE usr.type & (CAST (X'%x' as integer)) = (CAST (X'%x' as integer))
+         AND usr.disa_reas = %d AND usr.code = %d;",
+                               $G_dbpfx, $G_dbpfx,
+                               USER_FLAG_TY_DISABLE, USER_FLAG_TY_DISABLE,
+                               USER_DIS_REA_NU_ADDED, $id);
+                    if (($usr_pg = pg_query($bdb->dbconn->db(), $usr_sql)) == FALSE) {
+                        log_crit("stat-day: select from tournaments failed");
+                        break;
+                    }
+                    $usr_n = pg_numrows($usr_pg);
+                    if ($usr_n != 1) {
+                        $status .= sprintf("Inconsistency for code %d, returned %d records, skipped.<br>",
+                                          $id, $usr_n);
+                        break;
+                    }
+
+                    $usr_obj = pg_fetch_object($usr_pg, 0);
+
+                    $bdb->transaction('BEGIN');
+                    $is_trans = TRUE;
+
+                    // retrieve list added users
+                    $usr_sql = sprintf("
+                         DELETE FROM %susers
+                             WHERE (type & (CAST (X'%x' as integer))) = (CAST (X'%x' as integer))
+                               AND disa_reas = %d AND code = %d;",
+                               $G_dbpfx, USER_FLAG_TY_DISABLE, USER_FLAG_TY_DISABLE,
+                               USER_DIS_REA_NU_ADDED, $id);
+                    if (($usr_pg = pg_query($bdb->dbconn->db(), $usr_sql)) == FALSE) {
+                        log_crit(sprintf("Delete of user %d failed", $id));
+                        break;
+                    }
+
+                    $status .= sprintf("User %s removed: SUCCESS<br>", $usr_obj->login);
+                    $bdb->transaction('COMMIT');
+                    $res = TRUE;
+                } while(FALSE);
+                if ($res == FALSE) {
+                    $status .= sprintf("Error occurred during delete action<br>");
+                    if ($is_trans)
+                        $bdb->transaction('ROLLBACK');
+                    break;
+                }
+            }
+        } // else if ($action == "accept") {
+
 
 
         do {
@@ -519,8 +586,6 @@ SELECT usr.*, guar.login AS guar_login
             }
             exit;
         }
-
-
         else if ($action == "delete") {
             foreach($_POST as $key => $value) {
                 if (substr($key, 0, 9) != "f_newuser")
@@ -569,7 +634,7 @@ SELECT usr.*, guar.login AS guar_login
                                        $G_dbpfx, $usr_obj->code);
 
                     if (($del_pg = pg_query($bdb->dbconn->db(), $del_sql)) == FALSE) {
-                        log_crit("stat-day: select from tournaments failed");
+                        log_crit(sprintf("Delete user %d failed", $usr_obj->code));
                         break;
                     }
 
index 13eeb7b..009bb01 100644 (file)
@@ -16,6 +16,7 @@ function transport_ws(doc, xynt_streaming, page)
     else
         this.name = "WebSocket";
     this.ctx_new = "";
+    this.out_queue = [];
     var self = this;
 
     this.doc = doc;
@@ -25,6 +26,8 @@ function transport_ws(doc, xynt_streaming, page)
         this.xynt_streaming.log("PAGE: "+page);
         this.ws = new WebSocket(page);
         this.ws.onopen = function () {
+            console.log('WS On open');
+
             self.xynt_streaming.log("onopen");
             if (this.readyState == 1) {
                 // connected
@@ -33,14 +36,16 @@ function transport_ws(doc, xynt_streaming, page)
             }
         };
         this.ws.onmessage = function (msg) {
+            console.log('WS On message');
             self.xynt_streaming.log("onmessage");
             // new data in msg.data
             self.ctx_new += msg.data;
         };
         this.ws.onclose = function (msg) {
-            this.onopen  = null;
-            this.onclose = null;
-            this.onerror = null;
+            console.log('WS On close');
+            self.onopen  = null;
+            self.onclose = null;
+            self.onerror = null;
             self.xynt_streaming.log("onclose"+self.init_steps);
             if (self.init_steps == 0)
                 self.ws_cb("error");
@@ -68,6 +73,7 @@ transport_ws.prototype = {
     name: null,
     xynt_streaming: "ready",
     ws: null,
+    out_queue: null,
     stopped: true,
     failed: false,
 
@@ -99,11 +105,56 @@ this.xynt_streaming.log("DEC: "+this.xynt_streaming.transp_fback);
                 }
             }
         }
+        else if (from == "open") {
+            this.flush_out_queue();
+        }
+
         if (this.ws != null && this.ws.readyState > 1) {
            this.stopped = true;
         }
     },
 
+    flush_out_queue: function() {
+        var l_out = this.out_queue.length;
+        if (l_out == 0)
+            return;
+
+        for (var i = 0 ; i < l_out ; i++) {
+            if (this.ws.readyState != 1) {
+                break;
+            }
+            var item = this.out_queue.shift();
+            var sent = true;
+            try {
+                this.ws.send(item);
+            }
+            catch (ex) {
+                this.out_queue.unshift(item);
+                break;
+            }
+        }
+    },
+
+    send: function(msg) {
+        console.log('new send');
+        if (this.ws && this.ws.readyState == 1) {
+            try {
+                console.log('Try send ... ');
+                this.flush_out_queue();
+                this.ws.send(msg);
+                console.log(' ... done');
+            }
+            catch (ex) {
+                console.log(' ... catched exception');
+                this.flush_out.push(msg);
+            }
+        }
+        else {
+            console.log('ws not ready: push into flush_out');
+            this.flush_out.push(msg);
+        }
+    },
+
     ws_abort: function() {
         if (this.ws != null) {
 this.xynt_streaming.log("WSCLOSE");
@@ -934,6 +985,15 @@ xynt_streaming.prototype = {
         return;
     },
 
+    send: function(msg) {
+        if (typeof(this.transp.send) == 'undefined') {
+            this.log('send not implemented for ' + this.transp_type);
+            return;
+        }
+
+        return this.transp.send(msg);
+    },
+
     //
     // moved to xynt-streaming-ifra as push()
     //