2026/1/2 14:26:26
网站建设
项目流程
娱乐网站建设ppt,装修照片,佛山哪里做网站,郑州市建网站前言
上一个教程已经完成了历史数据的创建#xff0c;这个教程完成每日自动更新日K
Baostock数据更新时间
当前交易日17:30#xff0c;完成日K线数据入库
创建每天18点自动更新日K
根据Baostock的规则#xff0c;我们设定每天18:00的定时任务#xff0c;当你下班回家当日数…前言上一个教程已经完成了历史数据的创建这个教程完成每日自动更新日KBaostock数据更新时间当前交易日17:30完成日K线数据入库创建每天18点自动更新日K根据Baostock的规则我们设定每天18:00的定时任务当你下班回家当日数据已经给你整理得巴巴适适我们使用系统自带的crontab完成定时任务crontab-e添加定时任务脚本路径根据自己需求存储日志位置按个人喜好存储018* * * /bin/bash /home/zero/scripts/run.sh/var/log/stock_sync.log21创建run.sh脚本因为python 是使用的虚拟环境所以需要先激活虚拟环境再调用脚本#!/bin/bash# 项目路径根据您提供的信息设置SCRIPT_DIR/rootPROJECT_DIR/root/venvSCRIPT_NAMEdaily_stock_sync.py# 检查项目目录是否存在if[!-d$PROJECT_DIR];thenecho错误项目目录$PROJECT_DIR不存在exit1fi# 检查虚拟环境是否存在VENV_ACTIVATE$PROJECT_DIR/bin/activateif[!-f$VENV_ACTIVATE];thenecho错误虚拟环境激活脚本不存在请确认路径是否正确echo预期路径$VENV_ACTIVATEexit1fi# 检查脚本文件是否存在SCRIPT_PATH$SCRIPT_DIR/$SCRIPT_NAMEif[!-f$SCRIPT_PATH];thenecho错误脚本文件$SCRIPT_PATH不存在exit1fi# 激活虚拟环境并执行脚本cd$PROJECT_DIR||exit1source$VENV_ACTIVATEpython$SCRIPT_PATH运行脚本时先使用接口query_trade_dates判断当天是否是交易日如果是遍历获取当天的日K数据importbaostockasbsimportpandasaspdimporttimefromdatetimeimportdatetimefromsqlalchemyimportcreate_enginefromsqlalchemy.excimportIntegrityError# 配置信息 MYSQL_CONFIG{host:,# 数据库地址port:,# 端口user:,# 用户名password:,# 密码db:,# 数据库名charset:utf8mb4}enginecreate_engine(fmysqlpymysql://{MYSQL_CONFIG[user]}:{MYSQL_CONFIG[password]}{MYSQL_CONFIG[host]}:{MYSQL_CONFIG[port]}/{MYSQL_CONFIG[db]}?charset{MYSQL_CONFIG[charset]})# 核心函数 defis_trading_day()-bool:判断当天是否为交易日todaydatetime.now().strftime(%Y-%m-%d)rsbs.query_trade_dates(start_datetoday,end_datetoday)ifrs.error_code!0:print(f交易日查询失败{rs.error_msg})returnFalsetrading_days[]whilers.next():trading_days.append(rs.get_row_data())returnlen(trading_days)0andtrading_days[0][1]1# 1交易日defget_stock_codes()-list:获取需要更新的股票代码列表示例从数据库stock_basic表读取querySELECT code FROM stock_basic WHERE status 上市# 假设存在基础信息表codespd.read_sql(query,engine)[code].tolist()print(f共获取{len(codes)}只上市股票代码)returncodesdefget_daily_k_data(code:str,trade_date:str)-pd.DataFrame:获取单只股票的当日K线数据# Baostock日线接口参数说明# adjustflag复权类型1后复权2前复权3不复权与数据库字段对应fieldsdate,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,peTTM,pbMRQ,psTTM,pcfNcfTTMrsbs.query_history_k_data_plus(codecode,fieldsfields,start_datetrade_date,end_datetrade_date,frequencyd,# 日线数据adjustflag2# 前复权根据需求调整为1/2/3)ifrs.error_code!0:print(f股票{code}数据查询失败{rs.error_msg})returnpd.DataFrame()data_list[]whilers.next():datars.get_row_data()data_dict{trade_date:data[0],# 交易日期code:data[1],# 股票代码open:float(data[2])ifdata[2]elseNone,high:float(data[3])ifdata[3]elseNone,low:float(data[4])ifdata[4]elseNone,close:float(data[5])ifdata[5]elseNone,pre_close:float(data[6])ifdata[6]elseNone,volume:int(data[7])ifdata[7]elseNone,amount:float(data[8])ifdata[8]elseNone,adjustflag:data[9],# 复权类型turn:float(data[10])ifdata[10]elseNone,tradestatus:data[11],# 交易状态1正常0停牌pctChg:float(data[12])ifdata[12]elseNone,peTTM:float(data[13])ifdata[13]elseNone,pbMRQ:float(data[14])ifdata[14]elseNone,psTTM:float(data[15])ifdata[15]elseNone,pcfNcfTTM:float(data[16])ifdata[16]elseNone,change_amount:round(float(data[5])-float(data[6]),4)if(data[5]anddata[6])elseNone,# 计算涨跌额isST:1ifSTincodeelse0# 简单判断是否ST股需根据实际股票名称优化}data_list.append(data_dict)returnpd.DataFrame(data_list)defbatch_save_k_data_to_db(df:pd.DataFrame):批量写入K线数据到数据库ifdf.empty:print(无数据可写入)returntry:# 使用replace避免重复写入利用唯一索引idx_code_trade_datedf.to_sql(namestock_quote_daily,conengine,if_existsappend,indexFalse,chunksize1000)print(f成功写入{len(df)}条K线数据{datetime.now()})exceptIntegrityError:# 若触发唯一索引冲突说明数据已存在跳过写入print(f部分数据已存在跳过重复写入{datetime.now()})exceptExceptionase:print(f写入数据库失败{str(e)})# 主执行逻辑 defmain():todaydatetime.now().strftime(%Y-%m-%d)print(f 开始执行{today}K线数据更新任务 )# 1. 登录Baostocklgbs.login()iflg.error_code!0:print(fBaostock登录失败{lg.error_msg})returntry:# 2. 判断是否为交易日ifnotis_trading_day():print(f今日{today}非交易日任务结束)return# 3. 获取股票代码列表codesget_stock_codes()ifnotcodes:print(未获取到股票代码列表任务结束)return# 4. 批量获取K线数据并写入数据库batch_size50# 每批处理50只股票避免接口请求过于频繁all_data[]foriinrange(0,len(codes),batch_size):batch_codescodes[i:ibatch_size]print(f处理批次{i//batch_size1}/{(len(codes)batch_size-1)//batch_size}{len(batch_codes)}只股票)forcodeinbatch_codes:dfget_daily_k_data(code,today)ifnotdf.empty:all_data.append(df)time.sleep(0.5)# 控制请求频率Baostock限制每秒2次# 每批数据合并后写入数据库ifall_data:batch_dfpd.concat(all_data,ignore_indexTrue)batch_save_k_data_to_db(batch_df)all_data[]# 清空列表准备下一批finally:# 5. 登出Baostockbs.logout()print(f{today}K线数据更新任务结束 )if__name____main__:main()