X-Git-Url: http://mop.ddnsfree.com/gitweb/?a=blobdiff_plain;f=web%2FObj%2Fsac-a-push.phh;h=a4e463c058d2223966f274c1eec89c4e91157d2f;hb=eeef8a0eacaa2d3d0174dba8561ec860868bc1c6;hp=b931340b629509d865c78d620dc58286cd0f7215;hpb=44fb43fc14cc20e55dbc214f5e533b054e5b159c;p=brisk.git diff --git a/web/Obj/sac-a-push.phh b/web/Obj/sac-a-push.phh index b931340..a4e463c 100644 --- a/web/Obj/sac-a-push.phh +++ b/web/Obj/sac-a-push.phh @@ -93,14 +93,15 @@ function pid_remove() } } -function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cookie) +function spu_process_info($stream_info, &$method, &$header, &$get, &$post, &$cookie, &$rest) { $check_post = FALSE; $header = array(); $get = array(); $post = array(); + $rest = 0; foreach(preg_split("/(\r?\n)/", $stream_info) as $line) { - // printf("LINE: [%s]\n", $line); + printf("LINE: [%s]\n", $line); if ($check_post) { if (!isset($header['The-Request'])) { return FALSE; @@ -121,7 +122,7 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook } // GET params management $get_vars = explode('?', $req[1], 2); - $path = $get_vars[0]; + $path = $get_vars[0]; if (count($get_vars) > 1) { $a = explode('&', $get_vars[1]); printf("A COUNT: [%s] %d\n", $a[0], count($a)); @@ -132,17 +133,22 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook } // POST params management if ($req[0] == 'POST') { + $conttype_all = explode(";", $header['Content-Type']); + $header['Content-Type'] = $conttype_all[0]; + // $path_all[1-] other things like charset and so on + if ($header['Content-Type'] != 'application/x-www-form-urlencoded' || !isset($header['Content-Length'])) { return FALSE; } - $post_len = mb_strlen($line, "latin1"); + $post_len = mb_strlen($line, "ASCII"); $a = explode('&', $line); for ($i = 0 ; $i < count($a) ; $i++) { $b = explode('=', $a[$i]); $post[$b[0]] = urldecode($b[1]); } printf("INFO: postlen: %d\n", $post_len); + $rest = (int)($header['Content-Length']) - $post_len; } break; } @@ -371,8 +377,9 @@ class Sac_a_push { var $file_socket; var $unix_socket; var $socks; - var $s2u; - var $pages_flush; + var $s2u; // user associated with input socket + var $s2p; // pending page associated with input socket + var $pending_pages; var $list; var $in; @@ -425,7 +432,8 @@ class Sac_a_push { $thiz->debug = $debug; $thiz->socks = array(); $thiz->s2u = array(); - $thiz->pages_flush = array(); + $thiz->s2p = array(); + $thiz->pending_pages = array(); // create a couple of sockets for control management if (($sockpair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, @@ -469,35 +477,54 @@ class Sac_a_push { return ($thiz); } - function socks_set($sock, $user) + function socks_set($sock, $user, $pendpage) { $id = intval($sock); - $this->s2u[$id] = $user; $this->socks[$id] = $sock; + if ($user != NULL) + $this->s2u[$id] = $user; + if ($pendpage != NULL) + $this->s2p[$id] = $pendpage; } function socks_unset($sock) { $id = intval($sock); - unset($this->s2u[$id]); + if (isset($this->s2u[$id])) + unset($this->s2u[$id]); + if (isset($this->s2p[$id])) + unset($this->s2p[$id]); unset($this->socks[$id]); } - function pgflush_try_add($enc, &$new_socket, $tout, $header_out, $content) + function pendpage_try_addflush(&$new_socket, $tout, $enc, $header_out, $content) { - $pgflush = new PageFlush($new_socket, $enc, $this->curtime, $tout, $header_out, $content); + $pendpage = PendingPage::pendingpage_flushing($new_socket, $this->curtime, $tout, $enc, $header_out, $content); - if ($pgflush->try_flush($this->curtime) == FALSE) { - // Add $pgflush to the pgflush array - $this->pgflush_add($pgflush); + if ($pendpage->try_flush($this->curtime) == FALSE) { + // Add $pendpage to the pendpage array + $this->pendpage_add($pendpage); } } - function pgflush_add($pgflush) + function pendpage_add($pendpage) + { + array_push($this->pending_pages, $pendpage); + } + + function pendpage_try_addwait(&$new_socket, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest) { - array_push($this->pages_flush, $pgflush); + $pendpage = PendingPage::pendingpage_waiting($new_socket, $this->curtime, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest); + /* + if ($pendpage->try_flush($this->curtime) == FALSE) { + // Add $pendpage to the pendpage array + */ + $this->pendpage_add($pendpage); + /* + } + */ } function garbage_manager($force) @@ -505,14 +532,17 @@ class Sac_a_push { $this->app->garbage_manager($force); foreach ($this->socks as $k => $sock) { - if ($this->s2u[intval($sock)]->sess == '') { - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); + $id = intval($sock); + if (isset($this->s2u[$id])) { + if ($this->s2u[$id]->sess == '') { + if ($this->s2u[$id]->rd_socket_get() != NULL) { + $this->s2u[$id]->rd_socket_set(NULL); + } + unset($this->socks[$id]); + unset($this->s2u[$id]); + fclose($sock); + printf("CLOSE ON GARBAGE MANAGER\n"); } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); - fclose($sock); - printf("CLOSE ON GARBAGE MANAGER\n"); } } } @@ -539,7 +569,7 @@ class Sac_a_push { while ($this->main_loop) { $this->curtime = time(); - printf("IN LOOP: Current opened: %d pages_flush: %d - ", count($this->socks), count($this->pages_flush)); + printf("IN LOOP: Current opened: %d pending_pages: %d - ", count($this->socks), count($this->pending_pages)); /* Prepare the read array */ /* // when we manage it ... */ @@ -556,7 +586,7 @@ class Sac_a_push { } $write = NULL; $except = NULL; - $num_changed_sockets = @stream_select($read, $write, $except, 0, 500000); + $num_changed_sockets = @stream_select($read, $write, $except, 5, 500000); if ($num_changed_sockets == 0) { printf(" no data in 5 secs, splash [%d]\n", $G_with_splash); @@ -568,6 +598,8 @@ class Sac_a_push { } /* At least at one of the sockets something interesting happened */ foreach ($read as $i => $sock) { + $id = intval($sock); + $manage_page = FALSE; /* is_resource check is required because there is the possibility that during new request an old connection is closed */ if (!is_resource($sock)) { @@ -584,13 +616,35 @@ class Sac_a_push { $get = array(); $post = array(); $cookie = array(); + $rest = 0; if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { printf("NEW_SOCKET: %d\n", intval($new_socket)); stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking printf("RECEIVED HEADER:\n%s", $stream_info); - $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); + if (($path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie, $rest)) + == FALSE) { + fprintf(STDERR, "TODO: fix wrong header management\n"); + } + $addr = stream_socket_get_name($new_socket, TRUE); printf("PATH: [%s]\n", $path); printf("M: %s\nHEADER:\n", $method); + if ($method == "POST" && $rest > 0) { + fprintf(STDERR, "\nPOSTA DE CHE\n\n"); + $this->pendpage_try_addwait($new_socket, 20, + $method, $header, $get, $post, $cookie, + substr($path, SITE_PREFIX_LEN), $addr, $rest); + + // ADD PUSH INTO FD ARRAY AS WAITING DATA + // Passing all infos from spu_process_info as arguments: + // + // MAYBE: $stream_info, + // $method, $header, $get, $post, $cookie + // $s_a_p (this), $new_socket, substr($path, SITE_PREFIX_LEN), + // $addr + } + else { + $manage_page = TRUE; + } print_r($header); printf("GET:\n"); print_r($get); @@ -598,19 +652,9 @@ class Sac_a_push { print_r($post); printf("COOKIE:\n"); print_r($cookie); - $addr = stream_socket_get_name($new_socket, TRUE); - $header_out = array(); - $subs = SITE_PREFIX."briskin5/"; - $subs_l = strlen($subs); - $rret = FALSE; - if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) { - $rret = $this->app->request_mgr($this, $header, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie); - } - if ($rret == FALSE) { - // FIXME: manage 404 !!! - printf("TODO: fix unknown page\n"); - } + + printf("number of sockets after %d\n", count($this->socks)); } else { @@ -621,6 +665,7 @@ class Sac_a_push { $buf = fread($sock, 512); // if socket is closed if ($buf == FALSE || strlen($buf) == 0) { + // close socket case if ($buf == FALSE) { printf("ERROR READING\n"); } @@ -633,12 +678,14 @@ class Sac_a_push { return(22); } else { - // $user_a[$s2u[intval($sock)]]->disable(); - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); + unset($this->socks[$id]); + if (isset($this->s2u[$id])) { + // $user_a[$s2u[$id]]->disable(); + if ($this->s2u[$id]->rd_socket_get() != NULL) { + $this->s2u[$id]->rd_socket_set(NULL); + } + unset($this->s2u[$id]); } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); } fclose($sock); printf("CLOSE ON READ\n"); @@ -678,15 +725,32 @@ class Sac_a_push { } } } + + // TODO: MOVE HERE request_mgr to factorize new_sockets and POST closed + // $rret = $this->app->request_mgr + if ($manage_page == TRUE) { + $header_out = array(); + // TODO: MOVE DOWN request_mgr to factorize new_sockets and POST closed + $rret = FALSE; + if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) { + $rret = $this->app->request_mgr($this, $header, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie); + } + fprintf(STDERR, "\n\n DI QUI PASSA [%s]\n\n", $rret); + if ($rret == FALSE) { + // FIXME: manage 404 !!! + printf("TODO: fix unknown page\n"); + } + } } } $this->garbage_manager(FALSE); /* manage unfinished pages */ - foreach ($this->pages_flush as $k => $pgflush) { - if ($pgflush->try_flush($this->curtime) == TRUE) { - unset($this->pages_flush[$k]); + foreach ($this->pending_pages as $k => $pendpage) { + // TODO: try_flush if exists in the class + if ($pendpage->try_flush($this->curtime) == TRUE) { + unset($this->pending_pages[$k]); } } @@ -698,8 +762,9 @@ class Sac_a_push { /* manage open streaming */ foreach ($this->socks as $k => $sock) { - if (isset($this->s2u[intval($sock)])) { - $user = $this->s2u[intval($sock)]; + $id = intval($sock); + if (isset($this->s2u[$id])) { + $user = $this->s2u[$id]; $response = $user->rd_cache_get(); $do_ping = FALSE; if (($this->curtime - $user->lacc) > (EXPIRE_TIME_RD / 3)) { @@ -743,11 +808,11 @@ class Sac_a_push { // close socket after a while to prevent client memory consumption if ($user->rd_endtime_is_expired($this->curtime)) { - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); + if ($this->s2u[$id]->rd_socket_get() != NULL) { + $this->s2u[$id]->rd_socket_set(NULL); } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); + unset($this->socks[$id]); + unset($this->s2u[$id]); fclose($sock); printf("CLOSE ON LOOP\n"); }