websocket graceful shutdown and safety queue
[brisk.git] / web / Obj / transports.phh
index 0cc5f54..921d15e 100644 (file)
@@ -75,6 +75,11 @@ class Transport_template {
     {
     }
 
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
+
     function chunk($step, $cont)
     {
     }
@@ -90,6 +95,38 @@ class Transport_template {
     }
 }
 
+define("TRANSP_WS_CLOSE_TOUT", 5);
+
+class Transport_websocket_postclose {
+    function Transport_websocket_postclose($transp_ws, $sock, $curtime) {
+        printf("POSTCLOSE: Creation\n");
+        $this->transp_ws = $transp_ws;
+        $this->sock = $sock;
+        $this->start =  $curtime;
+        // status not required, currently
+        // $this->status = "begin";
+    }
+
+    function read($payload, $curtime) {
+        if ($this->start + TRANSP_WS_CLOSE_TOUT < $curtime) {
+            printf("POSTCLOSE: Closing ws (%d) force close by timeout\n", $this->sock);
+            return 0;
+        }
+        if (mb_strlen($payload, "ASCII") > 1) {
+            $this->transp_ws->unchunk($payload, $this->sock);
+        }
+        if ($this->transp_ws->hasSentClose) {
+            printf("POSTCLOSE: Closing ws gracefully\n");
+            return 0;
+        }
+        else {
+            printf("POSTCLOSE: not yet finished\n");
+            return 1;
+        }
+    }
+}
+
+
 class Transport_websocket {
     protected $magicGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
 
@@ -203,10 +240,10 @@ class Transport_websocket {
         return $strout . "\n";
     }
 
-    function unchunk($cont)
+    function unchunk($cont, $sock)
     {
         // fprintf(STDERR, "CHUNK: [%s]\n", $cont);
-        return $this->deframe($cont);
+        return $this->deframe($cont, $sock);
     }
 
     function chunk($step, $cont)
@@ -281,7 +318,7 @@ class Transport_websocket {
         return chr($b1) . chr($b2) . $lengthField . $message;
     }
 
-    protected function deframe($message) {
+    protected function deframe($message, $socket) {
         //echo $this->strtohex($message);
         $headers = $this->extractHeaders($message);
         $pongReply = false;
@@ -325,7 +362,8 @@ class Transport_websocket {
         if ($pongReply) {
             $reply = $this->frame($payload,$this,'pong');
             // TODO FIXME ALL socket_write management
-            socket_write($user->socket,$reply,mb_strlen($reply, "ASCII"));
+            // socket_write($user->socket,$reply,mb_strlen($reply, "ASCII"));
+            @fwrite($socket, $reply, mb_strlen($reply, "ASCII"));
             return false;
         }
         if (extension_loaded('mbstring')) {
@@ -476,6 +514,11 @@ class Transport_websocket {
         return(chr(0x88).chr(0x02).chr(0xe8).chr(0x03));
     }
 
+    function postclose_get($sock, $curtime)
+    {
+       return new Transport_websocket_postclose($this, $sock, $curtime);
+    }
+
     static function fini($init_string, $base, $blockerr)
     {
         return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@',  ($blockerr ? 'xstm.stop(); ' : ''), $base).self::close());
@@ -511,6 +554,11 @@ class Transport_xhr {
         return "";
     }
 
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
+
     static function fini($init_string, $base, $blockerr)
     {
         return (sprintf('@BEGIN@ %s window.onbeforeunload = null; window.onunload = null; document.location.assign("%sindex.php"); @END@',  ($blockerr ? 'xstm.stop(); ' : ''), $base));
@@ -568,6 +616,11 @@ window.onload = function () { try { if (xynt_streaming != \"ready\") { xynt_stre
         return "";
     }
 
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
+
     static function fini($init_string, $base, $blockerr)
     {
         $ret = "";
@@ -613,6 +666,11 @@ class Transport_htmlfile extends Transport_iframe {
     function Transport_htmlfile() {
         $this->type = 'htmlfile';
     }
+
+    function postclose_get($sock, $curtime)
+    {
+        return NULL;
+    }
 }
 
 class Transport {