POST managed with Expected management and long post data
[brisk.git] / web / Obj / sac-a-push.phh
index b931340..c71cdf1 100644 (file)
@@ -93,14 +93,31 @@ function pid_remove()
     }
 }
 
-function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cookie)
+function post_manage(&$post, $line)
+{
+    $a = explode('&', $line);
+    for ($i = 0 ; $i < count($a) ; $i++) {
+        $b = explode('=', $a[$i]);
+        if (isset($b[0])) {
+            if (isset($b[1])) {
+                $post[$b[0]] = urldecode($b[1]);
+            }
+            else {
+                $post[$b[0]] = "";
+            }
+        }
+    }
+}
+
+function spu_process_info($stream_info, &$method, &$header, &$get, &$post, &$cookie, &$rest, &$cont)
 {
     $check_post = FALSE;
     $header = array();
     $get = array();
     $post = array();
+    $rest = 0;
     foreach(preg_split("/(\r?\n)/", $stream_info) as $line) {
-        // printf("LINE: [%s]\n", $line);
+        printf("LINE: [%s]\n", $line);
         if ($check_post) {
             if (!isset($header['The-Request'])) {
                 return FALSE;
@@ -121,7 +138,7 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook
             }
             // GET params management
             $get_vars = explode('?', $req[1], 2);
-            $path =   $get_vars[0];
+            $path = $get_vars[0];
             if (count($get_vars) > 1) {
                 $a = explode('&', $get_vars[1]);
                 printf("A COUNT: [%s] %d\n", $a[0], count($a));
@@ -132,17 +149,27 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook
             }
             // POST params management
             if ($req[0] == 'POST') {
+                $conttype_all = explode(";", $header['Content-Type']);
+                $header['Content-Type'] = $conttype_all[0];
+                // $path_all[1-] other things like charset and so on
+
+                // if (content-type is wrong || content-length isn't set)
+                //     return false
+
                 if ($header['Content-Type'] != 'application/x-www-form-urlencoded' 
                     || !isset($header['Content-Length'])) {
                     return FALSE;
                 }
-                $post_len = mb_strlen($line, "latin1");
-                $a = explode('&', $line);
-                for ($i = 0 ; $i < count($a) ; $i++) {
-                    $b = explode('=', $a[$i]);
-                    $post[$b[0]] = urldecode($b[1]);
-                }
+                $post_len = mb_strlen($line, "ASCII");
                 printf("INFO: postlen: %d\n", $post_len);
+                $rest = (int)($header['Content-Length']) - $post_len;
+
+                if ($rest == 0) {
+                    post_manage($post, $line);
+                }
+                else {
+                    $cont = $line;
+                }
             }
             break;
         }
@@ -181,6 +208,9 @@ function headers_render($header, $len)
     if (isset($header['Location'])) {
         $s = sprintf("HTTP/1.1 302 OK\r\n%sLocation: %s\r\n", $cookies, $header['Location']);
     }
+    else if (isset($header['HTTP-Response'])) {
+        $s = sprintf("HTTP/1.1 %s\r\n", $header['HTTP-Response']);
+    }
     else {
         $s = "HTTP/1.1 200 OK\r\n";
 
@@ -371,8 +401,9 @@ class Sac_a_push {
     var $file_socket;
     var $unix_socket;
     var $socks;
-    var $s2u;
-    var $pages_flush;
+    var $s2u;             // user associated with input socket
+    var $s2p;             // pending page associated with input socket
+    var $pending_pages;
 
     var $list;
     var $in;
@@ -425,7 +456,8 @@ class Sac_a_push {
         $thiz->debug = $debug;
         $thiz->socks = array();
         $thiz->s2u  = array();
-        $thiz->pages_flush = array();
+        $thiz->s2p  = array();
+        $thiz->pending_pages = array();
 
         // create a couple of sockets for control management
         if (($sockpair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM,
@@ -469,35 +501,81 @@ class Sac_a_push {
         return ($thiz);
     }
 
-    function socks_set($sock, $user)
+    function socks_set($sock, $user, $pendpage)
     {
         $id = intval($sock);
 
-        $this->s2u[$id]   = $user;
         $this->socks[$id] = $sock;
+        if ($user != NULL)
+            $this->s2u[$id]   = $user;
+        if ($pendpage != NULL)
+            $this->s2p[$id]   = $pendpage;
     }
 
     function socks_unset($sock)
     {
         $id = intval($sock);
 
-        unset($this->s2u[$id]);
+        if (isset($this->s2u[$id]))
+            unset($this->s2u[$id]);
+        if (isset($this->s2p[$id]))
+            unset($this->s2p[$id]);
         unset($this->socks[$id]);
     }
 
-    function pgflush_try_add($enc, &$new_socket, $tout, $header_out, $content)
+    function pendpage_try_addcont(&$new_socket, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest, $cont)
+    {
+        $pendpage = PendingPage::pendingpage_continue(&$new_socket, $this->curtime, $tout, $method,
+                                                           $header,           $get, $post, $cookie,
+                                                             $path,          $addr, $rest, $cont);
+
+        $pendpage->try_flush($this->curtime);
+        // Add $pendpage to the pendpage array (in any case)
+        fprintf(STDERR, "IMPORTANT: Pendadd: %d\n", $pendpage->status);
+        $this->pendpage_add($pendpage);
+    }
+
+    function pendpage_try_addflush(&$new_socket, $tout, $enc, $header_out, $content)
+    {
+        $pendpage = PendingPage::pendingpage_flushing($new_socket, $this->curtime, $tout, $enc, $header_out, $content);
+
+        if ($pendpage->try_flush($this->curtime) == FALSE) {
+            // Add $pendpage to the pendpage array
+            $this->pendpage_add($pendpage);
+        }
+    }
+
+    function pendpage_add($pendpage)
     {
-        $pgflush = new PageFlush($new_socket, $enc, $this->curtime, $tout, $header_out, $content);
+        array_push($this->pending_pages, $pendpage);
+        $this->socks_set($pendpage->socket_get(), NULL, $pendpage);
+    }
 
-        if ($pgflush->try_flush($this->curtime) == FALSE) {
-            // Add $pgflush to the pgflush array
-            $this->pgflush_add($pgflush);
+    function pendpage_rem($pendpage)
+    {
+        $sock = $pendpage->socket_get();
+        if (($key = array_search($pendpage, $this->pending_pages)) !== FALSE) {
+            unset($this->pending_pages[$key]);
         }
+        else {
+            fprintf(STDERR, "WARNING: pendpage not found\n");
+        }
+        $this->socks_unset($sock);
+        fprintf(STDERR, "PP_REM: %d\n", intval($sock));
     }
 
-    function pgflush_add($pgflush)
+
+    function pendpage_try_addwait(&$new_socket, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest, $cont)
     {
-        array_push($this->pages_flush, $pgflush);
+        $pendpage = PendingPage::pendingpage_waiting($new_socket, $this->curtime, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest, $cont);
+        /*
+        if ($pendpage->try_flush($this->curtime) == FALSE) {
+            // Add $pendpage to the pendpage array
+            */
+        $this->pendpage_add($pendpage);
+        /*
+        }
+        */
     }
 
     function garbage_manager($force)
@@ -505,14 +583,17 @@ class Sac_a_push {
         $this->app->garbage_manager($force);
 
         foreach ($this->socks as $k => $sock) {
-            if ($this->s2u[intval($sock)]->sess == '') {
-                if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
-                    $this->s2u[intval($sock)]->rd_socket_set(NULL);
+            $id = intval($sock);
+            if (isset($this->s2u[$id])) {
+                if ($this->s2u[$id]->sess == '') {
+                    if ($this->s2u[$id]->rd_socket_get() != NULL) {
+                        $this->s2u[$id]->rd_socket_set(NULL);
+                    }
+                    unset($this->socks[$id]);
+                    unset($this->s2u[$id]);
+                    fclose($sock);
+                    printf("CLOSE ON GARBAGE MANAGER\n");
                 }
-                unset($this->socks[intval($sock)]);
-                unset($this->s2u[intval($sock)]);
-                fclose($sock);
-                printf("CLOSE ON GARBAGE MANAGER\n");
             }
         }
     }
@@ -539,7 +620,7 @@ class Sac_a_push {
         
         while ($this->main_loop) {
             $this->curtime = time();
-            printf("IN LOOP: Current opened: %d  pages_flush: %d - ", count($this->socks), count($this->pages_flush));
+            fprintf(STDERR, "IN LOOP: Current opened: %d  pending_pages: %d\n", count($this->socks), count($this->pending_pages));
             
             /* Prepare the read array */
             /* // when we manage it ... */
@@ -556,7 +637,7 @@ class Sac_a_push {
             }
             $write  = NULL;
             $except = NULL;
-            $num_changed_sockets = @stream_select($read, $write, $except, 0, 500000);
+            $num_changed_sockets = @stream_select($read, $write, $except, 5, 500000);
         
             if ($num_changed_sockets == 0) {
                 printf(" no data in 5 secs, splash [%d]\n", $G_with_splash);
@@ -568,6 +649,8 @@ class Sac_a_push {
                 }
                 /* At least at one of the sockets something interesting happened */
                 foreach ($read as $i => $sock) {
+                    $id = intval($sock);
+                    $manage_page = FALSE;
                     /* is_resource check is required because there is the possibility that
                        during new request an old connection is closed */
                     if (!is_resource($sock)) {
@@ -584,33 +667,36 @@ class Sac_a_push {
                         $get         = array();
                         $post        = array();
                         $cookie      = array();
+                        $rest        = 0;
+                        $cont        = "";
                         if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) {
                             printf("NEW_SOCKET: %d\n", intval($new_socket));
                             stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking
                             printf("RECEIVED HEADER:\n%s", $stream_info);
-                            $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie);
-                            printf("PATH: [%s]\n", $path);
-                            printf("M: %s\nHEADER:\n", $method);
-                            print_r($header);
-                            printf("GET:\n");
-                            print_r($get);
-                            printf("POST:\n");
-                            print_r($post);
-                            printf("COOKIE:\n");
-                            print_r($cookie);
+                            if (($path = spu_process_info($stream_info, $method, $header,
+                                                          $get, $post, $cookie, $rest, $cont))
+                                == FALSE) {
+                                fprintf(STDERR, "TODO: fix wrong header management\n");
+                            }
                             $addr = stream_socket_get_name($new_socket, TRUE);
-                            $header_out = array();
-
-                            $subs = SITE_PREFIX."briskin5/";
-                            $subs_l = strlen($subs);
-                            $rret = FALSE;
-                            if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) {
-                                $rret = $this->app->request_mgr($this, $header, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie);
+                            printf("PATH: [%s]\n", $path);
+                            if ($method == "POST" && $rest > 0) {
+                                if (isset($header['Expect']) && $header['Expect'] == '100-continue') {
+                                    fprintf(STDERR, "\nPOSTA DE CHE\n\n");
+                                    $this->pendpage_try_addcont($new_socket, 20,
+                                                                $method, $header, $get, $post, $cookie,
+                                                                $path, $addr, $rest, $cont);
+                                }
+                                else {
+                                    $this->pendpage_try_addwait($new_socket, 20,
+                                                                $method, $header, $get, $post, $cookie,
+                                                                $path, $addr, $rest, $cont);
+                                }
                             }
-                            if ($rret == FALSE) { 
-                                // FIXME: manage 404 !!!
-                                printf("TODO: fix unknown page\n");
+                            else {
+                                $manage_page = TRUE;
                             }
+
                             printf("number of sockets after %d\n", count($this->socks));
                         }
                         else {
@@ -618,27 +704,30 @@ class Sac_a_push {
                         }
                     }
                     else {
-                        $buf = fread($sock, 512);
+                        $buf = fread($sock, 4096);
                         // if socket is closed
-                        if ($buf == FALSE || strlen($buf) == 0) {
+                        if ($buf == FALSE || mb_strlen($buf, "ASCII") == 0) {
+                            // close socket case
                             if ($buf == FALSE) {
                                 printf("ERROR READING\n");
                             }
                             if ($sock === $this->list) {
-                                printf("Arrivati %d bytes da list\n", strlen($buf));
+                                printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII"));
                                 return(21);
                             }
                             else if ($sock === $this->in || $sock === static::$cnt_slave) {
-                                printf("Arrivati %d bytes da stdin\n", strlen($buf));
+                                printf("Arrivati %d bytes da stdin\n", mb_strlen($buf, "ASCII"));
                                 return(22);
                             }
                             else {
-                                // $user_a[$s2u[intval($sock)]]->disable();
-                                if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
-                                    $this->s2u[intval($sock)]->rd_socket_set(NULL);
+                                unset($this->socks[$id]);
+                                if (isset($this->s2u[$id])) {
+                                    // $user_a[$s2u[$id]]->disable();
+                                    if ($this->s2u[$id]->rd_socket_get() != NULL) {
+                                        $this->s2u[$id]->rd_socket_set(NULL);
+                                    }
+                                    unset($this->s2u[$id]);
                                 }
-                                unset($this->socks[intval($sock)]);
-                                unset($this->s2u[intval($sock)]);
                             }
                             fclose($sock);
                             printf("CLOSE ON READ\n");
@@ -653,10 +742,10 @@ class Sac_a_push {
                                 print_r($read);
                             }
                             if ($sock === $this->list) {
-                                printf("Arrivati %d bytes da list\n", strlen($buf));
+                                printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII"));
                             }
                             else if ($sock === $this->in || $sock === static::$cnt_slave) {
-                                printf("Arrivati %d bytes da stdin\n", strlen($buf));
+                                printf("Arrivati %d bytes da stdin\n", mb_strlen($buf, "ASCII"));
                                 $line = trim($buf);
                                 if ($line == "reload") {
                                     require("$DOCUMENT_ROOT/Etc/".BRISK_CONF);
@@ -674,19 +763,57 @@ class Sac_a_push {
                             }
                             else {
                                 $key = array_search("$sock", $this->socks);
-                                printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key);
+                                fprintf(STDERR, "Arrivati %d bytes dalla socket n. %d\n", mb_strlen($buf, "ASCII"), $key);
+                                if (isset($this->s2p[$id])) {
+                                    $this->s2p[$id]->rest -= mb_strlen($buf, "ASCII");
+                                    $this->s2p[$id]->cont .= $buf;
+                                    if ($this->s2p[$id]->rest <= 0) {
+                                        $header = $new_socket = $path = $addr = $get = $cookie = 0;
+                                        $post = array();
+
+                                        $this->s2p[$id]->context_get($header, $new_socket, $path, $addr, $get, $post, $cookie);
+                                        $this->pendpage_rem($this->s2p[$id]);
+                                        fprintf(STDERR, "SOCKET RUN: %s\n", $new_socket);
+
+                                        $manage_page = TRUE;
+                                    }
+                                }
                             }
                         }
                     }
+
+                    if ($manage_page == TRUE) {
+                        printf("M: %s\nHEADER:\n", $method);
+                        print_r($header);
+                        printf("GET:\n");
+                        print_r($get);
+                        printf("POST:\n");
+                        print_r($post);
+                        printf("COOKIE:\n");
+                        print_r($cookie);
+
+                        $header_out = array();
+                        // TODO: MOVE DOWN request_mgr to factorize new_sockets and POST closed
+                        $rret = FALSE;
+                        if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) {
+                            $rret = $this->app->request_mgr($this, $header, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie);
+                        }
+                        fprintf(STDERR, "\n\n DI QUI PASSA [%s]\n\n", $rret);
+                        if ($rret == FALSE) {
+                            // FIXME: manage 404 !!!
+                            printf("TODO: fix unknown page\n");
+                        }
+                    }
                 }
             }
 
             $this->garbage_manager(FALSE);
 
             /* manage unfinished pages */
-            foreach ($this->pages_flush as $k => $pgflush) {
-                if ($pgflush->try_flush($this->curtime) == TRUE) {
-                    unset($this->pages_flush[$k]);
+            foreach ($this->pending_pages as $k => $pendpage) {
+                // TODO: try_flush if exists in the class
+                if ($pendpage->try_flush($this->curtime) == TRUE) {
+                    unset($this->pending_pages[$k]);
                 }
             }
             
@@ -698,8 +825,9 @@ class Sac_a_push {
 
             /* manage open streaming */
             foreach ($this->socks as $k => $sock) {
-                if (isset($this->s2u[intval($sock)])) {
-                    $user = $this->s2u[intval($sock)];
+                $id = intval($sock);
+                if (isset($this->s2u[$id])) {
+                    $user = $this->s2u[$id];
                     $response = $user->rd_cache_get();
                     $do_ping = FALSE;
                     if (($this->curtime - $user->lacc) > (EXPIRE_TIME_RD / 3)) {
@@ -743,11 +871,11 @@ class Sac_a_push {
                     
                     // close socket after a while to prevent client memory consumption
                     if ($user->rd_endtime_is_expired($this->curtime)) {
-                        if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
-                            $this->s2u[intval($sock)]->rd_socket_set(NULL);
+                        if ($this->s2u[$id]->rd_socket_get() != NULL) {
+                            $this->s2u[$id]->rd_socket_set(NULL);
                         }
-                        unset($this->socks[intval($sock)]);
-                        unset($this->s2u[intval($sock)]);
+                        unset($this->socks[$id]);
+                        unset($this->s2u[$id]);
                         fclose($sock);
                         printf("CLOSE ON LOOP\n");
                     }