Python量化-如何获取实时股票信息

如何获取实时股票信息

股票信息的接口有很多,之前大家常用的是新浪的,但在年初的时候,新浪的接口突然不能使用,给大家造成了很大的困扰,为此网上也有很多教程教大家如何从新浪获取数据,跟着教程弄了半天也不行,索性换到126(也就是网易了),感觉速度都还不错。

首先我们看下接口地址:http://api.money.126.net/data/feed/1000001,money.api

其中的1000001就是股票代码了,跟新浪的不同,他的第一位代表交易所,后面6位是股票代码

  • 0:上交所
  • 1:深交所
  • 2:北交所

先通过浏览器看下数据结构:

_ntes_quote_callback({     "1000001": {         "code": "1000001",         "percent": 0.002113,         "high": 14.25,         "askvol3": 1026758,         "askvol2": 810700,         "askvol5": 290493,         "askvol4": 461100,         "price": 14.23,         "open": 14.2,         "bid5": 14.18,         "bid4": 14.19,         "bid3": 14.2,         "bid2": 14.21,         "bid1": 14.22,         "low": 14.11,         "updown": 0.03,         "type": "SZ",         "bidvol1": 323600,         "status": 0,         "bidvol3": 244200,         "bidvol2": 673474,         "symbol": "000001",         "update": "2022/06/25 17:59:57",         "bidvol5": 343500,         "bidvol4": 145200,         "volume": 86604061,         "askvol1": 817268,         "ask5": 14.27,         "ask4": 14.26,         "ask1": 14.23,         "name": "平安银行",         "ask3": 14.25,         "ask2": 14.24,         "arrow": "↑",         "time": "2022/06/24 16:00:58",         "yestclose": 14.2,         "turnover": 1227798687.09     } }); 

可以看出_ntes_quote_callback()中的就是标准的json数据,我们只要通过正则表达式就可以取出。
我们先定义一个数据结构:

class NetTick:     def __init__(self, dict={}):         self.name = dict.get('name')                # 股票名称         self.yestclose = dict.get('yestclose')      # 昨日收盘价         self.bidvol5 = dict.get('bidvol5')          # 买5数量         self.bidvol4 = dict.get('bidvol4')          # 买4数量         self.bidvol3 = dict.get('bidvol3')          # 买3数量         self.bidvol2 = dict.get('bidvol2')          # 买2数量         self.bidvol1 = dict.get('bidvol1')          # 买1数量         self.bid5 = dict.get('bid5')                # 买5价格         self.bid4 = dict.get('bid4')                # 买4价格         self.bid3 = dict.get('bid3')                # 买3价格         self.bid2 = dict.get('bid2')                # 买2价格         self.bid1 = dict.get('bid1')                # 买1价格         self.askvol5 = dict.get('askvol5')          # 卖5数量         self.askvol4 = dict.get('askvol4')          # 卖4数量         self.askvol3 = dict.get('askvol3')          # 卖3数量         self.askvol2 = dict.get('askvol2')          # 卖2数量         self.askvol1 = dict.get('askvol1')          # 卖1数量         self.ask5 = dict.get('ask5')                # 卖5价格         self.ask4 = dict.get('ask4')                # 卖4价格         self.ask3 = dict.get('ask3')                # 卖3价格         self.ask2 = dict.get('ask2')                # 卖2价格         self.ask1 = dict.get('ask1')                # 卖1价格         self.symbol = dict.get('symbol')            # 股票代码 第一位1:深交所 0:上交所 2北交所         self.volume = dict.get('volume')            # 成交量         self.price = dict.get('price')              # 当前价格         self.open = dict.get('open')                # 开盘价         self.low = dict.get('low')                  # 最低价         self.high = dict.get('high')                # 最高价         self.code = dict.get('code')                # 去除标记为的股票代码         self.turnover = dict.get('turnover')        # 成交额         self.percent = dict.get('percent')          # 涨跌幅         self.updown = dict.get('updown')            # 涨跌金额 

通过研究,我们发现126的接口支持多个股票查询,那我们可以定义两个方法,一个查单个,一个查多个,具体实现如下:

import requests import re from models.nettick import NetTick from utils.packages import *    class NetEaseData:     @staticmethod     def get_realtime_data(symbol):         """         网易的实时数据接口         :param symbol: 股票代码         :return: Tick         """         code = NetEaseData.convert_market(symbol)         try:             response = requests.get("http://api.money.126.net/data/feed/{},money.api".format(code)).text             re_find = NetEaseData.__re_find(response)             if re_find is not None:                 find_stock = re_find.get(code)                 if find_stock is not None:                     return NetTick(find_stock)          except Exception as e:             logger.error('请求网易接口出错,错误信息:{}'.format(e))          return None      @staticmethod     def convert_market(other_market_code=str):         """         转换通用股票代码 sz sh bj开头+股票代码         """         if other_market_code[0:2].lower() == 'sh':             return '0' + other_market_code[2:]         elif other_market_code[0:2].lower() == 'sz':             return '1' + other_market_code[2:]         else:             return '2' + other_market_code[2:]       @staticmethod     def get_realtime_datas(symbols=[]):         """         网易的实时数据接口         :param symbols: 股票代码列表         :return: Ticks列表         """         codes = [NetEaseData.convert_market(code) for code in symbols]         result = []         try:             response = requests.get("http://api.money.126.net/data/feed/{},money.api".format(','.join(codes))).text             re_find = NetEaseData.__re_find(response)             if re_find is not None:                 for code in re_find:                     item = re_find[code]                     result.append(NetTick(item))         except Exception as e:             logger.error('请求网易接口出错,错误信息:{}'.format(e))          return result      @staticmethod     def __re_find(response):         find = re.findall(r"_ntes_quote_callback((.*));", response)         if len(find) >= 1:             return to_obj(find[-1])          return None   if __name__ == '__main__':     ticks = NetEaseData.get_realtime_datas(['sh588000', 'sz000001', 'bj831010'])     [print(tick.symbol, tick.name, tick.price) for tick in ticks]     tick = NetEaseData.get_realtime_data('sz127045')     print(tick.symbol, tick.name, tick.price) 

使用也非常简单

  • NetEaseData.get_realtime_data:获取单个股票
  • NetEaseData.get_realtime_datas : 获取多个股票数据

这里我股票代码用的是兼容原有新浪模式的,你可以自己做下修改。

目前正在升级自己的量化平台,也会将之前的一些代码公布出来,如果喜欢请点个推荐,谢谢

发表评论

相关文章

当前内容话题