var $socks;
var $s2u; // user associated with input socket
var $s2p; // pending page associated with input socket
+ var $s2c; // ws sockets in closing phase
var $pending_pages;
var $is_daemon;
$thiz->socks = array();
$thiz->s2u = array();
$thiz->s2p = array();
+ $thiz->s2c = array();
$thiz->pending_pages = array();
$thiz->is_daemon = FALSE;
return ($thiz);
}
- function socks_set($sock, $user, $pendpage)
+ function socks_set($sock, $user, $pendpage, $postclose = NULL)
{
$id = intval($sock);
$this->socks[$id] = $sock;
if ($user != NULL)
- $this->s2u[$id] = $user;
+ $this->s2u[$id] = $user;
if ($pendpage != NULL)
- $this->s2p[$id] = $pendpage;
+ $this->s2p[$id] = $pendpage;
+ if ($postclose != NULL)
+ $this->s2c[$id] = $postclose;
}
function socks_unset($sock)
unset($this->s2u[$id]);
if (isset($this->s2p[$id]))
unset($this->s2p[$id]);
+ if (isset($this->s2c[$id]))
+ unset($this->s2c[$id]);
unset($this->socks[$id]);
}
foreach ($this->socks as $k => $sock) {
$id = intval($sock);
- if (isset($this->s2u[$id])) {
+ if (isset($this->s2c[$id])) {
+ $postclose = $this->s2c[$id];
+ if ($postclose->read('', $this->curtime) == 0) {
+ fclose($sock);
+ $this->socks_unset($sock);
+ }
+ }
+ else if (isset($this->s2u[$id])) {
$user = $this->s2u[$id];
if ($user->the_end) {
if (($user->rd_toflush == FALSE && $user->rd_step == $user->step)
$buf = fread($sock, 4096);
// if socket is closed
if ($buf == FALSE || feof($sock)) {
+ $postclose = NULL;
// close socket case
if ($buf == FALSE) {
// printf("INFO: read return false\n");
if ($this->s2u[$id]->rd_socket_get() != NULL) {
// try to send close frame (for websocket)
$clo = $this->s2u[$id]->stream_close();
+ $postclose = $user->stream_postclose_get($sock, $this->curtime);
$clo_l = mb_strlen($clo, "ASCII");
@fwrite($sock, $clo, $clo_l);
$this->s2u[$id]->rd_socket_set(NULL);
unset($this->s2u[$id]);
}
}
- fclose($sock);
+ if ($postclose != NULL) {
+ // print("POSTCLOSE found!");
+ $this->socks_set($sock, NULL, NULL, $postclose);
+ }
+ else {
+ fclose($sock);
+ }
// printf("CLOSE ON READ\n");
if ($this->debug > 1) {
// fprintf(STDERR, 'POST USER');
if ($user && $user->rd_transp && strpos($user->rd_transp->type, "websocket") !== FALSE) {
- $clie_cmd = $user->rd_transp->unchunk($buf);
+ $clie_cmd = $user->rd_transp->unchunk($buf, $sock);
$clie_cmd = json_decode($clie_cmd, TRUE);
// fprintf(STDERR, "HERE WE ARE INCOMING DATA [%s]\n", print_r($clie_cmd, TRUE));
ob_end_clean();
}
}
+ else {
+ fprintf(STDERR, "Unknown page [%s]\n", $wr_addr);
+ }
/*
briskin5/index_wr.php
$manage_page = TRUE;
}
}
+
+ // postclose case
+ if (isset($this->s2c[$id])) {
+ $postclose = $this->s2c[$id];
+ // printf("POSTCLOSE: found pc in s2c list\n");
+ if ($postclose->read($buf, $this->curtime) == 0) {
+ printf("POSTCLOSE: received end opcode, close\n");
+ fclose($sock);
+ $this->socks_unset($sock);
+ }
+ }
}
}
}
}
if ($rret == FALSE) {
// FIXME: manage 404 !!!
- printf("TODO: fix unknown page\n");
+ printf("TODO: fix unknown page: %s\n", $path);
fclose($new_socket);
}
}
// close socket after a while to prevent client memory consumption
if ($user->rd_endtime_is_expired($this->curtime)) {
- if ($this->s2u[$id]->rd_socket_get() != NULL) {
- $this->s2u[$id]->rd_socket_set(NULL);
+ $postclose = $user->stream_postclose_get($sock, $this->curtime);
+ if ($user->rd_socket_get() != NULL) {
+ $user->rd_socket_set(NULL);
}
unset($this->socks[$id]);
unset($this->s2u[$id]);
$clo = $user->stream_close();
$clo_l = mb_strlen($clo, "ASCII");
@fwrite($sock, $clo, $clo_l);
- fclose($sock);
+ if ($postclose) {
+ $this->socks_set($sock, NULL, NULL, $postclose);
+ }
+ else {
+ fclose($sock);
+ }
// printf("CLOSE ON LOOP\n");
}
} // if (isset($this->s2u[$id]...
{
}
+ function postclose_get($sock, $curtime)
+ {
+ return NULL;
+ }
+
function chunk($step, $cont)
{
}
}
}
+define("TRANSP_WS_CLOSE_TOUT", 5);
+
+class Transport_websocket_postclose {
+ function Transport_websocket_postclose($transp_ws, $sock, $curtime) {
+ printf("POSTCLOSE: Creation\n");
+ $this->transp_ws = $transp_ws;
+ $this->sock = $sock;
+ $this->start = $curtime;
+ // status not required, currently
+ // $this->status = "begin";
+ }
+
+ function read($payload, $curtime) {
+ if ($this->start + TRANSP_WS_CLOSE_TOUT < $curtime) {
+ printf("POSTCLOSE: Closing ws (%d) force close by timeout\n", $this->sock);
+ return 0;
+ }
+ if (mb_strlen($payload, "ASCII") > 1) {
+ $this->transp_ws->unchunk($payload, $this->sock);
+ }
+ if ($this->transp_ws->hasSentClose) {
+ printf("POSTCLOSE: Closing ws gracefully\n");
+ return 0;
+ }
+ else {
+ printf("POSTCLOSE: not yet finished\n");
+ return 1;
+ }
+ }
+}
+
+
class Transport_websocket {
protected $magicGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
return $strout . "\n";
}
- function unchunk($cont)
+ function unchunk($cont, $sock)
{
// fprintf(STDERR, "CHUNK: [%s]\n", $cont);
- return $this->deframe($cont);
+ return $this->deframe($cont, $sock);
}
function chunk($step, $cont)
return chr($b1) . chr($b2) . $lengthField . $message;
}
- protected function deframe($message) {
+ protected function deframe($message, $socket) {
//echo $this->strtohex($message);
$headers = $this->extractHeaders($message);
$pongReply = false;
if ($pongReply) {
$reply = $this->frame($payload,$this,'pong');
// TODO FIXME ALL socket_write management
- socket_write($user->socket,$reply,mb_strlen($reply, "ASCII"));
+ // socket_write($user->socket,$reply,mb_strlen($reply, "ASCII"));
+ @fwrite($socket, $reply, mb_strlen($reply, "ASCII"));
return false;
}
if (extension_loaded('mbstring')) {
return(chr(0x88).chr(0x02).chr(0xe8).chr(0x03));
}
+ function postclose_get($sock, $curtime)
+ {
+ return new Transport_websocket_postclose($this, $sock, $curtime);
+ }
+
static function fini($init_string, $base, $blockerr)
{
return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@', ($blockerr ? 'xstm.stop(); ' : ''), $base).self::close());
return "";
}
+ function postclose_get($sock, $curtime)
+ {
+ return NULL;
+ }
+
static function fini($init_string, $base, $blockerr)
{
return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@', ($blockerr ? 'xstm.stop(); ' : ''), $base));
return "";
}
+ function postclose_get($sock, $curtime)
+ {
+ return NULL;
+ }
+
static function fini($init_string, $base, $blockerr)
{
$ret = "";
function Transport_htmlfile() {
$this->type = 'htmlfile';
}
+
+ function postclose_get($sock, $curtime)
+ {
+ return NULL;
+ }
}
class Transport {
return ($this->rd_transp->close());
}
+function stream_postclose_get($sock, $curtime)
+{
+ return ($this->rd_transp->postclose_get($sock, $curtime));
+}
+
static function base_get()
{
$c = get_called_class();
lang: readCookie("lang")
});
// console.log(ws_msg);
- xstm.transp.ws.send(ws_msg);
+ xstm.send(ws_msg);
+ /*
+ if (xstm.transp.ws.readyState == 1) {
+ xstm.transp.ws.send(ws_msg);
+ }
+ else {
+ xstm.transp.out_queue.push(ws_msg);
+ }
+ */
+
}
else {
var xhr_wr = createXMLHttpRequest();
}
}
} // else if ($action == "accept") {
+ else if ($action == "delete") {
+ foreach($_POST as $key => $value) {
+ if (substr($key, 0, 9) != "f_newuser")
+ continue;
+
+ $id = (int)substr($key, 9);
+ if ($id <= 0)
+ continue;
+
+ // check existence of username or email
+ $is_trans = FALSE;
+ $res = FALSE;
+ do {
+ if (($bdb = BriskDB::create()) == FALSE)
+ break;
+
+ // retrieve list added users
+ $usr_sql = sprintf("
+SELECT usr.*, guar.login AS guar_login
+ FROM %susers AS usr
+ JOIN %susers AS guar ON guar.code = usr.guar_code
+ WHERE usr.type & (CAST (X'%x' as integer)) = (CAST (X'%x' as integer))
+ AND usr.disa_reas = %d AND usr.code = %d;",
+ $G_dbpfx, $G_dbpfx,
+ USER_FLAG_TY_DISABLE, USER_FLAG_TY_DISABLE,
+ USER_DIS_REA_NU_ADDED, $id);
+ if (($usr_pg = pg_query($bdb->dbconn->db(), $usr_sql)) == FALSE) {
+ log_crit("stat-day: select from tournaments failed");
+ break;
+ }
+ $usr_n = pg_numrows($usr_pg);
+ if ($usr_n != 1) {
+ $status .= sprintf("Inconsistency for code %d, returned %d records, skipped.<br>",
+ $id, $usr_n);
+ break;
+ }
+
+ $usr_obj = pg_fetch_object($usr_pg, 0);
+
+ $bdb->transaction('BEGIN');
+ $is_trans = TRUE;
+
+ // retrieve list added users
+ $usr_sql = sprintf("
+ DELETE FROM %susers
+ WHERE (type & (CAST (X'%x' as integer))) = (CAST (X'%x' as integer))
+ AND disa_reas = %d AND code = %d;",
+ $G_dbpfx, USER_FLAG_TY_DISABLE, USER_FLAG_TY_DISABLE,
+ USER_DIS_REA_NU_ADDED, $id);
+ if (($usr_pg = pg_query($bdb->dbconn->db(), $usr_sql)) == FALSE) {
+ log_crit(sprintf("Delete of user %d failed", $id));
+ break;
+ }
+
+ $status .= sprintf("User %s removed: SUCCESS<br>", $usr_obj->login);
+ $bdb->transaction('COMMIT');
+ $res = TRUE;
+ } while(FALSE);
+ if ($res == FALSE) {
+ $status .= sprintf("Error occurred during delete action<br>");
+ if ($is_trans)
+ $bdb->transaction('ROLLBACK');
+ break;
+ }
+ }
+ } // else if ($action == "accept") {
+
do {
}
exit;
}
-
-
else if ($action == "delete") {
foreach($_POST as $key => $value) {
if (substr($key, 0, 9) != "f_newuser")
$G_dbpfx, $usr_obj->code);
if (($del_pg = pg_query($bdb->dbconn->db(), $del_sql)) == FALSE) {
- log_crit("stat-day: select from tournaments failed");
+ log_crit(sprintf("Delete user %d failed", $usr_obj->code));
break;
}
else
this.name = "WebSocket";
this.ctx_new = "";
+ this.out_queue = [];
var self = this;
this.doc = doc;
this.xynt_streaming.log("PAGE: "+page);
this.ws = new WebSocket(page);
this.ws.onopen = function () {
+ console.log('WS On open');
+
self.xynt_streaming.log("onopen");
if (this.readyState == 1) {
// connected
}
};
this.ws.onmessage = function (msg) {
+ console.log('WS On message');
self.xynt_streaming.log("onmessage");
// new data in msg.data
self.ctx_new += msg.data;
};
this.ws.onclose = function (msg) {
- this.onopen = null;
- this.onclose = null;
- this.onerror = null;
+ console.log('WS On close');
+ self.onopen = null;
+ self.onclose = null;
+ self.onerror = null;
self.xynt_streaming.log("onclose"+self.init_steps);
if (self.init_steps == 0)
self.ws_cb("error");
name: null,
xynt_streaming: "ready",
ws: null,
+ out_queue: null,
stopped: true,
failed: false,
}
}
}
+ else if (from == "open") {
+ this.flush_out_queue();
+ }
+
if (this.ws != null && this.ws.readyState > 1) {
this.stopped = true;
}
},
+ flush_out_queue: function() {
+ var l_out = this.out_queue.length;
+ if (l_out == 0)
+ return;
+
+ for (var i = 0 ; i < l_out ; i++) {
+ if (this.ws.readyState != 1) {
+ break;
+ }
+ var item = this.out_queue.shift();
+ var sent = true;
+ try {
+ this.ws.send(item);
+ }
+ catch (ex) {
+ this.out_queue.unshift(item);
+ break;
+ }
+ }
+ },
+
+ send: function(msg) {
+ console.log('new send');
+ if (this.ws && this.ws.readyState == 1) {
+ try {
+ console.log('Try send ... ');
+ this.flush_out_queue();
+ this.ws.send(msg);
+ console.log(' ... done');
+ }
+ catch (ex) {
+ console.log(' ... catched exception');
+ this.flush_out.push(msg);
+ }
+ }
+ else {
+ console.log('ws not ready: push into flush_out');
+ this.flush_out.push(msg);
+ }
+ },
+
ws_abort: function() {
if (this.ws != null) {
this.xynt_streaming.log("WSCLOSE");
return;
},
+ send: function(msg) {
+ if (typeof(this.transp.send) == 'undefined') {
+ this.log('send not implemented for ' + this.transp_type);
+ return;
+ }
+
+ return this.transp.send(msg);
+ },
+
//
// moved to xynt-streaming-ifra as push()
//