From bcdb74ceefc0946436095706835f6a5256c38d92 Mon Sep 17 00:00:00 2001 From: Matteo Nastasi Date: Fri, 3 Aug 2012 09:00:18 +0200 Subject: [PATCH] Sac_a_push class created --- web/spush/brisk-spush.php | 523 +++++++++++++++++++++----------------- 1 file changed, 285 insertions(+), 238 deletions(-) diff --git a/web/spush/brisk-spush.php b/web/spush/brisk-spush.php index 6b84d9d..8716dcb 100755 --- a/web/spush/brisk-spush.php +++ b/web/spush/brisk-spush.php @@ -122,292 +122,339 @@ function chunked_fini() return sprintf("0\r\n"); } -function main() -{ - GLOBAL $G_headers; - GLOBAL $shutdown; - $main_loop = TRUE; +class Sac_a_push { + static $fixed_fd = 2; + + var $file_socket; + var $unix_socket; + var $socks; + var $s2u; + var $pages_flush; - /* - * INIT - */ + var $list; + var $in; - $FILE_SOCKET = "/tmp/brisk.sock"; - $UNIX_SOCKET = "unix://$FILE_SOCKET"; - $debug = 0; - $fixed_fd = 2; - $socks = array(); + var $debug; + var $blocking_mode; - $blocking_mode = 0; // 0 for non-blocking + var $room; + var $bin5; - if (($room = Room::create()) == FALSE) { - log_crit("load_data failed"); - return FALSE; + var $rndstr; + var $main_loop; + + function Sac_a_push() + { } - $s2u = array(); - $pages_flush = array(); + // Sac_a_push::create("/tmp/brisk.sock", 0, 0 - $rndstr = ""; - for ($i = 0 ; $i < 4096 ; $i++) { - $rndstr .= chr(mt_rand(65, 90)); - } + static function create($sockname, $debug, $blocking_mode) + { + $thiz = new Sac_a_push(); - if (file_exists($FILE_SOCKET)) { - unlink($FILE_SOCKET); - } - - $old_umask = umask(0); - if (($list = stream_socket_server($UNIX_SOCKET, $err, $errs)) === FALSE) { - exit(11); - } - umask($old_umask); + $thiz->file_socket = $sockname; + $thiz->unix_socket = "unix://$sockname"; + $thiz->debug = $debug; + $thiz->socks = array(); + $thiz->s2u = array(); + $thiz->pages_flush = array(); - if (($in = fopen("php://stdin", "r")) === FALSE) { - exit(11); - } + $thiz->blocking_mode = 0; // 0 for non-blocking - stream_set_blocking($list, $blocking_mode); # Set the stream to non-blocking + if (($thiz->room = Room::create()) == FALSE) { + log_crit("room::create failed"); + return FALSE; + } - while ($main_loop) { - $curtime = time(); - printf("IN LOOP: Current opened: %d pages_flush: %d\n", count($socks), count($pages_flush)); - /* Prepare the read array */ - if ($shutdown) - $read = array_merge(array("$in" => $in), $socks); - else - $read = array_merge(array("$list" => $list, "$in" => $in), $socks); + $thiz->rndstr = ""; + for ($i = 0 ; $i < 4096 ; $i++) { + $thiz->rndstr .= chr(mt_rand(65, 90)); + } + + if (file_exists($thiz->file_socket)) { + unlink($thiz->file_socket); + } + + $old_umask = umask(0); + if (($thiz->list = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) { + return (FALSE); + } + umask($old_umask); + stream_set_blocking($thiz->list, $thiz->blocking_mode); # Set the stream to non-blocking - if ($debug > 1) { - printf("PRE_SELECT\n"); - print_r($read); + if (($thiz->in = fopen("php://stdin", "r")) === FALSE) { + return(FALSE); } - $write = NULL; - $except = NULL; - $num_changed_sockets = stream_select($read, $write, $except, 0, 250000); + + $thiz->main_loop = FALSE; + + return ($thiz); + } + + function run() + { + if ($this->main_loop) { + return (FALSE); + } + + $this->main_loop = TRUE; - if ($num_changed_sockets === FALSE) { - printf("No data in 5 secs"); - } - else if ($num_changed_sockets > 0) { - printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read)); - if ($debug > 1) { + while ($this->main_loop) { + $curtime = time(); + printf("IN LOOP: Current opened: %d pages_flush: %d\n", count($this->socks), count($this->pages_flush)); + + /* Prepare the read array */ + /* // when we manage it ... */ + /* if ($shutdown) */ + /* $read = array_merge(array("$in" => $in), $socks); */ + /* else */ + $read = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in), + $this->socks); + + if ($this->debug > 1) { + printf("PRE_SELECT\n"); print_r($read); } - /* At least at one of the sockets something interesting happened */ - foreach ($read as $i => $sock) { - /* is_resource check is required because there is the possibility that - during new request an old connection is closed */ - if (!is_resource($sock)) { - continue; + $write = NULL; + $except = NULL; + $num_changed_sockets = stream_select($read, $write, $except, 0, 250000); + + if ($num_changed_sockets == 0) { + printf("No data in 5 secs\n"); + } + else if ($num_changed_sockets > 0) { + printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read)); + if ($this->debug > 1) { + print_r($read); } - if ($sock === $list) { - printf("NUOVA CONNEX\n"); - $new_unix = stream_socket_accept($list); - $stream_info = ""; - $method = ""; - $get = array(); - $post = array(); - $cookie = array(); - if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { - stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking - printf("RECEIVED HEADER:\n%s", $stream_info); - $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); - printf("PATH: [%s]\n", $path); - printf("M: %s\nHEADER:\n", $method); - print_r($header); - printf("GET:\n"); - print_r($get); - printf("POST:\n"); - print_r($post); - printf("COOKIE:\n"); - print_r($cookie); - - $addr = stream_socket_get_name($new_socket, TRUE); - - switch ($path) { - case SITE_PREFIX: - case SITE_PREFIX."index.php": + /* At least at one of the sockets something interesting happened */ + foreach ($read as $i => $sock) { + /* is_resource check is required because there is the possibility that + during new request an old connection is closed */ + if (!is_resource($sock)) { + continue; + } + if ($sock === $this->list) { + printf("NUOVA CONNEX\n"); + $new_unix = stream_socket_accept($this->list); + $stream_info = ""; + $method = ""; + $get = array(); + $post = array(); + $cookie = array(); + if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { + stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking + printf("RECEIVED HEADER:\n%s", $stream_info); + $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); + printf("PATH: [%s]\n", $path); + printf("M: %s\nHEADER:\n", $method); + print_r($header); + printf("GET:\n"); + print_r($get); + printf("POST:\n"); + print_r($post); + printf("COOKIE:\n"); + print_r($cookie); + + $addr = stream_socket_get_name($new_socket, TRUE); $header_out = array(); - ob_start(); - index_main($room, $header_out, $addr, $get, $post, $cookie); - $content = ob_get_contents(); - ob_end_clean(); - $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); + switch ($path) { + case SITE_PREFIX: + case SITE_PREFIX."index.php": + ob_start(); + index_main($this->room, $header_out, $addr, $get, $post, $cookie); + $content = ob_get_contents(); + ob_end_clean(); - if ($pgflush->try_flush($curtime) == FALSE) { - // Add $pgflush to the pgflush array - array_push($pages_flush, $pgflush); - } - - break; - case SITE_PREFIX."index_wr.php": - $header_out = array(); - ob_start(); - index_wr_main($room, $addr, $get, $post, $cookie); - $content = ob_get_contents(); - ob_end_clean(); + $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); - $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); + if ($pgflush->try_flush($curtime) == FALSE) { + // Add $pgflush to the pgflush array + array_push($this->pages_flush, $pgflush); + } - if ($pgflush->try_flush($curtime) == FALSE) { - // Add $pgflush to the pgflush array - array_push($pages_flush, $pgflush); - } + break; + case SITE_PREFIX."index_wr.php": + ob_start(); + index_wr_main($this->room, $addr, $get, $post, $cookie); + $content = ob_get_contents(); + ob_end_clean(); + + $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); + + if ($pgflush->try_flush($curtime) == FALSE) { + // Add $pgflush to the pgflush array + array_push($this->pages_flush, $pgflush); + } break; - case SITE_PREFIX."index_rd_ifra.php": - do { - $header_out = array(); - if (!isset($cookie['sess']) - || (($user = $room->get_user($cookie['sess'], $idx)) == FALSE)) { - $content = index_rd_ifra_fini(TRUE); + case SITE_PREFIX."index_rd_ifra.php": + do { + if (!isset($cookie['sess']) + || (($user = $this->room->get_user($cookie['sess'], $idx)) == FALSE)) { + $content = index_rd_ifra_fini(TRUE); + + $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); + + if ($pgflush->try_flush($curtime) == FALSE) { + // Add $pgflush to the pgflush array + array_push($this->pages_flush, $pgflush); + } + break; + } + // close a previous opened index_read_ifra socket, if exists + if (($prev = $user->rd_socket_get()) != NULL) { + unset($this->s2u[intval($user->rd_socket_get())]); + unset($this->socks[intval($user->rd_socket_get())]); + fclose($user->rd_socket_get()); + printf("CLOSE AND OPEN AGAIN ON IFRA2\n"); + $user->rd_socket_set(NULL); + } - $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); + $content = ""; + index_rd_ifra_init($this->room, $user, $header_out, $content, $get, $post, $cookie); + + $response = headers_render($header_out, -1).chunked_content($content); + $response_l = mb_strlen($response, "ASCII"); - if ($pgflush->try_flush($curtime) == FALSE) { - // Add $pgflush to the pgflush array - array_push($pages_flush, $pgflush); + $wret = @fwrite($new_socket, $response, $response_l); + if ($wret < $response_l) { + printf("TROUBLES WITH FWRITE: %d\n", $wret); + $user->rd_cache_set(mb_substr($content, $wret, $response_l - $wret, "ASCII")); } - break; - } - // close a previous opened index_read_ifra socket, if exists - if (($prev = $user->rd_socket_get()) != NULL) { - unset($s2u[intval($user->rd_socket_get())]); - unset($socks[intval($user->rd_socket_get())]); - fclose($user->rd_socket_get()); - printf("CLOSE AND OPEN AGAIN ON IFRA2\n"); - $user->rd_socket_set(NULL); - } - - $content = ""; - index_rd_ifra_init($room, $user, $header_out, $content, $get, $post, $cookie); - - $response = headers_render($header_out, -1).chunked_content($content); - $response_l = mb_strlen($response, "ASCII"); - - $wret = @fwrite($new_socket, $response, $response_l); - if ($wret < $response_l) { - printf("TROUBLES WITH FWRITE: %d\n", $wret); - $user->rd_cache_set(mb_substr($content, $wret, $response_l - $wret, "ASCII")); - } - else { - $user->rd_cache_set(""); - } - fflush($new_socket); - - $s2u[intval($new_socket)] = $idx; - $socks[intval($new_socket)] = $new_socket; - $user->rd_socket_set($new_socket); - } while (FALSE); - - break; + else { + $user->rd_cache_set(""); + } + fflush($new_socket); + + $this->s2u[intval($new_socket)] = $idx; + $this->socks[intval($new_socket)] = $new_socket; + $user->rd_socket_set($new_socket); + } while (FALSE); + + break; + + /* default: */ + /* $cl = strlen(SITE_PREFIX."briskin5/"); */ + /* if (!strncmp($this->path, SITE_PREFIX."briskin5/", $cl)) { */ + /* Bin5::page_manager($room, $header_out, substr($path,$cl), $method, $addr, $get, $post, $cookie); */ + } + } + else { + printf("WARNING: ancillary_getstream failed\n"); } } else { - printf("WARNING: ancillary_getstream failed\n"); - } - } - else { - if (($buf = fread($sock, 512)) === FALSE) { - printf("error read\n"); - exit(123); - } - else if (strlen($buf) === 0) { - if ($sock === $list) { - printf("Arrivati %d bytes da list\n", strlen($buf)); + if (($buf = fread($sock, 512)) === FALSE) { + printf("error read\n"); + exit(123); } - else if ($sock === $in) { - printf("Arrivati %d bytes da stdin\n", strlen($buf)); + else if (strlen($buf) === 0) { + if ($sock === $this->list) { + printf("Arrivati %d bytes da list\n", strlen($buf)); + } + else if ($sock === $this->in) { + printf("Arrivati %d bytes da stdin\n", strlen($buf)); + } + else { + // $user_a[$s2u[intval($sock)]]->disable(); + if ($this->room->user[$this->s2u[intval($sock)]]->rd_socket_get() != NULL) { + $this->room->user[$this->s2u[intval($sock)]]->rd_socket_set(NULL); + } + unset($this->socks[intval($sock)]); + unset($this->s2u[intval($sock)]); + fclose($sock); + printf("CLOSE ON READ\n"); + } + if ($this->debug > 1) { + printf("post unset\n"); + print_r($this->socks); + } } else { - // $user_a[$s2u[intval($sock)]]->disable(); - if ($room->user[$s2u[intval($sock)]]->rd_socket_get() != NULL) { - $room->user[$s2u[intval($sock)]]->rd_socket_set(NULL); + if ($debug > 1) { + print_r($read); + } + if ($sock === $this->list) { + printf("Arrivati %d bytes da list\n", strlen($buf)); + } + else if ($sock === $this->in) { + printf("Arrivati %d bytes da stdin\n", strlen($buf)); + } + else { + $key = array_search("$sock", $this->socks); + printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key); } - unset($socks[intval($sock)]); - unset($s2u[intval($sock)]); - fclose($sock); - printf("CLOSE ON READ\n"); - } - if ($debug > 1) { - printf("post unset\n"); - print_r($socks); } } - else { - if ($debug > 1) { - print_r($read); + } + } + + + foreach ($this->pages_flush as $k => $pgflush) { + if ($pgflush->try_flush($curtime) == TRUE) { + unset($this->pages_flush[$k]); + } + } + + foreach ($this->socks as $k => $sock) { + if (isset($this->s2u[intval($sock)])) { + $user = $this->room->user[$this->s2u[intval($sock)]]; + $response = $user->rd_cache_get(); + if ($response == "") { + $content = ""; + index_rd_ifra_main($this->room, $user, $content, $get, $post, $cookie); + + if ($content == "" && $user->rd_kalive_is_expired($curtime)) { + $content = index_rd_ifra_keepalive($user); } - if ($sock === $list) { - printf("Arrivati %d bytes da list\n", strlen($buf)); + if ($content != "") { + $response = chunked_content($content); } - else if ($sock === $in) { - printf("Arrivati %d bytes da stdin\n", strlen($buf)); + } + + if ($response != "") { + echo "SPIA: [".substr($response, 0, 60)."...]\n"; + $response_l = mb_strlen($response, "ASCII"); + $wret = @fwrite($sock, $response); + if ($wret < $response_l) { + printf("TROUBLE WITH FWRITE: %d\n", $wret); + $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII")); } else { - $key = array_search("$sock", $socks); - printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key); + $user->rd_cache_set(""); } + fflush($sock); + $user->rd_kalive_reset($curtime); + } + + // close socket after a while to prevent client memory consumption + if ($user->rd_endtime_is_expired($curtime)) { + // $user_a[$s2u[intval($sock)]]->disable(); + if ($this->room->user[$this->s2u[intval($sock)]]->rd_socket_get() != NULL) { + $this->room->user[$this->s2u[intval($sock)]]->rd_socket_set(NULL); + } + unset($this->socks[intval($sock)]); + unset($this->s2u[intval($sock)]); + fclose($sock); + printf("CLOSE ON LOOP\n"); } } } } + } +} +function main() +{ + if (($sap = Sac_a_push::create("/tmp/brisk.sock", 0, 0)) === FALSE) { + exit(1); + } - foreach ($pages_flush as $k => $pgflush) { - if ($pgflush->try_flush($curtime) == TRUE) { - unset($pages_flush[$k]); - } - } - - foreach ($socks as $k => $sock) { - if (isset($s2u[intval($sock)])) { - $user = $room->user[$s2u[intval($sock)]]; - $response = $user->rd_cache_get(); - if ($response == "") { - $content = ""; - index_rd_ifra_main($room, $user, $content, $get, $post, $cookie); - - if ($content == "" && $user->rd_kalive_is_expired($curtime)) { - $content = index_rd_ifra_keepalive($user); - } - if ($content != "") { - $response = chunked_content($content); - } - } - - if ($response != "") { - echo "SPIA: [".substr($response, 0, 60)."...]\n"; - $response_l = mb_strlen($response, "ASCII"); - $wret = @fwrite($sock, $response); - if ($wret < $response_l) { - printf("TROUBLE WITH FWRITE: %d\n", $wret); - $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII")); - } - else { - $user->rd_cache_set(""); - } - fflush($sock); - $user->rd_kalive_reset($curtime); - } + $sap->run(); - // close socket after a while to prevent client memory consumption - if ($user->rd_endtime_is_expired($curtime)) { - // $user_a[$s2u[intval($sock)]]->disable(); - if ($room->user[$s2u[intval($sock)]]->rd_socket_get() != NULL) { - $room->user[$s2u[intval($sock)]]->rd_socket_set(NULL); - } - unset($socks[intval($sock)]); - unset($s2u[intval($sock)]); - fclose($sock); - printf("CLOSE ON LOOP\n"); - } - } - } - } - exit(0); } -- 2.17.1