working version with timeout and cleanup of exausted handles
authorMatteo Nastasi (mop) <nastasi@alternativeoutput.it>
Tue, 9 Sep 2014 16:40:12 +0000 (18:40 +0200)
committerMatteo Nastasi (mop) <nastasi@alternativeoutput.it>
Tue, 9 Sep 2014 16:40:12 +0000 (18:40 +0200)
TODO.txt
web/Obj/curl-de-sac.phh
webtest/cds_test01.php

index 259ab81..31fbefd 100644 (file)
--- a/TODO.txt
+++ b/TODO.txt
@@ -1,3 +1,8 @@
-- timeout handling
-- remove cmd from list when accomplished
+TODO LIST
+
+- move name and timeout class to register function
+  to be able to use the same class different times
+
 DONE - debugging system
+DONE - timeout handling
+DONE - remove cmd from list when accomplished
index 4ef3644..01ba78b 100644 (file)
@@ -27,11 +27,18 @@ $G_curl_de_sac_version = "0.1";
 class CDS_cmd {
     var $cmd_cls;
     var $ch;
+    var $tlimit;
 
     function CDS_cmd($cmd_cls, $ch)
     {
         $this->cmd_cls = $cmd_cls;
         $this->ch = $ch;
+        $this->tlimit = time() + $cmd_cls->tout;
+    }
+
+    function ch_get()
+    {
+        return ($this->ch);
     }
 
     function dbg_get()
@@ -60,14 +67,16 @@ class CDS_cmd_cls {
         $this->cds = $cds;
     }
 
-    static function pre_create($url)
+    static function pre_create($cds, $url)
     {
+        if ($cds->dbg_get() > 2) { printf("CURL: curl_init\n"); }
         if (($ch = curl_init()) == FALSE)
             return FALSE;
         curl_setopt($ch, CURLOPT_URL, $url);
         curl_setopt($ch, CURLOPT_HEADER, 0);
         curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
-        
+        curl_setopt($ch, CURLOPT_FORBID_REUSE, true);
+        curl_setopt($ch, CURLOPT_HTTPHEADER, array('Connection: close'));
         return ($ch);
     }
 
@@ -75,24 +84,33 @@ class CDS_cmd_cls {
     {
         if ($cds->dbg > 2) {
             printf("CDS_cmd_cls::create - begin\n");
-            print_r($ch);
+            printf("CURL: curl_multi_add_handle\n");
         }
         if (($ret = curl_multi_add_handle($cds->mh, $ch)) != 0) {
             // INFO: $ret is a CURLM_XXX errors code
             return (FALSE);
         }
-        if ($cds->dbg > 2) {
-            printf("CDS_cmd_cls::create - end\n");
-        }
+        if ($cds->dbg > 2) { printf("CDS_cmd_cls::create - end\n"); }
         return (TRUE);
     }
 
-    function cb($cmd, $ret)
+    function process($cmd, $ret)
     {
         
-        print "THIS MUST BE IMPLEMENTED";
+        fprintf(STDERR, "process MUST BE IMPLEMENTED");
+        exit(123);
+    }
+
+    function timeout($cmd)
+    {
+        fprintf(STDERR, "timeout MUST BE IMPLEMENTED");
         exit(123);
     }
+
+    function dbg_get()
+    {
+        return $this->cds->dbg;
+    }
 }
 
 class Curl_de_sac {
@@ -102,6 +120,7 @@ class Curl_de_sac {
     var $dbg;
 
     function Curl_de_sac($dbg=0) {
+        if ($dbg > 2) { printf("CURL: curl_multi_init\n"); }
         $this->mh = curl_multi_init();
         $this->cmd_cls = array();
         $this->cmd = array();
@@ -154,6 +173,26 @@ class Curl_de_sac {
         $this->cmd_cls = array();
     }
 
+
+    function cleanup($key)
+    {
+        $cmd = $this->cmd[$key];
+
+        if ($this->dbg > 2) {
+            printf("cleanup\n");
+            printf("CURL: curl_multi_remove_handle:\n");
+            print_r($cmd->ch_get());
+            printf("\n");
+        }
+        // return 0 on SUCCESS or CURLM_XXX in other cases
+        if (($ret = curl_multi_remove_handle($this->mh, $cmd->ch_get())) != 0) {
+            fprintf(STDERR, "CURL: curl_multi_remove_handle FAILED (%d)\n", $ret);
+        }
+        if ($this->dbg > 2) { printf("CURL: curl_close\n"); }
+        curl_close($cmd->ch_get());
+        unset($this->cmd[$key]);
+    }
+
     function execute()
     {
         $args = func_get_args();
@@ -176,46 +215,59 @@ class Curl_de_sac {
                 break;
 
             array_push($this->cmd, $inst);
-            if ($this->dbg > 1) {
-                printf("CDS_cmd_cls::process - execute  push cmd\n");
-                print_r($this->cmd);
-            }
+            if ($this->dbg > 1) { printf("CDS_cmd_cls::process - execute  push cmd\n"); }
+            if (($this->dbg & 1) == 1) { print_r($this); }
+
             return TRUE;
         } while (FALSE);
 
         return FALSE;
     }
 
-    function process()
+    function process($curtime=0)
     {
-        if ($this->dbg > 1) {
-             printf("CDS_cmd_cls::process - begin\n");
+        if ($curtime  == 0) {
+            $curtime = time();
         }
+        if ($this->dbg > 1) { printf("CDS_cmd_cls::process - begin\n"); }
         $running = NULL;
+
+        if ($this->dbg > 2) { printf("CURL: curl_multi_exec\n"); }
         $ret = curl_multi_exec($this->mh, $running);
         $msgs_in_queue = NULL;
 
         do {
+            if ($this->dbg > 2) { printf("CURL: curl_multi_info_read\n"); }
+
             if ($ret = curl_multi_info_read ($this->mh, $msgs_in_queue)) {
-                if ($this->dbg > 1)
-                    printf("Info_read miq: %d\n", $msgs_in_queue);
-                
-                foreach($this->cmd as $cmd) {
-                    if ($cmd->ch == $ret['handle']) {
-                        $cmd->cmd_cls->cb($cmd, $ret);
-                        break;
-                    }
-                }
+                if ($this->dbg > 1) { printf("Info_read miq: %d\n", $msgs_in_queue); }
+                if ($this->dbg > 2) { printf("CURL: curl_getinfo\n"); }
+
                 $info = curl_getinfo($ret['handle']);
                 if ($this->dbg > 1) {
                     printf("Getinfo:\n");
                     print_r($info);
                 }
+
+                foreach($this->cmd as $key => $cmd) {
+                    if ($cmd->ch == $ret['handle']) {
+                        if ($cmd->cmd_cls->process($cmd, $ret) == TRUE) {
+                            $this->cleanup($key);
+                        }
+                        break;
+                    }
+                }
             }
         } while ($msgs_in_queue > 0);
-        if ($this->dbg > 1) {
-            printf("CDS_cmd_cls::process - end (queue: %d)\n", $msgs_in_queue);
+        foreach ($this->cmd as $key => $cmd) {
+            if ($this->dbg > 2) { printf("Check tout, curr: %d tlimit %d\n", $curtime, $cmd->tlimit); }
+            if ($curtime > $cmd->tlimit) {
+                if ($this->dbg > 2) { printf("TIMEOUT REACHED!\n"); }
+                $cmd->cmd_cls->timeout($cmd);
+                $this->cleanup($key);
+            }
         }
+        if ($this->dbg > 1) { printf("CDS_cmd_cls::process - end (queue: %d)\n", $msgs_in_queue); }
     }
 
 }
\ No newline at end of file
index 6156ef1..fd89bae 100755 (executable)
@@ -28,13 +28,13 @@ class short_cmd_cls extends CDS_cmd_cls {
         }
 
         do {
-            if (($ch = parent::pre_create($url)) == FALSE)
+            if (($ch = parent::pre_create($cds, $url)) == FALSE)
                 break;
 
             if (parent::create($cds, $ch) == FALSE)
                 break;
 
-            $cmd = new short_cmd($ch, $this, "none currently");
+            $cmd = new short_cmd($this, $ch, "none currently");
 
             return $cmd;
         } while (FALSE);
@@ -42,11 +42,19 @@ class short_cmd_cls extends CDS_cmd_cls {
         return FALSE;
     }
 
-    function cb()
+    function process($cmd, $ret)
     {
+        printf("CURL: curl_multi_getcontent\n");
+        $content = curl_multi_getcontent($cmd->ch_get());
         if ($this->dbg_get() > 0) {
-            printf("short_cb:\n");
+            printf("short process: [%s]\n", $content);
         }
+        return TRUE;
+    }
+
+    function timeout($cmd)
+    {
+        printf("Short timeout function reached\n");
     }
 }
 
@@ -63,7 +71,7 @@ class long_cmd extends CDS_cmd {
 class long_cmd_cls extends CDS_cmd_cls {
     function long_cmd_cls()
     {
-        parent::__construct("long", 10);
+        parent::__construct("long", 5);
     }
 
     function create($cds, $url)
@@ -73,13 +81,13 @@ class long_cmd_cls extends CDS_cmd_cls {
         }
 
         do {
-            if (($ch = parent::pre_create($url)) == FALSE)
+            if (($ch = parent::pre_create($cds, $url)) == FALSE)
                 break;
 
             if (parent::create($cds, $ch) == FALSE)
                 break;
 
-            $cmd = new long_cmd($ch, $this, "none currently");
+            $cmd = new long_cmd($this, $ch, "none currently");
 
             return $cmd;
         } while (FALSE);
@@ -87,60 +95,74 @@ class long_cmd_cls extends CDS_cmd_cls {
         return FALSE;
     }
 
-    function cb()
+    function process($cmd, $ret)
     {
+        printf("CURL: curl_multi_getcontent\n");
+        $content = curl_multi_getcontent($cmd->ch_get());
         if ($this->dbg_get() > 0) {
-            printf("long_cb:\n");
+            printf("long process: [%s]\n", $content);
         }
+
+        return TRUE;
+    }
+
+    function timeout($cmd)
+    {
+        printf("Long timeout function reached\n");
     }
 }
 
 
 function main()
 {
+    $debug = 998;
     // create cds
-    $cds = new Curl_de_sac(999);
+    $cds = new Curl_de_sac($debug);
 
     // create cds_cmd 1
-    $cmd_cls1 = new short_cmd_cls();
+    $short_cls = new short_cmd_cls();
 
     // registrer cds_cmd 1
     printf("MAIN: Register CLS1\n");
-    if (($cds->cmd_cls_register($cmd_cls1)) == FALSE) {
+    if (($cds->cmd_cls_register($short_cls)) == FALSE) {
         fprintf(STDERR, "MAIN: cmd_cls1 registration failed\n");
         exit(1);
     }
 
     // create cds_cmd 2
-    $cmd_cls2 = new long_cmd_cls();
+    $long_cls = new long_cmd_cls();
 
     // register cds_cmd 2
     printf("MAIN: Register CLS2\n");
-    if (($cds->cmd_cls_register($cmd_cls2)) == FALSE) {
+    if (($cds->cmd_cls_register($long_cls)) == FALSE) {
         fprintf(STDERR, "MAIN: cmd_cls2 registration failed\n");
         exit(2);
     }
 
     // register cds_cmd 2 (retry)
     printf("MAIN: Re-register CLS2 (must go wrong)\n");
-    if (($cds->cmd_cls_register($cmd_cls2)) != FALSE) {
+    if (($cds->cmd_cls_register($long_cls)) != FALSE) {
         fprintf(STDERR, "MAIN: cmd_cls2 re-registration success\n");
         exit(3);
     }
 
     printf("MAIN: CDS:\n");
-    print_r($cds);
+    if (($debug & 1) == 1)
+        print_r($cds);
     printf("MAIN: Deregister CLS2\n");
-    if (($cds->cmd_cls_deregister($cmd_cls2)) == FALSE) {
+    if (($cds->cmd_cls_deregister($long_cls)) == FALSE) {
         fprintf(STDERR, "MAIN: cmd_cls2 deregistration failed\n");
         exit(4);
     }
-    printf("MAIN: CDS:\n");
-    print_r($cds);
-
+    printf("MAIN:");
+    if (($debug & 1) == 1) {
+        printf(" CDS:\n");
+        print_r($cds);
+    }
+    printf("\n");
     // re-re-register cds_cmd 2
     printf("MAIN: Re-re-register CLS2\n");
-    if (($cds->cmd_cls_register($cmd_cls2)) == FALSE) {
+    if (($cds->cmd_cls_register($long_cls)) == FALSE) {
         fprintf(STDERR, "MAIN: cmd_cls2 re-re-registration failed\n");
         exit(5);
     }
@@ -150,33 +172,65 @@ function main()
 
     // registrer cds_cmd 1
     printf("MAIN: register CLS1\n");
-    if (($cds->cmd_cls_register($cmd_cls1)) == FALSE) {
+    if (($cds->cmd_cls_register($short_cls, 10)) == FALSE) {
         fprintf(STDERR, "MAIN: cmd_cls1 registration failed\n");
         exit(1);
     }
 
     // register cds_cmd 2
     printf("MAIN: register CLS2\n");
-    if (($cds->cmd_cls_register($cmd_cls2)) == FALSE) {
+    if (($cds->cmd_cls_register($long_cls, 4)) == FALSE) {
         fprintf(STDERR, "MAIN: cmd_cls2 registration failed\n");
         exit(2);
     }
-    printf("MAIN: CDS:\n");
-    print_r($cds);
-    printf("MAIN: SUCCESS\n");
+    printf("MAIN:");
+    if (($debug & 1) == 1) {
+        printf(" CDS:\n");
+        print_r($cds);
+    }
+    printf("\n");
+
+    // for ($i = -15 ; $i < 30 ; $i++) {
+    for ($i = 0 ; $i < 20 ; $i++) {
+        printf("MAIN: START ITERATION %d\n", $i);
 
-    for ($i = 0 ; $i < 10 ; $i++) {
-        if ($i == 2) {
+         if ($i == 2) {
             printf("MAIN: load short\n");
             if ($cds->execute("short", WEBURL.'/short.php') == FALSE) {
                 printf("MAIN: push command failed\n");
                 exit(123);
             }
         }
+
+         if ($i == 3) {
+            printf("MAIN: load short\n");
+            if ($cds->execute("short", WEBURL.'/short.php') == FALSE) {
+                printf("MAIN: push command failed\n");
+                exit(123);
+            }
+        }
+
+        if ($i == 4) {
+            printf("MAIN: load long\n");
+            if ($cds->execute("long", WEBURL.'/long.php') == FALSE) {
+                printf("MAIN: push command failed\n");
+                exit(123);
+            }
+        }
+
+        printf("MAIN:");
+        if (($debug & 1) == 1) {
+            printf(" CDS:\n");
+            print_r($cds);
+        }
+        printf("\n");
+
         printf("MAIN: Call process\n");
         $cds->process();
-        usleep(500000);
+        sleep(1);
     }
+    printf("MAIN: finished, dump cds:\n");
+    print_r($cds);
     // start loop
     //   print status
     //   if input data execute some command