From aa32018692a341fb130b5632ab2a1a58041de239 Mon Sep 17 00:00:00 2001 From: "Matteo Nastasi (mop)" Date: Tue, 9 Sep 2014 18:40:12 +0200 Subject: [PATCH] working version with timeout and cleanup of exausted handles --- TODO.txt | 9 +++- web/Obj/curl-de-sac.phh | 104 +++++++++++++++++++++++++++---------- webtest/cds_test01.php | 112 +++++++++++++++++++++++++++++----------- 3 files changed, 168 insertions(+), 57 deletions(-) diff --git a/TODO.txt b/TODO.txt index 259ab81..31fbefd 100644 --- a/TODO.txt +++ b/TODO.txt @@ -1,3 +1,8 @@ -- timeout handling -- remove cmd from list when accomplished +TODO LIST + +- move name and timeout class to register function + to be able to use the same class different times + DONE - debugging system +DONE - timeout handling +DONE - remove cmd from list when accomplished diff --git a/web/Obj/curl-de-sac.phh b/web/Obj/curl-de-sac.phh index 4ef3644..01ba78b 100644 --- a/web/Obj/curl-de-sac.phh +++ b/web/Obj/curl-de-sac.phh @@ -27,11 +27,18 @@ $G_curl_de_sac_version = "0.1"; class CDS_cmd { var $cmd_cls; var $ch; + var $tlimit; function CDS_cmd($cmd_cls, $ch) { $this->cmd_cls = $cmd_cls; $this->ch = $ch; + $this->tlimit = time() + $cmd_cls->tout; + } + + function ch_get() + { + return ($this->ch); } function dbg_get() @@ -60,14 +67,16 @@ class CDS_cmd_cls { $this->cds = $cds; } - static function pre_create($url) + static function pre_create($cds, $url) { + if ($cds->dbg_get() > 2) { printf("CURL: curl_init\n"); } if (($ch = curl_init()) == FALSE) return FALSE; curl_setopt($ch, CURLOPT_URL, $url); curl_setopt($ch, CURLOPT_HEADER, 0); curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); - + curl_setopt($ch, CURLOPT_FORBID_REUSE, true); + curl_setopt($ch, CURLOPT_HTTPHEADER, array('Connection: close')); return ($ch); } @@ -75,24 +84,33 @@ class CDS_cmd_cls { { if ($cds->dbg > 2) { printf("CDS_cmd_cls::create - begin\n"); - print_r($ch); + printf("CURL: curl_multi_add_handle\n"); } if (($ret = curl_multi_add_handle($cds->mh, $ch)) != 0) { // INFO: $ret is a CURLM_XXX errors code return (FALSE); } - if ($cds->dbg > 2) { - printf("CDS_cmd_cls::create - end\n"); - } + if ($cds->dbg > 2) { printf("CDS_cmd_cls::create - end\n"); } return (TRUE); } - function cb($cmd, $ret) + function process($cmd, $ret) { - print "THIS MUST BE IMPLEMENTED"; + fprintf(STDERR, "process MUST BE IMPLEMENTED"); + exit(123); + } + + function timeout($cmd) + { + fprintf(STDERR, "timeout MUST BE IMPLEMENTED"); exit(123); } + + function dbg_get() + { + return $this->cds->dbg; + } } class Curl_de_sac { @@ -102,6 +120,7 @@ class Curl_de_sac { var $dbg; function Curl_de_sac($dbg=0) { + if ($dbg > 2) { printf("CURL: curl_multi_init\n"); } $this->mh = curl_multi_init(); $this->cmd_cls = array(); $this->cmd = array(); @@ -154,6 +173,26 @@ class Curl_de_sac { $this->cmd_cls = array(); } + + function cleanup($key) + { + $cmd = $this->cmd[$key]; + + if ($this->dbg > 2) { + printf("cleanup\n"); + printf("CURL: curl_multi_remove_handle:\n"); + print_r($cmd->ch_get()); + printf("\n"); + } + // return 0 on SUCCESS or CURLM_XXX in other cases + if (($ret = curl_multi_remove_handle($this->mh, $cmd->ch_get())) != 0) { + fprintf(STDERR, "CURL: curl_multi_remove_handle FAILED (%d)\n", $ret); + } + if ($this->dbg > 2) { printf("CURL: curl_close\n"); } + curl_close($cmd->ch_get()); + unset($this->cmd[$key]); + } + function execute() { $args = func_get_args(); @@ -176,46 +215,59 @@ class Curl_de_sac { break; array_push($this->cmd, $inst); - if ($this->dbg > 1) { - printf("CDS_cmd_cls::process - execute push cmd\n"); - print_r($this->cmd); - } + if ($this->dbg > 1) { printf("CDS_cmd_cls::process - execute push cmd\n"); } + if (($this->dbg & 1) == 1) { print_r($this); } + return TRUE; } while (FALSE); return FALSE; } - function process() + function process($curtime=0) { - if ($this->dbg > 1) { - printf("CDS_cmd_cls::process - begin\n"); + if ($curtime == 0) { + $curtime = time(); } + if ($this->dbg > 1) { printf("CDS_cmd_cls::process - begin\n"); } $running = NULL; + + if ($this->dbg > 2) { printf("CURL: curl_multi_exec\n"); } $ret = curl_multi_exec($this->mh, $running); $msgs_in_queue = NULL; do { + if ($this->dbg > 2) { printf("CURL: curl_multi_info_read\n"); } + if ($ret = curl_multi_info_read ($this->mh, $msgs_in_queue)) { - if ($this->dbg > 1) - printf("Info_read miq: %d\n", $msgs_in_queue); - - foreach($this->cmd as $cmd) { - if ($cmd->ch == $ret['handle']) { - $cmd->cmd_cls->cb($cmd, $ret); - break; - } - } + if ($this->dbg > 1) { printf("Info_read miq: %d\n", $msgs_in_queue); } + if ($this->dbg > 2) { printf("CURL: curl_getinfo\n"); } + $info = curl_getinfo($ret['handle']); if ($this->dbg > 1) { printf("Getinfo:\n"); print_r($info); } + + foreach($this->cmd as $key => $cmd) { + if ($cmd->ch == $ret['handle']) { + if ($cmd->cmd_cls->process($cmd, $ret) == TRUE) { + $this->cleanup($key); + } + break; + } + } } } while ($msgs_in_queue > 0); - if ($this->dbg > 1) { - printf("CDS_cmd_cls::process - end (queue: %d)\n", $msgs_in_queue); + foreach ($this->cmd as $key => $cmd) { + if ($this->dbg > 2) { printf("Check tout, curr: %d tlimit %d\n", $curtime, $cmd->tlimit); } + if ($curtime > $cmd->tlimit) { + if ($this->dbg > 2) { printf("TIMEOUT REACHED!\n"); } + $cmd->cmd_cls->timeout($cmd); + $this->cleanup($key); + } } + if ($this->dbg > 1) { printf("CDS_cmd_cls::process - end (queue: %d)\n", $msgs_in_queue); } } } \ No newline at end of file diff --git a/webtest/cds_test01.php b/webtest/cds_test01.php index 6156ef1..fd89bae 100755 --- a/webtest/cds_test01.php +++ b/webtest/cds_test01.php @@ -28,13 +28,13 @@ class short_cmd_cls extends CDS_cmd_cls { } do { - if (($ch = parent::pre_create($url)) == FALSE) + if (($ch = parent::pre_create($cds, $url)) == FALSE) break; if (parent::create($cds, $ch) == FALSE) break; - $cmd = new short_cmd($ch, $this, "none currently"); + $cmd = new short_cmd($this, $ch, "none currently"); return $cmd; } while (FALSE); @@ -42,11 +42,19 @@ class short_cmd_cls extends CDS_cmd_cls { return FALSE; } - function cb() + function process($cmd, $ret) { + printf("CURL: curl_multi_getcontent\n"); + $content = curl_multi_getcontent($cmd->ch_get()); if ($this->dbg_get() > 0) { - printf("short_cb:\n"); + printf("short process: [%s]\n", $content); } + return TRUE; + } + + function timeout($cmd) + { + printf("Short timeout function reached\n"); } } @@ -63,7 +71,7 @@ class long_cmd extends CDS_cmd { class long_cmd_cls extends CDS_cmd_cls { function long_cmd_cls() { - parent::__construct("long", 10); + parent::__construct("long", 5); } function create($cds, $url) @@ -73,13 +81,13 @@ class long_cmd_cls extends CDS_cmd_cls { } do { - if (($ch = parent::pre_create($url)) == FALSE) + if (($ch = parent::pre_create($cds, $url)) == FALSE) break; if (parent::create($cds, $ch) == FALSE) break; - $cmd = new long_cmd($ch, $this, "none currently"); + $cmd = new long_cmd($this, $ch, "none currently"); return $cmd; } while (FALSE); @@ -87,60 +95,74 @@ class long_cmd_cls extends CDS_cmd_cls { return FALSE; } - function cb() + function process($cmd, $ret) { + printf("CURL: curl_multi_getcontent\n"); + $content = curl_multi_getcontent($cmd->ch_get()); if ($this->dbg_get() > 0) { - printf("long_cb:\n"); + printf("long process: [%s]\n", $content); } + + return TRUE; + } + + function timeout($cmd) + { + printf("Long timeout function reached\n"); } } function main() { + $debug = 998; // create cds - $cds = new Curl_de_sac(999); + $cds = new Curl_de_sac($debug); // create cds_cmd 1 - $cmd_cls1 = new short_cmd_cls(); + $short_cls = new short_cmd_cls(); // registrer cds_cmd 1 printf("MAIN: Register CLS1\n"); - if (($cds->cmd_cls_register($cmd_cls1)) == FALSE) { + if (($cds->cmd_cls_register($short_cls)) == FALSE) { fprintf(STDERR, "MAIN: cmd_cls1 registration failed\n"); exit(1); } // create cds_cmd 2 - $cmd_cls2 = new long_cmd_cls(); + $long_cls = new long_cmd_cls(); // register cds_cmd 2 printf("MAIN: Register CLS2\n"); - if (($cds->cmd_cls_register($cmd_cls2)) == FALSE) { + if (($cds->cmd_cls_register($long_cls)) == FALSE) { fprintf(STDERR, "MAIN: cmd_cls2 registration failed\n"); exit(2); } // register cds_cmd 2 (retry) printf("MAIN: Re-register CLS2 (must go wrong)\n"); - if (($cds->cmd_cls_register($cmd_cls2)) != FALSE) { + if (($cds->cmd_cls_register($long_cls)) != FALSE) { fprintf(STDERR, "MAIN: cmd_cls2 re-registration success\n"); exit(3); } printf("MAIN: CDS:\n"); - print_r($cds); + if (($debug & 1) == 1) + print_r($cds); printf("MAIN: Deregister CLS2\n"); - if (($cds->cmd_cls_deregister($cmd_cls2)) == FALSE) { + if (($cds->cmd_cls_deregister($long_cls)) == FALSE) { fprintf(STDERR, "MAIN: cmd_cls2 deregistration failed\n"); exit(4); } - printf("MAIN: CDS:\n"); - print_r($cds); - + printf("MAIN:"); + if (($debug & 1) == 1) { + printf(" CDS:\n"); + print_r($cds); + } + printf("\n"); // re-re-register cds_cmd 2 printf("MAIN: Re-re-register CLS2\n"); - if (($cds->cmd_cls_register($cmd_cls2)) == FALSE) { + if (($cds->cmd_cls_register($long_cls)) == FALSE) { fprintf(STDERR, "MAIN: cmd_cls2 re-re-registration failed\n"); exit(5); } @@ -150,33 +172,65 @@ function main() // registrer cds_cmd 1 printf("MAIN: register CLS1\n"); - if (($cds->cmd_cls_register($cmd_cls1)) == FALSE) { + if (($cds->cmd_cls_register($short_cls, 10)) == FALSE) { fprintf(STDERR, "MAIN: cmd_cls1 registration failed\n"); exit(1); } // register cds_cmd 2 printf("MAIN: register CLS2\n"); - if (($cds->cmd_cls_register($cmd_cls2)) == FALSE) { + if (($cds->cmd_cls_register($long_cls, 4)) == FALSE) { fprintf(STDERR, "MAIN: cmd_cls2 registration failed\n"); exit(2); } - printf("MAIN: CDS:\n"); - print_r($cds); - printf("MAIN: SUCCESS\n"); + printf("MAIN:"); + if (($debug & 1) == 1) { + printf(" CDS:\n"); + print_r($cds); + } + printf("\n"); + + // for ($i = -15 ; $i < 30 ; $i++) { + for ($i = 0 ; $i < 20 ; $i++) { + printf("MAIN: START ITERATION %d\n", $i); - for ($i = 0 ; $i < 10 ; $i++) { - if ($i == 2) { + if ($i == 2) { printf("MAIN: load short\n"); if ($cds->execute("short", WEBURL.'/short.php') == FALSE) { printf("MAIN: push command failed\n"); exit(123); } } + + if ($i == 3) { + printf("MAIN: load short\n"); + if ($cds->execute("short", WEBURL.'/short.php') == FALSE) { + printf("MAIN: push command failed\n"); + exit(123); + } + } + + if ($i == 4) { + printf("MAIN: load long\n"); + if ($cds->execute("long", WEBURL.'/long.php') == FALSE) { + printf("MAIN: push command failed\n"); + exit(123); + } + } + + printf("MAIN:"); + if (($debug & 1) == 1) { + printf(" CDS:\n"); + print_r($cds); + } + printf("\n"); + printf("MAIN: Call process\n"); $cds->process(); - usleep(500000); + sleep(1); } + printf("MAIN: finished, dump cds:\n"); + print_r($cds); // start loop // print status // if input data execute some command -- 2.17.1