From 9b4ce3e614e0636bc76fee81d5c1ec41ae5bd742 Mon Sep 17 00:00:00 2001 From: "Matteo Nastasi (mop)" Date: Sun, 4 Nov 2012 12:02:12 +0100 Subject: [PATCH] gzip compression management added --- test/stream_filter_test.php | 80 ++++++++++++++++++ web/Obj/brisk.phh | 28 +++---- web/Obj/sac-a-push.phh | 58 ++++++++++--- web/Obj/user.phh | 36 ++++++--- web/Obj/zlibstream.phh | 148 ++++++++++++++++++++++++++++++++++ web/briskin5/Obj/briskin5.phh | 14 ++-- web/spush/brisk-spush.phh | 15 ++-- web/spush/brisk-spush.php | 1 + 8 files changed, 330 insertions(+), 50 deletions(-) create mode 100755 test/stream_filter_test.php create mode 100644 web/Obj/zlibstream.phh diff --git a/test/stream_filter_test.php b/test/stream_filter_test.php new file mode 100755 index 0000000..d4fc8ae --- /dev/null +++ b/test/stream_filter_test.php @@ -0,0 +1,80 @@ +#!/usr/bin/php + 6, 'window' => -15, 'memory' => 9); + +if (($filter = stream_filter_append($pipe[1], "zlib.deflate", STREAM_FILTER_READ)) == FALSE) { + printf("filter append fails\n"); +} + +$cont = array( "pippo", "pluto", "paperino"); + +$fwrite_pos = 0; +$fread_pos = 0; + +$head = "\037\213\010\000\000\000\000\000\000\003"; + +if (($fout = fopen("fout.gz", "wb")) == FALSE) { + exit(1); +} + +fwrite($fout, $head); + +for ($i = 0 ; $i < 9 ; $i++) { + fprintf(STDERR, "Start loop\n"); + $s_in = $cont[$i % 3]; + if (($ct = fwrite($pipe[0], $s_in)) == FALSE) { + printf("fwrite fails\n"); + } + if (($s_out = fread($pipe[1], 1024)) != FALSE) { + printf("SUCCESS [%s]\n", $s_out); + } + fwrite($fout, $s_out); + + fprintf(STDERR, "PRE FLUSH\n"); + fflush($pipe[0]); + if (($s_out = fread($pipe[1], 1024)) != FALSE) { + printf("SUCCESS [%s]\n", $s_out); + } + fwrite($fout, $s_out); + + fprintf(STDERR, "POS FLUSH\n"); + fwrite($pipe[0], "1"); + if (($s_out = fread($pipe[1], 1024)) != FALSE) { + printf("SUCCESS [%s]\n", $s_out); + } + fwrite($fout, $s_out); + + fprintf(STDERR, "POS VOID\n"); + // else { + // printf("fread fails\n"); + // } + fprintf(STDERR, "\n"); + sleep(5); +} + +fclose($pipe[0]); +if (($s_out = fread($pipe[1], 1024)) != FALSE) { + printf("SUCCESS [%s]\n", $s_out); +} +fwrite($fout, $s_out); +fclose($pipe[1]); +fclose($fout); + +?> \ No newline at end of file diff --git a/web/Obj/brisk.phh b/web/Obj/brisk.phh index 902bfc3..64739ac 100644 --- a/web/Obj/brisk.phh +++ b/web/Obj/brisk.phh @@ -2133,7 +2133,7 @@ class Room { return ($ret); } - function request_mgr(&$s_a_p, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie) + function request_mgr(&$s_a_p, $enc, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie) { printf("NEW_SOCKET (root): %d\n", intval($new_socket)); @@ -2143,21 +2143,21 @@ class Room { case "": case "index.php": ob_start(); - index_main($this, $header_out, $addr, $get, $post, $cookie); - $content = ob_get_contents(); - ob_end_clean(); + index_main($this, $header_out, $addr, $get, $post, $cookie); + $content = ob_get_contents(); + ob_end_clean(); - $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content); - return TRUE; + $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content); + return TRUE; - break; + break; case "index_wr.php": ob_start(); index_wr_main($this, $addr, $get, $post, $cookie); $content = ob_get_contents(); ob_end_clean(); - - $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content); + + $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content); return TRUE; break; @@ -2167,7 +2167,7 @@ class Room { || (($user = $this->get_user($cookie['sess'], $idx)) == FALSE)) { $content = User::stream_fini($s_a_p->rndstr, TRUE); - $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content); + $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content); return TRUE; break; @@ -2179,11 +2179,11 @@ class Room { printf("CLOSE AND OPEN AGAIN ON IFRA2\n"); $user->rd_socket_set(NULL); } - + $content = ""; - $user->stream_init($s_a_p->rndstr, $header_out, $content, $get, $post, $cookie); + $user->stream_init($s_a_p->rndstr, $enc, $header_out, $content, $get, $post, $cookie); - $response = headers_render($header_out, -1).chunked_content($content); + $response = headers_render($header_out, -1).chunked_content($user->rd_zls_get(), $content); $response_l = mb_strlen($response, "ASCII"); $wret = @fwrite($new_socket, $response, $response_l); @@ -2211,7 +2211,7 @@ class Room { $subs = "briskin5/"; $subs_l = strlen($subs); if (!strncmp($path, $subs, $subs_l)) { - $ret = Bin5::request_mgr(&$s_a_p, &$header_out, &$new_socket, substr($path, $subs_l) , $addr, $get, $post, $cookie); + $ret = Bin5::request_mgr(&$s_a_p, $enc, &$header_out, &$new_socket, substr($path, $subs_l) , $addr, $get, $post, $cookie); return ($ret); } break; diff --git a/web/Obj/sac-a-push.phh b/web/Obj/sac-a-push.phh index 04f7c5b..c5faa55 100644 --- a/web/Obj/sac-a-push.phh +++ b/web/Obj/sac-a-push.phh @@ -121,15 +121,17 @@ function headers_render($header, $len) foreach($header as $key => $value) { $s .= sprintf("%s: %s\r\n", $key, $value); } - if ($len == -1) { + if ($len >= 0) { + $s .= sprintf("Content-Length: %d\r\n", $len); + } + else { $s .= "Cache-Control: no-cache, must-revalidate\r\n"; $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n"; - $s .= "Content-Encoding: chunked\r\n"; + if (!isset($header['Content-Encoding'])) { + $s .= "Content-Encoding: chunked\r\n"; + } $s .= "Transfer-Encoding: chunked\r\n"; } - else if ($len > 0) { - $s .= sprintf("Content-Length: %d\r\n", $len); - } $s .= "\r\n"; return ($s); @@ -151,11 +153,18 @@ register_shutdown_function('shutta'); * MAIN */ -function chunked_content($content) +function chunked_content($zls, $content) { - $content_l = mb_strlen($content, "ASCII"); + if ($zls) { + $cont_comp = $zls->compress_chunk($content); + } + else { + $cont_comp = $content; + } + $cont_comp_l = mb_strlen($cont_comp, "ASCII"); + printf("CHUNK: [%s]\n", $content); - return (sprintf("%X\r\n%s\r\n", $content_l, $content)); + return (sprintf("%X\r\n", $cont_comp_l).$cont_comp."\r\n"); } function chunked_fini() @@ -163,6 +172,24 @@ function chunked_fini() return sprintf("0\r\n"); } +function get_encoding($header) +{ + $enc = "plain"; + if (isset($header['Accept-Encoding'])) { + $acc = explode(',', $header['Accept-Encoding']); + + if (array_search('gzip', $acc) !== FALSE) { + $enc = 'gzip'; + } + else if (array_search('deflate', $acc) !== FALSE) { + $enc = 'deflate'; + } + } + + return ($enc); +} + + class Sac_a_push { static $fixed_fd = 2; @@ -208,7 +235,10 @@ class Sac_a_push { $thiz->rndstr = ""; for ($i = 0 ; $i < 4096 ; $i++) { - $thiz->rndstr .= chr(mt_rand(65, 90)); + if (($i % 128) == 0) + $thiz->rndstr .= "\n"; + else + $thiz->rndstr .= chr(mt_rand(65, 90)); } if (file_exists($thiz->file_socket)) { @@ -247,9 +277,9 @@ class Sac_a_push { unset($this->socks[$id]); } - function pgflush_try_add(&$new_socket, $tout, $header_out, $content) + function pgflush_try_add($enc, &$new_socket, $tout, $header_out, $content) { - $pgflush = new PageFlush($new_socket, $this->curtime, $tout, $header_out, $content); + $pgflush = new PageFlush($new_socket, $enc, $this->curtime, $tout, $header_out, $content); if ($pgflush->try_flush($this->curtime) == FALSE) { // Add $pgflush to the pgflush array @@ -351,11 +381,13 @@ class Sac_a_push { $addr = stream_socket_get_name($new_socket, TRUE); $header_out = array(); + $enc = get_encoding($header); + $subs = SITE_PREFIX."briskin5/"; $subs_l = strlen($subs); $rret = FALSE; if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) { - $rret = $this->app->request_mgr($this, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie); + $rret = $this->app->request_mgr($this, $enc, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie); } if ($rret == FALSE) { // FIXME: manage 404 !!! @@ -456,7 +488,7 @@ class Sac_a_push { $content = $user->stream_keepalive(FALSE); } if ($content != "") { - $response = chunked_content($content); + $response = chunked_content($user->rd_zls_get(), $content); } } diff --git a/web/Obj/user.phh b/web/Obj/user.phh index 2b28adb..cc4d211 100644 --- a/web/Obj/user.phh +++ b/web/Obj/user.phh @@ -97,6 +97,7 @@ class User { var $rd_scristp; // current script step (for each session) var $rd_kalive; // if no message are sent after RD_KEEPALIVE_TOUT secs we send a keepalive from server var $rd_cache; // place where store failed fwrite data + var $rd_zls; // zlibstream object handle if compressed stream, else FALSE var $comm; // commands array // var $asta_card; // @@ -154,6 +155,7 @@ class User { $thiz->rd_scristp = -1; $thiz->rd_kalive = -1; $thiz->rd_cache = ""; + $thiz->rd_zls = FALSE; $thiz->asta_card = -2; $thiz->asta_pnt = -1; @@ -287,7 +289,7 @@ class User { return ($thiz); } - function rd_data_set($curtime, $stat, $subst, $step, $from) + function rd_data_set($curtime, $enc, $stat, $subst, $step, $from) { $this->rd_endtime = $curtime + RD_ENDTIME_DELTA; $this->rd_stat = $stat; @@ -296,6 +298,7 @@ class User { $this->rd_from = $from; $this->rd_scristp = 0; $this->rd_kalive = $curtime + RD_KEEPALIVE_TOUT; + $this->rd_zls = ZLibStream::create($enc); } function rd_socket_get() { @@ -303,6 +306,12 @@ class User { } function rd_socket_set($sock) { + if ($sock == NULL) { + if ($this->rd_zls) { + $this->rd_zls->destroy(); + $this->rd_zls = FALSE; + } + } $this->rd_socket = $sock; } @@ -343,6 +352,11 @@ class User { $this->rd_cache = $cache; } + function rd_zls_get() + { + return ($this->rd_zls); + } + function idx_get() { return ($this->idx); } @@ -774,20 +788,22 @@ push(\"%s\"); stat step */ -function stream_init($init_string, &$header_out, &$body, $get, $post, $cookie) +function stream_init($init_string, $enc, &$header_out, &$body, $get, $post, $cookie) { $curtime = time(); - + printf("CLASS: [%s] base: [%s]\n", get_class($this), self::base_get()); - + $is_page_streaming = FALSE; // (webservers_exceeded() || stristr($HTTP_USER_AGENT, "Mozilla/5.0 (Windows NT 6.1; rv:5.0)") || stristr($HTTP_USER_AGENT, "MSIE") || stristr($HTTP_USER_AGENT, "CHROME") ? TRUE : FALSE); + if ($enc != 'plain') + $header_out['Content-Encoding'] = $enc; $header_out['Cache-Control'] = 'no-cache, must-revalidate'; // HTTP/1.1 $header_out['Expires'] = 'Mon, 26 Jul 1997 05:00:00 GMT'; // Date in the past $header_out['Content-type'] = 'text/html; charset="utf-8"'; - + log_load("index_rd_ifra_init.php"); - + if (($from = gpcs_var('from', $get, $post, $cookie)) === FALSE) $from = ""; if (($stat = gpcs_var('stat', $get, $post, $cookie)) === FALSE) @@ -797,9 +813,9 @@ function stream_init($init_string, &$header_out, &$body, $get, $post, $cookie) if (($step = gpcs_var('step', $get, $post, $cookie)) === FALSE) unset($step); - $this->rd_data_set($curtime, $stat, $subst, $step, $from); + $this->rd_data_set($curtime, $enc, $stat, $subst, $step, $from); $cc = get_called_class(); - + $body .= sprintf(" @@ -814,9 +830,9 @@ window.onload = function () { if (http_streaming != \"ready\") { http_streaming. "); $body .= sprintf("\n", $init_string); - + return TRUE; -} + } function stream_main(&$body, $get, $post, $cookie) { diff --git a/web/Obj/zlibstream.phh b/web/Obj/zlibstream.phh new file mode 100644 index 0000000..91ac711 --- /dev/null +++ b/web/Obj/zlibstream.phh @@ -0,0 +1,148 @@ +type = $type; + $this->s = array( FALSE, FALSE ); + $this->filter = FALSE; + } + + static function create($type) + { + if ($type == 'plain') + return (FALSE); + + if (($thiz = new ZLibStream($type)) == FALSE) + return (FALSE); + + for ($i = 0 ; $i < 2 ; $i++) + $thiz->s[$i] = FALSE; + if (($thiz->s = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) == FALSE) + return (FALSE); + + for ($i = 0 ; $i < 2 ; $i++) + stream_set_blocking ( $thiz->s[$i], 0); // 0 -> not blocking + + if ($type == 'gzip') { + $params = array('level' => 6, 'window' => -15, 'memory' => 9); + + if (($thiz->filter = stream_filter_append($thiz->s[1], "zlib.deflate", STREAM_FILTER_READ, $params)) == FALSE) { + return (FALSE); + } + $thiz->head = "\037\213\010\000\000\000\000\000\000\003"; + } + else if ($type == 'deflate') { + if (($thiz->filter = stream_filter_append($thiz->s[1], "zlib.deflate", STREAM_FILTER_READ)) == FALSE) { + return (FALSE); + } + } + return ($thiz); + } + + function destroy() + { + if ($this->filter != FALSE) { + stream_filter_remove($this->filter); + } + + for ($i = 0 ; $i < 2 ; $i++) { + if ($this->s[$i] != FALSE) + fclose($this->s[$i]); + } + } + + /* + too many actors, an explanation is needed to clarify: + + - fwrite: all data MUST be passed to write with success + - fflush: probably reduntant + - fread: all reads after successfull writes must go well, + + */ + function compress_chunk($s_in) + { + $s_in_l = mb_strlen($s_in, 'ASCII'); + + if ($this->head != FALSE) { + $s_out = $this->head; + $this->head = FALSE; + } + else { + $s_out = ""; + } + + for ($to_be_proc = $s_in_l, $max_fail = 0 ; $to_be_proc > 0 && $max_fail < 2 ; $max_fail++) { + if ($to_be_proc > 0) { + $max_fail = 0; + if (($ct = fwrite($this->s[0], $s_in)) == FALSE) + return FALSE; + + $to_be_proc -= $ct; + } + fflush($this->s[0]); // maybe reduntant but light so ... + + while (($ret = fread($this->s[1], 8192)) != FALSE) { + $s_out .= $ret; + } + } + + if ($max_fail < 2) + return ($s_out); + else + return (FALSE); + } + + static function compress($enc, $s) + { + // fprintf(STDERR, "compress: [%s][%s]\n", $enc, $s); + + if ($enc == 'gzip') { + return (gzencode($s, -1, FORCE_GZIP)); + } + else if ($enc == 'deflate') { + return (gzencode($s, -1, FORCE_DEFLATE)); + } + else + return $s; + } +} // class ZLibStream + + + +function zlibstream_test() +{ + $cont = array( "pippo", "pluto", "paperino"); + + for ($f = 0 ; $f < 2 ; $f++) { + if (($zls = ZLibStream::create('gzip')) == FALSE) { + printf("ZLibStream Creation failed\n"); + exit(1); + } + + if (($fp = fopen("../../test/zlibstream".$f.".gz", "w")) == FALSE) { + printf("ZLibStream test output file failed\n"); + exit(2); + } + + for ($i = 0 ; $i < 9 ; $i++) { + $idx = $i % 3; + + $comp = $zls->compress_chunk($cont[$idx]); + + fwrite($fp, $comp); + fflush($fp); + sleep(3); + } + fclose($fp); + $zls->destroy(); + } +} + +// zlibstream_test(); + +?> \ No newline at end of file diff --git a/web/briskin5/Obj/briskin5.phh b/web/briskin5/Obj/briskin5.phh index b2aae3f..72c9a26 100644 --- a/web/briskin5/Obj/briskin5.phh +++ b/web/briskin5/Obj/briskin5.phh @@ -1387,7 +1387,7 @@ class Bin5 { return ($is_ab); } - static function request_mgr(&$s_a_p, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie) + static function request_mgr(&$s_a_p, $enc, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie) { printf("NEW_SOCKET (root): %d\n", intval($new_socket)); @@ -1407,7 +1407,7 @@ class Bin5 { $content = ob_get_contents(); ob_end_clean(); - $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content); + $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content); return TRUE; break; @@ -1418,7 +1418,7 @@ class Bin5 { $content = ob_get_contents(); ob_end_clean(); - $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content); + $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content); return TRUE; break; @@ -1429,9 +1429,9 @@ class Bin5 { || (($user = $bri->get_user($cookie['sess'], $idx)) == FALSE)) { $content = Bin5_user::stream_fini($s_a_p->rndstr, TRUE); - $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content); + $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content); - return TRUE; + return TRUE; break; } // close a previous opened index_read_ifra socket, if exists @@ -1443,8 +1443,8 @@ class Bin5 { } $content = ""; - $user->stream_init($s_a_p->rndstr, $header_out, $content, $get, $post, $cookie); - $response = headers_render($header_out, -1).chunked_content($content); + $user->stream_init($s_a_p->rndstr, $enc, $header_out, $content, $get, $post, $cookie); + $response = headers_render($header_out, -1).chunked_content($user->rd_zls_get(), $content); $response_l = mb_strlen($response, "ASCII"); $wret = @fwrite($new_socket, $response, $response_l); diff --git a/web/spush/brisk-spush.phh b/web/spush/brisk-spush.phh index f0bcfcb..4728b92 100644 --- a/web/spush/brisk-spush.phh +++ b/web/spush/brisk-spush.phh @@ -32,17 +32,20 @@ class PageFlush { var $msg; // place where store failed fwrite data var $msg_sz; // size of content - function PageFlush($socket, $curtime, $kalive, $header_out, $body) + function PageFlush($socket, $enc, $curtime, $kalive, $header_out, $body) { printf("TRY FLUSH CREATE\n"); - // $body_sz = mb_strlen($body, "ASCII"); - // add length to header_out - $hea = headers_render($header_out, 0); + $body_out = ZLibStream::compress($enc, $body); + if ($enc != 'plain') + $header_out['Content-Encoding'] = $enc; + $body_out_sz = mb_strlen($body_out, "ASCII"); + $hea = headers_render($header_out, $body_out_sz); + $hea_sz = mb_strlen($hea, "ASCII"); $this->socket = $socket; $this->kalive = $curtime + $kalive; - $this->msg = $hea.$body; - $this->msg_sz = mb_strlen($this->msg, "ASCII"); + $this->msg = $hea.$body_out; + $this->msg_sz = $hea_sz + $body_out_sz; } /* return TRUE if is removable from it's list */ diff --git a/web/spush/brisk-spush.php b/web/spush/brisk-spush.php index eeeaf41..f4ecacb 100755 --- a/web/spush/brisk-spush.php +++ b/web/spush/brisk-spush.php @@ -70,6 +70,7 @@ require_once("./brisk-spush.phh"); require_once($G_base."Obj/user.phh"); require_once($G_base."Obj/brisk.phh"); require_once($G_base."Obj/auth.phh"); +require_once($G_base."Obj/zlibstream.phh"); // require_once("../Obj/proxyscan.phh"); require_once($G_base."index.php"); require_once($G_base."index_wr.php"); -- 2.17.1