From: Matteo Nastasi (mop) Date: Wed, 18 Jul 2012 22:22:37 +0000 (+0200) Subject: all rd stuff to manage fwrite fail, rd_keepalive and rd_endstream X-Git-Tag: v4.0.0~91 X-Git-Url: http://mop.ddnsfree.com/gitweb/?a=commitdiff_plain;h=a83f493a328cfcb04ab53c5e5f08a530a9b5f290;p=brisk.git all rd stuff to manage fwrite fail, rd_keepalive and rd_endstream --- diff --git a/web/Obj/brisk.phh b/web/Obj/brisk.phh index fb88f57..388838b 100644 --- a/web/Obj/brisk.phh +++ b/web/Obj/brisk.phh @@ -690,7 +690,9 @@ define('USER_FLAG_TY_SUPER', 0x020000); // done define('USER_FLAG_TY_SUSPEND', 0x400000); // done define('USER_FLAG_TY_DISABLE', 0x800000); // done -define('RD_STREAM_TIMEOUT', 4); +// 240 is the right value, 600 is for fwrite error test +define('RD_ENDTIME_DELTA', 240); +define('RD_KEEPALIVE_TOUT', 4); class User { var $idx; // index in the users array when you are in game @@ -714,7 +716,8 @@ class User { var $rd_step; // actual step of push stream var $rd_from; // referer var $rd_scristp; // current script step (for each session) - var $rd_tout; // if no message are sent after RD_STREAM_TIMEOUT secs we send a keepalive from server + var $rd_kalive; // if no message are sent after RD_KEEPALIVE_TOUT secs we send a keepalive from server + var $rd_cache; // store place where failed fwrite data var $comm; // commands array // var $asta_card; // @@ -767,7 +770,8 @@ class User { $thiz->rd_step = -1; $thiz->rd_from = ""; $thiz->rd_scristp = -1; - $thiz->rd_tout = -1; + $thiz->rd_kalive = -1; + $thiz->rd_cache = ""; $thiz->asta_card = -2; $thiz->asta_pnt = -1; @@ -903,13 +907,13 @@ class User { function rd_data_set($curtime, $stat, $subst, $step, $from) { - $this->rd_endtime = $curtime + STREAM_TIMEOUT; + $this->rd_endtime = $curtime + RD_ENDTIME_DELTA; $this->rd_stat = $stat; $this->rd_subst = $subst; $this->rd_step = $step; $this->rd_from = $from; $this->rd_scristp = 0; - $this->rd_tout = $curtime + RD_STREAM_TIMEOUT; + $this->rd_kalive = $curtime + RD_KEEPALIVE_TOUT; } function rd_socket_get() { @@ -920,31 +924,41 @@ class User { $this->rd_socket = $sock; } - function rd_tout_get() + function rd_kalive_get() { - return ($this->rd_tout); + return ($this->rd_kalive); } - function rd_tout_set($tm) + function rd_kalive_set($tm) { - $this->rd_tout = $tm; + $this->rd_kalive = $tm; } - function rd_tout_is_expired($tm) + function rd_kalive_is_expired($tm) { - // printf("rd_tout %d tm %d\n", $this->rd_tout, $tm); - return ($this->rd_tout < $tm); + // printf("rd_kalive %d tm %d\n", $this->rd_kalive, $tm); + return ($this->rd_kalive < $tm); } function rd_endtime_is_expired($tm) { - // printf("rd_endtime %d tm %d\n", $this->rd_tout, $tm); + // printf("rd_endtime %d tm %d\n", $this->rd_kalive, $tm); return ($this->rd_endtime < $tm); } - function rd_tout_reset($tm) + function rd_kalive_reset($tm) { - $this->rd_tout = $tm + RD_STREAM_TIMEOUT; + $this->rd_kalive = $tm + RD_KEEPALIVE_TOUT; + } + + function rd_cache_get() + { + return ($this->rd_cache); + } + + function rd_cache_set($cache) + { + $this->rd_cache = $cache; } function idx_get() { diff --git a/web/index_rd_ifra.php b/web/index_rd_ifra.php index 2f2d393..7546c2c 100644 --- a/web/index_rd_ifra.php +++ b/web/index_rd_ifra.php @@ -130,6 +130,7 @@ function blocking_error($is_unrecoverable) return (sprintf(($is_unrecoverable ? 'hstm.stop(); ' : '').'window.onbeforeunload = null; window.onunload = null; document.location.assign("index.php");')); } +// FIXME TO SUPPORT iframe function page_sync($sess, $page, $table_idx, $table_token) { GLOBAL $is_page_streaming; @@ -491,6 +492,33 @@ function maincheck(&$room, &$user, $cur_stat, $cur_subst, $cur_step, &$new_stat, return ($ret); } +function index_rd_ifra_fini($is_unrecoverable) +{ + GLOBAL $G_four_rnd_string; + + // IF IFRAME THEN: + $body = ""; + $body .= sprintf(" + + + + + +"); + $body .= sprintf("\n", $G_four_rnd_string); + $body .= sprintf("", 0, escpush(blocking_error($is_unrecoverable)) ); + // ELSE IF XHR THEN: + // return (blocking_error($is_unrecoverable)); + return ($body); +} + /* * MAIN */ diff --git a/web/spush/brisk-spush.php b/web/spush/brisk-spush.php index fff12a0..068d46c 100755 --- a/web/spush/brisk-spush.php +++ b/web/spush/brisk-spush.php @@ -151,7 +151,7 @@ function main() } $write = NULL; $except = NULL; - $num_changed_sockets = stream_select($read, $write, $except, 1); // 0, 250000); + $num_changed_sockets = stream_select($read, $write, $except, 0, 250000); if ($num_changed_sockets === FALSE) { printf("No data in 5 secs"); @@ -172,6 +172,7 @@ function main() $post = array(); $cookie = array(); if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { + stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking printf("RECEIVED HEADER:\n%s", $stream_info); $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); printf("PATH: [%s]\n", $path); @@ -215,25 +216,26 @@ function main() break; case SITE_PREFIX."index_rd_ifra.php": do { - if (!isset($cookie['sess'])) { - fclose($new_socket); - break; - } - if (($user = $room->get_user($cookie['sess'], $idx)) == FALSE) { + $header_out = array(); + if (!isset($cookie['sess']) + || (($user = $room->get_user($cookie['sess'], $idx)) == FALSE)) { + $body = index_rd_ifra_fini(TRUE); + fwrite($new_socket, headers_render($header_out).$body); + fflush($new_socket); fclose($new_socket); break; } + // close a previous opened index_read_ifra socket, if exists if (($prev = $user->rd_socket_get()) != NULL) { unset($s2u[intval($user->rd_socket_get())]); unset($socks[intval($user->rd_socket_get())]); fclose($user->rd_socket_get()); + printf("CLOSE AND OPEN AGAIN ON IFRA2\n"); $user->rd_socket_set(NULL); } - $header_out = array(); $body = ""; index_rd_ifra_init($room, $user, $header_out, $body, $get, $post, $cookie); - stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking fwrite($new_socket, headers_render($header_out).$body); fflush($new_socket); @@ -269,6 +271,7 @@ function main() unset($socks[intval($sock)]); unset($s2u[intval($sock)]); fclose($sock); + printf("CLOSE ON READ\n"); } if ($debug > 1) { printf("post unset\n"); @@ -294,26 +297,30 @@ function main() } } - - - foreach ($socks as $k => $sock) { if (isset($s2u[intval($sock)])) { - $body = ""; - - - $body = ""; $user = $room->user[$s2u[intval($sock)]]; - index_rd_ifra_main($room, $user, $body); - if ($body == "" && $user->rd_tout_is_expired($curtime)) { + $body = $user->rd_cache_get(); + if ($body == "") + index_rd_ifra_main($room, $user, $body); + + if ($body == "" && $user->rd_kalive_is_expired($curtime)) { $body = index_rd_ifra_keepalive($user); } if ($body != "") { echo "SPIA: [".substr($body, 0, 60)."...]\n"; - fwrite($sock, $body); + $body_l = mb_strlen($body, "LATIN1"); + $ret = @fwrite($sock, $body); + if ($ret < $body_l) { + printf("TROUBLE WITH FWRITE: %d\n", $ret); + $user->rd_cache_set(mb_substr($body, $ret, $body_l - $ret, "LATIN1")); + } + else { + $user->rd_cache_set(""); + } fflush($sock); - $user->rd_tout_reset($curtime); + $user->rd_kalive_reset($curtime); } // close socket after a while to prevent client memory consumption @@ -325,6 +332,7 @@ function main() unset($socks[intval($sock)]); unset($s2u[intval($sock)]); fclose($sock); + printf("CLOSE ON LOOP\n"); } } }