Sac_a_push class created
[brisk.git] / web / spush / brisk-spush.php
1 #!/usr/bin/php
2 <?php
3 /*
4  *  brisk - spush/brisk-spush.php
5  *
6  *  Copyright (C) 2012 Matteo Nastasi
7  *                          mailto: nastasi@alternativeoutput.it 
8  *                                  matteo.nastasi@milug.org
9  *                          web: http://www.alternativeoutput.it
10  *
11  * This program is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation; either version 2 of the License, or
14  * (at your option) any later version.
15  *
16  * This program is distributed in the hope that it will be useful, but
17  * WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABLILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  * General Public License for more details. You should have received a
20  * copy of the GNU General Public License along with this program; if
21  * not, write to the Free Software Foundation, Inc, 59 Temple Place -
22  * Suite 330, Boston, MA 02111-1307, USA.
23  *
24  * TODO
25  *
26  *
27  *   - BUG: logout failed
28  *   - BUG: fast loop on stream index_rd_ifra page
29  *
30  *   - garbage management
31  *   - log_legal address fix
32  *   - from room to table
33  *   - from table to room
34  *   - index_wr other issues
35  *   - manage and test cross forwarder between table and room
36  *   - setcookie (for tables only)
37  *   - keepalive management
38  *
39  *   DONE/FROZEN - problema con getpeer (HOSTADDR)
40  *
41  *   DONE - chunked
42  *   DONE - bug: read from a not resource handle (already closed because a new socket substitute it)
43  *   DONE - partial write for normal page management
44  *   DONE - index_rd_ifra: last_clean issue
45  *   DONE - fwrite failed error management (select, buffer where store unsent data, and fwrite check and retry)
46  *   ABRT - index_wr.php::reload - reload is js-only function
47  *   DONE - bug: after restart index_rd.php receive from prev clients a lot of req
48  *   DONE - index_wr.php::chat
49  *   DONE - index_wr.php::exit
50  *   DONE - index_rd.php porting
51  *   DONE - generic var management from internet
52  *   DONE - index.php auth part
53  */
54
55 $G_base = "../";
56
57 require_once("./sac-a-push.phh");
58 require_once("./brisk-spush.phh");
59 require_once($G_base."Obj/brisk.phh");
60 require_once($G_base."Obj/auth.phh");
61 // require_once("../Obj/proxyscan.phh");
62 require_once($G_base."index.php");
63 require_once($G_base."index_wr.php");
64 require_once($G_base."index_rd_ifra.php");
65 require_once($G_base."briskin5/Obj/briskin5.phh");
66
67 define('SITE_PREFIX', '/brisk/');
68
69 function headers_render($header, $len)
70 {
71     
72     $s = "";
73     $s .= "HTTP/1.1 200 OK\r\n";
74     if (!isset($header['Date']))
75         $s .= sprintf("Date: %s\r\n", date(DATE_RFC822));
76     if (!isset($header['Connection']))
77         $s .= "Connection: close\r\n";
78     if (!isset($header['Content-Type']))
79         $s .= "Content-Type: text/html\r\n";
80     foreach($header as $key => $value) {
81         $s .= sprintf("%s: %s\r\n", $key, $value);
82     }
83     if ($len == -1) {
84         $s .= "Cache-Control: no-cache, must-revalidate\r\n";
85         $s .= "Expires: Mon, 26 Jul 1997 05:00:00 GMT\r\n";
86         $s .= "Content-Encoding: chunked\r\n";
87         $s .= "Transfer-Encoding: chunked\r\n";
88     }
89     else if ($len > 0) {
90         $s .= sprintf("Content-Length: %d\r\n", $len);
91     }
92     $s .= "\r\n";
93
94     return ($s);
95 }
96
97 /*
98  *  Caching system using ob php system to cache old style pages
99  *  to a var and than send it with more calm
100  */
101
102 function shutta()
103 {
104   log_rd2("SHUTTA [".connection_status()."] !");
105 }
106
107 register_shutdown_function('shutta');
108
109 /*
110  *  MAIN
111  */
112
113 function chunked_content($content)
114 {
115     $content_l = mb_strlen($content, "ASCII");
116
117     return (sprintf("%X\r\n%s\r\n", $content_l, $content));
118 }
119
120 function chunked_fini()
121 {
122     return sprintf("0\r\n");
123 }
124
125 class Sac_a_push {
126     static $fixed_fd = 2;
127     
128     var $file_socket;
129     var $unix_socket;
130     var $socks;
131     var $s2u;
132     var $pages_flush;
133
134     var $list;
135     var $in;
136
137     var $debug;
138     var $blocking_mode;
139
140     var $room;
141     var $bin5;
142
143     var $rndstr;
144     var $main_loop;
145
146     function Sac_a_push()
147     {
148     }
149
150     // Sac_a_push::create("/tmp/brisk.sock", 0, 0
151
152     static function create($sockname, $debug, $blocking_mode)
153     {        
154         $thiz = new Sac_a_push();
155
156         $thiz->file_socket = $sockname;
157         $thiz->unix_socket = "unix://$sockname";
158         $thiz->debug = $debug;
159         $thiz->socks = array();
160         $thiz->s2u  = array();
161         $thiz->pages_flush = array();
162
163         $thiz->blocking_mode = 0; // 0 for non-blocking
164
165         if (($thiz->room = Room::create()) == FALSE) {
166             log_crit("room::create failed");
167             return FALSE;
168         }
169
170
171         $thiz->rndstr = "";
172         for ($i = 0 ; $i < 4096 ; $i++) {
173             $thiz->rndstr .= chr(mt_rand(65, 90));
174         }
175         
176         if (file_exists($thiz->file_socket)) {
177             unlink($thiz->file_socket);
178         }
179     
180         $old_umask = umask(0);
181         if (($thiz->list = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) {
182             return (FALSE);
183         }
184         umask($old_umask);
185         stream_set_blocking($thiz->list, $thiz->blocking_mode); # Set the stream to non-blocking
186
187         if (($thiz->in = fopen("php://stdin", "r")) === FALSE) {
188             return(FALSE);
189         }
190
191         $thiz->main_loop = FALSE;
192
193         return ($thiz);
194     }
195
196     function run()
197     {
198         if ($this->main_loop) {
199             return (FALSE);
200         }
201         
202         $this->main_loop = TRUE;
203         
204         while ($this->main_loop) {
205             $curtime = time();
206             printf("IN LOOP: Current opened: %d  pages_flush: %d\n", count($this->socks), count($this->pages_flush));
207             
208             /* Prepare the read array */
209             /* // when we manage it ... */
210             /* if ($shutdown)  */
211             /*     $read   = array_merge(array("$in" => $in), $socks); */
212             /* else */
213             $read   = array_merge(array(intval($this->list) => $this->list, intval($this->in) => $this->in),
214                                   $this->socks);
215             
216             if ($this->debug > 1) {
217                 printf("PRE_SELECT\n");
218                 print_r($read);
219             }
220             $write  = NULL;
221             $except = NULL;
222             $num_changed_sockets = stream_select($read, $write, $except, 0, 250000);
223         
224             if ($num_changed_sockets == 0) {
225                 printf("No data in 5 secs\n");
226             } 
227             else if ($num_changed_sockets > 0) {
228                 printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read));
229                 if ($this->debug > 1) {
230                     print_r($read);
231                 }
232                 /* At least at one of the sockets something interesting happened */
233                 foreach ($read as $i => $sock) {
234                     /* is_resource check is required because there is the possibility that
235                        during new request an old connection is closed */
236                     if (!is_resource($sock)) {
237                         continue;
238                     }
239                     if ($sock === $this->list) {
240                         printf("NUOVA CONNEX\n");
241                         $new_unix = stream_socket_accept($this->list);
242                         $stream_info = "";
243                         $method      = "";
244                         $get         = array();
245                         $post        = array();
246                         $cookie      = array();
247                         if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) {
248                             stream_set_blocking($new_socket, $this->blocking_mode); // Set the stream to non-blocking
249                             printf("RECEIVED HEADER:\n%s", $stream_info);
250                             $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie);
251                             printf("PATH: [%s]\n", $path);
252                             printf("M: %s\nHEADER:\n", $method);
253                             print_r($header);
254                             printf("GET:\n");
255                             print_r($get);
256                             printf("POST:\n");
257                             print_r($post);
258                             printf("COOKIE:\n");
259                             print_r($cookie);
260
261                             $addr = stream_socket_get_name($new_socket, TRUE);
262                             $header_out = array();
263
264                             switch ($path) {
265                             case SITE_PREFIX:
266                             case SITE_PREFIX."index.php":
267                                 ob_start();
268                                 index_main($this->room, $header_out, $addr, $get, $post, $cookie);
269                                 $content = ob_get_contents();
270                                 ob_end_clean();
271
272                                 $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content);
273
274                                 if ($pgflush->try_flush($curtime) == FALSE) {
275                                     // Add $pgflush to the pgflush array
276                                     array_push($this->pages_flush, $pgflush);
277                                 }
278
279                                 break;
280                             case SITE_PREFIX."index_wr.php":
281                                 ob_start();
282                                 index_wr_main($this->room, $addr, $get, $post, $cookie);
283                                 $content = ob_get_contents();
284                                 ob_end_clean();
285                                 
286                                 $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content);
287                                 
288                                 if ($pgflush->try_flush($curtime) == FALSE) {
289                                     // Add $pgflush to the pgflush array
290                                     array_push($this->pages_flush, $pgflush);
291                                 }
292                             break;
293                             case SITE_PREFIX."index_rd_ifra.php":
294                                 do {
295                                     if (!isset($cookie['sess'])
296                                         || (($user = $this->room->get_user($cookie['sess'], $idx)) == FALSE)) {
297                                         $content = index_rd_ifra_fini(TRUE);
298                                         
299                                         $pgflush = new PageFlush($new_socket, $curtime, 20, $header_out, $content);
300                                         
301                                         if ($pgflush->try_flush($curtime) == FALSE) {
302                                             // Add $pgflush to the pgflush array
303                                             array_push($this->pages_flush, $pgflush);
304                                         }
305                                         break;
306                                     }
307                                     // close a previous opened index_read_ifra socket, if exists
308                                     if (($prev = $user->rd_socket_get()) != NULL) {
309                                         unset($this->s2u[intval($user->rd_socket_get())]);
310                                         unset($this->socks[intval($user->rd_socket_get())]);
311                                         fclose($user->rd_socket_get());
312                                         printf("CLOSE AND OPEN AGAIN ON IFRA2\n");
313                                         $user->rd_socket_set(NULL);
314                                     }
315                                     
316                                     $content = "";
317                                     index_rd_ifra_init($this->room, $user, $header_out, $content, $get, $post, $cookie);
318                                     
319                                     $response = headers_render($header_out, -1).chunked_content($content);
320                                     $response_l = mb_strlen($response, "ASCII");
321
322                                     $wret = @fwrite($new_socket, $response, $response_l);
323                                     if ($wret < $response_l) {
324                                         printf("TROUBLES WITH FWRITE: %d\n", $wret);
325                                         $user->rd_cache_set(mb_substr($content, $wret, $response_l - $wret, "ASCII"));
326                                     }
327                                     else {
328                                         $user->rd_cache_set("");
329                                     }
330                                     fflush($new_socket);
331                                     
332                                     $this->s2u[intval($new_socket)] = $idx;
333                                     $this->socks[intval($new_socket)] = $new_socket;                                
334                                     $user->rd_socket_set($new_socket);
335                                 } while (FALSE);
336                                 
337                                 break;
338                                 
339                                 /* default: */
340                                 /*     $cl = strlen(SITE_PREFIX."briskin5/"); */
341                                 /*     if (!strncmp($this->path, SITE_PREFIX."briskin5/", $cl)) { */
342                                 /*         Bin5::page_manager($room, $header_out, substr($path,$cl), $method, $addr, $get, $post, $cookie); */
343                             }
344                         }
345                         else {
346                             printf("WARNING: ancillary_getstream failed\n");
347                         }
348                     }
349                     else {
350                         if (($buf = fread($sock, 512)) === FALSE) {
351                             printf("error read\n");
352                             exit(123);
353                         }
354                         else if (strlen($buf) === 0) {
355                             if ($sock === $this->list) {
356                                 printf("Arrivati %d bytes da list\n", strlen($buf));
357                             }
358                             else if ($sock === $this->in) {
359                                 printf("Arrivati %d bytes da stdin\n", strlen($buf));
360                             }
361                             else {
362                                 // $user_a[$s2u[intval($sock)]]->disable();
363                                 if ($this->room->user[$this->s2u[intval($sock)]]->rd_socket_get() != NULL) {
364                                     $this->room->user[$this->s2u[intval($sock)]]->rd_socket_set(NULL);
365                                 }
366                                 unset($this->socks[intval($sock)]);
367                                 unset($this->s2u[intval($sock)]);
368                                 fclose($sock);
369                                 printf("CLOSE ON READ\n");
370                             }
371                             if ($this->debug > 1) {
372                                 printf("post unset\n");
373                                 print_r($this->socks);
374                             }
375                         }
376                         else {
377                             if ($debug > 1) {
378                                 print_r($read);
379                             }
380                             if ($sock === $this->list) {
381                                 printf("Arrivati %d bytes da list\n", strlen($buf));
382                             }
383                             else if ($sock === $this->in) {
384                                 printf("Arrivati %d bytes da stdin\n", strlen($buf));
385                             }
386                             else {
387                                 $key = array_search("$sock", $this->socks);
388                                 printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key);
389                             }
390                         }
391                     }
392                 }
393             }
394             
395             
396             foreach ($this->pages_flush as $k => $pgflush) {
397                 if ($pgflush->try_flush($curtime) == TRUE) {
398                     unset($this->pages_flush[$k]);
399                 }
400             }
401             
402             foreach ($this->socks as $k => $sock) {
403                 if (isset($this->s2u[intval($sock)])) {
404                     $user = $this->room->user[$this->s2u[intval($sock)]];
405                     $response = $user->rd_cache_get();
406                     if ($response == "") {
407                         $content = "";
408                         index_rd_ifra_main($this->room, $user, $content, $get, $post, $cookie);
409                         
410                         if ($content == "" && $user->rd_kalive_is_expired($curtime)) {
411                             $content = index_rd_ifra_keepalive($user);
412                         }
413                         if ($content != "") {
414                             $response = chunked_content($content);
415                         }
416                     }
417                     
418                     if ($response != "") {
419                         echo "SPIA: [".substr($response, 0, 60)."...]\n";
420                         $response_l = mb_strlen($response, "ASCII");
421                         $wret = @fwrite($sock, $response);
422                         if ($wret < $response_l) {
423                             printf("TROUBLE WITH FWRITE: %d\n", $wret);
424                             $user->rd_cache_set(mb_substr($response, $wret, $response_l - $wret, "ASCII"));
425                         }
426                         else {
427                             $user->rd_cache_set("");
428                         }
429                         fflush($sock);
430                         $user->rd_kalive_reset($curtime);
431                     }
432                     
433                     // close socket after a while to prevent client memory consumption
434                     if ($user->rd_endtime_is_expired($curtime)) {
435                         // $user_a[$s2u[intval($sock)]]->disable();
436                         if ($this->room->user[$this->s2u[intval($sock)]]->rd_socket_get() != NULL) {
437                             $this->room->user[$this->s2u[intval($sock)]]->rd_socket_set(NULL);
438                         }
439                         unset($this->socks[intval($sock)]);
440                         unset($this->s2u[intval($sock)]);
441                         fclose($sock);
442                         printf("CLOSE ON LOOP\n");
443                     }
444                 }
445             }
446         }
447     }
448 }
449
450 function main()
451 {
452     if (($sap = Sac_a_push::create("/tmp/brisk.sock", 0, 0)) === FALSE) {
453         exit(1);
454     }
455
456     $sap->run();
457
458     exit(0);
459 }
460
461 main();
462 ?>