decoupling brisk / sac-a-push, add methods to sac-a-push to manage add and remove...
authorMatteo Nastasi <nastasi@alternativeoutput.it>
Fri, 3 Aug 2012 16:06:13 +0000 (18:06 +0200)
committerMatteo Nastasi <nastasi@alternativeoutput.it>
Fri, 3 Aug 2012 16:06:13 +0000 (18:06 +0200)
web/Obj/brisk.phh
web/spush/brisk-spush.php

index 52e408a..000722b 100644 (file)
@@ -2169,6 +2169,80 @@ class Room {
       return (FALSE);
   }
 
+  function request_mgr(&$s_a_p, &$header_out, &$new_socket, $path, $addr, $get, $post, $cookie)
+  {
+      printf("NEW_SOCKET (root): %d\n", intval($new_socket));
+
+      switch ($path) {
+      case SITE_PREFIX:
+      case SITE_PREFIX."index.php":
+          ob_start();
+      index_main($this, $header_out, $addr, $get, $post, $cookie);
+      $content = ob_get_contents();
+      ob_end_clean();
+
+      $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content);
+      return TRUE;
+
+      break;
+      case SITE_PREFIX."index_wr.php":
+          ob_start();
+          index_wr_main($this, $addr, $get, $post, $cookie);
+          $content = ob_get_contents();
+          ob_end_clean();
+
+          $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content);
+          return TRUE;
+
+          break;
+      case SITE_PREFIX."index_rd_ifra.php":
+          do {
+              if (!isset($cookie['sess'])
+                  || (($user = $this->get_user($cookie['sess'], $idx)) == FALSE)) {
+                  $content = User::stream_fini(TRUE);
+                  
+                  $s_a_p->pgflush_try_add($new_socket, 20, $header_out, $content);
+                  return TRUE;
+
+                  break;
+              }
+              // close a previous opened index_read_ifra socket, if exists
+              if (($prev = $user->rd_socket_get()) != NULL) {
+                  $s_a_p->socks_unset($user->rd_socket_get());
+                  fclose($user->rd_socket_get());
+                  printf("CLOSE AND OPEN AGAIN ON IFRA2\n");
+                  $user->rd_socket_set(NULL);
+              }
+              
+              $content = "";
+              $user->stream_init($header_out, $content, $get, $post, $cookie);
+              
+              $response = headers_render($header_out, -1).chunked_content($content);
+              $response_l = mb_strlen($response, "ASCII");
+              
+              $wret = @fwrite($new_socket, $response, $response_l);
+              if ($wret < $response_l) {
+                  printf("TROUBLES WITH FWRITE: %d\n", $wret);
+                  $user->rd_cache_set(mb_substr($content, $wret, $response_l - $wret, "ASCII"));
+              }
+              else {
+                  $user->rd_cache_set("");
+              }
+              fflush($new_socket);
+              
+              
+              $s_a_p->socks_set($new_socket, $user);
+              $user->rd_socket_set($new_socket);
+              printf(" - qui ci siamo - ");
+              return TRUE;
+          } while (FALSE);
+          
+          return FALSE;
+          break;
+      }
+
+      return (FALSE);
+  }
 
 } // end class Room
 
index 4113394..a48aa1f 100755 (executable)
@@ -141,6 +141,8 @@ class Sac_a_push {
     var $room;
     var $bin5;
 
+    var $curtime;
+
     var $rndstr;
     var $main_loop;
 
@@ -150,10 +152,11 @@ class Sac_a_push {
 
     // Sac_a_push::create("/tmp/brisk.sock", 0, 0
 
-    static function create($sockname, $debug, $blocking_mode)
+    static function create(&$room, $sockname, $debug, $blocking_mode)
     {        
         $thiz = new Sac_a_push();
-
+        
+        $thiz->room = $room;
         $thiz->file_socket = $sockname;
         $thiz->unix_socket = "unix://$sockname";
         $thiz->debug = $debug;
@@ -163,12 +166,6 @@ class Sac_a_push {
 
         $thiz->blocking_mode = 0; // 0 for non-blocking
 
-        if (($thiz->room = Room::create()) == FALSE) {
-            log_crit("room::create failed");
-            return FALSE;
-        }
-
-
         $thiz->rndstr = "";
         for ($i = 0 ; $i < 4096 ; $i++) {
             $thiz->rndstr .= chr(mt_rand(65, 90));
@@ -194,6 +191,37 @@ class Sac_a_push {
         return ($thiz);
     }
 
+    function socks_set($sock, $user)
+    {
+        $id = intval($sock);
+
+        $this->s2u[$id]   = $user;
+        $this->socks[$id] = $sock;
+    }
+
+    function socks_unset($sock)
+    {
+        $id = intval($sock);
+
+        unset($this->s2u[$id]);
+        unset($this->socks[$id]);
+    }
+
+    function pgflush_try_add(&$new_socket, $tout, $header_out, $content)
+    {
+        $pgflush = new PageFlush($new_socket, $this->curtime, $tout, $header_out, $content);
+
+        if ($pgflush->try_flush($this->curtime) == FALSE) {
+            // Add $pgflush to the pgflush array
+            $this->pgflush_add($pgflush);
+        }
+    }
+
+    function pgflush_add($pgflush)
+    {
+        array_push($this->pages_flush, $pgflush);
+    }
+
     function run()
     {
         if ($this->main_loop) {
@@ -203,8 +231,8 @@ class Sac_a_push {
         $this->main_loop = TRUE;
         
         while ($this->main_loop) {
-            $curtime = time();
-            printf("IN LOOP: Current opened: %d  pages_flush: %d\n", count($this->socks), count($this->pages_flush));
+            $this->curtime = time();
+            printf("IN LOOP: Current opened: %d  pages_flush: %d - ", count($this->socks), count($this->pages_flush));
             
             /* Prepare the read array */
             /* // when we manage it ... */
@@ -223,7 +251,7 @@ class Sac_a_push {
             $num_changed_sockets = stream_select($read, $write, $except, 0, 250000);
         
             if ($num_changed_sockets == 0) {
-                printf("No data in 5 secs\n");
+                printf(" no data in 5 secs ");
             } 
             else if ($num_changed_sockets > 0) {
                 printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read));
@@ -246,6 +274,7 @@ class Sac_a_push {
                         $post        = array();
                         $cookie      = array();
                         if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) {
+                            printf("NEW_SOCKET: %d\n", intval($new_socket));
                             stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking
                             printf("RECEIVED HEADER:\n%s", $stream_info);
                             $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie);
@@ -262,86 +291,8 @@ class Sac_a_push {
                             $addr = stream_socket_get_name($new_socket, TRUE);
                             $header_out = array();
 
-                            switch ($path) {
-                            case SITE_PREFIX:
-                            case SITE_PREFIX."index.php":
-                                ob_start();
-                                index_main($this->room, $header_out, $addr, $get, $post, $cookie);
-                                $content = ob_get_contents();
-                                ob_end_clean();
-
-                                $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content);
-
-                                if ($pgflush->try_flush($curtime) == FALSE) {
-                                    // Add $pgflush to the pgflush array
-                                    array_push($this->pages_flush, $pgflush);
-                                }
-
-                                break;
-                            case SITE_PREFIX."index_wr.php":
-                                ob_start();
-                                index_wr_main($this->room, $addr, $get, $post, $cookie);
-                                $content = ob_get_contents();
-                                ob_end_clean();
-                                
-                                $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content);
-                                
-                                if ($pgflush->try_flush($curtime) == FALSE) {
-                                    // Add $pgflush to the pgflush array
-                                    array_push($this->pages_flush, $pgflush);
-                                }
-                            break;
-                            case SITE_PREFIX."index_rd_ifra.php":
-                                do {
-                                    if (!isset($cookie['sess'])
-                                        || (($user = $this->room->get_user($cookie['sess'], $idx)) == FALSE)) {
-                                        $content = User::stream_fini(TRUE);
-                                        
-                                        $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content);
-                                        
-                                        if ($pgflush->try_flush($curtime) == FALSE) {
-                                            // Add $pgflush to the pgflush array
-                                            array_push($this->pages_flush, $pgflush);
-                                        }
-                                        break;
-                                    }
-                                    // close a previous opened index_read_ifra socket, if exists
-                                    if (($prev = $user->rd_socket_get()) != NULL) {
-                                        unset($this->s2u[intval($user->rd_socket_get())]);
-                                        unset($this->socks[intval($user->rd_socket_get())]);
-                                        fclose($user->rd_socket_get());
-                                        printf("CLOSE AND OPEN AGAIN ON IFRA2\n");
-                                        $user->rd_socket_set(NULL);
-                                    }
-                                    
-                                    $content = "";
-                                    $user->stream_init($header_out, $content, $get, $post, $cookie);
-                                    
-                                    $response = headers_render($header_out, -1).chunked_content($content);
-                                    $response_l = mb_strlen($response, "ASCII");
-
-                                    $wret = @fwrite($new_socket, $response, $response_l);
-                                    if ($wret < $response_l) {
-                                        printf("TROUBLES WITH FWRITE: %d\n", $wret);
-                                        $user->rd_cache_set(mb_substr($content, $wret, $response_l - $wret, "ASCII"));
-                                    }
-                                    else {
-                                        $user->rd_cache_set("");
-                                    }
-                                    fflush($new_socket);
-                                    
-                                    $this->s2u[intval($new_socket)] = $user;
-                                    $this->socks[intval($new_socket)] = $new_socket;                                
-                                    $user->rd_socket_set($new_socket);
-                                } while (FALSE);
-                                
-                                break;
-                                
-                                /* default: */
-                                /*     $cl = strlen(SITE_PREFIX."briskin5/"); */
-                                /*     if (!strncmp($this->path, SITE_PREFIX."briskin5/", $cl)) { */
-                                /*         Bin5::page_manager($room, $header_out, substr($path,$cl), $method, $addr, $get, $post, $cookie); */
-                            }
+                            $this->room->request_mgr($this, $header_out, $new_socket, $path, $addr, $get, $post, $cookie);
+                            printf("number of sockets after %d\n", count($this->socks));
                         }
                         else {
                             printf("WARNING: ancillary_getstream failed\n");
@@ -396,7 +347,7 @@ class Sac_a_push {
 
             /* manage unfinished pages */
             foreach ($this->pages_flush as $k => $pgflush) {
-                if ($pgflush->try_flush($curtime) == TRUE) {
+                if ($pgflush->try_flush($this->curtime) == TRUE) {
                     unset($this->pages_flush[$k]);
                 }
             }
@@ -410,7 +361,7 @@ class Sac_a_push {
                         $content = "";
                         $user->stream_main($content, $get, $post, $cookie);
                         
-                        if ($content == "" && $user->rd_kalive_is_expired($curtime)) {
+                        if ($content == "" && $user->rd_kalive_is_expired($this->curtime)) {
                             $content = $user->stream_keepalive();
                         }
                         if ($content != "") {
@@ -430,12 +381,11 @@ class Sac_a_push {
                             $user->rd_cache_set("");
                         }
                         fflush($sock);
-                        $user->rd_kalive_reset($curtime);
+                        $user->rd_kalive_reset($this->curtime);
                     }
                     
                     // close socket after a while to prevent client memory consumption
-                    if ($user->rd_endtime_is_expired($curtime)) {
-                        // $user_a[$s2u[intval($sock)]]->disable();
+                    if ($user->rd_endtime_is_expired($this->curtime)) {
                         if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
                             $this->s2u[intval($sock)]->rd_socket_set(NULL);
                         }
@@ -445,14 +395,20 @@ class Sac_a_push {
                         printf("CLOSE ON LOOP\n");
                     }
                 }
-            }
-        }
-    }
+            }  // foreach ($this->socks...
+            printf("\n");
+        }  // while (...
+    }  // function run(...
 }
 
 function main()
 {
-    if (($s_a_p = Sac_a_push::create("/tmp/brisk.sock", 0, 0)) === FALSE) {
+    if (($room = Room::create()) == FALSE) {
+        log_crit("room::create failed");
+        exit(1);
+    }
+
+    if (($s_a_p = Sac_a_push::create($room, "/tmp/brisk.sock", 0, 0)) === FALSE) {
         exit(1);
     }