websocket graceful shutdown and safety queue
[brisk.git] / web / Obj / transports.phh
index 69fe089..921d15e 100644 (file)
@@ -75,6 +75,11 @@ class Transport_template {
     {
     }
 
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
+
     function chunk($step, $cont)
     {
     }
@@ -90,19 +95,53 @@ class Transport_template {
     }
 }
 
+define("TRANSP_WS_CLOSE_TOUT", 5);
+
+class Transport_websocket_postclose {
+    function Transport_websocket_postclose($transp_ws, $sock, $curtime) {
+        printf("POSTCLOSE: Creation\n");
+        $this->transp_ws = $transp_ws;
+        $this->sock = $sock;
+        $this->start =  $curtime;
+        // status not required, currently
+        // $this->status = "begin";
+    }
+
+    function read($payload, $curtime) {
+        if ($this->start + TRANSP_WS_CLOSE_TOUT < $curtime) {
+            printf("POSTCLOSE: Closing ws (%d) force close by timeout\n", $this->sock);
+            return 0;
+        }
+        if (mb_strlen($payload, "ASCII") > 1) {
+            $this->transp_ws->unchunk($payload, $this->sock);
+        }
+        if ($this->transp_ws->hasSentClose) {
+            printf("POSTCLOSE: Closing ws gracefully\n");
+            return 0;
+        }
+        else {
+            printf("POSTCLOSE: not yet finished\n");
+            return 1;
+        }
+    }
+}
+
+
 class Transport_websocket {
     protected $magicGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 
     function Transport_websocket($secure = FALSE) {
+        $this->type = ($secure == FALSE ? "websocket" : "websocketsec");
         $this->headerOriginRequired                 = false;
         $this->headerSecWebSocketProtocolRequired   = false;
         $this->headerSecWebSocketExtensionsRequired = false;
 
         $this->sendingContinuous = false;
-       $this->sendingContinuous = false;
-       $this->partialMessage = "";
 
-       $this->hasSentClose = false;
+        $this->handlingPartialPacket = false;
+        $this->partialMessage = "";
+
+        $this->hasSentClose = false;
     }
 
     protected function extractHeaders($message) {
@@ -201,6 +240,12 @@ class Transport_websocket {
         return $strout . "\n";
     }
 
+    function unchunk($cont, $sock)
+    {
+        // fprintf(STDERR, "CHUNK: [%s]\n", $cont);
+        return $this->deframe($cont, $sock);
+    }
+
     function chunk($step, $cont)
     {
         // fprintf(STDERR, "CHUNK: [%s]\n", $cont);
@@ -273,7 +318,7 @@ class Transport_websocket {
         return chr($b1) . chr($b2) . $lengthField . $message;
     }
 
-    protected function deframe($message) {
+    protected function deframe($message, $socket) {
         //echo $this->strtohex($message);
         $headers = $this->extractHeaders($message);
         $pongReply = false;
@@ -317,7 +362,8 @@ class Transport_websocket {
         if ($pongReply) {
             $reply = $this->frame($payload,$this,'pong');
             // TODO FIXME ALL socket_write management
-            socket_write($user->socket,$reply,mb_strlen($reply, "ASCII"));
+            // socket_write($user->socket,$reply,mb_strlen($reply, "ASCII"));
+            @fwrite($socket, $reply, mb_strlen($reply, "ASCII"));
             return false;
         }
         if (extension_loaded('mbstring')) {
@@ -468,6 +514,11 @@ class Transport_websocket {
         return(chr(0x88).chr(0x02).chr(0xe8).chr(0x03));
     }
 
+    function postclose_get($sock, $curtime)
+    {
+       return new Transport_websocket_postclose($this, $sock, $curtime);
+    }
+
     static function fini($init_string, $base, $blockerr)
     {
         return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@',  ($blockerr ? 'xstm.stop(); ' : ''), $base).self::close());
@@ -483,6 +534,7 @@ class Transport_websocket {
 class Transport_xhr {
 
     function Transport_xhr() {
+        $this->type = 'xhr';
     }
 
     function init($enc, $header, &$header_out, $init_string, $base, $step)
@@ -502,6 +554,11 @@ class Transport_xhr {
         return "";
     }
 
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
+
     static function fini($init_string, $base, $blockerr)
     {
         return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@',  ($blockerr ? 'xstm.stop(); ' : ''), $base));
@@ -523,6 +580,7 @@ class Transport_xhr {
 class Transport_iframe {
 
     function Transport_iframe() {
+        $this->type = 'iframe';
     }
 
     function init($enc, $header, &$header_out, $init_string, $base, $step)
@@ -558,6 +616,11 @@ window.onload = function () { try { if (xynt_streaming != \"ready\") { xynt_stre
         return "";
     }
 
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
+
     static function fini($init_string, $base, $blockerr)
     {
         $ret = "";
@@ -600,6 +663,14 @@ push(\"%s\");\n// -->\n</script>", $step, escpush($cont) );
 }
 
 class Transport_htmlfile extends Transport_iframe {
+    function Transport_htmlfile() {
+        $this->type = 'htmlfile';
+    }
+
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
 }
 
 class Transport {
@@ -609,8 +680,8 @@ class Transport {
 
     static function create($transp)
     {
-        if ($transp == 'websocket') {
-            return new Transport_websocket();
+        if ($transp == 'websocket' || $transp == 'websocketsec') {
+            return new Transport_websocket($transp == 'websocketsec');
         }
         else if ($transp == 'xhr') {
             return new Transport_xhr();
@@ -632,4 +703,4 @@ class Transport {
         }
     }
 }
-?>
\ No newline at end of file
+?>