X-Git-Url: http://mop.ddnsfree.com/gitweb/?a=blobdiff_plain;f=web%2FObj%2Fsac-a-push.phh;h=27ed4c4f5f70fa9c35378aa72d9f384c0aa4cc7f;hb=059f6fcca909267802697e22ac7c8b701b9ed14e;hp=a69ea1dd575395584636e73b1abf69a6e2dee438;hpb=cb0a7d2568702826016699ac2f4d2c2cfdd66dcc;p=brisk.git diff --git a/web/Obj/sac-a-push.phh b/web/Obj/sac-a-push.phh index a69ea1d..27ed4c4 100644 --- a/web/Obj/sac-a-push.phh +++ b/web/Obj/sac-a-push.phh @@ -93,14 +93,31 @@ function pid_remove() } } -function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cookie) +function post_manage(&$post, $line) +{ + $a = explode('&', $line); + for ($i = 0 ; $i < count($a) ; $i++) { + $b = explode('=', $a[$i]); + if (isset($b[0])) { + if (isset($b[1])) { + $post[$b[0]] = urldecode($b[1]); + } + else { + $post[$b[0]] = ""; + } + } + } +} + +function spu_process_info($stream_info, &$method, &$header, &$get, &$post, &$cookie, &$rest, &$cont) { $check_post = FALSE; $header = array(); $get = array(); $post = array(); + $rest = 0; foreach(preg_split("/(\r?\n)/", $stream_info) as $line) { - // printf("LINE: [%s]\n", $line); + printf("LINE: [%s]\n", $line); if ($check_post) { if (!isset($header['The-Request'])) { return FALSE; @@ -109,14 +126,24 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook $method = $req[0]; if (isset($header['Cookie'])) { - $cookies = explode(";", $header['Cookie']); + + // LINE: [Cookie:sess=50e053a9511ef; CO_splashdate4=1356420646; CO_list=all; table_idx=7; table_token=510d494986925; lang=it; CO_bin5_pref_ring_endauct=false; CO_splashdate5=1358372822; CO_splashdate1=1363203374; CO_splashdate2=1363374826; __utma=43654517.209888411.1356605271.1356605271.1356605271.1; __utmz=43654517.1356605271.1.1.utmcsr=(direct)|utmccn=(direct)|utmcmd=(none)] + + $cookies = explode("; ", $header['Cookie']); for ($i = 0 ; $i < count($cookies) ; $i++) { - $nameval = explode("=", trim($cookies[$i])); - if (count($nameval) != 2) { - printf("WARNING: malformat cookie element [%s]\n", $cookies[$i]); + $name = mb_strstr($cookies[$i], "=", TRUE, 'UTF-8'); + if ($name == FALSE) { + if (mb_strlen($cookies[$i]) > 0) { + $cookie[$cookies[$i]] = ""; + } + else { + printf("WARNING: malformat cookie element [%s]\n", $cookies[$i]); + } continue; } - $cookie[$nameval[0]] = urldecode($nameval[1]); + + $value = mb_substr($cookies[$i], mb_strlen($name)+1, 10140, 'UTF-8'); + $cookie[$name] = urldecode($value); } } // GET params management @@ -136,17 +163,23 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook $header['Content-Type'] = $conttype_all[0]; // $path_all[1-] other things like charset and so on + // if (content-type is wrong || content-length isn't set) + // return false + if ($header['Content-Type'] != 'application/x-www-form-urlencoded' || !isset($header['Content-Length'])) { return FALSE; } $post_len = mb_strlen($line, "ASCII"); - $a = explode('&', $line); - for ($i = 0 ; $i < count($a) ; $i++) { - $b = explode('=', $a[$i]); - $post[$b[0]] = urldecode($b[1]); - } printf("INFO: postlen: %d\n", $post_len); + $rest = (int)($header['Content-Length']) - $post_len; + + if ($rest == 0) { + post_manage($post, $line); + } + else { + $cont = $line; + } } break; } @@ -155,7 +188,8 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook continue; } $split = explode(":", $line, 2); - $header[$split[0]] = $split[1]; + $hea_id = trim(mb_convert_case($split[0], MB_CASE_TITLE, 'UTF-8')); + $header[$hea_id] = $split[1]; } return $path; } @@ -185,6 +219,17 @@ function headers_render($header, $len) if (isset($header['Location'])) { $s = sprintf("HTTP/1.1 302 OK\r\n%sLocation: %s\r\n", $cookies, $header['Location']); } + else if (isset($header['HTTP-Response'])) { + $s = sprintf("HTTP/1.1 %s\r\n", $header['HTTP-Response']); + foreach($header as $key => $value) { + if (strtolower($key) == "http-response") + continue; + $s .= sprintf("%s: %s\r\n", $key, $value); + } + if ($len >= 0) { + $s .= sprintf("Content-Length: %ld\r\n", $len); + } + } else { $s = "HTTP/1.1 200 OK\r\n"; @@ -231,25 +276,6 @@ register_shutdown_function('shutta'); * MAIN */ -function chunked_content($zls, $content) -{ - if ($zls) { - $cont_comp = $zls->compress_chunk($content); - } - else { - $cont_comp = $content; - } - $cont_comp_l = mb_strlen($cont_comp, "ASCII"); - printf("CHUNK: [%s]\n", $content); - - return (sprintf("%X\r\n", $cont_comp_l).$cont_comp."\r\n"); -} - -function chunked_fini() -{ - return sprintf("0\r\n"); -} - function get_encoding($header) { $enc = "plain"; @@ -375,8 +401,10 @@ class Sac_a_push { var $file_socket; var $unix_socket; var $socks; - var $s2u; + var $s2u; // user associated with input socket + var $s2p; // pending page associated with input socket var $pending_pages; + var $is_daemon; var $list; var $in; @@ -419,7 +447,7 @@ class Sac_a_push { } } - static function create(&$app, $sockname, $debug, $blocking_mode) + static function create(&$app, $sockname, $debug, $blocking_mode, $argv) { $thiz = new Sac_a_push(); @@ -429,7 +457,13 @@ class Sac_a_push { $thiz->debug = $debug; $thiz->socks = array(); $thiz->s2u = array(); + $thiz->s2p = array(); $thiz->pending_pages = array(); + $thiz->is_daemon = FALSE; + + if (array_search("-d", $argv) !== FALSE || array_search("--daemon", $argv) !== FALSE) { + $thiz->is_daemon = TRUE; + } // create a couple of sockets for control management if (($sockpair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, @@ -473,22 +507,40 @@ class Sac_a_push { return ($thiz); } - function socks_set($sock, $user) + function socks_set($sock, $user, $pendpage) { $id = intval($sock); - $this->s2u[$id] = $user; $this->socks[$id] = $sock; + if ($user != NULL) + $this->s2u[$id] = $user; + if ($pendpage != NULL) + $this->s2p[$id] = $pendpage; } function socks_unset($sock) { $id = intval($sock); - unset($this->s2u[$id]); + if (isset($this->s2u[$id])) + unset($this->s2u[$id]); + if (isset($this->s2p[$id])) + unset($this->s2p[$id]); unset($this->socks[$id]); } + function pendpage_try_addcont(&$new_socket, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest, $cont) + { + $pendpage = PendingPage::pendingpage_continue(&$new_socket, $this->curtime, $tout, $method, + $header, $get, $post, $cookie, + $path, $addr, $rest, $cont); + + $pendpage->try_flush($this->curtime); + // Add $pendpage to the pendpage array (in any case) + fprintf(STDERR, "IMPORTANT: Pendadd: %d\n", $pendpage->status); + $this->pendpage_add($pendpage); + } + function pendpage_try_addflush(&$new_socket, $tout, $enc, $header_out, $content) { $pendpage = PendingPage::pendingpage_flushing($new_socket, $this->curtime, $tout, $enc, $header_out, $content); @@ -502,6 +554,34 @@ class Sac_a_push { function pendpage_add($pendpage) { array_push($this->pending_pages, $pendpage); + $this->socks_set($pendpage->socket_get(), NULL, $pendpage); + } + + function pendpage_rem($pendpage) + { + $sock = $pendpage->socket_get(); + if (($key = array_search($pendpage, $this->pending_pages)) !== FALSE) { + unset($this->pending_pages[$key]); + } + else { + fprintf(STDERR, "WARNING: pendpage not found\n"); + } + $this->socks_unset($sock); + fprintf(STDERR, "PP_REM: %d\n", intval($sock)); + } + + + function pendpage_try_addwait(&$new_socket, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest, $cont) + { + $pendpage = PendingPage::pendingpage_waiting($new_socket, $this->curtime, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest, $cont); + /* + if ($pendpage->try_flush($this->curtime) == FALSE) { + // Add $pendpage to the pendpage array + */ + $this->pendpage_add($pendpage); + /* + } + */ } function garbage_manager($force) @@ -509,14 +589,17 @@ class Sac_a_push { $this->app->garbage_manager($force); foreach ($this->socks as $k => $sock) { - if ($this->s2u[intval($sock)]->sess == '') { - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); + $id = intval($sock); + if (isset($this->s2u[$id])) { + if ($this->s2u[$id]->sess == '') { + if ($this->s2u[$id]->rd_socket_get() != NULL) { + $this->s2u[$id]->rd_socket_set(NULL); + } + unset($this->socks[$id]); + unset($this->s2u[$id]); + fclose($sock); + printf("CLOSE ON GARBAGE MANAGER\n"); } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); - fclose($sock); - printf("CLOSE ON GARBAGE MANAGER\n"); } } } @@ -534,6 +617,7 @@ class Sac_a_push { GLOBAL $G_splash_w, $G_topbanner, $G_with_donors, $G_with_poll; GLOBAL $G_with_sidebanner, $G_with_sidebanner2, $G_with_splash; GLOBAL $G_with_topbanner; + GLOBAL $G_tos_vers, $G_tos_fname, $G_tos_dtsoft, $G_tos_dthard, $G_tos_idx, $G_doc_path; if ($this->main_loop) { return (FALSE); @@ -542,17 +626,24 @@ class Sac_a_push { $this->main_loop = TRUE; while ($this->main_loop) { + $this->app->sess_cur_set(FALSE); $this->curtime = time(); - printf("IN LOOP: Current opened: %d pending_pages: %d - ", count($this->socks), count($this->pending_pages)); + fprintf(STDERR, "IN LOOP: Current opened: %d pending_pages: %d\n", count($this->socks), count($this->pending_pages)); /* Prepare the read array */ /* // when we manage it ... */ /* if ($shutdown) */ /* $read = array_merge(array("$in" => $in), $socks); */ /* else */ - $read = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in, - intval(static::$cnt_slave) => static::$cnt_slave), - $this->socks); + $pre_read = array_merge(array(intval($this->list) => $this->list, + intval(static::$cnt_slave) => static::$cnt_slave), + $this->socks); + if ($this->is_daemon == FALSE) { + $read = array_merge($pre_read, array(intval($this->in) => $this->in)); + } + else { + $read = $pre_read; + } if ($this->debug > 1) { printf("PRE_SELECT\n"); @@ -560,7 +651,7 @@ class Sac_a_push { } $write = NULL; $except = NULL; - $num_changed_sockets = @stream_select($read, $write, $except, 5, 500000); + $num_changed_sockets = @stream_select($read, $write, $except, 0, 500000); if ($num_changed_sockets == 0) { printf(" no data in 5 secs, splash [%d]\n", $G_with_splash); @@ -572,6 +663,8 @@ class Sac_a_push { } /* At least at one of the sockets something interesting happened */ foreach ($read as $i => $sock) { + $id = intval($sock); + $manage_page = FALSE; /* is_resource check is required because there is the possibility that during new request an old connection is closed */ if (!is_resource($sock)) { @@ -588,41 +681,36 @@ class Sac_a_push { $get = array(); $post = array(); $cookie = array(); + $rest = 0; + $cont = ""; if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { printf("NEW_SOCKET: %d\n", intval($new_socket)); stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking printf("RECEIVED HEADER:\n%s", $stream_info); - $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); + if (($path = spu_process_info($stream_info, $method, $header, + $get, $post, $cookie, $rest, $cont)) + == FALSE) { + fprintf(STDERR, "TODO: fix wrong header management\n"); + } $addr = stream_socket_get_name($new_socket, TRUE); printf("PATH: [%s]\n", $path); - printf("M: %s\nHEADER:\n", $method); - if ($method == "POST") { - // ADD PUSH INTO FD ARRAY AS WAITING DATA - // Passing all infos from spu_process_info as arguments: - // - // MAYBE: $stream_info, - // $method, $header, $get, $post, $cookie - // $s_a_p (this), $new_socket, substr($path, SITE_PREFIX_LEN), - // $addr - } - print_r($header); - printf("GET:\n"); - print_r($get); - printf("POST:\n"); - print_r($post); - printf("COOKIE:\n"); - print_r($cookie); - $header_out = array(); - - // TODO: MOVE DOWN request_mgr to factorize new_sockets and POST closed - $rret = FALSE; - if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) { - $rret = $this->app->request_mgr($this, $header, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie); + if ($method == "POST" && $rest > 0) { + if (isset($header['Expect']) && $header['Expect'] == '100-continue') { + fprintf(STDERR, "\nPOSTA DE CHE\n\n"); + $this->pendpage_try_addcont($new_socket, 20, + $method, $header, $get, $post, $cookie, + $path, $addr, $rest, $cont); + } + else { + $this->pendpage_try_addwait($new_socket, 20, + $method, $header, $get, $post, $cookie, + $path, $addr, $rest, $cont); + } } - if ($rret == FALSE) { - // FIXME: manage 404 !!! - printf("TODO: fix unknown page\n"); + else { + $manage_page = TRUE; } + printf("number of sockets after %d\n", count($this->socks)); } else { @@ -630,28 +718,34 @@ class Sac_a_push { } } else { - $buf = fread($sock, 512); + $buf = fread($sock, 4096); // if socket is closed - if ($buf == FALSE || strlen($buf) == 0) { + if ($buf == FALSE || mb_strlen($buf, "ASCII") == 0) { // close socket case if ($buf == FALSE) { - printf("ERROR READING\n"); + printf("INFO: read return false\n"); } if ($sock === $this->list) { - printf("Arrivati %d bytes da list\n", strlen($buf)); + printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); return(21); } else if ($sock === $this->in || $sock === static::$cnt_slave) { - printf("Arrivati %d bytes da stdin\n", strlen($buf)); + printf("Arrivati %d bytes da stdin\n", mb_strlen($buf, "ASCII")); return(22); } else { - // $user_a[$s2u[intval($sock)]]->disable(); - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); + unset($this->socks[$id]); + if (isset($this->s2u[$id])) { + // $user_a[$s2u[$id]]->disable(); + if ($this->s2u[$id]->rd_socket_get() != NULL) { + // try to send close frame (for websocket) + $clo = $this->s2u[$id]->stream_close(); + $clo_l = mb_strlen($clo, "ASCII"); + @fwrite($sock, $clo, $clo_l); + $this->s2u[$id]->rd_socket_set(NULL); + } + unset($this->s2u[$id]); } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); } fclose($sock); printf("CLOSE ON READ\n"); @@ -666,17 +760,17 @@ class Sac_a_push { print_r($read); } if ($sock === $this->list) { - printf("Arrivati %d bytes da list\n", strlen($buf)); + printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); } else if ($sock === $this->in || $sock === static::$cnt_slave) { - printf("Arrivati %d bytes da stdin\n", strlen($buf)); + printf("Arrivati %d bytes da stdin\n", mb_strlen($buf, "ASCII")); $line = trim($buf); if ($line == "reload") { require("$DOCUMENT_ROOT/Etc/".BRISK_CONF); global_dump(); } - else if ($line == "shutdown") { + else if ($line == "shutdown" || $line == "sd") { if ($this->app->dump_data()) { return(0); } @@ -687,21 +781,55 @@ class Sac_a_push { } else { $key = array_search("$sock", $this->socks); - printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key); + fprintf(STDERR, "Arrivati %d bytes dalla socket n. %d\n", mb_strlen($buf, "ASCII"), $key); + if (isset($this->s2p[$id])) { + $this->s2p[$id]->rest -= mb_strlen($buf, "ASCII"); + $this->s2p[$id]->cont .= $buf; + if ($this->s2p[$id]->rest <= 0) { + $header = $new_socket = $path = $addr = $get = $cookie = 0; + $post = array(); + + $this->s2p[$id]->context_get($header, $new_socket, $path, $addr, $get, $post, $cookie); + $this->pendpage_rem($this->s2p[$id]); + fprintf(STDERR, "SOCKET RUN: %s\n", $new_socket); + + $manage_page = TRUE; + } + } } } } - // TODO: MOVE HERE request_mgr to factorize new_sockets and POST closed - // $rret = $this->app->request_mgr + + if ($manage_page == TRUE) { + printf("M: %s\nHEADER:\n", $method); + print_r($header); + printf("GET:\n"); + print_r($get); + printf("POST:\n"); + print_r($post); + printf("COOKIE:\n"); + print_r($cookie); + + $header_out = array(); + // TODO: MOVE DOWN request_mgr to factorize new_sockets and POST closed + $rret = FALSE; + if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) { + $rret = $this->app->request_mgr($this, $header, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie); + } + if ($rret == FALSE) { + // FIXME: manage 404 !!! + printf("TODO: fix unknown page\n"); + } + } } } $this->garbage_manager(FALSE); /* manage unfinished pages */ - foreach ($this->pending_pages as $k => $pgflush) { + foreach ($this->pending_pages as $k => $pendpage) { // TODO: try_flush if exists in the class - if ($pgflush->try_flush($this->curtime) == TRUE) { + if ($pendpage->try_flush($this->curtime) == TRUE) { unset($this->pending_pages[$k]); } } @@ -714,8 +842,9 @@ class Sac_a_push { /* manage open streaming */ foreach ($this->socks as $k => $sock) { - if (isset($this->s2u[intval($sock)])) { - $user = $this->s2u[intval($sock)]; + $id = intval($sock); + if (isset($this->s2u[$id])) { + $user = $this->s2u[$id]; $response = $user->rd_cache_get(); $do_ping = FALSE; if (($this->curtime - $user->lacc) > (EXPIRE_TIME_RD / 3)) { @@ -737,15 +866,15 @@ class Sac_a_push { $content = $user->stream_keepalive(FALSE); } if ($content != "") { - $response = chunked_content($user->rd_zls_get(), $content); + $response = $user->chunked_content($content); } } if ($response != "") { // echo "SPIA: [".substr($response, 0, 60)."...]\n"; - echo "SPIA: [".$response."]\n"; + // echo "SPIA: [".$response."]\n"; $response_l = mb_strlen($response, "ASCII"); - $wret = @fwrite($sock, $response); + $wret = @fwrite($sock, $response, $response_l); if ($wret < $response_l) { printf("TROUBLE WITH FWRITE: %d\n", $wret); $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII")); @@ -759,11 +888,14 @@ class Sac_a_push { // close socket after a while to prevent client memory consumption if ($user->rd_endtime_is_expired($this->curtime)) { - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); + if ($this->s2u[$id]->rd_socket_get() != NULL) { + $this->s2u[$id]->rd_socket_set(NULL); } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); + unset($this->socks[$id]); + unset($this->s2u[$id]); + $clo = $user->stream_close(); + $clo_l = mb_strlen($clo, "ASCII"); + @fwrite($sock, $clo, $clo_l); fclose($sock); printf("CLOSE ON LOOP\n"); }