X-Git-Url: https://mop.ddnsfree.com/gitweb/?a=blobdiff_plain;f=web%2Fspush%2Fbrisk-spush.php;h=2717b5265f2acc8eb113efee1eb4b37f8080d0da;hb=5c7f81c55f00cc968162449eae3e799f160bfdcf;hp=40eb54e4835c0dad8f6a1fe552a07dfeb1d24558;hpb=90c7420f07b9198d094ae0d297c391ede9adc1d9;p=brisk.git diff --git a/web/spush/brisk-spush.php b/web/spush/brisk-spush.php index 40eb54e..2717b52 100755 --- a/web/spush/brisk-spush.php +++ b/web/spush/brisk-spush.php @@ -22,13 +22,34 @@ * Suite 330, Boston, MA 02111-1307, USA. * * TODO - * problema con getpeer (HOSTADDR) - * setcookie (for tables only) - * keepalive - * chunked - * index_rd.php porting - * index.php auth part - * generic var management from internet + * + * WIP - chunked + * + * - BUG: logout failed + * - BUG: fast loop on stream index_rd_ifra page + * + * - garbage management + * - log_legal address fix + * - from room to table + * - from table to room + * - fwrite other issues + * - manage and test cross forwarder between table and room + * - setcookie (for tables only) + * - keepalive management + * + * DONE/FROZEN - problema con getpeer (HOSTADDR) + * + * DONE - bug: read from a not resource handle (already closed because a new socket substitute it) + * DONE - partial write for normal page management + * DONE - index_rd_ifra: last_clean issue + * DONE - fwrite failed error management (select, buffer where store unsent data, and fwrite check and retry) + * ABRT - index_wr.php::reload - reload is js-only function + * DONE - bug: after restart index_rd.php receive from prev clients a lot of req + * DONE - index_wr.php::chat + * DONE - index_wr.php::exit + * DONE - index_rd.php porting + * DONE - generic var management from internet + * DONE - index.php auth part */ $G_base = "../"; @@ -45,92 +66,7 @@ require_once($G_base."briskin5/Obj/briskin5.phh"); define('SITE_PREFIX', '/brisk/'); -class SPUser { - var $id; - var $sess; - var $cnt; - var $sock; - - function SPUser($id) - { - $this->id = $id; - $this->cnt = -1; - $this->sock = NULL; - } - - function enable($sock, $sess) - { - $this->sess = $sess; - $this->cnt = 0; - $this->sock = $sock; - - return ($this->id); - } - - function disable() - { - $this->cnt = -1; - $this->sock = NULL; - } - - function is_enable() - { - return ($this->cnt < 0 ? FALSE : TRUE); - } - - function sock_get() - { - return $this->sock; - } - - function sock_set($sock) - { - $this->sock = $sock; - } - - function id_get() - { - return $this->id; - } - - function sess_get() - { - return $this->sess; - } - - function cnt_get() - { - return $this->cnt; - } - - function cnt_inc() - { - return $this->cnt++; - } -} - -function user_get_free($user_arr) -{ - foreach ($user_arr as $i => $user) { - if (!$user->is_enable()) { - return ($user); - } - } - return FALSE; -} - -function user_get_sess($user_arr, $sess) -{ - foreach ($user_arr as $i => $user) { - printf("SESS: [%s] cur: [%s]\n", $user->sess_get(), $sess); - if ($user->sess_get() == $sess) { - return ($user); - } - } - return FALSE; -} - -function headers_render($header) +function headers_render($header, $len) { $s = ""; @@ -144,13 +80,20 @@ function headers_render($header) foreach($header as $key => $value) { $s .= sprintf("%s: %s\r\n", $key, $value); } - $s .= "Mop: was/here\r\n"; + if ($len == -1) { + $s .= "Cache-Control: no-cache, must-revalidate\r\n"; + $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n"; + $s .= "Content-Encoding: chunked\r\n"; + $s .= "Transfer-Encoding: chunked\r\n"; + } + else if ($len > 0) { + $s .= sprintf("Content-Length: %d\r\n", $len); + } $s .= "\r\n"; return ($s); } - /* * Caching system using ob php system to cache old style pages * to a var and than send it with more calm @@ -169,6 +112,18 @@ register_shutdown_function('shutta'); */ $shutdown = FALSE; +function chunked_content($content) +{ + $content_l = mb_strlen($content, "ASCII"); + + return (sprintf("%X\r\n%s\r\n", $content_l, $content)); +} + +function chunked_fini() +{ + return sprintf("0\r\n"); +} + function main() { GLOBAL $G_headers; @@ -193,6 +148,7 @@ function main() } $s2u = array(); + $pages_flush = array(); $rndstr = ""; for ($i = 0 ; $i < 4096 ; $i++) { @@ -216,7 +172,9 @@ function main() stream_set_blocking($list, $blocking_mode); # Set the stream to non-blocking while ($main_loop) { - echo "IN LOOP\n"; + $curtime = time(); + printf("IN LOOP: Current opened: %d pages_flush: %d\n", count($socks), count($pages_flush)); + /* Prepare the read array */ if ($shutdown) $read = array_merge(array("$in" => $in), $socks); @@ -229,7 +187,7 @@ function main() } $write = NULL; $except = NULL; - $num_changed_sockets = stream_select($read, $write, $except, 1); // 0, 250000); + $num_changed_sockets = stream_select($read, $write, $except, 0, 250000); if ($num_changed_sockets === FALSE) { printf("No data in 5 secs"); @@ -241,6 +199,11 @@ function main() } /* At least at one of the sockets something interesting happened */ foreach ($read as $i => $sock) { + /* is_resource check is required because there is the possibility that + during new request an old connection is closed */ + if (!is_resource($sock)) { + continue; + } if ($sock === $list) { printf("NUOVA CONNEX\n"); $new_unix = stream_socket_accept($list); @@ -250,6 +213,7 @@ function main() $post = array(); $cookie = array(); if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { + stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking printf("RECEIVED HEADER:\n%s", $stream_info); $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); printf("PATH: [%s]\n", $path); @@ -272,47 +236,67 @@ function main() index_main($room, $header_out, $addr, $get, $post, $cookie); $content = ob_get_contents(); ob_end_clean(); - // printf("OUT: [%s]\n", $G_content); - fwrite($new_socket, headers_render($header_out).$content); - fclose($new_socket); + + $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); + + if ($pgflush->try_flush($curtime) == FALSE) { + // Add $pgflush to the pgflush array + array_push($pages_flush, $pgflush); + } + break; case SITE_PREFIX."index_wr.php": $header_out = array(); - $addr = ""; - // $ret = socket_getpeername($new_socket, $addr); - printf("RET: %s\n", $addr); - // exit(123); ob_start(); index_wr_main($room, $addr, $get, $post, $cookie); $content = ob_get_contents(); ob_end_clean(); - - // printf("OUT: [%s]\n", $G_content); - fwrite($new_socket, headers_render($header_out).$content); - fclose($new_socket); + + $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); + + if ($pgflush->try_flush($curtime) == FALSE) { + // Add $pgflush to the pgflush array + array_push($pages_flush, $pgflush); + } break; case SITE_PREFIX."index_rd_ifra.php": do { - if (!isset($cookie['sess'])) { - fclose($new_socket); - break; - } - if (($user = $room->get_user($cookie['sess'], $idx)) == FALSE) { - fclose($new_socket); + $header_out = array(); + if (!isset($cookie['sess']) + || (($user = $room->get_user($cookie['sess'], $idx)) == FALSE)) { + $content = index_rd_ifra_fini(TRUE); + + $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); + + if ($pgflush->try_flush($curtime) == FALSE) { + // Add $pgflush to the pgflush array + array_push($pages_flush, $pgflush); + } break; } + // close a previous opened index_read_ifra socket, if exists if (($prev = $user->rd_socket_get()) != NULL) { unset($s2u[intval($user->rd_socket_get())]); unset($socks[intval($user->rd_socket_get())]); fclose($user->rd_socket_get()); + printf("CLOSE AND OPEN AGAIN ON IFRA2\n"); $user->rd_socket_set(NULL); } - $header_out = array(); - $body = ""; - index_rd_ifra_init($room, $user, $header_out, $body, $get, $post, $cookie); - stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking - fwrite($new_socket, headers_render($header_out).$body); + $content = ""; + index_rd_ifra_init($room, $user, $header_out, $content, $get, $post, $cookie); + + $response = headers_render($header_out, -1).chunked_content($content); + $response_l = mb_strlen($response, "ASCII"); + + $wret = @fwrite($new_socket, $response, $response_l); + if ($wret < $response_l) { + printf("TROUBLES WITH FWRITE: %d\n", $wret); + $user->rd_cache_set(mb_substr($content, $wret, $response_l - $wret, "ASCII")); + } + else { + $user->rd_cache_set(""); + } fflush($new_socket); $s2u[intval($new_socket)] = $idx; @@ -322,42 +306,6 @@ function main() break; } - - - - - if (0 == 1) { - /* TODO: here stuff to decide if it is old or new user */ - if (($user_cur = user_get_sess($user_a, $get['sess'])) != FALSE) { - /* close the previous socket */ - unset($s2u[intval($user_cur->sock_get())]); - unset($socks[intval($user_cur->sock_get())]); - fclose($user_cur->sock_get()); - /* assign the new socket */ - $user_cur->sock_set($new_socket); - $id = $user_cur->id_get(); - $s2u[intval($new_socket)] = $id; - $socks[intval($new_socket)] = $new_socket; - fwrite($new_socket, $rndstr); - fflush($new_socket); - } - else if (($user_cur = user_get_free($user_a)) != FALSE) { - stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking - $socks[intval($new_socket)] = $new_socket; - - $id = $user_cur->id_get(); - $user_a[$id]->enable($new_socket, $get['sess']); - printf("s2u: ci passo %d\n", intval($new_socket)); - $s2u[intval($new_socket)] = $id; - - fwrite($new_socket, $rndstr); - fflush($new_socket); - } - else { - printf("Too many opened users\n"); - fclose($new_socket); - } - } } else { printf("WARNING: ancillary_getstream failed\n"); @@ -383,6 +331,7 @@ function main() unset($socks[intval($sock)]); unset($s2u[intval($sock)]); fclose($sock); + printf("CLOSE ON READ\n"); } if ($debug > 1) { printf("post unset\n"); @@ -409,25 +358,41 @@ function main() } - + foreach ($pages_flush as $k => $pgflush) { + if ($pgflush->try_flush($curtime) == TRUE) { + unset($pages_flush[$k]); + } + } foreach ($socks as $k => $sock) { if (isset($s2u[intval($sock)])) { - $body = ""; - - - $body = ""; $user = $room->user[$s2u[intval($sock)]]; - index_rd_ifra_main($room, $user, $body); - if ($body == "" && $user->rd_tout_is_expired($curtime)) { - $body = index_rd_ifra_keepalive($user); + $response = $user->rd_cache_get(); + if ($response == "") { + $content = ""; + index_rd_ifra_main($room, $user, $content); + + if ($content == "" && $user->rd_kalive_is_expired($curtime)) { + $content = index_rd_ifra_keepalive($user); + } + if ($content != "") { + $response = chunked_content($content); + } } - if ($body != "") { - echo "SPIA: [".substr($body, 0, 60)."...]\n"; - fwrite($sock, $body); + if ($response != "") { + echo "SPIA: [".substr($response, 0, 60)."...]\n"; + $response_l = mb_strlen($response, "ASCII"); + $wret = @fwrite($sock, $response); + if ($wret < $response_l) { + printf("TROUBLE WITH FWRITE: %d\n", $wret); + $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII")); + } + else { + $user->rd_cache_set(""); + } fflush($sock); - $user->rd_tout_reset($curtime); + $user->rd_kalive_reset($curtime); } // close socket after a while to prevent client memory consumption @@ -439,6 +404,7 @@ function main() unset($socks[intval($sock)]); unset($s2u[intval($sock)]); fclose($sock); + printf("CLOSE ON LOOP\n"); } } }