From: Matteo Nastasi (mop) Date: Tue, 25 Mar 2014 08:05:46 +0000 (+0100) Subject: add listening unix socket for internal services, renamed listen sockets field with... X-Git-Tag: v4.14.0~16 X-Git-Url: http://mop.ddnsfree.com/gitweb/?p=brisk.git;a=commitdiff_plain;h=5afa7681faebc92f25f73074e599b4048bcdd2a0 add listening unix socket for internal services, renamed listen sockets field with more appropriate names --- diff --git a/web/Obj/brisk.phh b/web/Obj/brisk.phh index f9ea0a4..db2c747 100644 --- a/web/Obj/brisk.phh +++ b/web/Obj/brisk.phh @@ -302,6 +302,37 @@ Copyright 2006-2012 Matteo Nasta
version '.$G_brisk_version.'

Copyright 2006-2012
Matteo Nastasi (aka mop)

'); +function cmd_return($val, $desc) +{ + return array('val' => $val, 'desc' => $desc); +} + +function cmd_serialize($attrs) +{ + $ret = ""; + + $sep = ""; + foreach ($attrs as $key => $value) { + $ret .= $sep . $key . '=' . urlencode($value); + $sep = "&"; + } + return $ret; +} + +function cmd_deserialize($cmd) +{ + $ret = array(); + $a = explode('&', $cmd); + $i = 0; + while ($i < count($a)) { + $b = split('=', $a[$i]); + $ret[urldecode($b[0])] = urldecode($b[1]); + $i++; + } + + return $ret; +} + // return values // -1 v1 < v2 // 0 equal diff --git a/web/Obj/sac-a-push.phh b/web/Obj/sac-a-push.phh index f9255c6..433ec45 100644 --- a/web/Obj/sac-a-push.phh +++ b/web/Obj/sac-a-push.phh @@ -25,6 +25,9 @@ define('SITE_PREFIX', '/brisk/'); define('SITE_PREFIX_LEN', 7); +define('DIRECT_ST_READ', 1); +define('DIRECT_ST_WRITE', 2); + declare(ticks = 1); function global_dump() @@ -400,13 +403,15 @@ class Sac_a_push { var $file_socket; var $unix_socket; + var $direct_socket; // socket where read direct commands var $socks; var $s2u; // user associated with input socket var $s2p; // pending page associated with input socket var $pending_pages; var $is_daemon; - var $list; + var $list_web; + var $list_cmd; var $in; var $debug; @@ -454,6 +459,7 @@ class Sac_a_push { $thiz->app = $app; $thiz->file_socket = $sockname; $thiz->unix_socket = "unix://$sockname"; + $thiz->direct_socket = "unix://${sockname}2"; $thiz->debug = $debug; $thiz->socks = array(); $thiz->s2u = array(); @@ -490,13 +496,20 @@ class Sac_a_push { if (file_exists($thiz->file_socket)) { unlink($thiz->file_socket); } + if (file_exists($thiz->file_socket."2")) { + unlink($thiz->file_socket."2"); + } $old_umask = umask(0); - if (($thiz->list = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) { + if (($thiz->list_web = stream_socket_server($thiz->unix_socket, $err, $errs)) === FALSE) { + return (FALSE); + } + if (($thiz->list_cmd = stream_socket_server($thiz->direct_socket, $err, $errs)) === FALSE) { return (FALSE); } umask($old_umask); - stream_set_blocking($thiz->list, $thiz->blocking_mode); # Set the stream to non-blocking + stream_set_blocking($thiz->list_web, $thiz->blocking_mode); # Set the stream to non-blocking + stream_set_blocking($thiz->list_cmd, $thiz->blocking_mode); # Set the stream to non-blocking if (($thiz->in = fopen("php://stdin", "r")) === FALSE) { return(FALSE); @@ -635,7 +648,8 @@ class Sac_a_push { /* if ($shutdown) */ /* $read = array_merge(array("$in" => $in), $socks); */ /* else */ - $pre_read = array_merge(array(intval($this->list) => $this->list, + $pre_read = array_merge(array(intval($this->list_web) => $this->list_web, + intval($this->list_cmd) => $this->list_cmd, intval(static::$cnt_slave) => static::$cnt_slave), $this->socks); if ($this->is_daemon == FALSE) { @@ -670,9 +684,9 @@ class Sac_a_push { if (!is_resource($sock)) { continue; } - if ($sock === $this->list) { + if ($sock === $this->list_web) { printf("NUOVA CONNEX\n"); - if (($new_unix = stream_socket_accept($this->list)) == FALSE) { + if (($new_unix = stream_socket_accept($this->list_web)) == FALSE) { printf("SOCKET_ACCEPT FAILED\n"); continue; } @@ -717,7 +731,16 @@ class Sac_a_push { printf("WARNING: ancillary_getstream failed\n"); } } - else { + else if ($sock === $this->list_cmd) { + printf("NUOVA DIRECT CONNEX\n"); + if (($new_unix = stream_socket_accept($this->list_cmd)) == FALSE) { + printf("SOCKET_ACCEPT FAILED\n"); + continue; + } + stream_set_blocking($new_unix, $this->blocking_mode); + $this->direct_mgmt($new_unix); + } // not socket_list nor socket_list_cmd + else { // already opened socket $buf = fread($sock, 4096); // if socket is closed if ($buf == FALSE || feof($sock)) { @@ -725,10 +748,14 @@ class Sac_a_push { if ($buf == FALSE) { printf("INFO: read return false\n"); } - if ($sock === $this->list) { + if ($sock === $this->list_web) { printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); return(21); } + else if ($sock === $this->list_cmd) { + printf("Arrivati %d bytes da list_cmd\n", mb_strlen($buf, "ASCII")); + return(23); + } else if ($sock === $this->in || $sock === static::$cnt_slave) { printf("Arrivati %d bytes da stdin\n", mb_strlen($buf, "ASCII")); return(22); @@ -754,14 +781,17 @@ class Sac_a_push { printf("post unset\n"); print_r($this->socks); } - } - else { + } // if ($buf == FALSE || mb_strlen($buf, "ASCII") == 0) { + else { // data on the socket if ($this->debug > 1) { print_r($read); } - if ($sock === $this->list) { + if ($sock === $this->list_web) { printf("Arrivati %d bytes da list\n", mb_strlen($buf, "ASCII")); } + else if ($sock === $this->list_cmd) { + printf("Arrivati %d bytes da list_cmd\n", mb_strlen($buf, "ASCII")); + } else if ($sock === $this->in || $sock === static::$cnt_slave) { printf("Arrivati %d bytes da stdin\n", mb_strlen($buf, "ASCII")); $line = trim($buf); @@ -779,7 +809,7 @@ class Sac_a_push { } } } - else { + else { // data arrived from not special socket $key = array_search("$sock", $this->socks); fprintf(STDERR, "Arrivati %d bytes dalla socket n. %d\n", mb_strlen($buf, "ASCII"), $key); if (isset($this->s2p[$id])) { @@ -899,11 +929,80 @@ class Sac_a_push { fclose($sock); printf("CLOSE ON LOOP\n"); } - } + } // if (isset($this->s2u[$id]... } // foreach ($this->socks... printf("\n"); } // while (... } // function run(... -} + function direct_command($cmdstr) + { + $cmd = cmd_deserialize($cmdstr); + + if (!isset($cmd['cmd'])) { + return cmd_return(500, 'no cmd found'); + } + // "cmd" => "userauth", "login" => 'mop', 'private' => 'it_must_be_correct', + // 'the_end' => 'true' ); + if ($cmd['cmd'] == 'userauth') { + if (!isset($cmd['login']) || !isset($cmd['private'])) { + return cmd_return(503, 'malformed cmd'); + } + return cmd_return(200, 'success'); + } + + return cmd_return(501, 'no cmd found'); + } + + function direct_mgmt($socket) + { + printf("DIRECT: begin\n"); + $st = DIRECT_ST_READ; + $cmd_all = ""; + $endtime = $this->curtime + 3; + + while(time() <= $endtime) { + printf("DIRECT: init loop %d\n", $st); + if ($st == DIRECT_ST_READ) { + $buf = fread($socket, 4096); + if ($buf == FALSE && feof($socket)) { + break; + } + else if ($buf != FALSE && strlen($buf) > 0) { + $cmd_all .= $buf; + + if (substr(trim($cmd_all), -13) == "&the_end=true") { + $output_arr = $this->direct_command($cmd_all); + $output = cmd_serialize($output_arr); + $output_cur = 0; + $output_len = mb_strlen($output, "ASCII"); + $st = DIRECT_ST_WRITE; + continue; + } + } + } + else if ($st == DIRECT_ST_WRITE) { + $ret = fwrite($socket, $output, $output_len); + if ($ret === FALSE) { + if (feof($socket)) { + break; + } + } + else if ($ret > 0 && $ret < $output_len) { + $output = substr($output, -($output_len - $ret)); + $output_len -= $ret; + continue; + } + else if ($ret == $output_len) { + fclose($socket); + return TRUE; + } + } + usleep(10000); + } + + fclose($socket); + return FALSE; + } +} // class Sac_a_push ?>