From 65c6a676cf9a655d33fa48441b1c7390492163fa Mon Sep 17 00:00:00 2001 From: Matteo Nastasi Date: Fri, 3 Aug 2012 18:27:27 +0200 Subject: [PATCH] physical separation between brisk and sac-a-push (new file) --- web/Obj/sac-a-push.phh | 439 ++++++++++++++++++++++++++++++++++++++ web/spush/brisk-spush.php | 337 +---------------------------- web/spush/sac-a-push.phh | 79 ------- 3 files changed, 440 insertions(+), 415 deletions(-) create mode 100644 web/Obj/sac-a-push.phh delete mode 100644 web/spush/sac-a-push.phh diff --git a/web/Obj/sac-a-push.phh b/web/Obj/sac-a-push.phh new file mode 100644 index 0000000..3c66c12 --- /dev/null +++ b/web/Obj/sac-a-push.phh @@ -0,0 +1,439 @@ + 1) { + $a = explode('&', $get_vars[1]); + printf("A COUNT: [%s] %d\n", $a[0], count($a)); + for ($i = 0 ; $i < count($a) ; $i++) { + $b = explode('=', $a[$i]); + $get[$b[0]] = urldecode($b[1]); + } + } + // POST params management + if ($req[0] == 'POST') { + if ($header['Content-Type'] != 'application/x-www-form-urlencoded' + || !isset($header['Content-Length'])) { + return FALSE; + } + $post_len = mb_strlen($line, "latin1"); + $a = explode('&', $line); + for ($i = 0 ; $i < count($a) ; $i++) { + $b = explode('=', $a[$i]); + $post[$b[0]] = urldecode($b[1]); + } + printf("INFO: postlen: %d\n", $post_len); + } + break; + } + if ($line == "") { + $check_post = TRUE; + continue; + } + $split = explode(":", $line, 2); + $header[$split[0]] = $split[1]; + } + return $path; +} + +function gpcs_var($name, $get, $post, $cookie) +{ + if (isset($GLOBALS[$name])) + return FALSE; + else if (isset($cookie[$name])) + return ($cookie[$name]); + else if (isset($post[$name])) + return ($post[$name]); + else if (isset($get[$name])) + return ($get[$name]); + + return FALSE; +} + +function headers_render($header, $len) +{ + + $s = ""; + $s .= "HTTP/1.1 200 OK\r\n"; + if (!isset($header['Date'])) + $s .= sprintf("Date: %s\r\n", date(DATE_RFC822)); + if (!isset($header['Connection'])) + $s .= "Connection: close\r\n"; + if (!isset($header['Content-Type'])) + $s .= "Content-Type: text/html\r\n"; + foreach($header as $key => $value) { + $s .= sprintf("%s: %s\r\n", $key, $value); + } + if ($len == -1) { + $s .= "Cache-Control: no-cache, must-revalidate\r\n"; + $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n"; + $s .= "Content-Encoding: chunked\r\n"; + $s .= "Transfer-Encoding: chunked\r\n"; + } + else if ($len > 0) { + $s .= sprintf("Content-Length: %d\r\n", $len); + } + $s .= "\r\n"; + + return ($s); +} + +/* + * Caching system using ob php system to cache old style pages + * to a var and than send it with more calm + */ + +function shutta() +{ + log_rd2("SHUTTA [".connection_status()."] !"); +} + +register_shutdown_function('shutta'); + +/* + * MAIN + */ + +function chunked_content($content) +{ + $content_l = mb_strlen($content, "ASCII"); + + return (sprintf("%X\r\n%s\r\n", $content_l, $content)); +} + +function chunked_fini() +{ + return sprintf("0\r\n"); +} + +class Sac_a_push { + static $fixed_fd = 2; + + var $file_socket; + var $unix_socket; + var $socks; + var $s2u; + var $pages_flush; + + var $list; + var $in; + + var $debug; + var $blocking_mode; + + var $room; + var $bin5; + + var $curtime; + + var $rndstr; + var $main_loop; + + function Sac_a_push() + { + } + + // Sac_a_push::create("/tmp/brisk.sock", 0, 0 + + static function create(&$room, $sockname, $debug, $blocking_mode) + { + $thiz = new Sac_a_push(); + + $thiz->room = $room; + $thiz->file_socket = $sockname; + $thiz->unix_socket = "unix://$sockname"; + $thiz->debug = $debug; + $thiz->socks = array(); + $thiz->s2u = array(); + $thiz->pages_flush = array(); + + $thiz->blocking_mode = 0; // 0 for non-blocking + + $thiz->rndstr = ""; + for ($i = 0 ; $i < 4096 ; $i++) { + $thiz->rndstr .= chr(mt_rand(65, 90)); + } + + if (file_exists($thiz->file_socket)) { + unlink($thiz->file_socket); + } + + $old_umask = umask(0); + if (($thiz->list = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) { + return (FALSE); + } + umask($old_umask); + stream_set_blocking($thiz->list, $thiz->blocking_mode); # Set the stream to non-blocking + + if (($thiz->in = fopen("php://stdin", "r")) === FALSE) { + return(FALSE); + } + + $thiz->main_loop = FALSE; + + return ($thiz); + } + + function socks_set($sock, $user) + { + $id = intval($sock); + + $this->s2u[$id] = $user; + $this->socks[$id] = $sock; + } + + function socks_unset($sock) + { + $id = intval($sock); + + unset($this->s2u[$id]); + unset($this->socks[$id]); + } + + function pgflush_try_add(&$new_socket, $tout, $header_out, $content) + { + $pgflush = new PageFlush($new_socket, $this->curtime, $tout, $header_out, $content); + + if ($pgflush->try_flush($this->curtime) == FALSE) { + // Add $pgflush to the pgflush array + $this->pgflush_add($pgflush); + } + } + + function pgflush_add($pgflush) + { + array_push($this->pages_flush, $pgflush); + } + + function run() + { + if ($this->main_loop) { + return (FALSE); + } + + $this->main_loop = TRUE; + + while ($this->main_loop) { + $this->curtime = time(); + printf("IN LOOP: Current opened: %d pages_flush: %d - ", count($this->socks), count($this->pages_flush)); + + /* Prepare the read array */ + /* // when we manage it ... */ + /* if ($shutdown) */ + /* $read = array_merge(array("$in" => $in), $socks); */ + /* else */ + $read = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in), + $this->socks); + + if ($this->debug > 1) { + printf("PRE_SELECT\n"); + print_r($read); + } + $write = NULL; + $except = NULL; + $num_changed_sockets = stream_select($read, $write, $except, 0, 250000); + + if ($num_changed_sockets == 0) { + printf(" no data in 5 secs "); + } + else if ($num_changed_sockets > 0) { + printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read)); + if ($this->debug > 1) { + print_r($read); + } + /* At least at one of the sockets something interesting happened */ + foreach ($read as $i => $sock) { + /* is_resource check is required because there is the possibility that + during new request an old connection is closed */ + if (!is_resource($sock)) { + continue; + } + if ($sock === $this->list) { + printf("NUOVA CONNEX\n"); + $new_unix = stream_socket_accept($this->list); + $stream_info = ""; + $method = ""; + $get = array(); + $post = array(); + $cookie = array(); + if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { + printf("NEW_SOCKET: %d\n", intval($new_socket)); + stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking + printf("RECEIVED HEADER:\n%s", $stream_info); + $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); + printf("PATH: [%s]\n", $path); + printf("M: %s\nHEADER:\n", $method); + print_r($header); + printf("GET:\n"); + print_r($get); + printf("POST:\n"); + print_r($post); + printf("COOKIE:\n"); + print_r($cookie); + + $addr = stream_socket_get_name($new_socket, TRUE); + $header_out = array(); + + $this->room->request_mgr($this, $header_out, $new_socket, $path, $addr, $get, $post, $cookie); + printf("number of sockets after %d\n", count($this->socks)); + } + else { + printf("WARNING: ancillary_getstream failed\n"); + } + } + else { + if (($buf = fread($sock, 512)) === FALSE) { + printf("error read\n"); + exit(123); + } + else if (strlen($buf) === 0) { + if ($sock === $this->list) { + printf("Arrivati %d bytes da list\n", strlen($buf)); + } + else if ($sock === $this->in) { + printf("Arrivati %d bytes da stdin\n", strlen($buf)); + } + else { + // $user_a[$s2u[intval($sock)]]->disable(); + if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { + $this->s2u[intval($sock)]->rd_socket_set(NULL); + } + unset($this->socks[intval($sock)]); + unset($this->s2u[intval($sock)]); + fclose($sock); + printf("CLOSE ON READ\n"); + } + if ($this->debug > 1) { + printf("post unset\n"); + print_r($this->socks); + } + } + else { + if ($debug > 1) { + print_r($read); + } + if ($sock === $this->list) { + printf("Arrivati %d bytes da list\n", strlen($buf)); + } + else if ($sock === $this->in) { + printf("Arrivati %d bytes da stdin\n", strlen($buf)); + } + else { + $key = array_search("$sock", $this->socks); + printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key); + } + } + } + } + } + + + /* manage unfinished pages */ + foreach ($this->pages_flush as $k => $pgflush) { + if ($pgflush->try_flush($this->curtime) == TRUE) { + unset($this->pages_flush[$k]); + } + } + + /* manage open streaming */ + foreach ($this->socks as $k => $sock) { + if (isset($this->s2u[intval($sock)])) { + $user = $this->s2u[intval($sock)]; + $response = $user->rd_cache_get(); + if ($response == "") { + $content = ""; + $user->stream_main($content, $get, $post, $cookie); + + if ($content == "" && $user->rd_kalive_is_expired($this->curtime)) { + $content = $user->stream_keepalive(); + } + if ($content != "") { + $response = chunked_content($content); + } + } + + if ($response != "") { + echo "SPIA: [".substr($response, 0, 60)."...]\n"; + $response_l = mb_strlen($response, "ASCII"); + $wret = @fwrite($sock, $response); + if ($wret < $response_l) { + printf("TROUBLE WITH FWRITE: %d\n", $wret); + $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII")); + } + else { + $user->rd_cache_set(""); + } + fflush($sock); + $user->rd_kalive_reset($this->curtime); + } + + // close socket after a while to prevent client memory consumption + if ($user->rd_endtime_is_expired($this->curtime)) { + if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { + $this->s2u[intval($sock)]->rd_socket_set(NULL); + } + unset($this->socks[intval($sock)]); + unset($this->s2u[intval($sock)]); + fclose($sock); + printf("CLOSE ON LOOP\n"); + } + } + } // foreach ($this->socks... + printf("\n"); + } // while (... + } // function run(... +} + +?> \ No newline at end of file diff --git a/web/spush/brisk-spush.php b/web/spush/brisk-spush.php index a48aa1f..ff09ded 100755 --- a/web/spush/brisk-spush.php +++ b/web/spush/brisk-spush.php @@ -54,7 +54,7 @@ $G_base = "../"; -require_once("./sac-a-push.phh"); +require_once($G_base."Obj/sac-a-push.phh"); require_once("./brisk-spush.phh"); require_once($G_base."Obj/user.phh"); require_once($G_base."Obj/brisk.phh"); @@ -65,341 +65,6 @@ require_once($G_base."index_wr.php"); require_once($G_base."index_rd_ifra.php"); require_once($G_base."briskin5/Obj/briskin5.phh"); -define('SITE_PREFIX', '/brisk/'); - -function headers_render($header, $len) -{ - - $s = ""; - $s .= "HTTP/1.1 200 OK\r\n"; - if (!isset($header['Date'])) - $s .= sprintf("Date: %s\r\n", date(DATE_RFC822)); - if (!isset($header['Connection'])) - $s .= "Connection: close\r\n"; - if (!isset($header['Content-Type'])) - $s .= "Content-Type: text/html\r\n"; - foreach($header as $key => $value) { - $s .= sprintf("%s: %s\r\n", $key, $value); - } - if ($len == -1) { - $s .= "Cache-Control: no-cache, must-revalidate\r\n"; - $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n"; - $s .= "Content-Encoding: chunked\r\n"; - $s .= "Transfer-Encoding: chunked\r\n"; - } - else if ($len > 0) { - $s .= sprintf("Content-Length: %d\r\n", $len); - } - $s .= "\r\n"; - - return ($s); -} - -/* - * Caching system using ob php system to cache old style pages - * to a var and than send it with more calm - */ - -function shutta() -{ - log_rd2("SHUTTA [".connection_status()."] !"); -} - -register_shutdown_function('shutta'); - -/* - * MAIN - */ - -function chunked_content($content) -{ - $content_l = mb_strlen($content, "ASCII"); - - return (sprintf("%X\r\n%s\r\n", $content_l, $content)); -} - -function chunked_fini() -{ - return sprintf("0\r\n"); -} - -class Sac_a_push { - static $fixed_fd = 2; - - var $file_socket; - var $unix_socket; - var $socks; - var $s2u; - var $pages_flush; - - var $list; - var $in; - - var $debug; - var $blocking_mode; - - var $room; - var $bin5; - - var $curtime; - - var $rndstr; - var $main_loop; - - function Sac_a_push() - { - } - - // Sac_a_push::create("/tmp/brisk.sock", 0, 0 - - static function create(&$room, $sockname, $debug, $blocking_mode) - { - $thiz = new Sac_a_push(); - - $thiz->room = $room; - $thiz->file_socket = $sockname; - $thiz->unix_socket = "unix://$sockname"; - $thiz->debug = $debug; - $thiz->socks = array(); - $thiz->s2u = array(); - $thiz->pages_flush = array(); - - $thiz->blocking_mode = 0; // 0 for non-blocking - - $thiz->rndstr = ""; - for ($i = 0 ; $i < 4096 ; $i++) { - $thiz->rndstr .= chr(mt_rand(65, 90)); - } - - if (file_exists($thiz->file_socket)) { - unlink($thiz->file_socket); - } - - $old_umask = umask(0); - if (($thiz->list = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) { - return (FALSE); - } - umask($old_umask); - stream_set_blocking($thiz->list, $thiz->blocking_mode); # Set the stream to non-blocking - - if (($thiz->in = fopen("php://stdin", "r")) === FALSE) { - return(FALSE); - } - - $thiz->main_loop = FALSE; - - return ($thiz); - } - - function socks_set($sock, $user) - { - $id = intval($sock); - - $this->s2u[$id] = $user; - $this->socks[$id] = $sock; - } - - function socks_unset($sock) - { - $id = intval($sock); - - unset($this->s2u[$id]); - unset($this->socks[$id]); - } - - function pgflush_try_add(&$new_socket, $tout, $header_out, $content) - { - $pgflush = new PageFlush($new_socket, $this->curtime, $tout, $header_out, $content); - - if ($pgflush->try_flush($this->curtime) == FALSE) { - // Add $pgflush to the pgflush array - $this->pgflush_add($pgflush); - } - } - - function pgflush_add($pgflush) - { - array_push($this->pages_flush, $pgflush); - } - - function run() - { - if ($this->main_loop) { - return (FALSE); - } - - $this->main_loop = TRUE; - - while ($this->main_loop) { - $this->curtime = time(); - printf("IN LOOP: Current opened: %d pages_flush: %d - ", count($this->socks), count($this->pages_flush)); - - /* Prepare the read array */ - /* // when we manage it ... */ - /* if ($shutdown) */ - /* $read = array_merge(array("$in" => $in), $socks); */ - /* else */ - $read = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in), - $this->socks); - - if ($this->debug > 1) { - printf("PRE_SELECT\n"); - print_r($read); - } - $write = NULL; - $except = NULL; - $num_changed_sockets = stream_select($read, $write, $except, 0, 250000); - - if ($num_changed_sockets == 0) { - printf(" no data in 5 secs "); - } - else if ($num_changed_sockets > 0) { - printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read)); - if ($this->debug > 1) { - print_r($read); - } - /* At least at one of the sockets something interesting happened */ - foreach ($read as $i => $sock) { - /* is_resource check is required because there is the possibility that - during new request an old connection is closed */ - if (!is_resource($sock)) { - continue; - } - if ($sock === $this->list) { - printf("NUOVA CONNEX\n"); - $new_unix = stream_socket_accept($this->list); - $stream_info = ""; - $method = ""; - $get = array(); - $post = array(); - $cookie = array(); - if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { - printf("NEW_SOCKET: %d\n", intval($new_socket)); - stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking - printf("RECEIVED HEADER:\n%s", $stream_info); - $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); - printf("PATH: [%s]\n", $path); - printf("M: %s\nHEADER:\n", $method); - print_r($header); - printf("GET:\n"); - print_r($get); - printf("POST:\n"); - print_r($post); - printf("COOKIE:\n"); - print_r($cookie); - - $addr = stream_socket_get_name($new_socket, TRUE); - $header_out = array(); - - $this->room->request_mgr($this, $header_out, $new_socket, $path, $addr, $get, $post, $cookie); - printf("number of sockets after %d\n", count($this->socks)); - } - else { - printf("WARNING: ancillary_getstream failed\n"); - } - } - else { - if (($buf = fread($sock, 512)) === FALSE) { - printf("error read\n"); - exit(123); - } - else if (strlen($buf) === 0) { - if ($sock === $this->list) { - printf("Arrivati %d bytes da list\n", strlen($buf)); - } - else if ($sock === $this->in) { - printf("Arrivati %d bytes da stdin\n", strlen($buf)); - } - else { - // $user_a[$s2u[intval($sock)]]->disable(); - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); - } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); - fclose($sock); - printf("CLOSE ON READ\n"); - } - if ($this->debug > 1) { - printf("post unset\n"); - print_r($this->socks); - } - } - else { - if ($debug > 1) { - print_r($read); - } - if ($sock === $this->list) { - printf("Arrivati %d bytes da list\n", strlen($buf)); - } - else if ($sock === $this->in) { - printf("Arrivati %d bytes da stdin\n", strlen($buf)); - } - else { - $key = array_search("$sock", $this->socks); - printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key); - } - } - } - } - } - - - /* manage unfinished pages */ - foreach ($this->pages_flush as $k => $pgflush) { - if ($pgflush->try_flush($this->curtime) == TRUE) { - unset($this->pages_flush[$k]); - } - } - - /* manage open streaming */ - foreach ($this->socks as $k => $sock) { - if (isset($this->s2u[intval($sock)])) { - $user = $this->s2u[intval($sock)]; - $response = $user->rd_cache_get(); - if ($response == "") { - $content = ""; - $user->stream_main($content, $get, $post, $cookie); - - if ($content == "" && $user->rd_kalive_is_expired($this->curtime)) { - $content = $user->stream_keepalive(); - } - if ($content != "") { - $response = chunked_content($content); - } - } - - if ($response != "") { - echo "SPIA: [".substr($response, 0, 60)."...]\n"; - $response_l = mb_strlen($response, "ASCII"); - $wret = @fwrite($sock, $response); - if ($wret < $response_l) { - printf("TROUBLE WITH FWRITE: %d\n", $wret); - $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII")); - } - else { - $user->rd_cache_set(""); - } - fflush($sock); - $user->rd_kalive_reset($this->curtime); - } - - // close socket after a while to prevent client memory consumption - if ($user->rd_endtime_is_expired($this->curtime)) { - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); - } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); - fclose($sock); - printf("CLOSE ON LOOP\n"); - } - } - } // foreach ($this->socks... - printf("\n"); - } // while (... - } // function run(... -} function main() { diff --git a/web/spush/sac-a-push.phh b/web/spush/sac-a-push.phh deleted file mode 100644 index fef79ac..0000000 --- a/web/spush/sac-a-push.phh +++ /dev/null @@ -1,79 +0,0 @@ - 1) { - $a = explode('&', $get_vars[1]); - printf("A COUNT: [%s] %d\n", $a[0], count($a)); - for ($i = 0 ; $i < count($a) ; $i++) { - $b = explode('=', $a[$i]); - $get[$b[0]] = urldecode($b[1]); - } - } - // POST params management - if ($req[0] == 'POST') { - if ($header['Content-Type'] != 'application/x-www-form-urlencoded' - || !isset($header['Content-Length'])) { - return FALSE; - } - $post_len = mb_strlen($line, "latin1"); - $a = explode('&', $line); - for ($i = 0 ; $i < count($a) ; $i++) { - $b = explode('=', $a[$i]); - $post[$b[0]] = urldecode($b[1]); - } - printf("INFO: postlen: %d\n", $post_len); - } - break; - } - if ($line == "") { - $check_post = TRUE; - continue; - } - $split = explode(":", $line, 2); - $header[$split[0]] = $split[1]; - } - return $path; -} - -function gpcs_var($name, $get, $post, $cookie) -{ - if (isset($GLOBALS[$name])) - return FALSE; - else if (isset($cookie[$name])) - return ($cookie[$name]); - else if (isset($post[$name])) - return ($post[$name]); - else if (isset($get[$name])) - return ($get[$name]); - - return FALSE; -} -?> -- 2.17.1