X-Git-Url: https://mop.ddnsfree.com/gitweb/?a=blobdiff_plain;f=web%2FObj%2Fsac-a-push.phh;h=1f9a70c2e953c0ebccd5dcbe86a0fdd1df6a12e1;hb=65b3ab569e06032a81b52a88ae109757bedc1b97;hp=10f4f70a57a48278ab3fdaeb060ff1e1c4df9def;hpb=8b0992347fdfdb40715268bb07481d35fc5191fb;p=brisk.git diff --git a/web/Obj/sac-a-push.phh b/web/Obj/sac-a-push.phh index 10f4f70..1f9a70c 100644 --- a/web/Obj/sac-a-push.phh +++ b/web/Obj/sac-a-push.phh @@ -40,7 +40,7 @@ $_globals_list = array( 'G_PG_vow_n', 'G_poll_entries', 'G_poll_name', 'G_poll_title', 'G_provider_proxy', 'G_proxy_white_list', 'G_room_about', 'G_room_help', 'G_room_passwdhowto', 'G_room_roadmap', -'G_selfreg_mask', 'G_selfreg_tout', 'G_shutdown', 'G_sidebanner', +'G_selfreg_mask', 'G_selfreg_tout', 'G_shutdown', 'G_profile', 'G_sidebanner', 'G_sidebanner_idx', 'G_splash_content', 'G_splash_contents', 'G_splash_cont_idx', 'G_splash_h', 'G_splash_idx', 'G_splash_interval', 'G_splash_timeout', 'G_splash_w', 'G_topbanner', 'G_tos_dthard', @@ -51,7 +51,7 @@ function global_dump() GLOBAL $G_alarm_passwd, $G_ban_list, $G_black_list, $G_btrace_pref_sub, $G_dbauth; GLOBAL $G_dbpfx, $G_donors_all, $G_donors_cur, $G_is_local, $G_lang; GLOBAL $G_poll_entries, $G_poll_name, $G_poll_title, $G_proxy_white_list; - GLOBAL $G_room_roadmap, $G_shutdown; + GLOBAL $G_room_roadmap, $G_shutdown, $G_profile; GLOBAL $G_splash_content, $G_splash_contents, $G_splash_cont_idx; GLOBAL $G_splash_h, $G_splash_idx, $G_splash_interval, $G_splash_timeout; GLOBAL $G_splash_w, $G_topbanner, $G_with_donors, $G_with_poll; @@ -74,6 +74,7 @@ function global_dump() fprintf(STDERR, "G_proxy_white_list = [%s]\n", print_r($G_proxy_white_list, TRUE)); fprintf(STDERR, "G_room_roadmap = [%s]\n", print_r($G_room_roadmap, TRUE)); fprintf(STDERR, "G_shutdown = [%s]\n", print_r($G_shutdown, TRUE)); + fprintf(STDERR, "G_profile = [%s]\n", print_r($G_profile, TRUE)); fprintf(STDERR, "G_splash_content = [%s]\n", print_r($G_splash_content, TRUE)); fprintf(STDERR, "G_splash_contents = [%s]\n", print_r($G_splash_contents, TRUE)); fprintf(STDERR, "G_splash_cont_idx = [%s]\n", print_r($G_splash_cont_idx, TRUE)); @@ -424,12 +425,13 @@ class Sac_a_push { var $provider_proxy; // list of provider/browser that offer proxy service - var $file_socket; - var $unix_socket; + var $file_socket_pfx; + var $unix_socket_pfx; var $direct_socket; // socket where read direct commands var $socks; var $s2u; // user associated with input socket var $s2p; // pending page associated with input socket + var $s2c; // ws sockets in closing phase var $pending_pages; var $is_daemon; @@ -475,7 +477,7 @@ class Sac_a_push { } } - static function create(&$app, $sockname, $debug, $blocking_mode, $provider_proxy, $argv) + static function create(&$app, $sockname_pfx, $debug, $blocking_mode, $provider_proxy, $argv) { $thiz = new Sac_a_push(); @@ -483,13 +485,15 @@ class Sac_a_push { $thiz->provider_proxy = ProviderProxy::create(); - $thiz->file_socket = $sockname; - $thiz->unix_socket = "unix://$sockname"; - $thiz->direct_socket = "unix://${sockname}2"; + $thiz->file_socket_pfx = $sockname_pfx; + $thiz->unix_socket_pfx = "unix://$sockname_pfx"; + $thiz->direct_socket = "unix://${sockname_pfx}_admin.sock"; $thiz->debug = $debug; + $thiz->list_web = array(); $thiz->socks = array(); $thiz->s2u = array(); $thiz->s2p = array(); + $thiz->s2c = array(); $thiz->pending_pages = array(); $thiz->is_daemon = FALSE; @@ -519,22 +523,32 @@ class Sac_a_push { $thiz->rndstr .= chr(mt_rand(65, 90)); } - if (file_exists($thiz->file_socket)) { - unlink($thiz->file_socket); + for ($i = 0 ; $i < USOCK_POOL_N ; $i++) { + $file_socket = $thiz->file_socket_pfx . sprintf("%d.sock", $i); + if (file_exists($file_socket)) { + unlink($file_socket); + } } - if (file_exists($thiz->file_socket."2")) { - unlink($thiz->file_socket."2"); + $file_socket_admin = $thiz->file_socket_pfx . "_admin.sock"; + if (file_exists($file_socket_admin)) { + unlink($file_socket_admin); } $old_umask = umask(0); - if (($thiz->list_web = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) { - return (FALSE); + for ($i = 0 ; $i < USOCK_POOL_N ; $i++) { + $unix_socket = sprintf("%s%d.sock", $thiz->unix_socket_pfx, $i); + if (($list_sock = stream_socket_server($unix_socket, $err, $errs)) === FALSE) { + return (FALSE); + } + array_push($thiz->list_web, $list_sock); } if (($thiz->list_cmd = stream_socket_server($thiz->direct_socket, $err, $errs)) === FALSE) { return (FALSE); } umask($old_umask); - stream_set_blocking($thiz->list_web, $thiz->blocking_mode); # Set the stream to non-blocking + for ($i = 0 ; $i < USOCK_POOL_N ; $i++) { + stream_set_blocking($thiz->list_web[$i], $thiz->blocking_mode); # Set the stream to non-blocking + } stream_set_blocking($thiz->list_cmd, $thiz->blocking_mode); # Set the stream to non-blocking if (($thiz->in = fopen("php://stdin", "r")) === FALSE) { @@ -547,15 +561,17 @@ class Sac_a_push { return ($thiz); } - function socks_set($sock, $user, $pendpage) + function socks_set($sock, $user, $pendpage, $postclose = NULL) { $id = intval($sock); $this->socks[$id] = $sock; if ($user != NULL) - $this->s2u[$id] = $user; + $this->s2u[$id] = $user; if ($pendpage != NULL) - $this->s2p[$id] = $pendpage; + $this->s2p[$id] = $pendpage; + if ($postclose != NULL) + $this->s2c[$id] = $postclose; } function socks_unset($sock) @@ -566,6 +582,8 @@ class Sac_a_push { unset($this->s2u[$id]); if (isset($this->s2p[$id])) unset($this->s2p[$id]); + if (isset($this->s2c[$id])) + unset($this->s2c[$id]); unset($this->socks[$id]); } @@ -630,7 +648,14 @@ class Sac_a_push { foreach ($this->socks as $k => $sock) { $id = intval($sock); - if (isset($this->s2u[$id])) { + if (isset($this->s2c[$id])) { + $postclose = $this->s2c[$id]; + if ($postclose->read('', $this->curtime) == 0) { + fclose($sock); + $this->socks_unset($sock); + } + } + else if (isset($this->s2u[$id])) { $user = $this->s2u[$id]; if ($user->the_end) { if (($user->rd_toflush == FALSE && $user->rd_step == $user->step) @@ -669,7 +694,7 @@ class Sac_a_push { GLOBAL $G_btrace_pref_sub, $G_dbauth; GLOBAL $G_dbpfx, $G_donors_all, $G_donors_cur, $G_is_local, $G_lang; GLOBAL $G_poll_entries, $G_poll_name, $G_poll_title, $G_proxy_white_list; - GLOBAL $G_room_roadmap, $G_shutdown; + GLOBAL $G_room_roadmap, $G_shutdown, $G_profile; GLOBAL $G_splash_content, $G_splash_contents, $G_splash_cont_idx; GLOBAL $G_splash_h, $G_splash_idx, $G_splash_interval, $G_splash_timeout; GLOBAL $G_splash_w, $G_topbanner, $G_with_donors, $G_with_poll; @@ -688,12 +713,50 @@ class Sac_a_push { $this->main_loop = TRUE; + $list_web_arr = array(); + for ($i = 0 ; $i < USOCK_POOL_N ; $i++) { + $list_web_arr[intval($this->list_web[$i])] = $this->list_web[$i]; + } + + log_legal(time(), '127.0.0.1', FALSE, 'MAIN', 'LOG:START'); + $lastime = 0; $dump_users = TRUE; + $sock_shard_cur = -1; + + $mtime_start = microtime(TRUE); + $mtsum_idle = 0.0; + $mtsum_read = 0.0; + $mtsum_garb = 0.0; + $mtsum_unfi = 0.0; + $mtsum_mana = 0.0; + while ($this->main_loop) { + $mtime_begin = microtime(TRUE); + $sock_shard_cur = ($sock_shard_cur + 1) % SOCK_SHARD_N; $this->app->sess_cur_set(FALSE); $this->curtime = time(); if ($lastime != ($this->curtime >> 2)) { + if ($G_profile == TRUE) { + $mtime_finish = microtime(TRUE); + $mtime_diff = ($mtime_finish - $mtime_start); + $mtime_idle = 100.0 * $mtsum_idle / $mtime_diff; + $mtime_read = 100.0 * $mtsum_read / $mtime_diff; + $mtime_garb = 100.0 * $mtsum_garb / $mtime_diff; + $mtime_unfi = 100.0 * $mtsum_unfi / $mtime_diff; + $mtime_mana = 100.0 * $mtsum_mana / $mtime_diff; + + log_legal(time(), '127.0.0.1', FALSE, 'MAIN', + sprintf('LOG:PROF: tot(%.3f) idle: %.2f read: %.2f garb: %.2f unfi: %.2f mana: %.2f', + $mtime_diff, $mtime_idle, $mtime_read, $mtime_garb, $mtime_unfi, $mtime_mana + )); + $mtsum_idle = 0.0; + $mtsum_read = 0.0; + $mtsum_garb = 0.0; + $mtsum_unfi = 0.0; + $mtsum_mana = 0.0; + $mtime_start = $mtime_finish; + } fprintf(STDERR, "\nIN LOOP: Current opened: %d pending_pages: %d\n", count($this->socks), count($this->pending_pages)); } @@ -702,10 +765,10 @@ class Sac_a_push { /* if ($shutdown) */ /* $read = array_merge(array("$in" => $in), $socks); */ /* else */ - $pre_read = array_merge(array(intval($this->list_web) => $this->list_web, - intval($this->list_cmd) => $this->list_cmd, - intval(static::$cnt_slave) => static::$cnt_slave), - $this->socks); + $pre_read = array_merge($list_web_arr, array( + intval($this->list_cmd) => $this->list_cmd, + intval(static::$cnt_slave) => static::$cnt_slave), + $this->socks); if ($this->is_daemon == FALSE) { $read = array_merge($pre_read, array(intval($this->in) => $this->in)); } @@ -721,6 +784,10 @@ class Sac_a_push { $except = NULL; $num_changed_sockets = @stream_select($read, $write, $except, 0, 500000); + if ($G_profile) { + $mtime_idle = microtime(TRUE); + $mtsum_idle += $mtime_idle - $mtime_begin; + } if ($num_changed_sockets == 0) { // printf(" no data in 5 secs, splash [%d]\n", $G_with_splash); ; @@ -741,9 +808,11 @@ class Sac_a_push { if (!is_resource($sock)) { continue; } - if ($sock === $this->list_web) { + $list_web_idx = array_search($sock, $this->list_web, TRUE); + if ($list_web_idx !== FALSE) { + $list_web_usock = $this->list_web[$list_web_idx]; // printf("NUOVA CONNEX\n"); - if (($new_unix = stream_socket_accept($this->list_web)) == FALSE) { + if (($new_unix = stream_socket_accept($list_web_usock)) == FALSE) { printf("SOCKET_ACCEPT FAILED\n"); continue; } @@ -810,16 +879,17 @@ class Sac_a_push { stream_set_blocking($new_unix, $this->blocking_mode); $this->direct_mgmt($new_unix); } // not socket_list nor socket_list_cmd - else { // already opened socket + else if ($id % SOCK_SHARD_N == $sock_shard_cur) { // already opened socket $buf = fread($sock, 4096); // if socket is closed - if ($buf == FALSE || feof($sock)) { + if ($buf == FALSE || mb_strlen($buf, "ASCII") == 0 || feof($sock)) { + $postclose = NULL; // close socket case if ($buf == FALSE) { // printf("INFO: read return false\n"); ; } - if ($sock === $this->list_web) { + if (array_search($sock, $this->list_web, TRUE) !== FALSE) { // printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); return(21); } @@ -838,6 +908,7 @@ class Sac_a_push { if ($this->s2u[$id]->rd_socket_get() != NULL) { // try to send close frame (for websocket) $clo = $this->s2u[$id]->stream_close(); + $postclose = $user->stream_postclose_get($sock, $this->curtime); $clo_l = mb_strlen($clo, "ASCII"); @fwrite($sock, $clo, $clo_l); $this->s2u[$id]->rd_socket_set(NULL); @@ -845,7 +916,13 @@ class Sac_a_push { unset($this->s2u[$id]); } } - fclose($sock); + if ($postclose != NULL) { + // print("POSTCLOSE found!"); + $this->socks_set($sock, NULL, NULL, $postclose); + } + else { + fclose($sock); + } // printf("CLOSE ON READ\n"); if ($this->debug > 1) { @@ -857,7 +934,7 @@ class Sac_a_push { if ($this->debug > 1) { print_r($read); } - if ($sock === $this->list_web) { + if (array_search($sock, $this->list_web, TRUE) !== FALSE) { // printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); ; } @@ -896,41 +973,67 @@ class Sac_a_push { // fprintf(STDERR, "Arrivati %d bytes dalla socket n. %d\n", mb_strlen($buf, "ASCII"), $key); if (isset($this->s2u[$id])) { + // + // TODO: + // fix $addr + // fix $this->pendpage_try_addflush below (probably not required) + // + $addr = "127.0.0.1"; $user = $this->s2u[$id]; // fprintf(STDERR, 'POST USER'); if ($user && $user->rd_transp && strpos($user->rd_transp->type, "websocket") !== FALSE) { - $clie_cmd = $user->rd_transp->unchunk($buf); + $clie_cmd = $user->rd_transp->unchunk($buf, $sock); $clie_cmd = json_decode($clie_cmd, TRUE); // fprintf(STDERR, "HERE WE ARE INCOMING DATA [%s]\n", print_r($clie_cmd, TRUE)); - ob_start(); - // complete: index_wr_main($this->app, $addr, $get, $post, $cookie); - index_wr_main($this->app, $addr, $clie_cmd, NULL, NULL); - $content = ob_get_contents(); - ob_end_clean(); - - // $this->pendpage_try_addflush($new_socket, 20, $enc, $header_out, $content); - // FIXME ^ - // fprintf(STDERR, "RETURNED CONTENT [%s]\n", $content); + $wr_addr = substr(parse_url($clie_cmd["target"])["path"], strlen(SITE_PREFIX)); + if ($wr_addr == "index_wr.php") { + ob_start(); + // complete: index_wr_main($this->app, $addr, $get, $post, $cookie); + index_wr_main($this->app, $addr, $clie_cmd, NULL, NULL); + $content = ob_get_contents(); + ob_end_clean(); + } + else if ($wr_addr == "briskin5/index_wr.php") { + $table_idx = $clie_cmd['table_idx']; + $table_token = $clie_cmd['table_token']; + + if (($bri = $this->app->match_get($table_idx, $table_token)) != FALSE) { + ob_start(); + bin5_index_wr_main($bri, $addr, $clie_cmd, NULL, NULL); + $content = ob_get_contents(); + ob_end_clean(); + } + } + else { + fprintf(STDERR, "Unknown page [%s]\n", $wr_addr); + } + /* + briskin5/index_wr.php + + if (isset($table_idx) && isset($table_token)) { + if (($bri = $s_a_p->app->match_get($table_idx, $table_token)) != FALSE) { + ob_start(); + bin5_index_wr_main($bri, $addr, $get, $post, $cookie); + $content = ob_get_contents(); + ob_end_clean(); + } + else { + $content = "Bin5 Load data error"; + } + } + else { + $content = "Bin5 Load data error"; + } + $s_a_p->pendpage_try_addflush($new_socket, 20, $enc, $header_out, $content); + */ } } else { fprintf(STDERR, "User associated with ID: %s not found\n", $id); } - if (isset($this->s2u[$id])) { - $user = $this->s2u[$id]; - - fprintf(STDERR, 'POST USER'); - if ($user && $user->rd_transp && strpos($user->rd_transp->type, "websocket") !== FALSE) { - fprintf(STDERR, "HERE WE ARE INCOMING DATA [%s]\n", $user->rd_transp->deframe($buf)); - - } - } - else { - fprintf(STDERR, "REC ID: %s\n", $id); - } if (isset($this->s2p[$id])) { $this->s2p[$id]->rest -= mb_strlen($buf, "ASCII"); $this->s2p[$id]->cont .= $buf; @@ -945,6 +1048,17 @@ class Sac_a_push { $manage_page = TRUE; } } + + // postclose case + if (isset($this->s2c[$id])) { + $postclose = $this->s2c[$id]; + // printf("POSTCLOSE: found pc in s2c list\n"); + if ($postclose->read($buf, $this->curtime) == 0) { + printf("POSTCLOSE: received end opcode, close\n"); + fclose($sock); + $this->socks_unset($sock); + } + } } } } @@ -969,15 +1083,24 @@ class Sac_a_push { } if ($rret == FALSE) { // FIXME: manage 404 !!! - printf("TODO: fix unknown page\n"); + printf("TODO: fix unknown page: %s\n", $path); fclose($new_socket); } } } } + if ($G_profile) { + $mtime_read = microtime(TRUE); + $mtsum_read += $mtime_read - $mtime_idle; + } $this->garbage_manager(FALSE); + if ($G_profile) { + $mtime_garb = microtime(TRUE); + $mtsum_garb += $mtime_garb - $mtime_read; + } + /* manage unfinished pages */ foreach ($this->pending_pages as $k => $pendpage) { // TODO: try_flush if exists in the class @@ -986,6 +1109,11 @@ class Sac_a_push { } } + if ($G_profile) { + $mtime_unfi = microtime(TRUE); + $mtsum_unfi += $mtime_unfi - $mtime_garb; + } + /* $response: raw stream data not sent $content: html consistent data (