X-Git-Url: https://mop.ddnsfree.com/gitweb/?a=blobdiff_plain;f=web%2FObj%2Fsac-a-push.phh;h=a52b5749bb1d8639c51dbee2af2a561cc6aa5b5a;hb=538a0d6de8e953d27fb8b0f0eb664fc33b297761;hp=cfebe2050eea133e1ab38258670ad6dbd5554608;hpb=8d7cdc537ab4b1329fe0ef8c4d976545c12e14c3;p=brisk.git diff --git a/web/Obj/sac-a-push.phh b/web/Obj/sac-a-push.phh index cfebe20..a52b574 100644 --- a/web/Obj/sac-a-push.phh +++ b/web/Obj/sac-a-push.phh @@ -424,12 +424,13 @@ class Sac_a_push { var $provider_proxy; // list of provider/browser that offer proxy service - var $file_socket; - var $unix_socket; + var $file_socket_pfx; + var $unix_socket_pfx; var $direct_socket; // socket where read direct commands var $socks; var $s2u; // user associated with input socket var $s2p; // pending page associated with input socket + var $s2c; // ws sockets in closing phase var $pending_pages; var $is_daemon; @@ -475,7 +476,7 @@ class Sac_a_push { } } - static function create(&$app, $sockname, $debug, $blocking_mode, $provider_proxy, $argv) + static function create(&$app, $sockname_pfx, $debug, $blocking_mode, $provider_proxy, $argv) { $thiz = new Sac_a_push(); @@ -483,13 +484,15 @@ class Sac_a_push { $thiz->provider_proxy = ProviderProxy::create(); - $thiz->file_socket = $sockname; - $thiz->unix_socket = "unix://$sockname"; - $thiz->direct_socket = "unix://${sockname}2"; + $thiz->file_socket_pfx = $sockname_pfx; + $thiz->unix_socket_pfx = "unix://$sockname_pfx"; + $thiz->direct_socket = "unix://${sockname_pfx}_admin.sock"; $thiz->debug = $debug; + $thiz->list_web = array(); $thiz->socks = array(); $thiz->s2u = array(); $thiz->s2p = array(); + $thiz->s2c = array(); $thiz->pending_pages = array(); $thiz->is_daemon = FALSE; @@ -519,22 +522,32 @@ class Sac_a_push { $thiz->rndstr .= chr(mt_rand(65, 90)); } - if (file_exists($thiz->file_socket)) { - unlink($thiz->file_socket); + for ($i = 0 ; $i < USOCK_POOL_N ; $i++) { + $file_socket = $thiz->file_socket_pfx . sprintf("%d.sock", $i); + if (file_exists($file_socket)) { + unlink($file_socket); + } } - if (file_exists($thiz->file_socket."2")) { - unlink($thiz->file_socket."2"); + $file_socket_admin = $thiz->file_socket_pfx . "_admin.sock"; + if (file_exists($file_socket_admin)) { + unlink($file_socket_admin); } $old_umask = umask(0); - if (($thiz->list_web = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) { - return (FALSE); + for ($i = 0 ; $i < USOCK_POOL_N ; $i++) { + $unix_socket = sprintf("%s%d.sock", $thiz->unix_socket_pfx, $i); + if (($list_sock = stream_socket_server($unix_socket, $err, $errs)) === FALSE) { + return (FALSE); + } + array_push($thiz->list_web, $list_sock); } if (($thiz->list_cmd = stream_socket_server($thiz->direct_socket, $err, $errs)) === FALSE) { return (FALSE); } umask($old_umask); - stream_set_blocking($thiz->list_web, $thiz->blocking_mode); # Set the stream to non-blocking + for ($i = 0 ; $i < USOCK_POOL_N ; $i++) { + stream_set_blocking($thiz->list_web[$i], $thiz->blocking_mode); # Set the stream to non-blocking + } stream_set_blocking($thiz->list_cmd, $thiz->blocking_mode); # Set the stream to non-blocking if (($thiz->in = fopen("php://stdin", "r")) === FALSE) { @@ -547,15 +560,17 @@ class Sac_a_push { return ($thiz); } - function socks_set($sock, $user, $pendpage) + function socks_set($sock, $user, $pendpage, $postclose = NULL) { $id = intval($sock); $this->socks[$id] = $sock; if ($user != NULL) - $this->s2u[$id] = $user; + $this->s2u[$id] = $user; if ($pendpage != NULL) - $this->s2p[$id] = $pendpage; + $this->s2p[$id] = $pendpage; + if ($postclose != NULL) + $this->s2c[$id] = $postclose; } function socks_unset($sock) @@ -566,6 +581,8 @@ class Sac_a_push { unset($this->s2u[$id]); if (isset($this->s2p[$id])) unset($this->s2p[$id]); + if (isset($this->s2c[$id])) + unset($this->s2c[$id]); unset($this->socks[$id]); } @@ -630,7 +647,14 @@ class Sac_a_push { foreach ($this->socks as $k => $sock) { $id = intval($sock); - if (isset($this->s2u[$id])) { + if (isset($this->s2c[$id])) { + $postclose = $this->s2c[$id]; + if ($postclose->read('', $this->curtime) == 0) { + fclose($sock); + $this->socks_unset($sock); + } + } + else if (isset($this->s2u[$id])) { $user = $this->s2u[$id]; if ($user->the_end) { if (($user->rd_toflush == FALSE && $user->rd_step == $user->step) @@ -688,9 +712,16 @@ class Sac_a_push { $this->main_loop = TRUE; + $list_web_arr = array(); + for ($i = 0 ; $i < USOCK_POOL_N ; $i++) { + $list_web_arr[intval($this->list_web[$i])] = $this->list_web[$i]; + } + $lastime = 0; $dump_users = TRUE; + $sock_shard_cur = -1; while ($this->main_loop) { + $sock_shard_cur = ($sock_shard_cur + 1) % SOCK_SHARD_N; $this->app->sess_cur_set(FALSE); $this->curtime = time(); if ($lastime != ($this->curtime >> 2)) { @@ -702,10 +733,10 @@ class Sac_a_push { /* if ($shutdown) */ /* $read = array_merge(array("$in" => $in), $socks); */ /* else */ - $pre_read = array_merge(array(intval($this->list_web) => $this->list_web, - intval($this->list_cmd) => $this->list_cmd, - intval(static::$cnt_slave) => static::$cnt_slave), - $this->socks); + $pre_read = array_merge($list_web_arr, array( + intval($this->list_cmd) => $this->list_cmd, + intval(static::$cnt_slave) => static::$cnt_slave), + $this->socks); if ($this->is_daemon == FALSE) { $read = array_merge($pre_read, array(intval($this->in) => $this->in)); } @@ -741,9 +772,11 @@ class Sac_a_push { if (!is_resource($sock)) { continue; } - if ($sock === $this->list_web) { + $list_web_idx = array_search($sock, $this->list_web, TRUE); + if ($list_web_idx !== FALSE) { + $list_web_usock = $this->list_web[$list_web_idx]; // printf("NUOVA CONNEX\n"); - if (($new_unix = stream_socket_accept($this->list_web)) == FALSE) { + if (($new_unix = stream_socket_accept($list_web_usock)) == FALSE) { printf("SOCKET_ACCEPT FAILED\n"); continue; } @@ -810,16 +843,17 @@ class Sac_a_push { stream_set_blocking($new_unix, $this->blocking_mode); $this->direct_mgmt($new_unix); } // not socket_list nor socket_list_cmd - else { // already opened socket + else if ($id % SOCK_SHARD_N == $sock_shard_cur) { // already opened socket $buf = fread($sock, 4096); // if socket is closed - if ($buf == FALSE || feof($sock)) { + if ($buf == FALSE || mb_strlen($buf, "ASCII") == 0 || feof($sock)) { + $postclose = NULL; // close socket case if ($buf == FALSE) { // printf("INFO: read return false\n"); ; } - if ($sock === $this->list_web) { + if (array_search($sock, $this->list_web, TRUE) !== FALSE) { // printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); return(21); } @@ -838,6 +872,7 @@ class Sac_a_push { if ($this->s2u[$id]->rd_socket_get() != NULL) { // try to send close frame (for websocket) $clo = $this->s2u[$id]->stream_close(); + $postclose = $user->stream_postclose_get($sock, $this->curtime); $clo_l = mb_strlen($clo, "ASCII"); @fwrite($sock, $clo, $clo_l); $this->s2u[$id]->rd_socket_set(NULL); @@ -845,7 +880,13 @@ class Sac_a_push { unset($this->s2u[$id]); } } - fclose($sock); + if ($postclose != NULL) { + // print("POSTCLOSE found!"); + $this->socks_set($sock, NULL, NULL, $postclose); + } + else { + fclose($sock); + } // printf("CLOSE ON READ\n"); if ($this->debug > 1) { @@ -857,7 +898,7 @@ class Sac_a_push { if ($this->debug > 1) { print_r($read); } - if ($sock === $this->list_web) { + if (array_search($sock, $this->list_web, TRUE) !== FALSE) { // printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); ; } @@ -906,7 +947,7 @@ class Sac_a_push { // fprintf(STDERR, 'POST USER'); if ($user && $user->rd_transp && strpos($user->rd_transp->type, "websocket") !== FALSE) { - $clie_cmd = $user->rd_transp->unchunk($buf); + $clie_cmd = $user->rd_transp->unchunk($buf, $sock); $clie_cmd = json_decode($clie_cmd, TRUE); // fprintf(STDERR, "HERE WE ARE INCOMING DATA [%s]\n", print_r($clie_cmd, TRUE)); @@ -929,6 +970,9 @@ class Sac_a_push { ob_end_clean(); } } + else { + fprintf(STDERR, "Unknown page [%s]\n", $wr_addr); + } /* briskin5/index_wr.php @@ -954,18 +998,6 @@ class Sac_a_push { fprintf(STDERR, "User associated with ID: %s not found\n", $id); } - if (isset($this->s2u[$id])) { - $user = $this->s2u[$id]; - - fprintf(STDERR, 'POST USER'); - if ($user && $user->rd_transp && strpos($user->rd_transp->type, "websocket") !== FALSE) { - fprintf(STDERR, "HERE WE ARE INCOMING DATA [%s]\n", $user->rd_transp->deframe($buf)); - - } - } - else { - fprintf(STDERR, "REC ID: %s\n", $id); - } if (isset($this->s2p[$id])) { $this->s2p[$id]->rest -= mb_strlen($buf, "ASCII"); $this->s2p[$id]->cont .= $buf; @@ -980,6 +1012,17 @@ class Sac_a_push { $manage_page = TRUE; } } + + // postclose case + if (isset($this->s2c[$id])) { + $postclose = $this->s2c[$id]; + // printf("POSTCLOSE: found pc in s2c list\n"); + if ($postclose->read($buf, $this->curtime) == 0) { + printf("POSTCLOSE: received end opcode, close\n"); + fclose($sock); + $this->socks_unset($sock); + } + } } } } @@ -1004,7 +1047,7 @@ class Sac_a_push { } if ($rret == FALSE) { // FIXME: manage 404 !!! - printf("TODO: fix unknown page\n"); + printf("TODO: fix unknown page: %s\n", $path); fclose($new_socket); } } @@ -1087,15 +1130,21 @@ class Sac_a_push { // close socket after a while to prevent client memory consumption if ($user->rd_endtime_is_expired($this->curtime)) { - if ($this->s2u[$id]->rd_socket_get() != NULL) { - $this->s2u[$id]->rd_socket_set(NULL); + $postclose = $user->stream_postclose_get($sock, $this->curtime); + if ($user->rd_socket_get() != NULL) { + $user->rd_socket_set(NULL); } unset($this->socks[$id]); unset($this->s2u[$id]); $clo = $user->stream_close(); $clo_l = mb_strlen($clo, "ASCII"); @fwrite($sock, $clo, $clo_l); - fclose($sock); + if ($postclose) { + $this->socks_set($sock, NULL, NULL, $postclose); + } + else { + fclose($sock); + } // printf("CLOSE ON LOOP\n"); } } // if (isset($this->s2u[$id]... @@ -1197,7 +1246,8 @@ class Sac_a_push { return TRUE; } } - usleep(10000); + // probably not needed + // usleep(10000); } fclose($socket);