Merge branch 'sac-a-push' of mop.mine.nu:brisk into sac-a-push
[brisk.git] / web / Obj / sac-a-push.phh
1 <?php
2
3 /*
4  *  brisk - spush/sac-a-push.phh
5  *
6  *  Copyright (C) 2012 Matteo Nastasi
7  *                          mailto: nastasi@alternativeoutput.it 
8  *                                  matteo.nastasi@milug.org
9  *                          web: http://www.alternativeoutput.it
10  *
11  * This program is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation; either version 2 of the License, or
14  * (at your option) any later version.
15  *
16  * This program is distributed in the hope that it will be useful, but
17  * WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABLILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  * General Public License for more details. You should have received a
20  * copy of the GNU General Public License along with this program; if
21  * not, write to the Free Software Foundation, Inc, 59 Temple Place -
22  * Suite 330, Boston, MA 02111-1307, USA.
23  *
24  */
25
26 define('SITE_PREFIX', '/brisk/');
27 define('SITE_PREFIX_LEN', 7);
28
29 function spu_process_info($stream_info, $method, &$header, &$get, &$post, &$cookie)
30 {
31     $check_post = FALSE;
32     $header = array();
33     $get = array();
34     $post = array();
35     foreach(preg_split("/(\r?\n)/", $stream_info) as $line) {
36         // printf("LINE: [%s]\n", $line);
37         if ($check_post) {
38             if (!isset($header['The-Request'])) {
39                 return FALSE;
40             }
41             $req = explode(" ", $header['The-Request']);
42             $method = $req[0];
43
44             if (isset($header['Cookie'])) {
45                 $cookies = explode(";", $header['Cookie']);
46                 for ($i = 0 ; $i < count($cookies) ; $i++) {
47                     $nameval = explode("=", trim($cookies[$i]));
48                     if (count($nameval) != 2) {
49                         printf("WARNING: malformat cookie element [%s]\n", $cookies[$i]);
50                         continue;
51                     }
52                     $cookie[$nameval[0]] = urldecode($nameval[1]);
53                 }
54             }
55             // GET params management
56             $get_vars = explode('?', $req[1], 2);
57             $path =   $get_vars[0];
58             if (count($get_vars) > 1) {
59                 $a = explode('&', $get_vars[1]);
60                 printf("A COUNT: [%s] %d\n", $a[0], count($a));
61                 for ($i = 0 ; $i < count($a) ; $i++) {
62                     $b = explode('=', $a[$i]);
63                     $get[$b[0]] = urldecode($b[1]);
64                 }
65             }
66             // POST params management
67             if ($req[0] == 'POST') {
68                 if ($header['Content-Type'] != 'application/x-www-form-urlencoded' 
69                     || !isset($header['Content-Length'])) {
70                     return FALSE;
71                 }
72                 $post_len = mb_strlen($line, "latin1");
73                 $a = explode('&', $line);
74                 for ($i = 0 ; $i < count($a) ; $i++) {
75                     $b = explode('=', $a[$i]);
76                     $post[$b[0]] = urldecode($b[1]);
77                 }
78                 printf("INFO: postlen: %d\n", $post_len);
79             }
80             break;
81         }
82         if ($line == "") {
83             $check_post = TRUE;
84             continue;
85         }
86         $split = explode(":", $line, 2);
87         $header[$split[0]] = $split[1];        
88     }
89     return $path;
90 }
91
92 function gpcs_var($name, $get, $post, $cookie)
93 {
94     if (isset($GLOBALS[$name])) 
95         return FALSE;
96     else if (isset($cookie[$name])) 
97         return ($cookie[$name]);
98     else if (isset($post[$name])) 
99         return ($post[$name]);
100     else if (isset($get[$name])) 
101         return ($get[$name]);
102
103     return FALSE;
104 }
105
106 function headers_render($header, $len)
107 {
108     $s = "";
109     if (isset($header['Location'])) {
110         return sprintf("HTTP/1.1 302 OK\r\nLocation: %s\r\n\r\n", $header['Location']);
111     }
112     else {
113         $s .= "HTTP/1.1 200 OK\r\n";
114     }
115     if (!isset($header['Date']))
116         $s .= sprintf("Date: %s\r\n", date(DATE_RFC822));
117     if (!isset($header['Connection']))
118         $s .= "Connection: close\r\n";
119     if (!isset($header['Content-Type']))
120         $s .= "Content-Type: text/html\r\n";
121     foreach($header as $key => $value) {
122         $s .= sprintf("%s: %s\r\n", $key, $value);
123     }
124     if ($len >= 0) {
125         $s .= sprintf("Content-Length: %d\r\n", $len);
126     }
127     else {
128         $s .= "Cache-Control: no-cache, must-revalidate\r\n";
129         $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n";
130         if (!isset($header['Content-Encoding'])) {
131             $s .= "Content-Encoding: chunked\r\n";
132         }
133         $s .= "Transfer-Encoding: chunked\r\n";
134     }
135     $s .= "\r\n";
136
137     return ($s);
138 }
139
140 /*
141  *  Caching system using ob php system to cache old style pages
142  *  to a var and than send it with more calm
143  */
144
145 function shutta()
146 {
147   log_rd2("SHUTTA [".connection_status()."] !");
148 }
149
150 register_shutdown_function('shutta');
151
152 /*
153  *  MAIN
154  */
155
156 function chunked_content($zls, $content)
157 {
158     if ($zls) {
159         $cont_comp = $zls->compress_chunk($content);
160     }
161     else {
162         $cont_comp = $content;
163     }
164     $cont_comp_l = mb_strlen($cont_comp, "ASCII");
165     printf("CHUNK: [%s]\n", $content);
166
167     return (sprintf("%X\r\n", $cont_comp_l).$cont_comp."\r\n");
168 }
169
170 function chunked_fini()
171 {
172     return sprintf("0\r\n");
173 }
174
175 function get_encoding($header)
176 {
177     $enc = "plain";
178     if (isset($header['Accept-Encoding'])) {
179         $acc = explode(',', $header['Accept-Encoding']);
180
181         if (array_search('gzip', $acc) !== FALSE) {
182             $enc = 'gzip';
183         }
184         else if (array_search('deflate', $acc) !== FALSE) {
185             $enc = 'deflate';
186         }
187     }
188
189     return ($enc);
190 }
191
192
193 class Sac_a_push {
194     static $fixed_fd = 2;
195     
196     var $file_socket;
197     var $unix_socket;
198     var $socks;
199     var $s2u;
200     var $pages_flush;
201
202     var $list;
203     var $in;
204
205     var $debug;
206     var $blocking_mode;
207
208     var $app;
209     var $bin5;
210
211     var $curtime;
212
213     var $rndstr;
214     var $main_loop;
215
216     function Sac_a_push()
217     {
218     }
219
220     // Sac_a_push::create("/tmp/brisk.sock", 0, 0
221
222     static function create(&$app, $sockname, $debug, $blocking_mode)
223     {        
224         $thiz = new Sac_a_push();
225         
226         $thiz->app = $app;
227         $thiz->file_socket = $sockname;
228         $thiz->unix_socket = "unix://$sockname";
229         $thiz->debug = $debug;
230         $thiz->socks = array();
231         $thiz->s2u  = array();
232         $thiz->pages_flush = array();
233
234         $thiz->blocking_mode = 0; // 0 for non-blocking
235
236         $thiz->rndstr = "";
237         for ($i = 0 ; $i < 4096 ; $i++) {
238             if (($i % 128) == 0)
239                 $thiz->rndstr .= "\n";
240             else
241                 $thiz->rndstr .= chr(mt_rand(65, 90));
242         }
243         
244         if (file_exists($thiz->file_socket)) {
245             unlink($thiz->file_socket);
246         }
247     
248         $old_umask = umask(0);
249         if (($thiz->list = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) {
250             return (FALSE);
251         }
252         umask($old_umask);
253         stream_set_blocking($thiz->list, $thiz->blocking_mode); # Set the stream to non-blocking
254
255         if (($thiz->in = fopen("php://stdin", "r")) === FALSE) {
256             return(FALSE);
257         }
258
259         $thiz->main_loop = FALSE;
260
261         return ($thiz);
262     }
263
264     function socks_set($sock, $user)
265     {
266         $id = intval($sock);
267
268         $this->s2u[$id]   = $user;
269         $this->socks[$id] = $sock;
270     }
271
272     function socks_unset($sock)
273     {
274         $id = intval($sock);
275
276         unset($this->s2u[$id]);
277         unset($this->socks[$id]);
278     }
279
280     function pgflush_try_add($enc, &$new_socket, $tout, $header_out, $content)
281     {
282         $pgflush = new PageFlush($new_socket, $enc, $this->curtime, $tout, $header_out, $content);
283
284         if ($pgflush->try_flush($this->curtime) == FALSE) {
285             // Add $pgflush to the pgflush array
286             $this->pgflush_add($pgflush);
287         }
288     }
289
290     function pgflush_add($pgflush)
291     {
292         array_push($this->pages_flush, $pgflush);
293     }
294
295     function garbage_manager($force)
296     {
297         $this->app->garbage_manager($force);
298
299         foreach ($this->socks as $k => $sock) {
300             if ($this->s2u[intval($sock)]->sess == '') {
301                 if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
302                     $this->s2u[intval($sock)]->rd_socket_set(NULL);
303                 }
304                 unset($this->socks[intval($sock)]);
305                 unset($this->s2u[intval($sock)]);
306                 fclose($sock);
307                 printf("CLOSE ON GARBAGE MANAGER\n");
308             }
309         }
310     }
311
312     function run()
313     {
314         GLOBAL $DOCUMENT_ROOT, $HTTP_HOST, $G_with_splash;
315
316         if ($this->main_loop) {
317             return (FALSE);
318         }
319         
320         $this->main_loop = TRUE;
321         
322         while ($this->main_loop) {
323             $this->curtime = time();
324             printf("IN LOOP: Current opened: %d  pages_flush: %d - ", count($this->socks), count($this->pages_flush));
325             
326             /* Prepare the read array */
327             /* // when we manage it ... */
328             /* if ($shutdown)  */
329             /*     $read   = array_merge(array("$in" => $in), $socks); */
330             /* else */
331             $read   = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in),
332                                   $this->socks);
333             
334             if ($this->debug > 1) {
335                 printf("PRE_SELECT\n");
336                 print_r($read);
337             }
338             $write  = NULL;
339             $except = NULL;
340             $num_changed_sockets = stream_select($read, $write, $except, 0, 500000);
341         
342             if ($num_changed_sockets == 0) {
343                 printf(" no data in 5 secs, splash [%d]\n", $G_with_splash);
344             } 
345             else if ($num_changed_sockets > 0) {
346                 printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read));
347                 if ($this->debug > 1) {
348                     print_r($read);
349                 }
350                 /* At least at one of the sockets something interesting happened */
351                 foreach ($read as $i => $sock) {
352                     /* is_resource check is required because there is the possibility that
353                        during new request an old connection is closed */
354                     if (!is_resource($sock)) {
355                         continue;
356                     }
357                     if ($sock === $this->list) {
358                         printf("NUOVA CONNEX\n");
359                         if (($new_unix = stream_socket_accept($this->list)) == FALSE) {
360                             printf("SOCKET_ACCEPT FAILED\n");
361                             continue;
362                         }
363                         $stream_info = "";
364                         $method      = "";
365                         $get         = array();
366                         $post        = array();
367                         $cookie      = array();
368                         if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) {
369                             printf("NEW_SOCKET: %d\n", intval($new_socket));
370                             stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking
371                             printf("RECEIVED HEADER:\n%s", $stream_info);
372                             $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie);
373                             printf("PATH: [%s]\n", $path);
374                             printf("M: %s\nHEADER:\n", $method);
375                             print_r($header);
376                             printf("GET:\n");
377                             print_r($get);
378                             printf("POST:\n");
379                             print_r($post);
380                             printf("COOKIE:\n");
381                             print_r($cookie);
382                             $addr = stream_socket_get_name($new_socket, TRUE);
383                             $header_out = array();
384
385                             $enc = get_encoding($header);
386
387                             $subs = SITE_PREFIX."briskin5/";
388                             $subs_l = strlen($subs);
389                             $rret = FALSE;
390                             if (!strncmp($path, SITE_PREFIX, SITE_PREFIX_LEN)) {
391                                 $rret = $this->app->request_mgr($this, $enc, $header_out, $new_socket, substr($path, SITE_PREFIX_LEN), $addr, $get, $post, $cookie);
392                             }
393                             if ($rret == FALSE) { 
394                                 // FIXME: manage 404 !!!
395                                 printf("TODO: fix unknown page\n");
396                             }
397                             printf("number of sockets after %d\n", count($this->socks));
398                         }
399                         else {
400                             printf("WARNING: ancillary_getstream failed\n");
401                         }
402                     }
403                     else {
404                         $buf = fread($sock, 512);
405                         // if socket is closed
406                         if ($buf == FALSE || strlen($buf) == 0) {
407                             if ($buf == FALSE) {
408                                 printf("ERROR READING\n");
409                             }
410                             if ($sock === $this->list) {
411                                 printf("Arrivati %d bytes da list\n", strlen($buf));
412                                 exit(21);
413                             }
414                             else if ($sock === $this->in) {
415                                 printf("Arrivati %d bytes da stdin\n", strlen($buf));
416                                 exit(22);
417                             }
418                             else {
419                                 // $user_a[$s2u[intval($sock)]]->disable();
420                                 if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
421                                     $this->s2u[intval($sock)]->rd_socket_set(NULL);
422                                 }
423                                 unset($this->socks[intval($sock)]);
424                                 unset($this->s2u[intval($sock)]);
425                             }
426                             fclose($sock);
427                             printf("CLOSE ON READ\n");
428
429                             if ($this->debug > 1) {
430                                 printf("post unset\n");
431                                 print_r($this->socks);
432                             }
433                         }
434                         else {
435                             if ($this->debug > 1) {
436                                 print_r($read);
437                             }
438                             if ($sock === $this->list) {
439                                 printf("Arrivati %d bytes da list\n", strlen($buf));
440                             }
441                             else if ($sock === $this->in) {
442                                 printf("Arrivati %d bytes da stdin\n", strlen($buf));
443                                 $line = trim($buf);
444                                 if ($line == "reload") {
445                                     require("$DOCUMENT_ROOT/Etc/".BRISK_CONF);
446                                 }
447                             }
448                             else {
449                                 $key = array_search("$sock", $this->socks);
450                                 printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key);
451                             }
452                         }
453                     }
454                 }
455             }
456
457             $this->garbage_manager(FALSE);
458
459             /* manage unfinished pages */
460             foreach ($this->pages_flush as $k => $pgflush) {
461                 if ($pgflush->try_flush($this->curtime) == TRUE) {
462                     unset($this->pages_flush[$k]);
463                 }
464             }
465             
466             /*
467                $response:                        raw stream data not sent
468                $content:                         html consistent data (<script bla bla>)
469                $user->stream_keepalive($w_ping)  ping srv->cli OR srv->cli + cli->srv if $w_ping == TRUE
470             */
471
472             /* manage open streaming */
473             foreach ($this->socks as $k => $sock) {
474                 if (isset($this->s2u[intval($sock)])) {
475                     $user = $this->s2u[intval($sock)];
476                     $response = $user->rd_cache_get();
477                     $do_ping = FALSE;
478                     if (($this->curtime - $user->lacc) > (EXPIRE_TIME_RD / 3)) {
479                         $do_ping = TRUE;
480                     }
481                     else {
482                         $user->ping_req = FALSE;
483                     }
484
485                     if ($response == "") {
486                         $content = "";
487                         $user->stream_main($content, $get, $post, $cookie);
488                         printf("[%s] [%d] [%d]\n", $user->name, $user->lacc, $this->curtime);
489                         if ($do_ping && $user->ping_req == FALSE) {
490                             $content .= $user->stream_keepalive(TRUE);
491                             $user->ping_req = TRUE;
492                         }
493                         else if ($content == "" && $user->rd_kalive_is_expired($this->curtime)) {
494                             $content = $user->stream_keepalive(FALSE);
495                         }
496                         if ($content != "") {
497                             $response = chunked_content($user->rd_zls_get(), $content);
498                         }
499                     }
500                     
501                     if ($response != "") {
502                         // echo "SPIA: [".substr($response, 0, 60)."...]\n";
503                         echo "SPIA: [".$response."]\n";
504                         $response_l = mb_strlen($response, "ASCII");
505                         $wret = @fwrite($sock, $response);
506                         if ($wret < $response_l) {
507                             printf("TROUBLE WITH FWRITE: %d\n", $wret);
508                             $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII"));
509                         }
510                         else {
511                             $user->rd_cache_set("");
512                         }
513                         fflush($sock);
514                         $user->rd_kalive_reset($this->curtime);
515                     }
516                     
517                     // close socket after a while to prevent client memory consumption
518                     if ($user->rd_endtime_is_expired($this->curtime)) {
519                         if ($this->s2u[intval($sock)]->rd_socket_get() != NULL) {
520                             $this->s2u[intval($sock)]->rd_socket_set(NULL);
521                         }
522                         unset($this->socks[intval($sock)]);
523                         unset($this->s2u[intval($sock)]);
524                         fclose($sock);
525                         printf("CLOSE ON LOOP\n");
526                     }
527                 }
528             }  // foreach ($this->socks...
529             printf("\n");
530         }  // while (...
531     }  // function run(...
532 }
533
534 ?>