Downloader类
此类扮演的是下载器的角色,以下列举的是其常用的回调API以及普通API。
注意:由于主机禁止出现敏感IP,请自行将 localhost 替换为 目标IP。
onDownloaderStart
参数
@param object $downloader 代表 downloader 实例
含义
1. 设置Downloader子进程启动时的回调函数,每个子进程启动时都会执行。
2. 回调属性 onDownloaderStart 是在子进程启动时运行的,总共会运行 $downloader->count 次。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloaderStart = function($downloader){
echo "{$downloader->id}号下载器进程启动成功";
};
onDownloaderStop
参数
@param object $downloader 代表 downloader 实例
含义
1. 设置Downloader子进程停止时的回调函数,每个子进程停止时都会执行。
2. 回调属性 onDownloaderStop 是在子进程停止时运行的,总共会运行 $downloader->count 次。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloaderStop = function($downloader){
echo "{$downloader->id}号下载器进程停止成功";
};
onDownloaderReload
参数
@param object $downloader 代表 downloader 实例
含义
此回调属性不常用到,用于设置下载器收到reload信号后执行的回调。
说明:
利用onDownloaderReload回调做很多事情,例如在不需要重启进程的情况下重新加载业务配置文件。
子进程收到reload信号默认的动作是退出重启,以便新进程重新加载业务代码完成代码更新。
所以reload后子进程在执行完onDownloaderReload回调后便立刻退出是正常现象。
如果在收到reload信号后只想让子进程执行onDownloaderReload而不想退出,可以在初始化
Downloader实例时设置对应的Downloader实例的reloadable属性为false。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
//设置reloadable为false,即子进程收到reload信号不执行重启。
$downloader->reloadable = false;
$downloader->onDownloaderReload = function($downloader){
//some code here
};
onDownloaderMessage
参数
@param object $downloader 代表 downloader 实例
@param array $parser_reply 代表 parser 端响应回来的数据
含义
当收到 parser 端的响应数据时触发的回调。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloaderMessage = function($downloader, $parser_reply){
print_r($parser_reply);
};
onDownloaderConnectToParser
参数
@param object $connetion 代表 client connection 实例
含义
当下载器进程成功建立到解析器的异步连接时触发此回调
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloaderConnectToParser = function($connection){
//$connection->bufferFull = true;
};
onDownloadBefore
别名
onBeforeDownload
参数
@param object $downloader 代表 downloader 实例
@param array $task 代表任务数组
含义
在任务下载之前触发此回调。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloadBefore = function($downloader, $task){
$downloader->httpClient->setConnectTimeout(5);
$downloader->httpClient->setTransferTimeout(8);
};
onDownloadStart
别名
onStartDownload
参数
@param object $downloader 代表 downloader 实例
@param array $task 代表任务数组
含义
在任务开始下载时触发此回调。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloadStart = function($downloader, $task){
};
onDownloadAfter
别名
onAfterDownload
参数
@param object $downloader 代表 downloader 实例
@param array $task 代表任务数组
含义
在任务下载之后触发此回调。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloadAfter = function($downloader, $task){
$downloader->setTaskCrawlInterval(2);
};
onDownloadFail
别名
onFailDownload
参数
@param object $downloader 代表 downloader 实例
@param array $error 代表错误信息
@param array $task 代表任务数组
含义
在任务下载失败之后触发此回调。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloadFail = function($downloader, $error, $task){
print_r($error);
};
onDownloadTaskEmpty
别名
onTaskEmpty
参数
@param object $downloader 代表 downloader 实例
含义
在任务消费为空之后触发此回调。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloadTaskEmpty = function($downloader){
};
setName ( string $name ) : object
参数
@param string $name 代表downloader实例的名称,默认none。
含义
设置downloader实例的名称,只能是数字或字母或组合,最多15个字符。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->setName('downloader1');
setCount ( int $count, boolean $prepfer_by_cpu_cores = false ) : object
参数
@param int $count 代表downloader实例的进程数量,默认1。
@param boolean $prepfer_by_cpu_cores 根据CPU核数自动计算最优进程数,拥有最高优先级。
含义
设置downloader实例的进程数量。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->setCount(2); //设置启动2个进程。
$downloader->setCount(1, true); //假如是4核处理器,那么将启动8个进程。
setRouter ( Closure $router ) : object
参数
@param closure $router 路由回调函数
回调函数的参数
@param object $downloader 下载器实例
@param array $connections 下载器与解析器集群之间的长连接集合
$connections = [
'localhost:8888' => [$connection1, $connection2, ...., $connectionN];
'localhost:9999' => [$connection1, $connection2, ...., $connectionN];
......
];
含义
设置下载器到解析器集群之间的路由回调,
既可以路由至具体的解析器服务器,也可以路由至目标服务器上的某些连接。
说明
此方法的缺点是:无法根据业务进行路由,如果需要根据业务路由,可查看方法 routerToParser()。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
//比如:设置路由到特定的 parser 服务器:localhost:8888
$downloader->setRouter(function($downloader, $connections){
$_connections = [];
$target_server = 'localhost:8888';
foreach($connections as $server => $connection)
{
$server == $target_server && $_connections[$target_server] = $connection;
}
return $_connections;
});
routerToParser( string $parser = '', string $algorithm = 'random' ) : string
参数
@param string $parser 目标parser服务器地址 [ip:port]
@param string $algorithm 路由策略,默认策略是random [random | modula]
含义
路由至指定的一台目标parser服务器,或者根据路由策略路由至某一台目标parser服务器
说明
1. 既可直接通过任务配置设定parser项,也可通过回调API调用本方法进行路由。
2. 本方法的优先级高于配置设定的优先级。
3. 设定的parser项如果留空或者类型不符或者是无效的socket地址串,
则引擎默认会根据随机策略随机路由至一台目标parser服务器。
4. 该方法的优点是:可以根据业务种类进行路由,弥补了setRouter()方法的缺陷。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->onDownloadBefore = function($downloader, $task){
if('list' == $task['type']){
$task['channel'] = $downloader->routerToParser("localhost:8888");
}elseif('detail' == $task['type']){
$task['channel'] = $downloader->routerToParser("localhost:9999");
}
return $task;
};
setTaskCrawlInterval ( float $interval ) : object
参数
@param float $interval 爬取任务的间隔时间,单位:秒,最小支持0.001秒,默认1秒。
含义
设置消费任务的间隔时间,即:爬取任务的间隔时间。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
//设置爬取任务间隔时间为2秒
$downloader->setTaskCrawlInterval(2);
setSendBufferSize ( int $size ) : object
参数
@param int $size 发送缓冲区大小
含义
设置Downloader进程发送消息到Parser进程时的发送缓冲区大小,单位:字节;
本质上设置的是Downloader进程内所有异步任务连接的发送缓冲区大小,默认10M。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
//设置发送缓冲区大小为1M
$downloader->setSendBufferSize(102400);
setClientSocketAddress ( $address = '', $key = 'parser' ) : object
参数
@param string|array $address 目标socket地址
@param string $key 目标服务器标识
含义
当连接目标 socket 服务器时,根据key设置客户端对应 socket 地址。
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->setClientSocketAddress([
'ws://localhost:8888',
'ws://localhost:9999',
]);
appendClientSocketAddress ( $address = '', $key = 'parser' ) : object
参数
@param string|array $address 目标socket地址
@param string $key 目标服务器标识
含义
当连接目标 socket 服务器时,根据key追加客户端对应 socket 地址
举例
<?php
use PHPCreeper\Downloader;
$downloader = new Downloader;
$downloader->appendClientSocketAddress([
'ws://localhost:8888',
'ws://localhost:9999',
]);