X-Git-Url: https://mop.ddnsfree.com/gitweb/?a=blobdiff_plain;f=web%2FObj%2Fsac-a-push.phh;h=6efc241629b4443a1d38ec139ab2a16eb5a2a14c;hb=db5d6355c39327ba7f8052f360f1e846ba4ad01c;hp=527d9b78d18d109f7671e38edb4ad195029699ab;hpb=31c53d65779a84c86cb833bfe13be8fe3e4dae1d;p=brisk.git diff --git a/web/Obj/sac-a-push.phh b/web/Obj/sac-a-push.phh index 527d9b7..6efc241 100644 --- a/web/Obj/sac-a-push.phh +++ b/web/Obj/sac-a-push.phh @@ -1,9 +1,8 @@ 0) { + $cookie[$cookies[$i]] = ""; + } + else { + printf("WARNING: malformat cookie element [%s]\n", $cookies[$i]); + } continue; } - $cookie[$nameval[0]] = urldecode($nameval[1]); + + $value = mb_substr($cookies[$i], mb_strlen($name)+1, 10140, 'UTF-8'); + $cookie[$name] = urldecode($value); } } // GET params management $get_vars = explode('?', $req[1], 2); - $path = $get_vars[0]; + $path = $get_vars[0]; if (count($get_vars) > 1) { $a = explode('&', $get_vars[1]); - printf("A COUNT: [%s] %d\n", $a[0], count($a)); + // printf("A COUNT: [%s] %d\n", $a[0], count($a)); for ($i = 0 ; $i < count($a) ; $i++) { $b = explode('=', $a[$i]); + if ($b[0] == "") + continue; $get[$b[0]] = urldecode($b[1]); } } // POST params management if ($req[0] == 'POST') { + $conttype_all = explode(";", $header['Content-Type']); + $header['Content-Type'] = $conttype_all[0]; + // $path_all[1-] other things like charset and so on + + // if (content-type is wrong || content-length isn't set) + // return false + if ($header['Content-Type'] != 'application/x-www-form-urlencoded' || !isset($header['Content-Length'])) { return FALSE; } - $post_len = mb_strlen($line, "latin1"); - $a = explode('&', $line); - for ($i = 0 ; $i < count($a) ; $i++) { - $b = explode('=', $a[$i]); - $post[$b[0]] = urldecode($b[1]); + $post_len = mb_strlen($line, "ASCII"); + // printf("INFO: postlen: %d\n", $post_len); + $rest = (int)($header['Content-Length']) - $post_len; + + if ($rest == 0) { + post_manage($post, $line); + } + else { + $cont = $line; } - printf("INFO: postlen: %d\n", $post_len); } break; } @@ -84,7 +212,8 @@ function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cook continue; } $split = explode(":", $line, 2); - $header[$split[0]] = $split[1]; + $hea_id = trim(mb_convert_case($split[0], MB_CASE_TITLE, 'UTF-8')); + $header[$hea_id] = $split[1]; } return $path; } @@ -105,32 +234,50 @@ function gpcs_var($name, $get, $post, $cookie) function headers_render($header, $len) { - $s = ""; - if (isset($header['Location'])) { - return sprintf("HTTP/1.1 302 OK\r\nLocation: %s\r\n\r\n", $header['Location']); - } - else { - $s .= "HTTP/1.1 200 OK\r\n"; + $cookies = ""; + + if (isset($header['cookies'])) { + $cookies = $header['cookies']->render(); + unset($header['cookies']); } - if (!isset($header['Date'])) - $s .= sprintf("Date: %s\r\n", date(DATE_RFC822)); - if (!isset($header['Connection'])) - $s .= "Connection: close\r\n"; - if (!isset($header['Content-Type'])) - $s .= "Content-Type: text/html\r\n"; - foreach($header as $key => $value) { - $s .= sprintf("%s: %s\r\n", $key, $value); + if (isset($header['Location'])) { + $s = sprintf("HTTP/1.1 302 OK\r\n%sLocation: %s\r\n", $cookies, $header['Location']); } - if ($len >= 0) { - $s .= sprintf("Content-Length: %d\r\n", $len); + else if (isset($header['HTTP-Response'])) { + $s = sprintf("HTTP/1.1 %s\r\n", $header['HTTP-Response']); + foreach($header as $key => $value) { + if (strtolower($key) == "http-response") + continue; + $s .= sprintf("%s: %s\r\n", $key, $value); + } + if ($len >= 0) { + $s .= sprintf("Content-Length: %ld\r\n", $len); + } } else { - $s .= "Cache-Control: no-cache, must-revalidate\r\n"; - $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n"; - if (!isset($header['Content-Encoding'])) { - $s .= "Content-Encoding: chunked\r\n"; + $s = "HTTP/1.1 200 OK\r\n"; + + if (!isset($header['Date'])) + $s .= sprintf("Date: %s\r\n", date(DATE_RFC822)); + if (!isset($header['Connection'])) + $s .= "Connection: close\r\n"; + if (!isset($header['Content-Type'])) + $s .= "Content-Type: text/html\r\n"; + foreach($header as $key => $value) { + $s .= sprintf("%s: %s\r\n", $key, $value); + } + if ($len >= 0) { + $s .= sprintf("Content-Length: %d\r\n", $len); + } + else { + $s .= "Cache-Control: no-cache, must-revalidate\r\n"; + $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n"; + if (!isset($header['Content-Encoding'])) { + $s .= "Content-Encoding: chunked\r\n"; + } + $s .= "Transfer-Encoding: chunked\r\n"; } - $s .= "Transfer-Encoding: chunked\r\n"; + $s .= $cookies; } $s .= "\r\n"; @@ -153,25 +300,6 @@ register_shutdown_function('shutta'); * MAIN */ -function chunked_content($zls, $content) -{ - if ($zls) { - $cont_comp = $zls->compress_chunk($content); - } - else { - $cont_comp = $content; - } - $cont_comp_l = mb_strlen($cont_comp, "ASCII"); - printf("CHUNK: [%s]\n", $content); - - return (sprintf("%X\r\n", $cont_comp_l).$cont_comp."\r\n"); -} - -function chunked_fini() -{ - return sprintf("0\r\n"); -} - function get_encoding($header) { $enc = "plain"; @@ -189,24 +317,130 @@ function get_encoding($header) return ($enc); } +class Cookie { + var $attr; + // Set-Cookie: reg_fb_gate=deleted; Expires=Thu, 01-Jan-1970 00:00:01 GMT; Path=/; Domain=.foo.com; HttpOnly + // string $name [, string $value [, int $expire = 0 [, string $path [, string $domain [, bool $secure = false [, bool $httponly = false ]]]]]] ) + function Cookie() + { + $this->attr = array(); + } + + static function create($name) + { + $thiz = new Cookie(); + + $thiz->attr[$name] = ""; + + $argc = func_num_args(); + for ($i = 1 ; $i < $argc ; $i++) { + $arg = func_get_arg($i); + switch ($i) { + case 1: + $thiz->attr[$name] = urlencode($arg); + break; + case 2: + $thiz->attr['Expires'] = gmdate('D, d M Y H:i:s \G\M\T', $arg); // RFC 1211 format + break; + case 3: + $thiz->attr['Path'] = $arg; + break; + case 4: + $thiz->attr['Domain'] = $arg; + break; + case 5: + if ($arg == TRUE) { + $thiz->attr['Secure'] = NULL; + } + break; + case 6: + if ($arg == TRUE) { + $thiz->attr['HttpOnly'] = NULL; + } + break; + default: + return FALSE; + } + } + + return $thiz; + } + + function render() + { + $r = "Set-Cookie: "; + $isfirst = TRUE; + + foreach ($this->attr as $k => $v) { + if ($v == NULL) { + $r .= sprintf("%s%s", ($isfirst ? "" : "; "), $k); + } + else { + $r .= sprintf("%s%s=%s", ($isfirst ? "" : "; "), $k, $v); + } + $isfirst = FALSE; + } + $r .= "\r\n"; + + return $r; + } +} + +class Cookies { + var $cookies; + + function Cookies() + { + $this->cookies = array(); + } + + function add($name) + { + if (($cookie = call_user_func_array("Cookie::create", func_get_args())) == FALSE) + return (FALSE); + + array_push($this->cookies, $cookie); + + return (TRUE); + } + + function render() + { + $r = ""; + foreach ($this->cookies as $cookie) { + $r .= $cookie->render(); + } + + return ($r); + } +} + class Sac_a_push { - static $fixed_fd = 2; + // maybe fixed_fd is unuseful + static $fixed_fd = 3; + static $cnt_master = NULL; + static $cnt_slave = NULL; + + var $provider_proxy; // list of provider/browser that offer proxy service var $file_socket; var $unix_socket; + var $direct_socket; // socket where read direct commands var $socks; - var $s2u; - var $pages_flush; + var $s2u; // user associated with input socket + var $s2p; // pending page associated with input socket + var $pending_pages; + var $is_daemon; - var $list; + var $list_web; + var $list_cmd; var $in; var $debug; var $blocking_mode; var $app; - var $bin5; var $curtime; @@ -217,26 +451,70 @@ class Sac_a_push { { } - // Sac_a_push::create("/tmp/brisk.sock", 0, 0 + function sig_handler($sig) + { + switch ($sig) { + case SIGINT: + exit(1); + break; + case SIGTERM: + if (static::$cnt_master != NULL) { + fwrite(static::$cnt_master, "\nshutdown\n"); + fflush(static::$cnt_master); + } + else { + exit(1); + } + break; + case SIGHUP: + if (static::$cnt_master != NULL) { + fwrite(static::$cnt_master, "\nreload\n"); + fflush(static::$cnt_master); + } + break; + } + } - static function create(&$app, $sockname, $debug, $blocking_mode) + static function create(&$app, $sockname, $debug, $blocking_mode, $provider_proxy, $argv) { $thiz = new Sac_a_push(); $thiz->app = $app; + + $thiz->provider_proxy = ProviderProxy::create(); + $thiz->file_socket = $sockname; $thiz->unix_socket = "unix://$sockname"; + $thiz->direct_socket = "unix://${sockname}2"; $thiz->debug = $debug; $thiz->socks = array(); $thiz->s2u = array(); - $thiz->pages_flush = array(); + $thiz->s2p = array(); + $thiz->pending_pages = array(); + $thiz->is_daemon = FALSE; + + if (array_search("-d", $argv) !== FALSE || array_search("--daemon", $argv) !== FALSE) { + $thiz->is_daemon = TRUE; + } + + // create a couple of sockets for control management + if (($sockpair = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, + STREAM_IPPROTO_IP)) == FALSE) { + return FALSE; + } + static::$cnt_master = $sockpair[0]; + static::$cnt_slave = $sockpair[1]; + + pcntl_signal(SIGTERM, array("Sac_a_push", "sig_handler")); + pcntl_signal(SIGINT, array("Sac_a_push", "sig_handler")); + pcntl_signal(SIGHUP, array("Sac_a_push", "sig_handler")); $thiz->blocking_mode = 0; // 0 for non-blocking $thiz->rndstr = ""; for ($i = 0 ; $i < 4096 ; $i++) { if (($i % 128) == 0) - $thiz->rndstr .= "\n"; + $thiz->rndstr .= " "; else $thiz->rndstr .= chr(mt_rand(65, 90)); } @@ -244,52 +522,106 @@ class Sac_a_push { if (file_exists($thiz->file_socket)) { unlink($thiz->file_socket); } + if (file_exists($thiz->file_socket."2")) { + unlink($thiz->file_socket."2"); + } $old_umask = umask(0); - if (($thiz->list = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) { + if (($thiz->list_web = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) { + return (FALSE); + } + if (($thiz->list_cmd = stream_socket_server($thiz->direct_socket, $err, $errs)) === FALSE) { return (FALSE); } umask($old_umask); - stream_set_blocking($thiz->list, $thiz->blocking_mode); # Set the stream to non-blocking + stream_set_blocking($thiz->list_web, $thiz->blocking_mode); # Set the stream to non-blocking + stream_set_blocking($thiz->list_cmd, $thiz->blocking_mode); # Set the stream to non-blocking if (($thiz->in = fopen("php://stdin", "r")) === FALSE) { return(FALSE); } $thiz->main_loop = FALSE; + $thiz->reload(TRUE, $provider_proxy); return ($thiz); } - function socks_set($sock, $user) + function socks_set($sock, $user, $pendpage) { $id = intval($sock); - $this->s2u[$id] = $user; $this->socks[$id] = $sock; + if ($user != NULL) + $this->s2u[$id] = $user; + if ($pendpage != NULL) + $this->s2p[$id] = $pendpage; } function socks_unset($sock) { $id = intval($sock); - unset($this->s2u[$id]); + if (isset($this->s2u[$id])) + unset($this->s2u[$id]); + if (isset($this->s2p[$id])) + unset($this->s2p[$id]); unset($this->socks[$id]); } - function pgflush_try_add($enc, &$new_socket, $tout, $header_out, $content) + function pendpage_try_addcont(&$new_socket, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest, $cont) { - $pgflush = new PageFlush($new_socket, $enc, $this->curtime, $tout, $header_out, $content); + $pendpage = PendingPage::pendingpage_continue( $new_socket, $this->curtime, $tout, $method, + $header, $get, $post, $cookie, + $path, $addr, $rest, $cont); - if ($pgflush->try_flush($this->curtime) == FALSE) { - // Add $pgflush to the pgflush array - $this->pgflush_add($pgflush); + $pendpage->try_flush($this->curtime); + // Add $pendpage to the pendpage array (in any case) + // fprintf(STDERR, "IMPORTANT: Pendadd: %d\n", $pendpage->status); + $this->pendpage_add($pendpage); + } + + function pendpage_try_addflush(&$new_socket, $tout, $enc, $header_out, $content) + { + $pendpage = PendingPage::pendingpage_flushing($new_socket, $this->curtime, $tout, $enc, $header_out, $content); + + if ($pendpage->try_flush($this->curtime) == FALSE) { + // Add $pendpage to the pendpage array + $this->pendpage_add($pendpage); } } - function pgflush_add($pgflush) + function pendpage_add($pendpage) { - array_push($this->pages_flush, $pgflush); + array_push($this->pending_pages, $pendpage); + $this->socks_set($pendpage->socket_get(), NULL, $pendpage); + } + + function pendpage_rem($pendpage) + { + $sock = $pendpage->socket_get(); + if (($key = array_search($pendpage, $this->pending_pages)) !== FALSE) { + unset($this->pending_pages[$key]); + } + else { + fprintf(STDERR, "WARNING: pendpage not found\n"); + } + $this->socks_unset($sock); + fprintf(STDERR, "PP_REM: %d\n", intval($sock)); + } + + + function pendpage_try_addwait(&$new_socket, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest, $cont) + { + $pendpage = PendingPage::pendingpage_waiting($new_socket, $this->curtime, $tout, $method, $header, $get, $post, $cookie, $path, $addr, $rest, $cont); + /* + if ($pendpage->try_flush($this->curtime) == FALSE) { + // Add $pendpage to the pendpage array + */ + $this->pendpage_add($pendpage); + /* + } + */ } function garbage_manager($force) @@ -297,21 +629,58 @@ class Sac_a_push { $this->app->garbage_manager($force); foreach ($this->socks as $k => $sock) { - if ($this->s2u[intval($sock)]->sess == '') { - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); + $id = intval($sock); + if (isset($this->s2u[$id])) { + $user = $this->s2u[$id]; + if ($user->the_end) { + if (($user->rd_toflush == FALSE && $user->rd_step == $user->step) + || $user->rd_endtime_is_expired($this->curtime)) { + if ($user->rd_socket_get() != NULL) { + $user->rd_socket_set(NULL); + } + unset($this->socks[$id]); + unset($this->s2u[$id]); + fclose($sock); + // printf("CLOSE ON GARBAGE MANAGER\n"); + } } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); - fclose($sock); - printf("CLOSE ON GARBAGE MANAGER\n"); } } + $this->app->users_cleanup(); + } + + function check_globals() + { + GLOBAL $_globals_list; + foreach ($_globals_list as $g) { + if (!array_search($g, $GLOBALS) || !isset($GLOBALS[$g])) { + error_log(sprintf("Global [%s] not declared", $g)); + return FALSE; + } + } + return TRUE; } function run() { - GLOBAL $DOCUMENT_ROOT, $HTTP_HOST, $G_with_splash; + GLOBAL $DOCUMENT_ROOT, $HTTP_HOST; + + GLOBAL $G_alarm_passwd, $G_ban_list, $G_black_list, $G_cloud_smasher, $G_provider_proxy; + GLOBAL $G_btrace_pref_sub, $G_dbauth; + GLOBAL $G_dbpfx, $G_donors_all, $G_donors_cur, $G_is_local, $G_lang; + GLOBAL $G_poll_entries, $G_poll_name, $G_poll_title, $G_proxy_white_list; + GLOBAL $G_room_roadmap, $G_shutdown; + GLOBAL $G_splash_content, $G_splash_contents, $G_splash_cont_idx; + GLOBAL $G_splash_h, $G_splash_idx, $G_splash_interval, $G_splash_timeout; + GLOBAL $G_splash_w, $G_topbanner, $G_with_donors, $G_with_poll; + GLOBAL $G_with_splash, $G_sidebanner, $G_sidebanner_idx; + GLOBAL $G_with_topbanner; + GLOBAL $G_tos_vers, $G_tos_fname, $G_tos_dtsoft, $G_tos_dthard, $G_tos_idx, $G_doc_path; + + if (!$this->check_globals()) { + fprintf(STDERR, "Take a look to the phplog file, GLOBALS missing!\n"); + sleep(10); + } if ($this->main_loop) { return (FALSE); @@ -319,17 +688,30 @@ class Sac_a_push { $this->main_loop = TRUE; + $lastime = 0; + $dump_users = TRUE; while ($this->main_loop) { + $this->app->sess_cur_set(FALSE); $this->curtime = time(); - printf("IN LOOP: Current opened: %d pages_flush: %d - ", count($this->socks), count($this->pages_flush)); + if ($lastime != ($this->curtime >> 2)) { + fprintf(STDERR, "\nIN LOOP: Current opened: %d pending_pages: %d\n", count($this->socks), count($this->pending_pages)); + } /* Prepare the read array */ /* // when we manage it ... */ /* if ($shutdown) */ /* $read = array_merge(array("$in" => $in), $socks); */ /* else */ - $read = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in), - $this->socks); + $pre_read = array_merge(array(intval($this->list_web) => $this->list_web, + intval($this->list_cmd) => $this->list_cmd, + intval(static::$cnt_slave) => static::$cnt_slave), + $this->socks); + if ($this->is_daemon == FALSE) { + $read = array_merge($pre_read, array(intval($this->in) => $this->in)); + } + else { + $read = $pre_read; + } if ($this->debug > 1) { printf("PRE_SELECT\n"); @@ -337,26 +719,31 @@ class Sac_a_push { } $write = NULL; $except = NULL; - $num_changed_sockets = stream_select($read, $write, $except, 0, 500000); + $num_changed_sockets = @stream_select($read, $write, $except, 0, 500000); if ($num_changed_sockets == 0) { - printf(" no data in 5 secs, splash [%d]\n", $G_with_splash); + // printf(" no data in 5 secs, splash [%d]\n", $G_with_splash); + ; } else if ($num_changed_sockets > 0) { - printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read)); + if ($lastime != ($this->curtime >> 2)) { + printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read)); + } if ($this->debug > 1) { print_r($read); } /* At least at one of the sockets something interesting happened */ foreach ($read as $i => $sock) { + $id = intval($sock); + $manage_page = FALSE; /* is_resource check is required because there is the possibility that during new request an old connection is closed */ if (!is_resource($sock)) { continue; } - if ($sock === $this->list) { - printf("NUOVA CONNEX\n"); - if (($new_unix = stream_socket_accept($this->list)) == FALSE) { + if ($sock === $this->list_web) { + // printf("NUOVA CONNEX\n"); + if (($new_unix = stream_socket_accept($this->list_web)) == FALSE) { printf("SOCKET_ACCEPT FAILED\n"); continue; } @@ -365,101 +752,200 @@ class Sac_a_push { $get = array(); $post = array(); $cookie = array(); + $rest = 0; + $cont = ""; if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { - printf("NEW_SOCKET: %d\n", intval($new_socket)); + // printf("NEW_SOCKET: %d\n", intval($new_socket)); stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking - printf("RECEIVED HEADER:\n%s", $stream_info); - $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); - printf("PATH: [%s]\n", $path); - printf("M: %s\nHEADER:\n", $method); - print_r($header); - printf("GET:\n"); - print_r($get); - printf("POST:\n"); - print_r($post); - printf("COOKIE:\n"); - print_r($cookie); - $addr = stream_socket_get_name($new_socket, TRUE); - $header_out = array(); - - $enc = get_encoding($header); - - $subs = SITE_PREFIX."briskin5/"; - $subs_l = strlen($subs); - $rret = FALSE; - if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) { - $rret = $this->app->request_mgr($this, $enc, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie); + // error_log(sprintf("RECEIVED HEADER:\n%s", $stream_info)); + if (($path = spu_process_info($stream_info, $method, $header, + $get, $post, $cookie, $rest, $cont)) + == FALSE) { + fprintf(STDERR, "TODO: fix wrong header management\n"); + } + + // We try to get real IP from header (passed by proxy) and then fallback to direct connection IP + // error_log(sprintf("addr: [%s]", $addr)); + // error_log(sprintf("X-Real-Ip: [%s]", array_key_exists('X-Real-Ip', $header) ? $header['X-Real-Ip'] : "Not exists")); + if (array_key_exists('X-Real-Ip', $header)) { + $addr = $header['X-Real-Ip']; + } + else { + $addr = addrtoipv4(stream_socket_get_name($new_socket, TRUE)); + } + + // FOR TEST $header['X-Forwarded-For'] = '154.155.22.33'; + $addr = $this->pproxy_realip($header, $addr); + + // printf("PATH: [%s] [%s]\n", $path, print_r($header, TRUE)); + if ($method == "POST" && $rest > 0) { + if (isset($header['Expect']) && $header['Expect'] == '100-continue') { + // fprintf(STDERR, "\nPOSTA DE CHE\n\n"); + $this->pendpage_try_addcont($new_socket, 20, + $method, $header, $get, $post, $cookie, + $path, $addr, $rest, $cont); + } + else { + $this->pendpage_try_addwait($new_socket, 20, + $method, $header, $get, $post, $cookie, + $path, $addr, $rest, $cont); + } } - if ($rret == FALSE) { - // FIXME: manage 404 !!! - printf("TODO: fix unknown page\n"); + else { + $manage_page = TRUE; } - printf("number of sockets after %d\n", count($this->socks)); + + // printf("number of sockets after %d\n", count($this->socks)); } else { printf("WARNING: ancillary_getstream failed\n"); } } - else { - $buf = fread($sock, 512); + else if ($sock === $this->list_cmd) { + // printf("NUOVA DIRECT CONNEX\n"); + if (($new_unix = stream_socket_accept($this->list_cmd)) == FALSE) { + printf("SOCKET_ACCEPT FAILED\n"); + continue; + } + stream_set_blocking($new_unix, $this->blocking_mode); + $this->direct_mgmt($new_unix); + } // not socket_list nor socket_list_cmd + else { // already opened socket + $buf = fread($sock, 4096); // if socket is closed - if ($buf == FALSE || strlen($buf) == 0) { + if ($buf == FALSE || feof($sock)) { + // close socket case if ($buf == FALSE) { - printf("ERROR READING\n"); + // printf("INFO: read return false\n"); + ; + } + if ($sock === $this->list_web) { + // printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); + return(21); } - if ($sock === $this->list) { - printf("Arrivati %d bytes da list\n", strlen($buf)); - exit(21); + else if ($sock === $this->list_cmd) { + // printf("Arrivati %d bytes da list_cmd\n", mb_strlen($buf, "ASCII")); + return(23); } - else if ($sock === $this->in) { - printf("Arrivati %d bytes da stdin\n", strlen($buf)); - exit(22); + else if ($sock === $this->in || $sock === static::$cnt_slave) { + // printf("Arrivati %d bytes da stdin\n", mb_strlen($buf, "ASCII")); + return(22); } else { - // $user_a[$s2u[intval($sock)]]->disable(); - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); + unset($this->socks[$id]); + if (isset($this->s2u[$id])) { + // $user_a[$s2u[$id]]->disable(); + if ($this->s2u[$id]->rd_socket_get() != NULL) { + // try to send close frame (for websocket) + $clo = $this->s2u[$id]->stream_close(); + $clo_l = mb_strlen($clo, "ASCII"); + @fwrite($sock, $clo, $clo_l); + $this->s2u[$id]->rd_socket_set(NULL); + } + unset($this->s2u[$id]); } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); } fclose($sock); - printf("CLOSE ON READ\n"); + // printf("CLOSE ON READ\n"); if ($this->debug > 1) { printf("post unset\n"); print_r($this->socks); } - } - else { + } // if ($buf == FALSE || mb_strlen($buf, "ASCII") == 0) { + else { // data on the socket if ($this->debug > 1) { print_r($read); } - if ($sock === $this->list) { - printf("Arrivati %d bytes da list\n", strlen($buf)); + if ($sock === $this->list_web) { + // printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); + ; + } + else if ($sock === $this->list_cmd) { + // printf("Arrivati %d bytes da list_cmd\n", mb_strlen($buf, "ASCII")); + ; } - else if ($sock === $this->in) { - printf("Arrivati %d bytes da stdin\n", strlen($buf)); + else if ($sock === $this->in || $sock === static::$cnt_slave) { + // printf("Arrivati %d bytes da stdin\n", mb_strlen($buf, "ASCII")); $line = trim($buf); if ($line == "reload") { require("$DOCUMENT_ROOT/Etc/".BRISK_CONF); + $this->reload(FALSE, $G_provider_proxy); + $this->app->reload(FALSE, $G_ban_list, $G_black_list, + $G_cloud_smasher); + if (!$this->check_globals()) { + fprintf(STDERR, "Take a look to the phplog file, GLOBALS missing!\n"); + sleep(10); + } + global_dump(); + } + else if ($line == "dump") { + $dump_users = TRUE; + } + else if ($line == "shutdown" || $line == "sd") { + if ($this->app->dump_data()) { + return(0); + } + else { + return(1); + } } } - else { + else { // data arrived from not special socket $key = array_search("$sock", $this->socks); - printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key); + // fprintf(STDERR, "Arrivati %d bytes dalla socket n. %d\n", mb_strlen($buf, "ASCII"), $key); + if (isset($this->s2p[$id])) { + $this->s2p[$id]->rest -= mb_strlen($buf, "ASCII"); + $this->s2p[$id]->cont .= $buf; + if ($this->s2p[$id]->rest <= 0) { + $header = $new_socket = $path = $addr = $get = $cookie = 0; + $post = array(); + + $this->s2p[$id]->context_get($header, $new_socket, $path, $addr, $get, $post, $cookie); + $this->pendpage_rem($this->s2p[$id]); + fprintf(STDERR, "SOCKET RUN: %s\n", $new_socket); + + $manage_page = TRUE; + } + } } } } + + if ($manage_page == TRUE) { + /* + printf("M: %s\nHEADER:\n", $method); + print_r($header); + printf("GET:\n"); + print_r($get); + printf("POST:\n"); + print_r($post); + printf("COOKIE:\n"); + print_r($cookie); + */ + + $header_out = array(); + // TODO: MOVE DOWN request_mgr to factorize new_sockets and POST closed + $rret = FALSE; + if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) { + $rret = $this->app->request_mgr($this, $header, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie); + } + if ($rret == FALSE) { + // FIXME: manage 404 !!! + printf("TODO: fix unknown page\n"); + fclose($new_socket); + } + } } } $this->garbage_manager(FALSE); /* manage unfinished pages */ - foreach ($this->pages_flush as $k => $pgflush) { - if ($pgflush->try_flush($this->curtime) == TRUE) { - unset($this->pages_flush[$k]); + foreach ($this->pending_pages as $k => $pendpage) { + // TODO: try_flush if exists in the class + if ($pendpage->try_flush($this->curtime) == TRUE) { + unset($this->pending_pages[$k]); } } @@ -471,8 +957,16 @@ class Sac_a_push { /* manage open streaming */ foreach ($this->socks as $k => $sock) { - if (isset($this->s2u[intval($sock)])) { - $user = $this->s2u[intval($sock)]; + $id = intval($sock); + if (isset($this->s2u[$id])) { + $user = $this->s2u[$id]; + + if ($user->rd_toflush) { + if (fflush($sock) == FALSE) + continue; + else + $user->rd_toflush = FALSE; + } $response = $user->rd_cache_get(); $do_ping = FALSE; if (($this->curtime - $user->lacc) > (EXPIRE_TIME_RD / 3)) { @@ -485,7 +979,9 @@ class Sac_a_push { if ($response == "") { $content = ""; $user->stream_main($content, $get, $post, $cookie); - printf("[%s] [%d] [%d]\n", $user->name, $user->lacc, $this->curtime); + if ($dump_users) { + printf("[%s] [%d] [%d]\n", $user->name, $user->lacc, $this->curtime); + } if ($do_ping && $user->ping_req == FALSE) { $content .= $user->stream_keepalive(TRUE); $user->ping_req = TRUE; @@ -494,15 +990,15 @@ class Sac_a_push { $content = $user->stream_keepalive(FALSE); } if ($content != "") { - $response = chunked_content($user->rd_zls_get(), $content); + $response = $user->chunked_content($content); } } if ($response != "") { // echo "SPIA: [".substr($response, 0, 60)."...]\n"; - echo "SPIA: [".$response."]\n"; + // echo "SPIA: [".$response."]\n"; $response_l = mb_strlen($response, "ASCII"); - $wret = @fwrite($sock, $response); + $wret = @fwrite($sock, $response, $response_l); if ($wret < $response_l) { printf("TROUBLE WITH FWRITE: %d\n", $wret); $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII")); @@ -510,25 +1006,130 @@ class Sac_a_push { else { $user->rd_cache_set(""); } - fflush($sock); + if (fflush($sock) == FALSE) { + $user->rd_toflush = TRUE; + continue; + } $user->rd_kalive_reset($this->curtime); } // close socket after a while to prevent client memory consumption if ($user->rd_endtime_is_expired($this->curtime)) { - if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { - $this->s2u[intval($sock)]->rd_socket_set(NULL); + if ($this->s2u[$id]->rd_socket_get() != NULL) { + $this->s2u[$id]->rd_socket_set(NULL); } - unset($this->socks[intval($sock)]); - unset($this->s2u[intval($sock)]); + unset($this->socks[$id]); + unset($this->s2u[$id]); + $clo = $user->stream_close(); + $clo_l = mb_strlen($clo, "ASCII"); + @fwrite($sock, $clo, $clo_l); fclose($sock); - printf("CLOSE ON LOOP\n"); + // printf("CLOSE ON LOOP\n"); } - } + } // if (isset($this->s2u[$id]... } // foreach ($this->socks... - printf("\n"); + $dump_users = FALSE; + printf("#"); + if (defined('CURL_DE_SAC_VERS')) { + $this->app->cds->process(); + } + $lastime = ($this->curtime >> 2); } // while (... } // function run(... -} + function pproxy_realip(&$header, $ip_str) + { + return ($this->provider_proxy->realip($header, $ip_str)); + } + + function reload($is_first, $prov_proxy) + { + fprintf(STDERR, "SAP RELOAD STUFF (%d)\n", count($prov_proxy)); + + $this->provider_proxy->update($prov_proxy); + } + + function direct_command($cmdstr) + { + GLOBAL $G_alarm_passwd; + + $cmd = cmd_deserialize($cmdstr); + + if (!isset($cmd['cmd'])) { + return cmd_return(500, 'no cmd found'); + } + // "cmd" => "userauth", "sess" => 'xxxxxxxxxxx', 'private' => 'it_must_be_correct', + // 'the_end' => 'true' ); + // cmd=userauth&sess=52d796ac08c47&private=yourpasswd192.168.122.152d796ac08c47&the_end=true + if ($cmd['cmd'] == 'userauth') { + if (!isset($cmd['sess']) || !isset($cmd['private'])) { + return cmd_return(401, 'malformed cmd'); + } + $idx = -1; + if (($user = $this->app->get_user($cmd['sess'], $idx)) == FALSE) + return cmd_return(402, 'user not found'); + + if (($user->flags & USER_FLAG_TY_ADMIN) == 0x00) + return cmd_return(403, 'permission denied'); + + if (md5($G_alarm_passwd.$user->ip.$user->sess) != $cmd['private']) + return cmd_return(404, 'authentication failed ['.$cmd['private'].']['.$G_alarm_passwd.$user->ip.$user->sess.']'); + + return cmd_return(200, 'success'); + } + + return cmd_return(501, 'no cmd found'); + } + + function direct_mgmt($socket) + { + printf("DIRECT: begin\n"); + $st = DIRECT_ST_READ; + $cmd_all = ""; + $endtime = $this->curtime + 3; + + while(time() <= $endtime) { + printf("DIRECT: init loop %d\n", $st); + if ($st == DIRECT_ST_READ) { + $buf = fread($socket, 4096); + if ($buf == FALSE && feof($socket)) { + break; + } + else if ($buf != FALSE && strlen($buf) > 0) { + $cmd_all .= $buf; + + if (substr(trim($cmd_all), -13) == "&the_end=true") { + $output_arr = $this->direct_command($cmd_all); + $output = cmd_serialize($output_arr); + $output_cur = 0; + $output_len = mb_strlen($output, "ASCII"); + $st = DIRECT_ST_WRITE; + continue; + } + } + } + else if ($st == DIRECT_ST_WRITE) { + $ret = fwrite($socket, $output, $output_len); + if ($ret === FALSE) { + if (feof($socket)) { + break; + } + } + else if ($ret > 0 && $ret < $output_len) { + $output = substr($output, -($output_len - $ret)); + $output_len -= $ret; + continue; + } + else if ($ret == $output_len) { + fclose($socket); + return TRUE; + } + } + usleep(10000); + } + + fclose($socket); + return FALSE; + } +} // class Sac_a_push ?>