gzip compression management added
authorMatteo Nastasi (mop) <nastasi@alternativeoutput.it>
Sun, 4 Nov 2012 11:02:12 +0000 (12:02 +0100)
committerMatteo Nastasi (mop) <nastasi@alternativeoutput.it>
Sun, 4 Nov 2012 11:02:12 +0000 (12:02 +0100)
test/stream_filter_test.php [new file with mode: 0755]
web/Obj/brisk.phh
web/Obj/sac-a-push.phh
web/Obj/user.phh
web/Obj/zlibstream.phh [new file with mode: 0644]
web/briskin5/Obj/briskin5.phh
web/spush/brisk-spush.phh
web/spush/brisk-spush.php

diff --git a/test/stream_filter_test.php b/test/stream_filter_test.php
new file mode 100755 (executable)
index 0000000..d4fc8ae
--- /dev/null
@@ -0,0 +1,80 @@
+#!/usr/bin/php
+<?php
+$max = 1024 * 1024 * 2;
+/* if (($fp = gzopen("php://memory/maxmemory:$max", "wb")) == FALSE) { */
+/*     printf("Open file failed\n"); */
+/* } */
+
+/* printf("Open ok\n"); */
+/* exit(123); */
+
+
+
+print_r(stream_get_filters());
+
+$pipe = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP);
+
+for ($i = 0 ; $i < 2 ; $i++)
+    stream_set_blocking  ( $pipe[$i], 0);
+
+$params = array('level' => 6, 'window' => -15, 'memory' => 9);
+
+if (($filter = stream_filter_append($pipe[1], "zlib.deflate", STREAM_FILTER_READ)) == FALSE) {
+    printf("filter append fails\n");
+}
+
+$cont = array( "pippo", "pluto", "paperino");
+
+$fwrite_pos = 0;
+$fread_pos = 0;
+
+$head = "\037\213\010\000\000\000\000\000\000\003";
+
+if (($fout = fopen("fout.gz", "wb")) == FALSE) {
+    exit(1);
+}
+
+fwrite($fout, $head);
+
+for ($i = 0 ; $i < 9 ; $i++) {
+    fprintf(STDERR, "Start loop\n");
+    $s_in = $cont[$i % 3];    
+    if (($ct = fwrite($pipe[0], $s_in)) == FALSE) {
+        printf("fwrite fails\n");
+    }
+    if (($s_out = fread($pipe[1], 1024)) != FALSE) { 
+        printf("SUCCESS [%s]\n", $s_out);
+    }
+    fwrite($fout, $s_out);
+
+    fprintf(STDERR, "PRE FLUSH\n");
+    fflush($pipe[0]);
+    if (($s_out = fread($pipe[1], 1024)) != FALSE) { 
+        printf("SUCCESS [%s]\n", $s_out);
+    }
+    fwrite($fout, $s_out);
+
+    fprintf(STDERR, "POS FLUSH\n");
+    fwrite($pipe[0], "1");
+    if (($s_out = fread($pipe[1], 1024)) != FALSE) { 
+        printf("SUCCESS [%s]\n", $s_out);
+    }
+    fwrite($fout, $s_out);
+
+    fprintf(STDERR, "POS VOID\n");
+    // else {
+    // printf("fread fails\n");
+    // }
+    fprintf(STDERR, "\n");
+    sleep(5);
+}
+
+fclose($pipe[0]);
+if (($s_out = fread($pipe[1], 1024)) != FALSE) { 
+    printf("SUCCESS [%s]\n", $s_out);
+}
+fwrite($fout, $s_out);
+fclose($pipe[1]);
+fclose($fout);
+
+?>
\ No newline at end of file
index 902bfc3..64739ac 100644 (file)
@@ -2133,7 +2133,7 @@ class Room {
     return ($ret);
   }
 
-  function request_mgr(&$s_a_p, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie)
+  function request_mgr(&$s_a_p, $enc, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie)
   {
       printf("NEW_SOCKET (root): %d\n", intval($new_socket));
 
@@ -2143,21 +2143,21 @@ class Room {
       case "":
       case "index.php":
           ob_start();
-      index_main($this, $header_out, $addr, $get, $post, $cookie);
-      $content = ob_get_contents();
-      ob_end_clean();
+          index_main($this, $header_out, $addr, $get, $post, $cookie);
+          $content = ob_get_contents();
+          ob_end_clean();
 
-      $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content);
-      return TRUE;
+          $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content);
+          return TRUE;
 
-      break;
+          break;
       case "index_wr.php":
           ob_start();
           index_wr_main($this, $addr, $get, $post, $cookie);
           $content = ob_get_contents();
           ob_end_clean();
-
-          $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content);
+          
+          $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content);
           return TRUE;
 
           break;
@@ -2167,7 +2167,7 @@ class Room {
                   || (($user = $this->get_user($cookie['sess'], $idx)) == FALSE)) {
                   $content = User::stream_fini($s_a_p->rndstr, TRUE);
                   
-                  $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content);
+                  $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content);
                   return TRUE;
 
                   break;
@@ -2179,11 +2179,11 @@ class Room {
                   printf("CLOSE AND OPEN AGAIN ON IFRA2\n");
                   $user->rd_socket_set(NULL);
               }
-              
+
               $content = "";
-              $user->stream_init($s_a_p->rndstr, $header_out, $content, $get, $post, $cookie);
+              $user->stream_init($s_a_p->rndstr, $enc, $header_out, $content, $get, $post, $cookie);
               
-              $response = headers_render($header_out, -1).chunked_content($content);
+              $response = headers_render($header_out, -1).chunked_content($user->rd_zls_get(), $content);
               $response_l = mb_strlen($response, "ASCII");
               
               $wret = @fwrite($new_socket, $response, $response_l);
@@ -2211,7 +2211,7 @@ class Room {
           $subs = "briskin5/";
           $subs_l = strlen($subs);
           if (!strncmp($path, $subs, $subs_l)) {
-              $ret = Bin5::request_mgr(&$s_a_p, &$header_out, &$new_socket, substr($path, $subs_l) , $addr, $get, $post, $cookie);
+              $ret = Bin5::request_mgr(&$s_a_p, $enc, &$header_out, &$new_socket, substr($path, $subs_l) , $addr, $get, $post, $cookie);
               return ($ret);
           }
           break;
index 04f7c5b..c5faa55 100644 (file)
@@ -121,15 +121,17 @@ function headers_render($header, $len)
     foreach($header as $key => $value) {
         $s .= sprintf("%s: %s\r\n", $key, $value);
     }
-    if ($len == -1) {
+    if ($len >= 0) {
+        $s .= sprintf("Content-Length: %d\r\n", $len);
+    }
+    else {
         $s .= "Cache-Control: no-cache, must-revalidate\r\n";
         $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n";
-        $s .= "Content-Encoding: chunked\r\n";
+        if (!isset($header['Content-Encoding'])) {
+            $s .= "Content-Encoding: chunked\r\n";
+        }
         $s .= "Transfer-Encoding: chunked\r\n";
     }
-    else if ($len > 0) {
-        $s .= sprintf("Content-Length: %d\r\n", $len);
-    }
     $s .= "\r\n";
 
     return ($s);
@@ -151,11 +153,18 @@ register_shutdown_function('shutta');
  *  MAIN
  */
 
-function chunked_content($content)
+function chunked_content($zls, $content)
 {
-    $content_l = mb_strlen($content, "ASCII");
+    if ($zls) {
+        $cont_comp = $zls->compress_chunk($content);
+    }
+    else {
+        $cont_comp = $content;
+    }
+    $cont_comp_l = mb_strlen($cont_comp, "ASCII");
+    printf("CHUNK: [%s]\n", $content);
 
-    return (sprintf("%X\r\n%s\r\n", $content_l, $content));
+    return (sprintf("%X\r\n", $cont_comp_l).$cont_comp."\r\n");
 }
 
 function chunked_fini()
@@ -163,6 +172,24 @@ function chunked_fini()
     return sprintf("0\r\n");
 }
 
+function get_encoding($header)
+{
+    $enc = "plain";
+    if (isset($header['Accept-Encoding'])) {
+        $acc = explode(',', $header['Accept-Encoding']);
+
+        if (array_search('gzip', $acc) !== FALSE) {
+            $enc = 'gzip';
+        }
+        else if (array_search('deflate', $acc) !== FALSE) {
+            $enc = 'deflate';
+        }
+    }
+
+    return ($enc);
+}
+
+
 class Sac_a_push {
     static $fixed_fd = 2;
     
@@ -208,7 +235,10 @@ class Sac_a_push {
 
         $thiz->rndstr = "";
         for ($i = 0 ; $i < 4096 ; $i++) {
-            $thiz->rndstr .= chr(mt_rand(65, 90));
+            if (($i % 128) == 0)
+                $thiz->rndstr .= "\n";
+            else
+                $thiz->rndstr .= chr(mt_rand(65, 90));
         }
         
         if (file_exists($thiz->file_socket)) {
@@ -247,9 +277,9 @@ class Sac_a_push {
         unset($this->socks[$id]);
     }
 
-    function pgflush_try_add(&$new_socket, $tout, $header_out, $content)
+    function pgflush_try_add($enc, &$new_socket, $tout, $header_out, $content)
     {
-        $pgflush = new PageFlush($new_socket, $this->curtime, $tout, $header_out, $content);
+        $pgflush = new PageFlush($new_socket, $enc, $this->curtime, $tout, $header_out, $content);
 
         if ($pgflush->try_flush($this->curtime) == FALSE) {
             // Add $pgflush to the pgflush array
@@ -351,11 +381,13 @@ class Sac_a_push {
                             $addr = stream_socket_get_name($new_socket, TRUE);
                             $header_out = array();
 
+                            $enc = get_encoding($header);
+
                             $subs = SITE_PREFIX."briskin5/";
                             $subs_l = strlen($subs);
                             $rret = FALSE;
                             if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) {
-                                $rret = $this->app->request_mgr($this, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie);
+                                $rret = $this->app->request_mgr($this, $enc, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie);
                             }
                             if ($rret == FALSE) { 
                                 // FIXME: manage 404 !!!
@@ -456,7 +488,7 @@ class Sac_a_push {
                             $content = $user->stream_keepalive(FALSE);
                         }
                         if ($content != "") {
-                            $response = chunked_content($content);
+                            $response = chunked_content($user->rd_zls_get(), $content);
                         }
                     }
                     
index 2b28adb..cc4d211 100644 (file)
@@ -97,6 +97,7 @@ class User {
   var $rd_scristp; // current script step (for each session) 
   var $rd_kalive;  // if no message are sent after RD_KEEPALIVE_TOUT secs we send a keepalive from server
   var $rd_cache;   // place where store failed fwrite data
+  var $rd_zls;     // zlibstream object handle if compressed stream, else FALSE
 
   var $comm;       // commands array
   // var $asta_card;  // 
@@ -154,6 +155,7 @@ class User {
     $thiz->rd_scristp = -1;
     $thiz->rd_kalive  = -1;
     $thiz->rd_cache   = "";
+    $thiz->rd_zls     = FALSE;
 
     $thiz->asta_card  = -2;
     $thiz->asta_pnt   = -1;
@@ -287,7 +289,7 @@ class User {
     return ($thiz);
   }
 
-  function rd_data_set($curtime, $stat, $subst, $step, $from)
+  function rd_data_set($curtime, $enc, $stat, $subst, $step, $from)
   {
       $this->rd_endtime = $curtime + RD_ENDTIME_DELTA;
       $this->rd_stat    = $stat;
@@ -296,6 +298,7 @@ class User {
       $this->rd_from    = $from;
       $this->rd_scristp = 0;
       $this->rd_kalive  = $curtime + RD_KEEPALIVE_TOUT;
+      $this->rd_zls     = ZLibStream::create($enc);
   }
 
   function rd_socket_get() {
@@ -303,6 +306,12 @@ class User {
   }
 
   function rd_socket_set($sock) {
+      if ($sock == NULL) {
+          if ($this->rd_zls) {
+              $this->rd_zls->destroy();
+              $this->rd_zls = FALSE;
+          }
+      }
       $this->rd_socket = $sock;
   }
 
@@ -343,6 +352,11 @@ class User {
       $this->rd_cache = $cache;
   }
 
+  function rd_zls_get()
+  {
+      return ($this->rd_zls);
+  }
+
   function idx_get() {
       return ($this->idx);
   }
@@ -774,20 +788,22 @@ push(\"%s\");
    stat
    step
 */
-function stream_init($init_string, &$header_out, &$body, $get, $post, $cookie)
+function stream_init($init_string, $enc, &$header_out, &$body, $get, $post, $cookie)
 {
     $curtime = time();
-
+    
     printf("CLASS: [%s] base: [%s]\n", get_class($this), self::base_get());
-
+    
     $is_page_streaming = FALSE; // (webservers_exceeded() || stristr($HTTP_USER_AGENT, "Mozilla/5.0 (Windows NT 6.1; rv:5.0)") || stristr($HTTP_USER_AGENT, "MSIE") || stristr($HTTP_USER_AGENT, "CHROME") ? TRUE : FALSE);
 
+    if ($enc != 'plain')
+        $header_out['Content-Encoding'] = $enc;
     $header_out['Cache-Control'] = 'no-cache, must-revalidate';     // HTTP/1.1
     $header_out['Expires']       = 'Mon, 26 Jul 1997 05:00:00 GMT'; // Date in the past
     $header_out['Content-type']  = 'text/html; charset="utf-8"';
-
+    
     log_load("index_rd_ifra_init.php");
-
+    
     if (($from  = gpcs_var('from', $get, $post, $cookie)) === FALSE)
         $from = "";
     if (($stat  = gpcs_var('stat', $get, $post, $cookie)) === FALSE) 
@@ -797,9 +813,9 @@ function stream_init($init_string, &$header_out, &$body, $get, $post, $cookie)
     if (($step  = gpcs_var('step', $get, $post, $cookie)) === FALSE) 
         unset($step);
     
-    $this->rd_data_set($curtime, $stat, $subst, $step, $from);
+    $this->rd_data_set($curtime, $enc, $stat, $subst, $step, $from);
     $cc = get_called_class();
-
+    
     $body .= sprintf("<html>
 <head>
 <script type=\"text/javascript\" src=\"%scommons.js\"></script>
@@ -814,9 +830,9 @@ window.onload = function () { if (http_streaming != \"ready\") { http_streaming.
 </head>
 <body>");
     $body .= sprintf("<!-- \n%s -->\n", $init_string);
-
+    
     return TRUE;
-}
+  }
 
 function stream_main(&$body, $get, $post, $cookie)
 {
diff --git a/web/Obj/zlibstream.phh b/web/Obj/zlibstream.phh
new file mode 100644 (file)
index 0000000..91ac711
--- /dev/null
@@ -0,0 +1,148 @@
+<?php
+class ZLibStream {
+    var $s;
+    var $head;
+    var $type;
+    var $filter;
+
+    function ZLibStream($type)
+    {
+        $this->type = $type;
+        $this->s = array( FALSE, FALSE );
+        $this->filter = FALSE;
+    }
+
+    static function create($type)
+    {
+        if ($type == 'plain')
+            return (FALSE);
+
+        if (($thiz = new ZLibStream($type)) == FALSE)
+            return (FALSE);
+
+        for ($i = 0 ; $i < 2 ; $i++)
+            $thiz->s[$i] = FALSE;
+        if (($thiz->s = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, STREAM_IPPROTO_IP)) == FALSE)
+            return (FALSE);
+
+        for ($i = 0 ; $i < 2 ; $i++)
+            stream_set_blocking  ( $thiz->s[$i], 0); // 0 -> not blocking
+        
+        if ($type == 'gzip') {
+            $params = array('level' => 6, 'window' => -15, 'memory' => 9);
+            
+            if (($thiz->filter = stream_filter_append($thiz->s[1], "zlib.deflate", STREAM_FILTER_READ, $params)) == FALSE) {
+                return (FALSE);
+            }
+            $thiz->head = "\037\213\010\000\000\000\000\000\000\003";
+        }
+        else if ($type  == 'deflate') {
+            if (($thiz->filter = stream_filter_append($thiz->s[1], "zlib.deflate", STREAM_FILTER_READ)) == FALSE) {
+                return (FALSE);
+            }
+        }
+        return ($thiz);
+    }
+
+    function destroy()
+    {
+        if ($this->filter != FALSE) {
+            stream_filter_remove($this->filter);
+        }
+
+        for ($i = 0 ; $i < 2 ; $i++) {
+            if ($this->s[$i] != FALSE)
+                fclose($this->s[$i]);
+        }
+    }
+
+        /*
+          too many actors, an explanation is needed to clarify:
+
+          - fwrite: all data MUST be passed to write with success
+          - fflush: probably reduntant
+          - fread: all reads after successfull writes must go well, 
+          
+         */
+    function compress_chunk($s_in)
+    {
+        $s_in_l = mb_strlen($s_in, 'ASCII');
+
+        if ($this->head != FALSE) {
+            $s_out = $this->head;
+            $this->head = FALSE;
+        }
+        else {
+            $s_out = "";
+        }
+
+        for ($to_be_proc = $s_in_l, $max_fail = 0 ; $to_be_proc > 0 && $max_fail < 2 ; $max_fail++) {
+            if ($to_be_proc > 0) {
+                $max_fail = 0;
+                if (($ct = fwrite($this->s[0], $s_in)) == FALSE) 
+                    return FALSE;
+                
+                $to_be_proc -= $ct;
+            }
+            fflush($this->s[0]); // maybe reduntant but light so ...
+        
+            while (($ret = fread($this->s[1],  8192)) != FALSE) {
+                $s_out .= $ret;
+            }
+        }
+
+        if ($max_fail < 2)
+            return ($s_out);
+        else
+            return (FALSE);
+    }
+
+    static function compress($enc, $s)
+    {
+        // fprintf(STDERR, "compress: [%s][%s]\n", $enc, $s);
+
+        if ($enc == 'gzip') {
+            return (gzencode($s, -1, FORCE_GZIP));
+        }
+        else if ($enc == 'deflate') {
+            return (gzencode($s, -1, FORCE_DEFLATE));
+        }
+        else
+            return $s;
+    }
+} // class ZLibStream 
+
+
+
+function zlibstream_test()
+{
+    $cont = array( "pippo", "pluto", "paperino");
+
+    for ($f = 0 ; $f < 2 ; $f++) {
+        if (($zls = ZLibStream::create('gzip')) == FALSE) {
+            printf("ZLibStream Creation failed\n");
+            exit(1);
+        }
+        
+        if (($fp = fopen("../../test/zlibstream".$f.".gz", "w")) == FALSE) {
+            printf("ZLibStream test output file failed\n");
+            exit(2);
+        }   
+        
+        for ($i = 0 ; $i < 9 ; $i++) {
+            $idx = $i % 3;
+            
+            $comp = $zls->compress_chunk($cont[$idx]);
+            
+            fwrite($fp, $comp);
+            fflush($fp);
+            sleep(3);
+        }
+        fclose($fp);
+        $zls->destroy();
+    }
+}
+
+// zlibstream_test();
+
+?>
\ No newline at end of file
index b2aae3f..72c9a26 100644 (file)
@@ -1387,7 +1387,7 @@ class Bin5 {
         return ($is_ab);
     }
 
-    static function request_mgr(&$s_a_p, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie)
+    static function request_mgr(&$s_a_p, $enc, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie)
     {
         printf("NEW_SOCKET (root): %d\n", intval($new_socket));
         
@@ -1407,7 +1407,7 @@ class Bin5 {
             $content = ob_get_contents();
             ob_end_clean();
         
-            $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content);
+            $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content);
             return TRUE;
         
         break;
@@ -1418,7 +1418,7 @@ class Bin5 {
             $content = ob_get_contents();
             ob_end_clean();
             
-            $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content);
+            $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content);
             return TRUE;
             
             break;
@@ -1429,9 +1429,9 @@ class Bin5 {
                     || (($user = $bri->get_user($cookie['sess'], $idx)) == FALSE)) {
 
                     $content = Bin5_user::stream_fini($s_a_p->rndstr, TRUE);
-                    $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content);
+                    $s_a_p->pgflush_try_add($enc, $new_socket, 20, $header_out, $content);
 
-                    return TRUE;                    
+                    return TRUE;
                     break;
                 }
                 // close a previous opened index_read_ifra socket, if exists
@@ -1443,8 +1443,8 @@ class Bin5 {
                 }
                 
                 $content = "";
-                $user->stream_init($s_a_p->rndstr, $header_out, $content, $get, $post, $cookie);
-                $response = headers_render($header_out, -1).chunked_content($content);
+                $user->stream_init($s_a_p->rndstr, $enc, $header_out, $content, $get, $post, $cookie);
+                $response = headers_render($header_out, -1).chunked_content($user->rd_zls_get(), $content);
                 $response_l = mb_strlen($response, "ASCII");
                 
                 $wret = @fwrite($new_socket, $response, $response_l);
index f0bcfcb..4728b92 100644 (file)
@@ -32,17 +32,20 @@ class PageFlush {
   var $msg;    // place where store failed fwrite data
   var $msg_sz; // size of content
 
-  function PageFlush($socket, $curtime, $kalive, $header_out, $body)
+  function PageFlush($socket, $enc, $curtime, $kalive, $header_out, $body)
   {
       printf("TRY FLUSH CREATE\n");
-      // $body_sz = mb_strlen($body, "ASCII");
-      // add length to header_out 
-      $hea = headers_render($header_out, 0);
+      $body_out = ZLibStream::compress($enc, $body);
+      if ($enc != 'plain')
+          $header_out['Content-Encoding'] = $enc;
+      $body_out_sz = mb_strlen($body_out, "ASCII");
+      $hea = headers_render($header_out, $body_out_sz);
+      $hea_sz = mb_strlen($hea, "ASCII");
 
       $this->socket = $socket;
       $this->kalive = $curtime + $kalive;
-      $this->msg    = $hea.$body;
-      $this->msg_sz = mb_strlen($this->msg, "ASCII");
+      $this->msg    = $hea.$body_out;
+      $this->msg_sz = $hea_sz + $body_out_sz;
   }
 
   /* return TRUE if is removable from it's list */
index eeeaf41..f4ecacb 100755 (executable)
@@ -70,6 +70,7 @@ require_once("./brisk-spush.phh");
 require_once($G_base."Obj/user.phh");
 require_once($G_base."Obj/brisk.phh");
 require_once($G_base."Obj/auth.phh");
+require_once($G_base."Obj/zlibstream.phh");
 // require_once("../Obj/proxyscan.phh");
 require_once($G_base."index.php");
 require_once($G_base."index_wr.php");