X-Git-Url: http://mop.ddnsfree.com/gitweb/?a=blobdiff_plain;f=web%2Fspush%2Fbrisk-spush.php;h=a811a2902f3283c58aa616cac8380084a405a4ca;hb=8c1df4ebf55f715144a422cdb4be7eee48f1b5d2;hp=7d893daa9b4ea56504a66bd70f6df0b0c1dd8704;hpb=d96c30d2cdb27c9314474b8902f70fe69d88119b;p=brisk.git diff --git a/web/spush/brisk-spush.php b/web/spush/brisk-spush.php index 7d893da..a811a29 100755 --- a/web/spush/brisk-spush.php +++ b/web/spush/brisk-spush.php @@ -21,107 +21,77 @@ * not, write to the Free Software Foundation, Inc, 59 Temple Place - * Suite 330, Boston, MA 02111-1307, USA. * + * TODO + * + * - partial write for normal page management + * - log legal address fix + * - from room to table + * - from table to room + * - fwrite other issues + * - manage and test cross forwarder between table and room + * - setcookie (for tables only) + * - keepalive management + * - chunked + * + * DONE/FROZEN - problema con getpeer (HOSTADDR) + * + * DONE - index_rd_ifra: last_clean issue + * DONE - fwrite failed error management (select, buffer where store unsent data, and fwrite check and retry) + * ABRT - index_wr.php::reload - reload is js-only function + * DONE - bug: after restart index_rd.php receive from prev clients a lot of req + * DONE - index_wr.php::chat + * DONE - index_wr.php::exit + * DONE - index_rd.php porting + * DONE - generic var management from internet + * DONE - index.php auth part */ $G_base = "../"; +require_once("./sac-a-push.phh"); require_once("./brisk-spush.phh"); -require_once("../Obj/brisk.phh"); +require_once($G_base."Obj/brisk.phh"); +require_once($G_base."Obj/auth.phh"); // require_once("../Obj/proxyscan.phh"); -require_once("./sac-a-push.phh"); - -class SPUser { - var $id; - var $sess; - var $cnt; - var $sock; - - function SPUser($id) - { - $this->id = $id; - $this->cnt = -1; - $this->sock = NULL; - } - - function enable($sock, $sess) - { - $this->sess = $sess; - $this->cnt = 0; - $this->sock = $sock; - - return ($this->id); - } - - function disable() - { - $this->cnt = -1; - $this->sock = NULL; - } - - function is_enable() - { - return ($this->cnt < 0 ? FALSE : TRUE); - } - - function sock_get() - { - return $this->sock; - } - - function sock_set($sock) - { - $this->sock = $sock; - } - - function id_get() - { - return $this->id; - } - - function sess_get() - { - return $this->sess; - } +require_once($G_base."index.php"); +require_once($G_base."index_wr.php"); +require_once($G_base."index_rd_ifra.php"); +require_once($G_base."briskin5/Obj/briskin5.phh"); - function cnt_get() - { - return $this->cnt; - } - - function cnt_inc() - { - return $this->cnt++; - } -} +define('SITE_PREFIX', '/brisk/'); -function user_get_free($user_arr) +function headers_render($header) { - foreach ($user_arr as $i => $user) { - if (!$user->is_enable()) { - return ($user); - } + + $s = ""; + $s .= "HTTP/1.1 200 OK\r\n"; + if (!isset($header['Date'])) + $s .= sprintf("Date: %s\r\n", date(DATE_RFC822)); + if (!isset($header['Connection'])) + $s .= "Connection: close\r\n"; + if (!isset($header['Content-Type'])) + $s .= "Content-Type: text/html\r\n"; + foreach($header as $key => $value) { + $s .= sprintf("%s: %s\r\n", $key, $value); } - return FALSE; -} + $s .= "Mop: was/here\r\n"; + $s .= "\r\n"; -function user_get_sess($user_arr, $sess) -{ - foreach ($user_arr as $i => $user) { - printf("SESS: [%s] cur: [%s]\n", $user->sess_get(), $sess); - if ($user->sess_get() == $sess) { - return ($user); - } - } - return FALSE; + return ($s); } +/* + * Caching system using ob php system to cache old style pages + * to a var and than send it with more calm + */ +$G_headers = ""; function shutta() { log_rd2("SHUTTA [".connection_status()."] !"); } -register_shutdown_function(shutta); +register_shutdown_function('shutta'); /* * MAIN @@ -130,38 +100,35 @@ $shutdown = FALSE; function main() { - GLOBAL $G_lang, $mlang_indrd, $is_page_streaming; - // GLOBAL $first_loop; - GLOBAL $G_with_splash, $G_splash_content, $G_splash_interval, $G_splash_idx; - GLOBAL $G_splash_w, $G_splash_h, $G_splash_timeout; - $CO_splashdate = "CO_splashdate".$G_splash_idx; - GLOBAL $$CO_splashdate; - - GLOBAL $S_load_stat; + GLOBAL $G_headers; + GLOBAL $shutdown; + $main_loop = TRUE; + /* + * INIT + */ + $FILE_SOCKET = "/tmp/brisk.sock"; + $UNIX_SOCKET = "unix://$FILE_SOCKET"; + $debug = 0; + $fixed_fd = 2; + $socks = array(); - GLOBAL $shutdown; - $main_loop = TRUE; + $blocking_mode = 0; // 0 for non-blocking - $user_a = array(); - $s2u = array(); - for ($i = 0 ; $i < 10 ; $i++) { - $user_a[$i] = new SPUser($i, 0, NULL); + if (($room = Room::create()) == FALSE) { + log_crit("load_data failed"); + return FALSE; } + $s2u = array(); + $pages_flush = array(); + $rndstr = ""; for ($i = 0 ; $i < 4096 ; $i++) { $rndstr .= chr(mt_rand(65, 90)); } - $FILE_SOCKET = "/tmp/test001.sock"; - $UNIX_SOCKET = "unix://$FILE_SOCKET"; - $debug = 0; - $fixed_fd = 2; - - $blocking_mode = 0; // 0 for non-blocking - if (file_exists($FILE_SOCKET)) { unlink($FILE_SOCKET); } @@ -172,8 +139,6 @@ function main() } umask($old_umask); - $socks = array(); - if (($in = fopen("php://stdin", "r")) === FALSE) { exit(11); } @@ -181,7 +146,9 @@ function main() stream_set_blocking($list, $blocking_mode); # Set the stream to non-blocking while ($main_loop) { - echo "IN LOOP\n"; + $curtime = time(); + printf("IN LOOP: Current opened: %d pages_flush: %d\n", count($socks), count($pages_flush)); + /* Prepare the read array */ if ($shutdown) $read = array_merge(array("$in" => $in), $socks); @@ -194,7 +161,7 @@ function main() } $write = NULL; $except = NULL; - $num_changed_sockets = stream_select($read, $write, $except, 5); + $num_changed_sockets = stream_select($read, $write, $except, 0, 250000); if ($num_changed_sockets === FALSE) { printf("No data in 5 secs"); @@ -206,49 +173,98 @@ function main() } /* At least at one of the sockets something interesting happened */ foreach ($read as $i => $sock) { + /* is_resource check is required because there is the possibility that + during new request an old connection is closed */ + if (!is_resource($sock)) { + continue; + } if ($sock === $list) { printf("NUOVA CONNEX\n"); $new_unix = stream_socket_accept($list); $stream_info = ""; + $method = ""; + $get = array(); + $post = array(); + $cookie = array(); if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { + stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking printf("RECEIVED HEADER:\n%s", $stream_info); - $m = spu_process_info($stream_info, $header, $get, $post); - printf("M: %s\nHEADER:\n", $m); + $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); + printf("PATH: [%s]\n", $path); + printf("M: %s\nHEADER:\n", $method); print_r($header); printf("GET:\n"); print_r($get); printf("POST:\n"); print_r($post); - - /* TODO: here stuff to decide if it is old or new user */ - if (($user_cur = user_get_sess($user_a, $get['sess'])) != FALSE) { - /* close the previous socket */ - unset($s2u[intval($user_cur->sock_get())]); - unset($socks[intval($user_cur->sock_get())]); - fclose($user_cur->sock_get()); - /* assign the new socket */ - $user_cur->sock_set($new_socket); - $id = $user_cur->id_get(); - $s2u[intval($new_socket)] = $id; - $socks[intval($new_socket)] = $new_socket; - fwrite($new_socket, $rndstr); - fflush($new_socket); - } - else if (($user_cur = user_get_free($user_a)) != FALSE) { - stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking - $socks[intval($new_socket)] = $new_socket; - - $id = $user_cur->id_get(); - $user_a[$id]->enable($new_socket, $get['sess']); - printf("s2u: ci passo %d\n", intval($new_socket)); - $s2u[intval($new_socket)] = $id; + printf("COOKIE:\n"); + print_r($cookie); + + $addr = stream_socket_get_name($new_socket, TRUE); + + switch ($path) { + case SITE_PREFIX: + case SITE_PREFIX."index.php": + $header_out = array(); + ob_start(); + index_main($room, $header_out, $addr, $get, $post, $cookie); + $content = ob_get_contents(); + ob_end_clean(); + + $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); + + if ($pgflush->try_flush($curtime) == FALSE) { + // Add $pgflush to the pgflush array + array_push($pages_flush, $pgflush); + } + + break; + case SITE_PREFIX."index_wr.php": + $header_out = array(); + $addr = ""; + // $ret = socket_getpeername($new_socket, $addr); + printf("RET: %s\n", $addr); + // exit(123); + ob_start(); + index_wr_main($room, $addr, $get, $post, $cookie); + $content = ob_get_contents(); + ob_end_clean(); - fwrite($new_socket, $rndstr); - fflush($new_socket); - } - else { - printf("Too many opened users\n"); + // printf("OUT: [%s]\n", $G_content); + fwrite($new_socket, headers_render($header_out).$content); fclose($new_socket); + break; + case SITE_PREFIX."index_rd_ifra.php": + do { + $header_out = array(); + if (!isset($cookie['sess']) + || (($user = $room->get_user($cookie['sess'], $idx)) == FALSE)) { + $body = index_rd_ifra_fini(TRUE); + fwrite($new_socket, headers_render($header_out).$body); + fflush($new_socket); + fclose($new_socket); + break; + } + // close a previous opened index_read_ifra socket, if exists + if (($prev = $user->rd_socket_get()) != NULL) { + unset($s2u[intval($user->rd_socket_get())]); + unset($socks[intval($user->rd_socket_get())]); + fclose($user->rd_socket_get()); + printf("CLOSE AND OPEN AGAIN ON IFRA2\n"); + $user->rd_socket_set(NULL); + } + + $body = ""; + index_rd_ifra_init($room, $user, $header_out, $body, $get, $post, $cookie); + fwrite($new_socket, headers_render($header_out).$body); + fflush($new_socket); + + $s2u[intval($new_socket)] = $idx; + $socks[intval($new_socket)] = $new_socket; + $user->rd_socket_set($new_socket); + } while (FALSE); + + break; } } else { @@ -268,10 +284,14 @@ function main() printf("Arrivati %d bytes da stdin\n", strlen($buf)); } else { + // $user_a[$s2u[intval($sock)]]->disable(); + if ($room->user[$s2u[intval($sock)]]->rd_socket_get() != NULL) { + $room->user[$s2u[intval($sock)]]->rd_socket_set(NULL); + } unset($socks[intval($sock)]); - $user_a[$s2u[intval($sock)]]->disable(); unset($s2u[intval($sock)]); fclose($sock); + printf("CLOSE ON READ\n"); } if ($debug > 1) { printf("post unset\n"); @@ -298,15 +318,49 @@ function main() } + foreach ($pages_flush as $k => $pgflush) { + if ($pgflush->try_flush($curtime) == TRUE) { + unset($pages_flush[$k]); + } + } + foreach ($socks as $k => $sock) { + if (isset($s2u[intval($sock)])) { + $user = $room->user[$s2u[intval($sock)]]; + $body = $user->rd_cache_get(); + if ($body == "") + index_rd_ifra_main($room, $user, $body); + if ($body == "" && $user->rd_kalive_is_expired($curtime)) { + $body = index_rd_ifra_keepalive($user); + } + if ($body != "") { + echo "SPIA: [".substr($body, 0, 60)."...]\n"; + $body_l = mb_strlen($body, "ASCII"); + $ret = @fwrite($sock, $body); + if ($ret < $body_l) { + printf("TROUBLE WITH FWRITE: %d\n", $ret); + $user->rd_cache_set(mb_substr($body, $ret, $body_l - $ret, "ASCII")); + } + else { + $user->rd_cache_set(""); + } + fflush($sock); + $user->rd_kalive_reset($curtime); + } - if (0 == 1) { // WRITE PART EXAMPLE - // sleep(3); - foreach ($socks as $k => $sock) { - fwrite($sock, sprintf("DI QUI CI PASSO [%d]\n", $user_a[$s2u[intval($sock)]]->cnt_inc())); - fflush($sock); + // close socket after a while to prevent client memory consumption + if ($user->rd_endtime_is_expired($curtime)) { + // $user_a[$s2u[intval($sock)]]->disable(); + if ($room->user[$s2u[intval($sock)]]->rd_socket_get() != NULL) { + $room->user[$s2u[intval($sock)]]->rd_socket_set(NULL); + } + unset($socks[intval($sock)]); + unset($s2u[intval($sock)]); + fclose($sock); + printf("CLOSE ON LOOP\n"); + } } } } @@ -315,4 +369,4 @@ function main() } main(); -?> \ No newline at end of file +?>