centralized get curtime var
[brisk.git] / web / spush / brisk-spush.php
1 #!/usr/bin/php
2 <?php
3 /*
4  *  brisk - spush/brisk-spush.php
5  *
6  *  Copyright (C) 2012 Matteo Nastasi
7  *                          mailto: nastasi@alternativeoutput.it 
8  *                                  matteo.nastasi@milug.org
9  *                          web: http://www.alternativeoutput.it
10  *
11  * This program is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU General Public License as published by
13  * the Free Software Foundation; either version 2 of the License, or
14  * (at your option) any later version.
15  *
16  * This program is distributed in the hope that it will be useful, but
17  * WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABLILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19  * General Public License for more details. You should have received a
20  * copy of the GNU General Public License along with this program; if
21  * not, write to the Free Software Foundation, Inc, 59 Temple Place -
22  * Suite 330, Boston, MA 02111-1307, USA.
23  *
24  * TODO
25  *   index_wr.php::chat
26  *   index_wr.php::reload
27  *   index_wr.php::exit
28  *
29  *   setcookie (for tables only)
30  *   keepalive
31  *   chunked 
32  *   BUG - after restart index_rd.php receive from prev clients a lot of req
33  *   DONE/FROZEN - problema con getpeer (HOSTADDR)
34  *
35  *   DONE - index_rd.php porting
36  *   DONE - generic var management from internet
37  *   DONE - index.php auth part
38  */
39
40 $G_base = "../";
41
42 require_once("./sac-a-push.phh");
43 require_once("./brisk-spush.phh");
44 require_once($G_base."Obj/brisk.phh");
45 require_once($G_base."Obj/auth.phh");
46 // require_once("../Obj/proxyscan.phh");
47 require_once($G_base."index.php");
48 require_once($G_base."index_wr.php");
49 require_once($G_base."index_rd_ifra.php");
50 require_once($G_base."briskin5/Obj/briskin5.phh");
51
52 define('SITE_PREFIX', '/brisk/');
53
54 class SPUser {
55     var $id;
56     var $sess;
57     var $cnt;
58     var $sock;
59     
60     function SPUser($id)
61     {
62         $this->id = $id;
63         $this->cnt = -1;
64         $this->sock = NULL;
65     }
66
67     function enable($sock, $sess)
68     {
69         $this->sess = $sess;
70         $this->cnt = 0;
71         $this->sock = $sock;
72
73         return ($this->id);
74     }
75
76     function disable()
77     {
78         $this->cnt = -1;
79         $this->sock = NULL;
80     }
81
82     function is_enable()
83     {
84         return ($this->cnt < 0 ? FALSE : TRUE);
85     }
86
87     function sock_get()
88     {
89         return $this->sock;
90     }
91
92     function sock_set($sock)
93     {
94         $this->sock = $sock;
95     }
96
97     function id_get()
98     {
99         return $this->id;
100     }
101     
102     function sess_get()
103     {
104         return $this->sess;
105     }
106
107     function cnt_get()
108     {
109         return $this->cnt;
110     }
111
112     function cnt_inc()
113     {
114         return $this->cnt++;
115     }
116 }
117
118 function user_get_free($user_arr)
119 {
120     foreach ($user_arr as $i => $user) {
121         if (!$user->is_enable()) {
122             return ($user);
123         }
124     }
125     return FALSE;
126 }
127
128 function user_get_sess($user_arr, $sess)
129 {
130     foreach ($user_arr as $i => $user) {
131         printf("SESS: [%s]  cur: [%s]\n", $user->sess_get(), $sess);
132         if ($user->sess_get() == $sess) {
133             return ($user);
134         }
135     }
136     return FALSE;
137 }
138
139 function headers_render($header)
140 {
141     
142     $s = "";
143     $s .= "HTTP/1.1 200 OK\r\n";
144     if (!isset($header['Date']))
145         $s .= sprintf("Date: %s\r\n", date(DATE_RFC822));
146     if (!isset($header['Connection']))
147         $s .= "Connection: close\r\n";
148     if (!isset($header['Content-Type']))
149         $s .= "Content-Type: text/html\r\n";
150     foreach($header as $key => $value) {
151         $s .= sprintf("%s: %s\r\n", $key, $value);
152     }
153     $s .= "Mop: was/here\r\n";
154     $s .= "\r\n";
155
156     return ($s);
157 }
158
159
160 /*
161  *  Caching system using ob php system to cache old style pages
162  *  to a var and than send it with more calm
163  */
164 $G_headers = "";
165
166 function shutta()
167 {
168   log_rd2("SHUTTA [".connection_status()."] !");
169 }
170
171 register_shutdown_function('shutta');
172
173 /*
174  *  MAIN
175  */
176 $shutdown = FALSE;
177
178 function main()
179 {
180     GLOBAL $G_headers;
181     GLOBAL $shutdown;
182     $main_loop = TRUE;
183
184     /*
185      *  INIT
186      */
187
188     $FILE_SOCKET = "/tmp/brisk.sock";
189     $UNIX_SOCKET = "unix://$FILE_SOCKET";
190     $debug = 0;
191     $fixed_fd = 2;
192     $socks = array();
193
194     $blocking_mode = 0; // 0 for non-blocking
195
196     if (($room = Room::create()) == FALSE) {
197         log_crit("load_data failed");
198         return FALSE;
199     }
200
201     $s2u  = array();
202
203     $rndstr = "";
204     for ($i = 0 ; $i < 4096 ; $i++) {
205         $rndstr .= chr(mt_rand(65, 90));
206     }
207
208     if (file_exists($FILE_SOCKET)) {
209         unlink($FILE_SOCKET);
210     }
211     
212     $old_umask = umask(0);
213     if (($list = stream_socket_server($UNIX_SOCKET, $err, $errs)) === FALSE) {
214         exit(11);
215     }
216     umask($old_umask);
217
218     if (($in = fopen("php://stdin", "r")) === FALSE) {
219         exit(11);
220     }
221
222     stream_set_blocking($list, $blocking_mode); # Set the stream to non-blocking
223
224     while ($main_loop) {
225         $curtime = time();
226         printf("IN LOOP: Current opened: %d\n", count($socks));
227
228         /* Prepare the read array */
229         if ($shutdown) 
230             $read   = array_merge(array("$in" => $in), $socks);
231         else
232             $read   = array_merge(array("$list" => $list, "$in" => $in), $socks);
233
234         if ($debug > 1) {
235             printf("PRE_SELECT\n");
236             print_r($read);
237         }
238         $write  = NULL;
239         $except = NULL;
240         $num_changed_sockets = stream_select($read, $write, $except, 1); // 0, 250000);
241         
242         if ($num_changed_sockets === FALSE) {
243             printf("No data in 5 secs");
244         } 
245         else if ($num_changed_sockets > 0) {
246             printf("num sock %d num_of_socket: %d\n", $num_changed_sockets, count($read));
247             if ($debug > 1) {
248                 print_r($read);
249             }
250             /* At least at one of the sockets something interesting happened */
251             foreach ($read as $i => $sock) {
252                 if ($sock === $list) {
253                     printf("NUOVA CONNEX\n");
254                     $new_unix = stream_socket_accept($list);
255                     $stream_info = "";
256                     $method      = "";
257                     $get         = array();
258                     $post        = array();
259                     $cookie      = array();
260                     if (($new_socket = ancillary_getstream($new_unix, $stream_info)) !== FALSE) {
261                         printf("RECEIVED HEADER:\n%s", $stream_info);
262                         $path = spu_process_info($stream_info, $method, $header, $get, $post, $cookie);
263                         printf("PATH: [%s]\n", $path);
264                         printf("M: %s\nHEADER:\n", $method);
265                         print_r($header);
266                         printf("GET:\n");
267                         print_r($get);
268                         printf("POST:\n");
269                         print_r($post);
270                         printf("COOKIE:\n");
271                         print_r($cookie);
272
273                         $addr = stream_socket_get_name($new_socket, TRUE);
274
275                         switch ($path) {
276                         case SITE_PREFIX:
277                         case SITE_PREFIX."index.php":
278                             $header_out = array();
279                             ob_start();
280                             index_main($room, $header_out, $addr, $get, $post, $cookie);
281                             $content = ob_get_contents();
282                             ob_end_clean();
283                             // printf("OUT: [%s]\n", $G_content);
284                             fwrite($new_socket, headers_render($header_out).$content);
285                             fclose($new_socket);
286                             break;
287                         case SITE_PREFIX."index_wr.php":
288                             $header_out = array();
289                             $addr = "";
290                             // $ret = socket_getpeername($new_socket, $addr);
291                             printf("RET: %s\n", $addr);
292                             // exit(123);
293                             ob_start();
294                             index_wr_main($room, $addr, $get, $post, $cookie);
295                             $content = ob_get_contents();
296                             ob_end_clean();
297                             
298                             // printf("OUT: [%s]\n", $G_content);
299                             fwrite($new_socket, headers_render($header_out).$content);
300                             fclose($new_socket);
301                             break;
302                         case SITE_PREFIX."index_rd_ifra.php":
303                             do {
304                                 if (!isset($cookie['sess'])) {
305                                     fclose($new_socket);
306                                     break;
307                                 }
308                                 if (($user = $room->get_user($cookie['sess'], $idx)) == FALSE) {
309                                     fclose($new_socket);
310                                     break;
311                                 }
312                                 if (($prev = $user->rd_socket_get()) != NULL) {
313                                     unset($s2u[intval($user->rd_socket_get())]);
314                                     unset($socks[intval($user->rd_socket_get())]);
315                                     fclose($user->rd_socket_get());
316                                     $user->rd_socket_set(NULL);
317                                 }
318
319                                 $header_out = array();
320                                 $body = "";
321                                 index_rd_ifra_init($room, $user, $header_out, $body, $get, $post, $cookie);
322                                 stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking
323                                 fwrite($new_socket, headers_render($header_out).$body);
324                                 fflush($new_socket);
325
326                                 $s2u[intval($new_socket)] = $idx;
327                                 $socks[intval($new_socket)] = $new_socket;                                
328                                 $user->rd_socket_set($new_socket);
329                             } while (FALSE);
330
331                             break;
332                         }
333                             
334
335
336
337                         if (0 == 1) {
338                             /* TODO: here stuff to decide if it is old or new user */
339                             if (($user_cur = user_get_sess($user_a, $get['sess'])) != FALSE) {
340                                 /* close the previous socket */
341                                 unset($s2u[intval($user_cur->sock_get())]);
342                                 unset($socks[intval($user_cur->sock_get())]);
343                                 fclose($user_cur->sock_get());
344                                 /* assign the new socket */
345                                 $user_cur->sock_set($new_socket);
346                                 $id = $user_cur->id_get();
347                                 $s2u[intval($new_socket)] = $id;
348                                 $socks[intval($new_socket)] = $new_socket;
349                                 fwrite($new_socket, $rndstr);
350                                 fflush($new_socket);
351                             }
352                             else if (($user_cur = user_get_free($user_a)) != FALSE) {
353                                 stream_set_blocking($new_socket, $blocking_mode); // Set the stream to non-blocking
354                                 $socks[intval($new_socket)] = $new_socket;
355
356                                 $id = $user_cur->id_get();
357                                 $user_a[$id]->enable($new_socket, $get['sess']);
358                                 printf("s2u: ci passo %d\n", intval($new_socket));
359                                 $s2u[intval($new_socket)] = $id;
360
361                                 fwrite($new_socket, $rndstr);
362                                 fflush($new_socket);
363                             }
364                             else {
365                                 printf("Too many opened users\n");
366                                 fclose($new_socket);
367                             }
368                         }
369                     }
370                     else {
371                         printf("WARNING: ancillary_getstream failed\n");
372                     }
373                 }
374                 else {
375                     if (($buf = fread($sock, 512)) === FALSE) {
376                         printf("error read\n");
377                         exit(123);
378                     }
379                     else if (strlen($buf) === 0) {
380                         if ($sock === $list) {
381                             printf("Arrivati %d bytes da list\n", strlen($buf));
382                         }
383                         else if ($sock === $in) {
384                             printf("Arrivati %d bytes da stdin\n", strlen($buf));
385                         }
386                         else {
387                             // $user_a[$s2u[intval($sock)]]->disable();
388                             if ($room->user[$s2u[intval($sock)]]->rd_socket_get() != NULL) {
389                                 $room->user[$s2u[intval($sock)]]->rd_socket_set(NULL);
390                             }
391                             unset($socks[intval($sock)]);
392                             unset($s2u[intval($sock)]);
393                             fclose($sock);
394                         }
395                         if ($debug > 1) {
396                             printf("post unset\n");
397                             print_r($socks);
398                         }
399                     }
400                     else {
401                         if ($debug > 1) {
402                             print_r($read);
403                         }
404                         if ($sock === $list) {
405                             printf("Arrivati %d bytes da list\n", strlen($buf));
406                         }
407                         else if ($sock === $in) {
408                             printf("Arrivati %d bytes da stdin\n", strlen($buf));
409                         }
410                         else {
411                             $key = array_search("$sock", $socks);
412                             printf("Arrivati %d bytes dalla socket n. %d\n", strlen($buf), $key);
413                         }
414                     }
415                 }
416             }
417         }
418
419
420
421
422         foreach ($socks as $k => $sock) {
423             if (isset($s2u[intval($sock)])) {
424                 $body = "";
425                 
426
427                 $body = "";
428                 $user = $room->user[$s2u[intval($sock)]];
429                 index_rd_ifra_main($room, $user, $body);
430                 if ($body == "" && $user->rd_tout_is_expired($curtime)) {
431                     $body = index_rd_ifra_keepalive($user);
432                 }
433
434                 if ($body != "") {
435                     echo "SPIA: [".substr($body, 0, 60)."...]\n";
436                     fwrite($sock, $body);
437                     fflush($sock);
438                     $user->rd_tout_reset($curtime);
439                 }
440
441                 // close socket after a while to prevent client memory consumption
442                 if ($user->rd_endtime_is_expired($curtime)) {
443                     // $user_a[$s2u[intval($sock)]]->disable();
444                     if ($room->user[$s2u[intval($sock)]]->rd_socket_get() != NULL) {
445                         $room->user[$s2u[intval($sock)]]->rd_socket_set(NULL);
446                     }
447                     unset($socks[intval($sock)]);
448                     unset($s2u[intval($sock)]);
449                     fclose($sock);
450                 }
451             }
452         }
453     }
454     
455     exit(0);
456 }
457
458 main();
459 ?>