socket2pending (s2p) array added
[brisk.git] / web / Obj / sac-a-push.phh
index 441d2eb..db5456a 100644 (file)
@@ -25,6 +25,7 @@
 define('SITE_PREFIX', '/brisk/');
 define('SITE_PREFIX_LEN', 7);
 
+declare(ticks = 1);
 
 function global_dump()
 {
@@ -120,7 +121,7 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook
             }
             // GET params management
             $get_vars = explode('?', $req[1], 2);
-            $path =   $get_vars[0];
+            $path = $get_vars[0];
             if (count($get_vars) > 1) {
                 $a = explode('&', $get_vars[1]);
                 printf("A COUNT: [%s] %d\n", $a[0], count($a));
@@ -131,11 +132,15 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook
             }
             // POST params management
             if ($req[0] == 'POST') {
+                $conttype_all = explode(";", $header['Content-Type']);
+                $header['Content-Type'] = $conttype_all[0];
+                // $path_all[1-] other things like charset and so on
+
                 if ($header['Content-Type'] != 'application/x-www-form-urlencoded' 
                     || !isset($header['Content-Length'])) {
                     return FALSE;
                 }
-                $post_len = mb_strlen($line, "latin1");
+                $post_len = mb_strlen($line, "ASCII");
                 $a = explode('&', $line);
                 for ($i = 0 ; $i < count($a) ; $i++) {
                     $b = explode('=', $a[$i]);
@@ -362,13 +367,17 @@ class Cookies {
 
 
 class Sac_a_push {
-    static $fixed_fd = 2;
+    // maybe fixed_fd is unuseful
+    static $fixed_fd = 3;
+    static $cnt_master = NULL;
+    static $cnt_slave  = NULL;
     
     var $file_socket;
     var $unix_socket;
     var $socks;
-    var $s2u;
-    var $pages_flush;
+    var $s2u;             // user associated with input socket
+    var $s2p;             // pending page associated with input socket
+    var $pending_pages;
 
     var $list;
     var $in;
@@ -387,7 +396,29 @@ class Sac_a_push {
     {
     }
 
-    // Sac_a_push::create("/tmp/brisk.sock", 0, 0
+    function sig_handler($sig)
+    {
+        switch ($sig) {
+        case SIGINT:
+            exit(1);
+            break;
+        case SIGTERM:
+            if (static::$cnt_master != NULL) {
+                    fwrite(static::$cnt_master, "\nshutdown\n");
+                    fflush(static::$cnt_master);
+            }
+            else {
+                exit(1);
+            }
+            break;
+        case SIGHUP: 
+            if (static::$cnt_master != NULL) {
+                fwrite(static::$cnt_master, "\nreload\n");
+                fflush(static::$cnt_master);
+            }
+            break;
+        }
+    }
 
     static function create(&$app, $sockname, $debug, $blocking_mode)
     {        
@@ -399,7 +430,20 @@ class Sac_a_push {
         $thiz->debug = $debug;
         $thiz->socks = array();
         $thiz->s2u  = array();
-        $thiz->pages_flush = array();
+        $thiz->s2p  = array();
+        $thiz->pending_pages = array();
+
+        // create a couple of sockets for control management
+        if (($sockpair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM,
+                                            STREAM_IPPROTO_IP)) == FALSE) {
+            return FALSE;
+        }
+        static::$cnt_master = $sockpair[0];
+        static::$cnt_slave  = $sockpair[1];
+
+        pcntl_signal(SIGTERM, array("Sac_a_push", "sig_handler"));
+        pcntl_signal(SIGINT, array("Sac_a_push", "sig_handler"));
+        pcntl_signal(SIGHUP, array("Sac_a_push", "sig_handler"));
 
         $thiz->blocking_mode = 0; // 0 for non-blocking
 
@@ -431,35 +475,41 @@ class Sac_a_push {
         return ($thiz);
     }
 
-    function socks_set($sock, $user)
+    function socks_set($sock, $user, $pendpage)
     {
         $id = intval($sock);
 
-        $this->s2u[$id]   = $user;
         $this->socks[$id] = $sock;
+        if ($user != NULL)
+            $this->s2u[$id]   = $user;
+        if ($pendpage != NULL)
+            $this->s2p[$id]   = $pendpage;
     }
 
     function socks_unset($sock)
     {
         $id = intval($sock);
 
-        unset($this->s2u[$id]);
+        if (isset($this->s2u[$id]))
+            unset($this->s2u[$id]);
+        if (isset($this->s2p[$id]))
+            unset($this->s2p[$id]);
         unset($this->socks[$id]);
     }
 
-    function pgflush_try_add($enc, &$new_socket, $tout, $header_out, $content)
+    function pendpage_try_addflush(&$new_socket, $tout, $enc, $header_out, $content)
     {
-        $pgflush = new PageFlush($new_socket, $enc, $this->curtime, $tout, $header_out, $content);
+        $pendpage = PendingPage::pendingpage_flushing($new_socket, $this->curtime, $tout, $enc, $header_out, $content);
 
-        if ($pgflush->try_flush($this->curtime) == FALSE) {
-            // Add $pgflush to the pgflush array
-            $this->pgflush_add($pgflush);
+        if ($pendpage->try_flush($this->curtime) == FALSE) {
+            // Add $pendpage to the pendpage array
+            $this->pendpage_add($pendpage);
         }
     }
 
-    function pgflush_add($pgflush)
+    function pendpage_add($pendpage)
     {
-        array_push($this->pages_flush, $pgflush);
+        array_push($this->pending_pages, $pendpage);
     }
 
     function garbage_manager($force)
@@ -467,14 +517,17 @@ class Sac_a_push {
         $this->app->garbage_manager($force);
 
         foreach ($this->socks as $k => $sock) {
-            if ($this->s2u[intval($sock)]->sess == '') {
-                if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
-                    $this->s2u[intval($sock)]->rd_socket_set(NULL);
+            $id = intval($sock);
+            if (isset($this->s2u[$id])) {
+                if ($this->s2u[$id]->sess == '') {
+                    if ($this->s2u[$id]->rd_socket_get() != NULL) {
+                        $this->s2u[$id]->rd_socket_set(NULL);
+                    }
+                    unset($this->socks[$id]);
+                    unset($this->s2u[$id]);
+                    fclose($sock);
+                    printf("CLOSE ON GARBAGE MANAGER\n");
                 }
-                unset($this->socks[intval($sock)]);
-                unset($this->s2u[intval($sock)]);
-                fclose($sock);
-                printf("CLOSE ON GARBAGE MANAGER\n");
             }
         }
     }
@@ -501,14 +554,15 @@ class Sac_a_push {
         
         while ($this->main_loop) {
             $this->curtime = time();
-            printf("IN LOOP: Current opened: %d  pages_flush: %d - ", count($this->socks), count($this->pages_flush));
+            printf("IN LOOP: Current opened: %d  pending_pages: %d - ", count($this->socks), count($this->pending_pages));
             
             /* Prepare the read array */
             /* // when we manage it ... */
             /* if ($shutdown)  */
             /*     $read   = array_merge(array("$in" => $in), $socks); */
             /* else */
-            $read   = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in),
+            $read   = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in,
+                                        intval(static::$cnt_slave) => static::$cnt_slave),
                                   $this->socks);
             
             if ($this->debug > 1) {
@@ -517,7 +571,7 @@ class Sac_a_push {
             }
             $write  = NULL;
             $except = NULL;
-            $num_changed_sockets = stream_select($read, $write, $except, 0, 500000);
+            $num_changed_sockets = @stream_select($read, $write, $except, 5, 500000);
         
             if ($num_changed_sockets == 0) {
                 printf(" no data in 5 secs, splash [%d]\n", $G_with_splash);
@@ -529,6 +583,8 @@ class Sac_a_push {
                 }
                 /* At least at one of the sockets something interesting happened */
                 foreach ($read as $i => $sock) {
+                    $id = intval($sock);
+                    $manage_page = FALSE;
                     /* is_resource check is required because there is the possibility that
                        during new request an old connection is closed */
                     if (!is_resource($sock)) {
@@ -550,8 +606,21 @@ class Sac_a_push {
                             stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking
                             printf("RECEIVED HEADER:\n%s", $stream_info);
                             $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie);
+                            $addr = stream_socket_get_name($new_socket, TRUE);
                             printf("PATH: [%s]\n", $path);
                             printf("M: %s\nHEADER:\n", $method);
+                            if ($method == "POST") {
+                                // ADD PUSH INTO FD ARRAY AS WAITING DATA
+                                // Passing all infos from spu_process_info as arguments:
+                                //
+                                // MAYBE: $stream_info,
+                                // $method, $header, $get, $post, $cookie
+                                // $s_a_p (this), $new_socket, substr($path, SITE_PREFIX_LEN),
+                                // $addr
+                            }
+                            else {
+                                $manage_page = TRUE;
+                            }
                             print_r($header);
                             printf("GET:\n");
                             print_r($get);
@@ -559,11 +628,9 @@ class Sac_a_push {
                             print_r($post);
                             printf("COOKIE:\n");
                             print_r($cookie);
-                            $addr = stream_socket_get_name($new_socket, TRUE);
                             $header_out = array();
 
-                            $subs = SITE_PREFIX."briskin5/";
-                            $subs_l = strlen($subs);
+                            // TODO: MOVE DOWN request_mgr to factorize new_sockets and POST closed
                             $rret = FALSE;
                             if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) {
                                 $rret = $this->app->request_mgr($this, $header, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie);
@@ -582,6 +649,7 @@ class Sac_a_push {
                         $buf = fread($sock, 512);
                         // if socket is closed
                         if ($buf == FALSE || strlen($buf) == 0) {
+                            // close socket case
                             if ($buf == FALSE) {
                                 printf("ERROR READING\n");
                             }
@@ -589,17 +657,19 @@ class Sac_a_push {
                                 printf("Arrivati %d bytes da list\n", strlen($buf));
                                 return(21);
                             }
-                            else if ($sock === $this->in) {
+                            else if ($sock === $this->in || $sock === static::$cnt_slave) {
                                 printf("Arrivati %d bytes da stdin\n", strlen($buf));
                                 return(22);
                             }
                             else {
-                                // $user_a[$s2u[intval($sock)]]->disable();
-                                if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
-                                    $this->s2u[intval($sock)]->rd_socket_set(NULL);
+                                unset($this->socks[$id]);
+                                if (isset($this->s2u[$id])) {
+                                    // $user_a[$s2u[$id]]->disable();
+                                    if ($this->s2u[$id]->rd_socket_get() != NULL) {
+                                        $this->s2u[$id]->rd_socket_set(NULL);
+                                    }
+                                    unset($this->s2u[$id]);
                                 }
-                                unset($this->socks[intval($sock)]);
-                                unset($this->s2u[intval($sock)]);
                             }
                             fclose($sock);
                             printf("CLOSE ON READ\n");
@@ -616,7 +686,7 @@ class Sac_a_push {
                             if ($sock === $this->list) {
                                 printf("Arrivati %d bytes da list\n", strlen($buf));
                             }
-                            else if ($sock === $this->in) {
+                            else if ($sock === $this->in || $sock === static::$cnt_slave) {
                                 printf("Arrivati %d bytes da stdin\n", strlen($buf));
                                 $line = trim($buf);
                                 if ($line == "reload") {
@@ -639,15 +709,18 @@ class Sac_a_push {
                             }
                         }
                     }
+                    // TODO: MOVE HERE request_mgr to factorize new_sockets and POST closed
+                    // $rret = $this->app->request_mgr
                 }
             }
 
             $this->garbage_manager(FALSE);
 
             /* manage unfinished pages */
-            foreach ($this->pages_flush as $k => $pgflush) {
+            foreach ($this->pending_pages as $k => $pgflush) {
+                // TODO: try_flush if exists in the class
                 if ($pgflush->try_flush($this->curtime) == TRUE) {
-                    unset($this->pages_flush[$k]);
+                    unset($this->pending_pages[$k]);
                 }
             }
             
@@ -659,8 +732,9 @@ class Sac_a_push {
 
             /* manage open streaming */
             foreach ($this->socks as $k => $sock) {
-                if (isset($this->s2u[intval($sock)])) {
-                    $user = $this->s2u[intval($sock)];
+                $id = intval($sock);
+                if (isset($this->s2u[$id])) {
+                    $user = $this->s2u[$id];
                     $response = $user->rd_cache_get();
                     $do_ping = FALSE;
                     if (($this->curtime - $user->lacc) > (EXPIRE_TIME_RD / 3)) {
@@ -704,11 +778,11 @@ class Sac_a_push {
                     
                     // close socket after a while to prevent client memory consumption
                     if ($user->rd_endtime_is_expired($this->curtime)) {
-                        if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
-                            $this->s2u[intval($sock)]->rd_socket_set(NULL);
+                        if ($this->s2u[$id]->rd_socket_get() != NULL) {
+                            $this->s2u[$id]->rd_socket_set(NULL);
                         }
-                        unset($this->socks[intval($sock)]);
-                        unset($this->s2u[intval($sock)]);
+                        unset($this->socks[$id]);
+                        unset($this->s2u[$id]);
                         fclose($sock);
                         printf("CLOSE ON LOOP\n");
                     }