public override void Initialize()
{
///系统设置 必须设置 因为此方法会获取策略ID 且必须是第一句
SetConfig(1, 5);
Print("初始化配置");
SetStartDate(BeginTime); // 设置回测开始日期
SetEndDate(EndTime); // 设置回测结束日期
//设置平安银行股票
var stock = AddStock("000001.XSHE", Resolution.Minute);
SymbolPool.Add(stock);
}
/// <summary>
/// 每天数据到达时调用
/// </summary>
/// <param name="slice"></param>
public override void OnData(Slice slice)
{
//每天10.00分打印所有股票此刻价格
if (Time.Hour == 10 && Time.Minute == 0)
{
foreach (var item in SymbolPool)
{
Console.WriteLine($"股票:{item.Value},价格:{Securities[item].Price}");
}
}
}
from AlgorithmLogic import *
from MysqlHelper import *
import config
import RedisHelper
class MyTestAlgorithm(AlgorithmLogic):
stock_list=[]
def init(self):
#初始化T+1 取消时间为120秒
self.SetConfig(1,120)
logger.info("进入MyTestAlgorithm init方法:")
#添加单个指数 分钟为Resolution.Minute
self.index300 = self.AddIndex("000300.XSHG", Resolution.Daily).symbol
#添加指数下的所有 分钟为Resolution.Minute
stock_list=self.FillStocks([],"000300.XSHG",Resolution.Daily)
#设置基准
self.set_benchmark(self.index300)
#数据来了触发事件
def ondata(self,slice):
#logger.info("进入MyTestAlgorithm ondata方法:")
#logger.info(fr"Holdings:{len(config.Holdings)}")
#logger.info(config.Holdings)
_symbol=self.get_security_by_symbol("000001.XSHE")
aapl_record = self.get_first_by_symbol(config.Holdings, "000001.XSHE")
if aapl_record:
#检测当前价格是否上下浮动10% 卖出
price=self.Securities[aapl_record.symbol].Price
if aapl_record.price*1.1<price :
self.Sell(aapl_record.symbol,aapl_record.quantity,self.SellFun)
if aapl_record.price*0.9<price :
self.Sell(aapl_record.symbol,aapl_record.quantity,self.SellFun)
else:
#没有就买入
cash=self.GetAvailableInvestmentCapital()
self.Buy(_symbol.Symbol,cash,self.BuyFun)
#for symbol, data in slice.items():
#输出所有数据
#logger.info(f"{symbol.Value}:T={data.time} O={data.Open}, H={data.High}, L={data.Low}, C={data.Close}")
#Sell 回调方法
def SellFun(self,symbol,order_ticket):
logger.info(fr"Sell Symbol:{symbol} Price: {order_ticket.AverageFillPrice}")
#Buy 回调方法
def BuyFun(self,symbol,order_ticket):
logger.info(fr"Buy Symbol:{symbol} Price: {order_ticket.AverageFillPrice}")
# 方法1:使用 next() 获取第一个匹配对象
def get_first_by_symbol(self,records: List[config.TradeRecord], symbol: str):
"""获取第一个匹配symbol的记录"""
logger.info(records)
return next((record for record in records if record.symbol.Value == symbol), None)