From e8903530a6cf61a0b06c63bbabed10a813fea097 Mon Sep 17 00:00:00 2001 From: Matteo Nastasi Date: Sat, 11 Apr 2020 17:36:22 +0200 Subject: [PATCH] websocket graceful shutdown and safety queue --- web/Obj/sac-a-push.phh | 61 ++++++++++++++++++++++++++++++------ web/Obj/transports.phh | 66 ++++++++++++++++++++++++++++++++++++--- web/Obj/user.phh | 5 +++ web/commons.js | 11 ++++++- web/usermgmt.php | 71 ++++++++++++++++++++++++++++++++++++++++-- web/xynt-streaming.js | 66 +++++++++++++++++++++++++++++++++++++-- 6 files changed, 259 insertions(+), 21 deletions(-) diff --git a/web/Obj/sac-a-push.phh b/web/Obj/sac-a-push.phh index fdbb968..4d8706b 100644 --- a/web/Obj/sac-a-push.phh +++ b/web/Obj/sac-a-push.phh @@ -430,6 +430,7 @@ class Sac_a_push { var $socks; var $s2u; // user associated with input socket var $s2p; // pending page associated with input socket + var $s2c; // ws sockets in closing phase var $pending_pages; var $is_daemon; @@ -491,6 +492,7 @@ class Sac_a_push { $thiz->socks = array(); $thiz->s2u = array(); $thiz->s2p = array(); + $thiz->s2c = array(); $thiz->pending_pages = array(); $thiz->is_daemon = FALSE; @@ -558,15 +560,17 @@ class Sac_a_push { return ($thiz); } - function socks_set($sock, $user, $pendpage) + function socks_set($sock, $user, $pendpage, $postclose = NULL) { $id = intval($sock); $this->socks[$id] = $sock; if ($user != NULL) - $this->s2u[$id] = $user; + $this->s2u[$id] = $user; if ($pendpage != NULL) - $this->s2p[$id] = $pendpage; + $this->s2p[$id] = $pendpage; + if ($postclose != NULL) + $this->s2c[$id] = $postclose; } function socks_unset($sock) @@ -577,6 +581,8 @@ class Sac_a_push { unset($this->s2u[$id]); if (isset($this->s2p[$id])) unset($this->s2p[$id]); + if (isset($this->s2c[$id])) + unset($this->s2c[$id]); unset($this->socks[$id]); } @@ -641,7 +647,14 @@ class Sac_a_push { foreach ($this->socks as $k => $sock) { $id = intval($sock); - if (isset($this->s2u[$id])) { + if (isset($this->s2c[$id])) { + $postclose = $this->s2c[$id]; + if ($postclose->read('', $this->curtime) == 0) { + fclose($sock); + $this->socks_unset($sock); + } + } + else if (isset($this->s2u[$id])) { $user = $this->s2u[$id]; if ($user->the_end) { if (($user->rd_toflush == FALSE && $user->rd_step == $user->step) @@ -834,6 +847,7 @@ class Sac_a_push { $buf = fread($sock, 4096); // if socket is closed if ($buf == FALSE || feof($sock)) { + $postclose = NULL; // close socket case if ($buf == FALSE) { // printf("INFO: read return false\n"); @@ -858,6 +872,7 @@ class Sac_a_push { if ($this->s2u[$id]->rd_socket_get() != NULL) { // try to send close frame (for websocket) $clo = $this->s2u[$id]->stream_close(); + $postclose = $user->stream_postclose_get($sock, $this->curtime); $clo_l = mb_strlen($clo, "ASCII"); @fwrite($sock, $clo, $clo_l); $this->s2u[$id]->rd_socket_set(NULL); @@ -865,7 +880,13 @@ class Sac_a_push { unset($this->s2u[$id]); } } - fclose($sock); + if ($postclose != NULL) { + // print("POSTCLOSE found!"); + $this->socks_set($sock, NULL, NULL, $postclose); + } + else { + fclose($sock); + } // printf("CLOSE ON READ\n"); if ($this->debug > 1) { @@ -926,7 +947,7 @@ class Sac_a_push { // fprintf(STDERR, 'POST USER'); if ($user && $user->rd_transp && strpos($user->rd_transp->type, "websocket") !== FALSE) { - $clie_cmd = $user->rd_transp->unchunk($buf); + $clie_cmd = $user->rd_transp->unchunk($buf, $sock); $clie_cmd = json_decode($clie_cmd, TRUE); // fprintf(STDERR, "HERE WE ARE INCOMING DATA [%s]\n", print_r($clie_cmd, TRUE)); @@ -949,6 +970,9 @@ class Sac_a_push { ob_end_clean(); } } + else { + fprintf(STDERR, "Unknown page [%s]\n", $wr_addr); + } /* briskin5/index_wr.php @@ -988,6 +1012,17 @@ class Sac_a_push { $manage_page = TRUE; } } + + // postclose case + if (isset($this->s2c[$id])) { + $postclose = $this->s2c[$id]; + // printf("POSTCLOSE: found pc in s2c list\n"); + if ($postclose->read($buf, $this->curtime) == 0) { + printf("POSTCLOSE: received end opcode, close\n"); + fclose($sock); + $this->socks_unset($sock); + } + } } } } @@ -1012,7 +1047,7 @@ class Sac_a_push { } if ($rret == FALSE) { // FIXME: manage 404 !!! - printf("TODO: fix unknown page\n"); + printf("TODO: fix unknown page: %s\n", $path); fclose($new_socket); } } @@ -1095,15 +1130,21 @@ class Sac_a_push { // close socket after a while to prevent client memory consumption if ($user->rd_endtime_is_expired($this->curtime)) { - if ($this->s2u[$id]->rd_socket_get() != NULL) { - $this->s2u[$id]->rd_socket_set(NULL); + $postclose = $user->stream_postclose_get($sock, $this->curtime); + if ($user->rd_socket_get() != NULL) { + $user->rd_socket_set(NULL); } unset($this->socks[$id]); unset($this->s2u[$id]); $clo = $user->stream_close(); $clo_l = mb_strlen($clo, "ASCII"); @fwrite($sock, $clo, $clo_l); - fclose($sock); + if ($postclose) { + $this->socks_set($sock, NULL, NULL, $postclose); + } + else { + fclose($sock); + } // printf("CLOSE ON LOOP\n"); } } // if (isset($this->s2u[$id]... diff --git a/web/Obj/transports.phh b/web/Obj/transports.phh index 0cc5f54..921d15e 100644 --- a/web/Obj/transports.phh +++ b/web/Obj/transports.phh @@ -75,6 +75,11 @@ class Transport_template { { } + function postclose_get($sock, $curtime) + { + return NULL; + } + function chunk($step, $cont) { } @@ -90,6 +95,38 @@ class Transport_template { } } +define("TRANSP_WS_CLOSE_TOUT", 5); + +class Transport_websocket_postclose { + function Transport_websocket_postclose($transp_ws, $sock, $curtime) { + printf("POSTCLOSE: Creation\n"); + $this->transp_ws = $transp_ws; + $this->sock = $sock; + $this->start = $curtime; + // status not required, currently + // $this->status = "begin"; + } + + function read($payload, $curtime) { + if ($this->start + TRANSP_WS_CLOSE_TOUT < $curtime) { + printf("POSTCLOSE: Closing ws (%d) force close by timeout\n", $this->sock); + return 0; + } + if (mb_strlen($payload, "ASCII") > 1) { + $this->transp_ws->unchunk($payload, $this->sock); + } + if ($this->transp_ws->hasSentClose) { + printf("POSTCLOSE: Closing ws gracefully\n"); + return 0; + } + else { + printf("POSTCLOSE: not yet finished\n"); + return 1; + } + } +} + + class Transport_websocket { protected $magicGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; @@ -203,10 +240,10 @@ class Transport_websocket { return $strout . "\n"; } - function unchunk($cont) + function unchunk($cont, $sock) { // fprintf(STDERR, "CHUNK: [%s]\n", $cont); - return $this->deframe($cont); + return $this->deframe($cont, $sock); } function chunk($step, $cont) @@ -281,7 +318,7 @@ class Transport_websocket { return chr($b1) . chr($b2) . $lengthField . $message; } - protected function deframe($message) { + protected function deframe($message, $socket) { //echo $this->strtohex($message); $headers = $this->extractHeaders($message); $pongReply = false; @@ -325,7 +362,8 @@ class Transport_websocket { if ($pongReply) { $reply = $this->frame($payload,$this,'pong'); // TODO FIXME ALL socket_write management - socket_write($user->socket,$reply,mb_strlen($reply, "ASCII")); + // socket_write($user->socket,$reply,mb_strlen($reply, "ASCII")); + @fwrite($socket, $reply, mb_strlen($reply, "ASCII")); return false; } if (extension_loaded('mbstring')) { @@ -476,6 +514,11 @@ class Transport_websocket { return(chr(0x88).chr(0x02).chr(0xe8).chr(0x03)); } + function postclose_get($sock, $curtime) + { + return new Transport_websocket_postclose($this, $sock, $curtime); + } + static function fini($init_string, $base, $blockerr) { return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@', ($blockerr ? 'xstm.stop(); ' : ''), $base).self::close()); @@ -511,6 +554,11 @@ class Transport_xhr { return ""; } + function postclose_get($sock, $curtime) + { + return NULL; + } + static function fini($init_string, $base, $blockerr) { return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@', ($blockerr ? 'xstm.stop(); ' : ''), $base)); @@ -568,6 +616,11 @@ window.onload = function () { try { if (xynt_streaming != \"ready\") { xynt_stre return ""; } + function postclose_get($sock, $curtime) + { + return NULL; + } + static function fini($init_string, $base, $blockerr) { $ret = ""; @@ -613,6 +666,11 @@ class Transport_htmlfile extends Transport_iframe { function Transport_htmlfile() { $this->type = 'htmlfile'; } + + function postclose_get($sock, $curtime) + { + return NULL; + } } class Transport { diff --git a/web/Obj/user.phh b/web/Obj/user.phh index 573883c..d6a4395 100644 --- a/web/Obj/user.phh +++ b/web/Obj/user.phh @@ -869,6 +869,11 @@ function stream_close() return ($this->rd_transp->close()); } +function stream_postclose_get($sock, $curtime) +{ + return ($this->rd_transp->postclose_get($sock, $curtime)); +} + static function base_get() { $c = get_called_class(); diff --git a/web/commons.js b/web/commons.js index 62d3330..721b816 100644 --- a/web/commons.js +++ b/web/commons.js @@ -284,7 +284,16 @@ function send_mesg(mesg, content) lang: readCookie("lang") }); // console.log(ws_msg); - xstm.transp.ws.send(ws_msg); + xstm.send(ws_msg); + /* + if (xstm.transp.ws.readyState == 1) { + xstm.transp.ws.send(ws_msg); + } + else { + xstm.transp.out_queue.push(ws_msg); + } + */ + } else { var xhr_wr = createXMLHttpRequest(); diff --git a/web/usermgmt.php b/web/usermgmt.php index 57a6ae1..24948a1 100644 --- a/web/usermgmt.php +++ b/web/usermgmt.php @@ -263,6 +263,73 @@ SELECT usr.*, guar.login AS guar_login } } } // else if ($action == "accept") { + else if ($action == "delete") { + foreach($_POST as $key => $value) { + if (substr($key, 0, 9) != "f_newuser") + continue; + + $id = (int)substr($key, 9); + if ($id <= 0) + continue; + + // check existence of username or email + $is_trans = FALSE; + $res = FALSE; + do { + if (($bdb = BriskDB::create()) == FALSE) + break; + + // retrieve list added users + $usr_sql = sprintf(" +SELECT usr.*, guar.login AS guar_login + FROM %susers AS usr + JOIN %susers AS guar ON guar.code = usr.guar_code + WHERE usr.type & (CAST (X'%x' as integer)) = (CAST (X'%x' as integer)) + AND usr.disa_reas = %d AND usr.code = %d;", + $G_dbpfx, $G_dbpfx, + USER_FLAG_TY_DISABLE, USER_FLAG_TY_DISABLE, + USER_DIS_REA_NU_ADDED, $id); + if (($usr_pg = pg_query($bdb->dbconn->db(), $usr_sql)) == FALSE) { + log_crit("stat-day: select from tournaments failed"); + break; + } + $usr_n = pg_numrows($usr_pg); + if ($usr_n != 1) { + $status .= sprintf("Inconsistency for code %d, returned %d records, skipped.
", + $id, $usr_n); + break; + } + + $usr_obj = pg_fetch_object($usr_pg, 0); + + $bdb->transaction('BEGIN'); + $is_trans = TRUE; + + // retrieve list added users + $usr_sql = sprintf(" + DELETE FROM %susers + WHERE (type & (CAST (X'%x' as integer))) = (CAST (X'%x' as integer)) + AND disa_reas = %d AND code = %d;", + $G_dbpfx, USER_FLAG_TY_DISABLE, USER_FLAG_TY_DISABLE, + USER_DIS_REA_NU_ADDED, $id); + if (($usr_pg = pg_query($bdb->dbconn->db(), $usr_sql)) == FALSE) { + log_crit(sprintf("Delete of user %d failed", $id)); + break; + } + + $status .= sprintf("User %s removed: SUCCESS
", $usr_obj->login); + $bdb->transaction('COMMIT'); + $res = TRUE; + } while(FALSE); + if ($res == FALSE) { + $status .= sprintf("Error occurred during delete action
"); + if ($is_trans) + $bdb->transaction('ROLLBACK'); + break; + } + } + } // else if ($action == "accept") { + do { @@ -519,8 +586,6 @@ SELECT usr.*, guar.login AS guar_login } exit; } - - else if ($action == "delete") { foreach($_POST as $key => $value) { if (substr($key, 0, 9) != "f_newuser") @@ -569,7 +634,7 @@ SELECT usr.*, guar.login AS guar_login $G_dbpfx, $usr_obj->code); if (($del_pg = pg_query($bdb->dbconn->db(), $del_sql)) == FALSE) { - log_crit("stat-day: select from tournaments failed"); + log_crit(sprintf("Delete user %d failed", $usr_obj->code)); break; } diff --git a/web/xynt-streaming.js b/web/xynt-streaming.js index 13eeb7b..009bb01 100644 --- a/web/xynt-streaming.js +++ b/web/xynt-streaming.js @@ -16,6 +16,7 @@ function transport_ws(doc, xynt_streaming, page) else this.name = "WebSocket"; this.ctx_new = ""; + this.out_queue = []; var self = this; this.doc = doc; @@ -25,6 +26,8 @@ function transport_ws(doc, xynt_streaming, page) this.xynt_streaming.log("PAGE: "+page); this.ws = new WebSocket(page); this.ws.onopen = function () { + console.log('WS On open'); + self.xynt_streaming.log("onopen"); if (this.readyState == 1) { // connected @@ -33,14 +36,16 @@ function transport_ws(doc, xynt_streaming, page) } }; this.ws.onmessage = function (msg) { + console.log('WS On message'); self.xynt_streaming.log("onmessage"); // new data in msg.data self.ctx_new += msg.data; }; this.ws.onclose = function (msg) { - this.onopen = null; - this.onclose = null; - this.onerror = null; + console.log('WS On close'); + self.onopen = null; + self.onclose = null; + self.onerror = null; self.xynt_streaming.log("onclose"+self.init_steps); if (self.init_steps == 0) self.ws_cb("error"); @@ -68,6 +73,7 @@ transport_ws.prototype = { name: null, xynt_streaming: "ready", ws: null, + out_queue: null, stopped: true, failed: false, @@ -99,11 +105,56 @@ this.xynt_streaming.log("DEC: "+this.xynt_streaming.transp_fback); } } } + else if (from == "open") { + this.flush_out_queue(); + } + if (this.ws != null && this.ws.readyState > 1) { this.stopped = true; } }, + flush_out_queue: function() { + var l_out = this.out_queue.length; + if (l_out == 0) + return; + + for (var i = 0 ; i < l_out ; i++) { + if (this.ws.readyState != 1) { + break; + } + var item = this.out_queue.shift(); + var sent = true; + try { + this.ws.send(item); + } + catch (ex) { + this.out_queue.unshift(item); + break; + } + } + }, + + send: function(msg) { + console.log('new send'); + if (this.ws && this.ws.readyState == 1) { + try { + console.log('Try send ... '); + this.flush_out_queue(); + this.ws.send(msg); + console.log(' ... done'); + } + catch (ex) { + console.log(' ... catched exception'); + this.flush_out.push(msg); + } + } + else { + console.log('ws not ready: push into flush_out'); + this.flush_out.push(msg); + } + }, + ws_abort: function() { if (this.ws != null) { this.xynt_streaming.log("WSCLOSE"); @@ -934,6 +985,15 @@ xynt_streaming.prototype = { return; }, + send: function(msg) { + if (typeof(this.transp.send) == 'undefined') { + this.log('send not implemented for ' + this.transp_type); + return; + } + + return this.transp.send(msg); + }, + // // moved to xynt-streaming-ifra as push() // -- 2.17.1