bdb4b593c82b50241498a94b572fb9705a02342b
[brisk.git] / web / Obj / sac-a-push.phh
1 <?php
2
3 /*
4  *  brisk - spush/sac-a-push.phh
5  *
6  *  Copyright (C) 2012 Matteo Nastasi
7  *                          mailto: nastasi@alternativeoutput.it 
8  *                                  matteo.nastasi@milug.org
9  *                          web: http://www.alternativeoutput.it
10  *
11  * This program is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation; either version 2 of the License, or
14  * (at your option) any later version.
15  *
16  * This program is distributed in the hope that it will be useful, but
17  * WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABLILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  * General Public License for more details. You should have received a
20  * copy of the GNU General Public License along with this program; if
21  * not, write to the Free Software Foundation, Inc, 59 Temple Place -
22  * Suite 330, Boston, MA 02111-1307, USA.
23  *
24  */
25
26 define('SITE_PREFIX', '/brisk/');
27
28 function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cookie)
29 {
30     $check_post = FALSE;
31     $header = array();
32     $get = array();
33     $post = array();
34     foreach(preg_split("/(\r?\n)/", $stream_info) as $line) {
35         // printf("LINE: [%s]\n", $line);
36         if ($check_post) {
37             if (!isset($header['The-Request'])) {
38                 return FALSE;
39             }
40             $req = explode(" ", $header['The-Request']);
41             $method = $req[0];
42
43             if (isset($header['Cookie'])) {
44                 $cookies = explode(";", $header['Cookie']);
45                 for ($i = 0 ; $i < count($cookies) ; $i++) {
46                     $nameval = explode("=", trim($cookies[$i]));
47                     if (count($nameval) != 2) {
48                         printf("WARNING: malformat cookie element [%s]\n", $cookies[$i]);
49                         continue;
50                     }
51                     $cookie[$nameval[0]] = urldecode($nameval[1]);
52                 }
53             }
54             // GET params management
55             $get_vars = explode('?', $req[1], 2);
56             $path =   $get_vars[0];
57             if (count($get_vars) > 1) {
58                 $a = explode('&', $get_vars[1]);
59                 printf("A COUNT: [%s] %d\n", $a[0], count($a));
60                 for ($i = 0 ; $i < count($a) ; $i++) {
61                     $b = explode('=', $a[$i]);
62                     $get[$b[0]] = urldecode($b[1]);
63                 }
64             }
65             // POST params management
66             if ($req[0] == 'POST') {
67                 if ($header['Content-Type'] != 'application/x-www-form-urlencoded' 
68                     || !isset($header['Content-Length'])) {
69                     return FALSE;
70                 }
71                 $post_len = mb_strlen($line, "latin1");
72                 $a = explode('&', $line);
73                 for ($i = 0 ; $i < count($a) ; $i++) {
74                     $b = explode('=', $a[$i]);
75                     $post[$b[0]] = urldecode($b[1]);
76                 }
77                 printf("INFO: postlen: %d\n", $post_len);
78             }
79             break;
80         }
81         if ($line == "") {
82             $check_post = TRUE;
83             continue;
84         }
85         $split = explode(":", $line, 2);
86         $header[$split[0]] = $split[1];        
87     }
88     return $path;
89 }
90
91 function gpcs_var($name, $get, $post, $cookie)
92 {
93     if (isset($GLOBALS[$name])) 
94         return FALSE;
95     else if (isset($cookie[$name])) 
96         return ($cookie[$name]);
97     else if (isset($post[$name])) 
98         return ($post[$name]);
99     else if (isset($get[$name])) 
100         return ($get[$name]);
101
102     return FALSE;
103 }
104
105 function headers_render($header, $len)
106 {
107     
108     $s = "";
109     $s .= "HTTP/1.1 200 OK\r\n";
110     if (!isset($header['Date']))
111         $s .= sprintf("Date: %s\r\n", date(DATE_RFC822));
112     if (!isset($header['Connection']))
113         $s .= "Connection: close\r\n";
114     if (!isset($header['Content-Type']))
115         $s .= "Content-Type: text/html\r\n";
116     foreach($header as $key => $value) {
117         $s .= sprintf("%s: %s\r\n", $key, $value);
118     }
119     if ($len == -1) {
120         $s .= "Cache-Control: no-cache, must-revalidate\r\n";
121         $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n";
122         $s .= "Content-Encoding: chunked\r\n";
123         $s .= "Transfer-Encoding: chunked\r\n";
124     }
125     else if ($len > 0) {
126         $s .= sprintf("Content-Length: %d\r\n", $len);
127     }
128     $s .= "\r\n";
129
130     return ($s);
131 }
132
133 /*
134  *  Caching system using ob php system to cache old style pages
135  *  to a var and than send it with more calm
136  */
137
138 function shutta()
139 {
140   log_rd2("SHUTTA [".connection_status()."] !");
141 }
142
143 register_shutdown_function('shutta');
144
145 /*
146  *  MAIN
147  */
148
149 function chunked_content($content)
150 {
151     $content_l = mb_strlen($content, "ASCII");
152
153     return (sprintf("%X\r\n%s\r\n", $content_l, $content));
154 }
155
156 function chunked_fini()
157 {
158     return sprintf("0\r\n");
159 }
160
161 class Sac_a_push {
162     static $fixed_fd = 2;
163     
164     var $file_socket;
165     var $unix_socket;
166     var $socks;
167     var $s2u;
168     var $pages_flush;
169
170     var $list;
171     var $in;
172
173     var $debug;
174     var $blocking_mode;
175
176     var $app;
177     var $bin5;
178
179     var $curtime;
180
181     var $rndstr;
182     var $main_loop;
183
184     function Sac_a_push()
185     {
186     }
187
188     // Sac_a_push::create("/tmp/brisk.sock", 0, 0
189
190     static function create(&$app, $sockname, $debug, $blocking_mode)
191     {        
192         $thiz = new Sac_a_push();
193         
194         $thiz->app = $app;
195         $thiz->file_socket = $sockname;
196         $thiz->unix_socket = "unix://$sockname";
197         $thiz->debug = $debug;
198         $thiz->socks = array();
199         $thiz->s2u  = array();
200         $thiz->pages_flush = array();
201
202         $thiz->blocking_mode = 0; // 0 for non-blocking
203
204         $thiz->rndstr = "";
205         for ($i = 0 ; $i < 4096 ; $i++) {
206             $thiz->rndstr .= chr(mt_rand(65, 90));
207         }
208         
209         if (file_exists($thiz->file_socket)) {
210             unlink($thiz->file_socket);
211         }
212     
213         $old_umask = umask(0);
214         if (($thiz->list = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) {
215             return (FALSE);
216         }
217         umask($old_umask);
218         stream_set_blocking($thiz->list, $thiz->blocking_mode); # Set the stream to non-blocking
219
220         if (($thiz->in = fopen("php://stdin", "r")) === FALSE) {
221             return(FALSE);
222         }
223
224         $thiz->main_loop = FALSE;
225
226         return ($thiz);
227     }
228
229     function socks_set($sock, $user)
230     {
231         $id = intval($sock);
232
233         $this->s2u[$id]   = $user;
234         $this->socks[$id] = $sock;
235     }
236
237     function socks_unset($sock)
238     {
239         $id = intval($sock);
240
241         unset($this->s2u[$id]);
242         unset($this->socks[$id]);
243     }
244
245     function pgflush_try_add(&$new_socket, $tout, $header_out, $content)
246     {
247         $pgflush = new PageFlush($new_socket, $this->curtime, $tout, $header_out, $content);
248
249         if ($pgflush->try_flush($this->curtime) == FALSE) {
250             // Add $pgflush to the pgflush array
251             $this->pgflush_add($pgflush);
252         }
253     }
254
255     function pgflush_add($pgflush)
256     {
257         array_push($this->pages_flush, $pgflush);
258     }
259
260     function run()
261     {
262         if ($this->main_loop) {
263             return (FALSE);
264         }
265         
266         $this->main_loop = TRUE;
267         
268         while ($this->main_loop) {
269             $this->curtime = time();
270             printf("IN LOOP: Current opened: %d  pages_flush: %d - ", count($this->socks), count($this->pages_flush));
271             
272             /* Prepare the read array */
273             /* // when we manage it ... */
274             /* if ($shutdown)  */
275             /*     $read   = array_merge(array("$in" => $in), $socks); */
276             /* else */
277             $read   = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in),
278                                   $this->socks);
279             
280             if ($this->debug > 1) {
281                 printf("PRE_SELECT\n");
282                 print_r($read);
283             }
284             $write  = NULL;
285             $except = NULL;
286             $num_changed_sockets = stream_select($read, $write, $except, 0, 250000);
287         
288             if ($num_changed_sockets == 0) {
289                 printf(" no data in 5 secs ");
290             } 
291             else if ($num_changed_sockets > 0) {
292                 printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read));
293                 if ($this->debug > 1) {
294                     print_r($read);
295                 }
296                 /* At least at one of the sockets something interesting happened */
297                 foreach ($read as $i => $sock) {
298                     /* is_resource check is required because there is the possibility that
299                        during new request an old connection is closed */
300                     if (!is_resource($sock)) {
301                         continue;
302                     }
303                     if ($sock === $this->list) {
304                         printf("NUOVA CONNEX\n");
305                         $new_unix = stream_socket_accept($this->list);
306                         $stream_info = "";
307                         $method      = "";
308                         $get         = array();
309                         $post        = array();
310                         $cookie      = array();
311                         if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) {
312                             printf("NEW_SOCKET: %d\n", intval($new_socket));
313                             stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking
314                             printf("RECEIVED HEADER:\n%s", $stream_info);
315                             $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie);
316                             printf("PATH: [%s]\n", $path);
317                             printf("M: %s\nHEADER:\n", $method);
318                             print_r($header);
319                             printf("GET:\n");
320                             print_r($get);
321                             printf("POST:\n");
322                             print_r($post);
323                             printf("COOKIE:\n");
324                             print_r($cookie);
325
326                             $addr = stream_socket_get_name($new_socket, TRUE);
327                             $header_out = array();
328
329                             $this->app->request_mgr($this, $header_out, $new_socket, $path, $addr, $get, $post, $cookie);
330                             printf("number of sockets after %d\n", count($this->socks));
331                         }
332                         else {
333                             printf("WARNING: ancillary_getstream failed\n");
334                         }
335                     }
336                     else {
337                         if (($buf = fread($sock, 512)) === FALSE) {
338                             printf("error read\n");
339                             exit(123);
340                         }
341                         else if (strlen($buf) === 0) {
342                             if ($sock === $this->list) {
343                                 printf("Arrivati %d bytes da list\n", strlen($buf));
344                             }
345                             else if ($sock === $this->in) {
346                                 printf("Arrivati %d bytes da stdin\n", strlen($buf));
347                             }
348                             else {
349                                 // $user_a[$s2u[intval($sock)]]->disable();
350                                 if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
351                                     $this->s2u[intval($sock)]->rd_socket_set(NULL);
352                                 }
353                                 unset($this->socks[intval($sock)]);
354                                 unset($this->s2u[intval($sock)]);
355                                 fclose($sock);
356                                 printf("CLOSE ON READ\n");
357                             }
358                             if ($this->debug > 1) {
359                                 printf("post unset\n");
360                                 print_r($this->socks);
361                             }
362                         }
363                         else {
364                             if ($debug > 1) {
365                                 print_r($read);
366                             }
367                             if ($sock === $this->list) {
368                                 printf("Arrivati %d bytes da list\n", strlen($buf));
369                             }
370                             else if ($sock === $this->in) {
371                                 printf("Arrivati %d bytes da stdin\n", strlen($buf));
372                             }
373                             else {
374                                 $key = array_search("$sock", $this->socks);
375                                 printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key);
376                             }
377                         }
378                     }
379                 }
380             }
381             
382
383             /* manage unfinished pages */
384             foreach ($this->pages_flush as $k => $pgflush) {
385                 if ($pgflush->try_flush($this->curtime) == TRUE) {
386                     unset($this->pages_flush[$k]);
387                 }
388             }
389             
390             /* manage open streaming */
391             foreach ($this->socks as $k => $sock) {
392                 if (isset($this->s2u[intval($sock)])) {
393                     $user = $this->s2u[intval($sock)];
394                     $response = $user->rd_cache_get();
395                     if ($response == "") {
396                         $content = "";
397                         $user->stream_main($content, $get, $post, $cookie);
398                         
399                         if ($content == "" && $user->rd_kalive_is_expired($this->curtime)) {
400                             $content = $user->stream_keepalive();
401                         }
402                         if ($content != "") {
403                             $response = chunked_content($content);
404                         }
405                     }
406                     
407                     if ($response != "") {
408                         echo "SPIA: [".substr($response, 0, 60)."...]\n";
409                         $response_l = mb_strlen($response, "ASCII");
410                         $wret = @fwrite($sock, $response);
411                         if ($wret < $response_l) {
412                             printf("TROUBLE WITH FWRITE: %d\n", $wret);
413                             $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII"));
414                         }
415                         else {
416                             $user->rd_cache_set("");
417                         }
418                         fflush($sock);
419                         $user->rd_kalive_reset($this->curtime);
420                     }
421                     
422                     // close socket after a while to prevent client memory consumption
423                     if ($user->rd_endtime_is_expired($this->curtime)) {
424                         if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
425                             $this->s2u[intval($sock)]->rd_socket_set(NULL);
426                         }
427                         unset($this->socks[intval($sock)]);
428                         unset($this->s2u[intval($sock)]);
429                         fclose($sock);
430                         printf("CLOSE ON LOOP\n");
431                     }
432                 }
433             }  // foreach ($this->socks...
434             printf("\n");
435         }  // while (...
436     }  // function run(...
437 }
438
439 ?>