///指标数据
QuoteHistoryDay(10, (dic) =>
{
if (dic.Count > 0)
{
foreach (var item in dic.Keys)
{
///获取指标结果
var resp = dic[item].GetDoji(0.1);
Console.WriteLine(resp.ToJson());
}
}
});
from AlgorithmImports import *
class DojiIndicatorStrategy(QCAlgorithm):
def Initialize(self):
self.SetStartDate(2020, 1, 1)
self.SetEndDate(2021, 1, 1)
self.SetCash(100000)
self.symbol = self.AddEquity("SPY", Resolution.Daily).Symbol
# ========== Doji检测参数 ==========
self.doji_threshold = 0.001 # 实体大小阈值(价格范围的0.1%)
self.min_shadow_ratio = 0.3 # 最小影线比率
# 价格历史
self.price_history = []
self.lookback = 10
self.SetWarmUp(self.lookback)
# 设置图表
self.SetupCharts()
def SetupCharts(self):
"""设置图表"""
# 蜡烛图
candle_chart = Chart("Candlestick")
candle_chart.AddSeries(Series("Price", SeriesType.Candle, 0))
self.AddChart(candle_chart)
# Doji信号
signal_chart = Chart("Doji Signals")
signal_chart.AddSeries(Series("Doji", SeriesType.Scatter, 0))
self.AddChart(signal_chart)
def OnData(self, data):
if self.IsWarmingUp:
return
if self.symbol not in data:
return
bar = data[self.symbol]
# 保存价格历史
self.UpdatePriceHistory(bar)
# 检测Doji形态
is_doji = self.IsDoji(bar)
# 如果有Doji信号
if is_doji:
self.ProcessDojiSignal(bar)
# 更新图表
self.UpdateCharts(bar, is_doji)
def UpdatePriceHistory(self, bar):
"""更新价格历史"""
candle_data = {
'open': bar.Open,
'high': bar.High,
'low': bar.Low,
'close': bar.Close,
'time': self.Time
}
self.price_history.append(candle_data)
if len(self.price_history) > self.lookback:
self.price_history.pop(0)
def IsDoji(self, bar):
"""检测Doji形态"""
# 计算蜡烛属性
body_size = abs(bar.Close - bar.Open)
total_range = bar.High - bar.Low
# 避免除零错误
if total_range == 0:
return False
# 1. 实体非常小(小于价格范围的0.1%)
body_ratio = body_size / total_range
# 2. 计算影线比率
upper_shadow = bar.High - max(bar.Open, bar.Close)
lower_shadow = min(bar.Open, bar.Close) - bar.Low
shadow_ratio = (upper_shadow + lower_shadow) / total_range
# Doji条件
is_doji = (body_ratio < self.doji_threshold and
shadow_ratio > self.min_shadow_ratio)
return is_doji
def GetDojiType(self, bar):
"""判断Doji类型"""
if bar.Open == bar.Close and bar.Open == bar.High and bar.Open == bar.Low:
return "four_price" # 四价十字星
upper_shadow = bar.High - max(bar.Open, bar.Close)
lower_shadow = min(bar.Open, bar.Close) - bar.Low
if upper_shadow > lower_shadow * 3:
return "gravestone" # 墓碑十字星
elif lower_shadow > upper_shadow * 3:
return "dragonfly" # 蜻蜓十字星
elif upper_shadow > total_range * 0.3 and lower_shadow > total_range * 0.3:
return "long_legged" # 长腿十字星
else:
return "standard" # 标准十字星
def ProcessDojiSignal(self, bar):
"""处理Doji信号"""
if len(self.price_history) < 3:
return
# 分析趋势
trend = self.AnalyzeTrend()
doji_type = self.GetDojiType(bar)
holdings = self.Portfolio[self.symbol].Quantity
# 根据位置和类型生成信号
if trend == "uptrend":
# 上升趋势中的Doji可能是反转信号
self.Debug(f"上升趋势Doji: {doji_type}, 价格={bar.Close:.2f}")
if holdings > 0:
# 考虑卖出
self.Liquidate(self.symbol)
self.Debug("Doji反转信号:卖出")
elif trend == "downtrend":
# 下降趋势中的Doji可能是反转信号
self.Debug(f"下降趋势Doji: {doji_type}, 价格={bar.Close:.2f}")
if holdings <= 0:
# 考虑买入
self.SetHoldings(self.symbol, 0.5)
self.Debug("Doji反转信号:买入")
def AnalyzeTrend(self):
"""分析趋势"""
if len(self.price_history) < 3:
return "neutral"
# 简单趋势判断
recent_closes = [c['close'] for c in self.price_history[-3:]]
older_closes = [c['close'] for c in self.price_history[-6:-3]]
if len(recent_closes) < 2 or len(older_closes) < 2:
return "neutral"
avg_recent = sum(recent_closes) / len(recent_closes)
avg_older = sum(older_closes) / len(older_closes)
if avg_recent > avg_older * 1.02:
return "uptrend"
elif avg_recent < avg_older * 0.98:
return "downtrend"
else:
return "neutral"
def UpdateCharts(self, bar, is_doji):
"""更新图表"""
# 绘制蜡烛图
self.Plot("Candlestick", "Price", bar.Close)
# 绘制Doji信号
if is_doji:
self.Plot("Doji Signals", "Doji", bar.Close)