From fce0ad98358c7884408699613ad33c92851d45b4 Mon Sep 17 00:00:00 2001 From: Matteo Nastasi Date: Fri, 3 Aug 2012 18:06:13 +0200 Subject: [PATCH] decoupling brisk / sac-a-push, add methods to sac-a-push to manage add and remove of sockets from the new room method request_mgr --- web/Obj/brisk.phh | 74 ++++++++++++++++++ web/spush/brisk-spush.php | 156 ++++++++++++++------------------------ 2 files changed, 130 insertions(+), 100 deletions(-) diff --git a/web/Obj/brisk.phh b/web/Obj/brisk.phh index 52e408a..000722b 100644 --- a/web/Obj/brisk.phh +++ b/web/Obj/brisk.phh @@ -2169,6 +2169,80 @@ class Room { return (FALSE); } + function request_mgr(&$s_a_p, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie) + { + printf("NEW_SOCKET (root): %d\n", intval($new_socket)); + + switch ($path) { + case SITE_PREFIX: + case SITE_PREFIX."index.php": + ob_start(); + index_main($this, $header_out, $addr, $get, $post, $cookie); + $content = ob_get_contents(); + ob_end_clean(); + + $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content); + return TRUE; + + break; + case SITE_PREFIX."index_wr.php": + ob_start(); + index_wr_main($this, $addr, $get, $post, $cookie); + $content = ob_get_contents(); + ob_end_clean(); + + $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content); + return TRUE; + + break; + case SITE_PREFIX."index_rd_ifra.php": + do { + if (!isset($cookie['sess']) + || (($user = $this->get_user($cookie['sess'], $idx)) == FALSE)) { + $content = User::stream_fini(TRUE); + + $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content); + return TRUE; + + break; + } + // close a previous opened index_read_ifra socket, if exists + if (($prev = $user->rd_socket_get()) != NULL) { + $s_a_p->socks_unset($user->rd_socket_get()); + fclose($user->rd_socket_get()); + printf("CLOSE AND OPEN AGAIN ON IFRA2\n"); + $user->rd_socket_set(NULL); + } + + $content = ""; + $user->stream_init($header_out, $content, $get, $post, $cookie); + + $response = headers_render($header_out, -1).chunked_content($content); + $response_l = mb_strlen($response, "ASCII"); + + $wret = @fwrite($new_socket, $response, $response_l); + if ($wret < $response_l) { + printf("TROUBLES WITH FWRITE: %d\n", $wret); + $user->rd_cache_set(mb_substr($content, $wret, $response_l - $wret, "ASCII")); + } + else { + $user->rd_cache_set(""); + } + fflush($new_socket); + + + $s_a_p->socks_set($new_socket, $user); + $user->rd_socket_set($new_socket); + printf(" - qui ci siamo - "); + return TRUE; + } while (FALSE); + + return FALSE; + break; + } + + return (FALSE); + } } // end class Room diff --git a/web/spush/brisk-spush.php b/web/spush/brisk-spush.php index 4113394..a48aa1f 100755 --- a/web/spush/brisk-spush.php +++ b/web/spush/brisk-spush.php @@ -141,6 +141,8 @@ class Sac_a_push { var $room; var $bin5; + var $curtime; + var $rndstr; var $main_loop; @@ -150,10 +152,11 @@ class Sac_a_push { // Sac_a_push::create("/tmp/brisk.sock", 0, 0 - static function create($sockname, $debug, $blocking_mode) + static function create(&$room, $sockname, $debug, $blocking_mode) { $thiz = new Sac_a_push(); - + + $thiz->room = $room; $thiz->file_socket = $sockname; $thiz->unix_socket = "unix://$sockname"; $thiz->debug = $debug; @@ -163,12 +166,6 @@ class Sac_a_push { $thiz->blocking_mode = 0; // 0 for non-blocking - if (($thiz->room = Room::create()) == FALSE) { - log_crit("room::create failed"); - return FALSE; - } - - $thiz->rndstr = ""; for ($i = 0 ; $i < 4096 ; $i++) { $thiz->rndstr .= chr(mt_rand(65, 90)); @@ -194,6 +191,37 @@ class Sac_a_push { return ($thiz); } + function socks_set($sock, $user) + { + $id = intval($sock); + + $this->s2u[$id] = $user; + $this->socks[$id] = $sock; + } + + function socks_unset($sock) + { + $id = intval($sock); + + unset($this->s2u[$id]); + unset($this->socks[$id]); + } + + function pgflush_try_add(&$new_socket, $tout, $header_out, $content) + { + $pgflush = new PageFlush($new_socket, $this->curtime, $tout, $header_out, $content); + + if ($pgflush->try_flush($this->curtime) == FALSE) { + // Add $pgflush to the pgflush array + $this->pgflush_add($pgflush); + } + } + + function pgflush_add($pgflush) + { + array_push($this->pages_flush, $pgflush); + } + function run() { if ($this->main_loop) { @@ -203,8 +231,8 @@ class Sac_a_push { $this->main_loop = TRUE; while ($this->main_loop) { - $curtime = time(); - printf("IN LOOP: Current opened: %d pages_flush: %d\n", count($this->socks), count($this->pages_flush)); + $this->curtime = time(); + printf("IN LOOP: Current opened: %d pages_flush: %d - ", count($this->socks), count($this->pages_flush)); /* Prepare the read array */ /* // when we manage it ... */ @@ -223,7 +251,7 @@ class Sac_a_push { $num_changed_sockets = stream_select($read, $write, $except, 0, 250000); if ($num_changed_sockets == 0) { - printf("No data in 5 secs\n"); + printf(" no data in 5 secs "); } else if ($num_changed_sockets > 0) { printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read)); @@ -246,6 +274,7 @@ class Sac_a_push { $post = array(); $cookie = array(); if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) { + printf("NEW_SOCKET: %d\n", intval($new_socket)); stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking printf("RECEIVED HEADER:\n%s", $stream_info); $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie); @@ -262,86 +291,8 @@ class Sac_a_push { $addr = stream_socket_get_name($new_socket, TRUE); $header_out = array(); - switch ($path) { - case SITE_PREFIX: - case SITE_PREFIX."index.php": - ob_start(); - index_main($this->room, $header_out, $addr, $get, $post, $cookie); - $content = ob_get_contents(); - ob_end_clean(); - - $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); - - if ($pgflush->try_flush($curtime) == FALSE) { - // Add $pgflush to the pgflush array - array_push($this->pages_flush, $pgflush); - } - - break; - case SITE_PREFIX."index_wr.php": - ob_start(); - index_wr_main($this->room, $addr, $get, $post, $cookie); - $content = ob_get_contents(); - ob_end_clean(); - - $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); - - if ($pgflush->try_flush($curtime) == FALSE) { - // Add $pgflush to the pgflush array - array_push($this->pages_flush, $pgflush); - } - break; - case SITE_PREFIX."index_rd_ifra.php": - do { - if (!isset($cookie['sess']) - || (($user = $this->room->get_user($cookie['sess'], $idx)) == FALSE)) { - $content = User::stream_fini(TRUE); - - $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content); - - if ($pgflush->try_flush($curtime) == FALSE) { - // Add $pgflush to the pgflush array - array_push($this->pages_flush, $pgflush); - } - break; - } - // close a previous opened index_read_ifra socket, if exists - if (($prev = $user->rd_socket_get()) != NULL) { - unset($this->s2u[intval($user->rd_socket_get())]); - unset($this->socks[intval($user->rd_socket_get())]); - fclose($user->rd_socket_get()); - printf("CLOSE AND OPEN AGAIN ON IFRA2\n"); - $user->rd_socket_set(NULL); - } - - $content = ""; - $user->stream_init($header_out, $content, $get, $post, $cookie); - - $response = headers_render($header_out, -1).chunked_content($content); - $response_l = mb_strlen($response, "ASCII"); - - $wret = @fwrite($new_socket, $response, $response_l); - if ($wret < $response_l) { - printf("TROUBLES WITH FWRITE: %d\n", $wret); - $user->rd_cache_set(mb_substr($content, $wret, $response_l - $wret, "ASCII")); - } - else { - $user->rd_cache_set(""); - } - fflush($new_socket); - - $this->s2u[intval($new_socket)] = $user; - $this->socks[intval($new_socket)] = $new_socket; - $user->rd_socket_set($new_socket); - } while (FALSE); - - break; - - /* default: */ - /* $cl = strlen(SITE_PREFIX."briskin5/"); */ - /* if (!strncmp($this->path, SITE_PREFIX."briskin5/", $cl)) { */ - /* Bin5::page_manager($room, $header_out, substr($path,$cl), $method, $addr, $get, $post, $cookie); */ - } + $this->room->request_mgr($this, $header_out, $new_socket, $path, $addr, $get, $post, $cookie); + printf("number of sockets after %d\n", count($this->socks)); } else { printf("WARNING: ancillary_getstream failed\n"); @@ -396,7 +347,7 @@ class Sac_a_push { /* manage unfinished pages */ foreach ($this->pages_flush as $k => $pgflush) { - if ($pgflush->try_flush($curtime) == TRUE) { + if ($pgflush->try_flush($this->curtime) == TRUE) { unset($this->pages_flush[$k]); } } @@ -410,7 +361,7 @@ class Sac_a_push { $content = ""; $user->stream_main($content, $get, $post, $cookie); - if ($content == "" && $user->rd_kalive_is_expired($curtime)) { + if ($content == "" && $user->rd_kalive_is_expired($this->curtime)) { $content = $user->stream_keepalive(); } if ($content != "") { @@ -430,12 +381,11 @@ class Sac_a_push { $user->rd_cache_set(""); } fflush($sock); - $user->rd_kalive_reset($curtime); + $user->rd_kalive_reset($this->curtime); } // close socket after a while to prevent client memory consumption - if ($user->rd_endtime_is_expired($curtime)) { - // $user_a[$s2u[intval($sock)]]->disable(); + if ($user->rd_endtime_is_expired($this->curtime)) { if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) { $this->s2u[intval($sock)]->rd_socket_set(NULL); } @@ -445,14 +395,20 @@ class Sac_a_push { printf("CLOSE ON LOOP\n"); } } - } - } - } + } // foreach ($this->socks... + printf("\n"); + } // while (... + } // function run(... } function main() { - if (($s_a_p = Sac_a_push::create("/tmp/brisk.sock", 0, 0)) === FALSE) { + if (($room = Room::create()) == FALSE) { + log_crit("room::create failed"); + exit(1); + } + + if (($s_a_p = Sac_a_push::create($room, "/tmp/brisk.sock", 0, 0)) === FALSE) { exit(1); } -- 2.17.1