记住用户名密码
最近项目上线一个模块需要获取火币的K线数据,初期我用的Workerman定时任务每秒通过URL请求获取,做出来之后老板感觉数据实时性不强,要优化,没办法我只能继续研究,幸好在GitHub看到一个老哥写的通过WebSocket获取火币数据的,话不多说,下面直接开始上代码。第一次写博客,写的如果不好,还请大家见谅。
URL请求方法
发现这种方法实现起来也有坑,网上大部分都只是贴出接口文档和代码,但是实际操作会发现无法请求火币服务器,为啥,因为人家在国外,偶尔能请求概率也很低,所以代码只能放到外网服务器才能执行,这样一来开发调试就很麻烦。后面我就想了一个办法,找一台外网服务器,布置一个脚本代理请求(非火币的外网请求也可以),这样在国内也可以请求火币接口了,调试什么的方便多了。
下面贴出代理脚本代码:
$url = urldecode($_GET['url']);
if ($url) {
echo curl_get($url);
die();
}else{
echo "How are you";
}
function curl_get($url, $timeout = 5)
{
$ssl = substr($url, 0, 8) == "https://" ? TRUE : FALSE;
$ch = curl_init();
$headers = array(
"Content-Type: application/json charset=utf-8",
'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36',
);
$opt = array(
CURLOPT_URL => $url,
CURLOPT_HEADER => 0,
CURLOPT_CUSTOMREQUEST => strtoupper('GET'),
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_TIMEOUT => $timeout,
CURLOPT_HTTPHEADER => $headers,
);
if ($ssl) {
$opt[CURLOPT_SSL_VERIFYHOST] = false;
$opt[CURLOPT_SSL_VERIFYPEER] = FALSE;
}
curl_setopt_array($ch, $opt);
$result = curl_exec($ch);
curl_close($ch);
return $result;
}
获取数据的代码:
$now = time();
$diff = intval((strtotime(date('Y-m-d H:i:00'), $now) - $find['add_time']) / self::$time_list[$period]);
$size = $diff + 1 > 2000 ? 2000 : $diff + 1;
$url = "https://api.huobipro.com/market/history/kline?period={$period}&size={$size}&symbol={$symbol}";
$log .= ",url:{$url}";
//如果服务器在国内,需要把public目录下的post.php文件部署到外网服务器代理请求火币api
$post_url = 'http://xxx.com/post.php?url='.urlencode($url);
$log .= ",post_url:{$post_url}";
$res = self::curl_get($post_url, 5);
if (!$res) throw new Exception(lang('火币请求失败'));
$res = json_decode($res, true);
if ($res['status'] != 'ok') throw new Exception("火币网返回错误,err-code:{$res['err-code']},err-msg:{$res['err-msg']}");
if (empty($res['data'])) throw new Exception("火币网返回数据为空");
$huobi = $res['data'];
$ids = array_column($huobi,'id');
array_multisort($ids,SORT_ASC,$huobi);
$add_list = [];
$update_list = [];
foreach ($huobi as $key1 => $value1) {
$where1 = $where;
$where1['add_time'] = $value1['id'];
$find1 = (new self)->where($where1)->order('id', 'desc')->find();
if ($find1) {//记录已存在,更新已有记录
$update_list[] = [
'id'=>$find1['id'],
'open_price'=>number_format($value1['open'],6,".",""),
'close_price'=>number_format($value1['close'],6,".",""),
'high_price'=>number_format($value1['high'],6,".",""),
'low_price'=>number_format($value1['low'],6,".",""),
'amount'=>number_format($value1['amount'],6,".",""),
'count'=>number_format($value1['count'],6,".",""),
'vol'=>number_format($value1['vol'],6,".",""),
'ch'=>$res['ch'],
//'add_time'=>$value1['id'],
'update_time'=>time(),
];
}
else {
$add_list[] = [
'period'=>$period,
'symbol'=>$symbol,
'open_price'=>number_format($value1['open'],6,".",""),
'close_price'=>number_format($value1['close'],6,".",""),
'high_price'=>number_format($value1['high'],6,".",""),
'low_price'=>number_format($value1['low'],6,".",""),
'amount'=>number_format($value1['amount'],6,".",""),
'count'=>number_format($value1['count'],6,".",""),
'vol'=>number_format($value1['vol'],6,".",""),
'ch'=>$res['ch'],
'add_time'=>$value1['id'],
'update_time'=>time(),
];
}
}
static function curl_get($url, $timeout = 30)
{
$ssl = substr($url, 0, 8) == "https://" ? TRUE : FALSE;
$ch = curl_init();
$headers = array(
"Content-Type: application/json charset=utf-8",
'User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/80.0.3987.122 Safari/537.36',
);
$opt = array(
CURLOPT_URL => $url,
CURLOPT_HEADER => 0,
CURLOPT_CUSTOMREQUEST => strtoupper('GET'),
CURLOPT_RETURNTRANSFER => 1,
CURLOPT_TIMEOUT => $timeout,
CURLOPT_HTTPHEADER => $headers,
);
if ($ssl) {
$opt[CURLOPT_SSL_VERIFYHOST] = false;
$opt[CURLOPT_SSL_VERIFYPEER] = FALSE;
$opt[CURLOPT_SSLVERSION] = 3;
}
curl_setopt_array($ch, $opt);
$result = curl_exec($ch);
if (!$result) {
$error = curl_error($ch);
$errno = curl_errno($ch);
Log::write("curl_get,url:{$url},error:{$error},error:{$errno}", 'INFO');
}
curl_close($ch);
return $result;
}
WebSocket方法获取
这种方法主要是把服务器当成一个WebSocket Client连接到火币的WebSocket服务器,订阅交易对的不同时间粒度的K线数据,成功之后火币服务器就会在K线数据变化的时候主动推送消息给到服务器,接收到推送之后就可以进行存储等操作,同时服务器也把接收到的推送消息再推送给连接到服务端的WebSocket Client。
因为火币服务器在外网,所以采用WebSocket方法也需要部署一台国外服务器,这台服务器用于向火币订阅数据并接收火币推送,其他国内服务器就可以通过连接这台服务器获取火币K线数据。这种应该也可以分布式部署,大家可以试一下。结构图如下:

连接到火币服务器代码:
$info = "连接到服务器:{$this->host}";
echo "\r\n".$info;
$this->saveLog("huobi", $info);
// 异步建立一个到火币服务器的连接
$con = new AsyncTcpConnection($this->host);
if ($this->flag) {//正式环境
$con->transport = 'ssl';
}
$con->onConnect = function($con)
{
$this->onAsyncConnect($con);
};
// 当服务器连接发来数据时,转发给对应客户端的连接
$con->onMessage = function($con, $message) use($worker)
{
$this->onAsyncMessage($con, $message, $worker);
};
$con->onError = function($con, $err_code, $err_msg)
{
echo "$err_code, $err_msg";
$info = "Async onError err_code:{$err_code},err_msg:{$err_msg}";
echo "\r\n ".$info;
$this->saveLog("huobi", $info);
};
$con->onClose = function($con)
{
$info = "Async onClose";
echo "\r\n ".$info;
$this->saveLog("huobi", $info);
$this->reconnect_num++;//重连次数+1
//重连之前先更新K线数据
$info = "更新K线数据-重连之前-start:".date('Y-m-d H:i:s');
echo "\r\n ".$info;
$this->saveLog("huobi", $info);
foreach ($this->trade_list as $value) {
$symbol = $value;
foreach ($this->time_list as $k => $v) {
$info = "create_kline:{$symbol}-{$k}";
echo "\r\n ".$info;
$this->saveLog("huobi", $info);
$r = \app\common\model\TradeKlineKline::create_kline($symbol, $k);
if ($r['code'] == SUCCESS) {
}
}
};
$info = "更新K线数据-重连之前-end:".date('Y-m-d H:i:s');
echo "\r\n ".$info;
$this->saveLog("huobi", $info);
// 如果连接断开,则在1秒后重连
$con->reConnect(1);
};
// 执行异步连接
$con->connect();
//连接火币成功回调方法
function onAsyncConnect($con)
{
$this->async_message_time = time();
$info = "连接到服务器:{$this->host},成功";
echo "\r\n".$info;
$this->saveLog("huobi", $info);
$info = "开始订阅K线数据";
echo "\r\n".$info;
$this->saveLog("huobi", $info);
//$this->saveLog("huobi", 'onAsyncConnect:'.print_r($con, true));
$this->saveLog("huobi", 'onAsyncConnect,cid:'.$con->id.',reconnect_num:'.$this->reconnect_num);
$make = explode(',', TradeConfig::get_value('trade_kline_symbols', 'btcusdt,ethusdt,eosusdt,ltcusdt,etcusdt'));
$this->huobi_id = $con->id;
foreach ($make as $key => $value) {
$symbol = $value;
foreach ($this->time_list as $k => $v) {
$info = "sub:{$symbol}-{$k}";
echo "\r\n".$info;
$this->saveLog("huobi", $info);
$data = json_encode([ //行情
'sub' => "market." . $symbol . ".kline." . $k,
'id' => "id" . time(),
'freq-ms' => 5000
]);
$con->send($data);
}
}
/*foreach ($this->trade_list as $key => $value) {
$symbol = $key;
foreach ($this->time_list as $k => $v) {
echo "sub:{$symbol}-{$k}\r\n";
$data = json_encode([ //行情
'sub' => "market." . $symbol . ".kline." . $k,
'id' => "id" . time(),
'freq-ms' => 5000
]);
$con->send($data);
}
};*/
}
//接收到火币推送回调方法
function onAsyncMessage($con, $message, $worker)
{
$data = json_decode($message, true);
if (!$data) {//说明采用了GZIP压缩
$data = gzdecode($message);
$this->saveLog("huobi", $data);
$data = json_decode($data, true);
}
else {
$this->saveLog("huobi", $message);
}
if(isset($data['ping'])) {
$this->async_message_time = time();
$con->send(json_encode([
"pong" => $data['ping']
]));
// 给客户端心跳
foreach($this->all_cons as $kk=>$vv){
if (array_key_exists($vv["sid"], $worker->connections)) {
$info = "\r\n sid ".$vv["sid"]." send ping";
echo $info;
$this->saveLog("all", $info);
$worker->connections[$vv["sid"]]->send(json_encode($data));
}
else {
unset($this->all_cons[$kk]);
}
}
} else if (isset($data['ch'])) {
$this->async_message_time = time();
$info = "接收到推送,ch:{$data['ch']}";
echo "\r\n".$info;
$this->saveLog("huobi", $info);
//Log::write(print_r($data, true), 'INFO');
$symbol = $data["ch"];
$info = "\r\n on mess size:".sizeof($this->all_cons)." conn-size: ".sizeof($worker->connections)." symbol:".$symbol;
echo $info;
$this->saveLog("all", $info);
$pieces = explode(".", $data['ch']);
switch ($pieces[2]) {
case "kline": //行情图
$market = $pieces[1]; //火币对
if (in_array($market, $this->symbol_list)) {
$period = $pieces[3];
$tick = $data['tick'];
//tick 说明
//"tick": {
// "id": K线id,
// "amount": 成交量,
// "count": 成交笔数,
// "open": 开盘价,
// "close": 收盘价,当K线为最晚的一根时,是最新成交价
// "low": 最低价,
// "high": 最高价,
// "vol": 成交额, 即 sum(每一笔成交价 * 该笔的成交量)
//}
$id = $tick['id'];
$where = [
'period'=>$period,
'symbol'=>$market,
'add_time'=>$id,
];
$find1 = \app\common\model\TradeKline::where($where)->order('id', 'desc')->find();
if ($find1) {//记录已存在,更新已有记录
if ($find1['open_price'] != $tick['open'] ||
$find1['close_price'] != $tick['close'] ||
$find1['high_price'] != $tick['high'] ||
$find1['low_price'] != $tick['low'] ||
$find1['amount'] != $tick['amount'] ||
$find1['count'] != $tick['count'] ||
$find1['vol'] != $tick['vol']) {//没有数据变化不做更新
$update_list[] = [
'id'=>$find1['id'],
'open_price'=>number_format($tick['open'],6,".",""),
'close_price'=>number_format($tick['close'],6,".",""),
'high_price'=>number_format($tick['high'],6,".",""),
'low_price'=>number_format($tick['low'],6,".",""),
'amount'=>number_format($tick['amount'],6,".",""),
'count'=>number_format($tick['count'],6,".",""),
'vol'=>number_format($tick['vol'],6,".",""),
'update_time'=>time(),
];
$kline = new \app\common\model\TradeKline;
$res2 = $kline->isUpdate()->saveAll($update_list);
if (empty($res2)) {
var_dump(lang('更新记录失败-2').'-in line:'.__LINE__);
//throw new Exception(lang('更新记录失败-2').'-in line:'.__LINE__);
}
}
}
else {
$add_list[] = [
'period'=>$period,
'symbol'=>$market,
'open_price'=>number_format($tick['open'],6,".",""),
'close_price'=>number_format($tick['close'],6,".",""),
'high_price'=>number_format($tick['high'],6,".",""),
'low_price'=>number_format($tick['low'],6,".",""),
'amount'=>number_format($tick['amount'],6,".",""),
'count'=>number_format($tick['count'],6,".",""),
'vol'=>number_format($tick['vol'],6,".",""),
'ch'=>$data['ch'],
'add_time'=>$id,
'update_time'=>time(),
];
$kline = new \app\common\model\TradeKline;
$res1 = $kline->saveAll($add_list);
if (empty($res1)) {
var_dump(lang('插入记录失败').'-in line:'.__LINE__);
//throw new Exception(lang('插入记录失败').'-in line:'.__LINE__);
}
}
}
break;
}
$time_1 = microtime(true);
if (array_key_exists($symbol, $this->all_symbols)) {
foreach ($this->all_symbols[$symbol] as $key => $val) {
$info = " symbol ".$symbol." | ch ".$data["ch"]." sid ".$val." send \r\n";
echo $info;
$this->saveLog("all", $info);
$worker->connections[$val]->send(json_encode($data));
}
}
$time_2 = microtime(true);
$cost = $time_2 - $time_1;
if ($cost > 1) {
$info = " symbol ".$symbol." | ch ".$data["ch"]." cost {$cost} \r\n";
echo $info;
$this->saveLog("all", $info);
}
}
else {
echo "undefind message\r\n";
var_dump($data);
}
}
目前有 0 条留言 其中:访客:0 条, 博主:0 条