调整py脚本

This commit is contained in:
2026-01-28 12:28:04 +08:00
parent edebf424db
commit d1afc64e5a
9 changed files with 195 additions and 831 deletions

View File

@@ -53,7 +53,7 @@ def get_history_records(data_dirs):
if not os.path.exists(d): continue
for f in os.listdir(d):
if f.endswith('.json') and f != 'stat_result.json':
with open(os.path.join(d, f), 'r') as file:
with open(os.path.join(d, f), 'r', encoding='utf-8') as file:
try:
content = json.load(file)
if isinstance(content, list):
@@ -109,6 +109,7 @@ if __name__ == "__main__":
df = pd.DataFrame(all_records).drop_duplicates(subset=['id']).sort_values('time')
df['time'] = pd.to_datetime(df['time'])
print(main(next_period_time, df))
result = main(next_period_time, df)
print(json.dumps(result, ensure_ascii=False))

View File

@@ -1,475 +0,0 @@
import json
import os
import math
import pandas as pd
from datetime import datetime, timedelta
# 玩法1.2 赔率映射
SUM12_ODDS = {
3: 41.9, 4: 41.9, 18: 41.9, 19: 41.9,
5: 20.38, 6: 20.38, 16: 20.38, 17: 20.38,
7: 13.51, 8: 13.51, 14: 13.51, 15: 13.51,
9: 9.7, 10: 9.7, 12: 9.7, 13: 9.7,
11: 8.1
}
INIT_CONFIG = {
"INIT_GOLD": 100000, # 初始金币betting_predictions_final_500.json
"init_cost": 100000,
"stop_win": 1100000, # 本金止盈点
"stop_loss": 0
}
# 玩法1.1 赔率映射
SIZE_ODDS = {"冠亚大": 2.1, "冠亚小": 1.7}
ODD_EVEN_ODDS = {"冠亚单": 1.7, "冠亚双": 2.1}
NUM_DETAIL_ODDS = 1.9199
# 玩法2.1/2.2/3 固定赔率
RANK_GLH_ODDS = 1.9199 # 玩法2.2
NUM_POS_ODDS = 9.599 # 玩法3
# 定义工具函数判断是否为NaN兼容Python的None和math.nan
def is_nan(val):
return val is None or (isinstance(val, float) and math.isnan(val))
def load_actual_data(actual_dir):
actual_records = []
if not os.path.exists(actual_dir):
return actual_records
# 处理可能的嵌套目录
target_dir = actual_dir
if 'data_test_dir' in os.listdir(actual_dir):
target_dir = os.path.join(actual_dir, 'data_test_dir')
for file in os.listdir(target_dir):
if file.endswith('.json') and file != 'stat_result.json':
with open(os.path.join(target_dir, file), 'r') as f:
try:
data = json.load(f)
actual_records.extend(data)
except:
continue
return actual_records
# 1. 加载实际结果数据示例从JSON字符串加载可替换为文件读取
# with open("data_test_dir1/2026_1_17_data_test.json", "r", encoding="utf-8") as f:
# actual_data = json.load(f)
actual_data = load_actual_data("data_test_dir1")
def get_history_records(data_dirs):
all_records = []
for d in data_dirs:
if not os.path.exists(d): continue
for f in os.listdir(d):
if f.endswith('.json') and f != 'stat_result.json':
with open(os.path.join(d, f), 'r') as file:
try:
content = json.load(file)
if isinstance(content, list):
all_records.extend([item for item in content if isinstance(item, dict)])
except:
continue
return all_records
# with open("data_test_predict/betting_predictions_final_2026-01-11.json", "r", encoding="utf-8") as f:
# with open("data_test_predict/betting_predictions_final_502.json", "r", encoding="utf-8") as f:
# with open("data_test_predict/betting_predictions_final_501.json", "r", encoding="utf-8") as f:
# predict_data = json.load(f)
predict_data = load_actual_data("data_test_predict")
# 标准化时间函数
def standardize_time(time_str):
try:
dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
return dt
except ValueError:
return None
# 构建实际数据的时间映射使用datetime对象作为key
actual_time_map = {}
for item in actual_data:
std_dt = standardize_time(item["time"])
if std_dt:
actual_time_map[std_dt] = item
# 定义函数获取时间所属的区间key07:05:00 ~ 次日06:00:00
def get_time_interval_key(dt):
# 确定区间起始时间:如果当前时间 >= 07:05:00起始为当天07:05否则为前一天07:05
start_hour, start_min = 7, 5
if dt.hour > start_hour or (dt.hour == start_hour and dt.minute >= start_min):
interval_start = datetime(dt.year, dt.month, dt.day, start_hour, start_min, 0)
else:
interval_start = datetime(dt.year, dt.month, dt.day, start_hour, start_min, 0) - timedelta(days=1)
# 区间结束时间:起始时间 + 1天 - 1小时5分钟即次日06:00:00
interval_end = interval_start + timedelta(days=1) - timedelta(hours=1, minutes=5)
# 格式化区间key
key = f"{interval_start.strftime('%Y-%m-%d %H:%M:%S')}_{interval_end.strftime('%Y-%m-%d %H:%M:%S')}"
return key, interval_start, interval_end
# ====================== 定义输赢计算函数 ======================
def calculate_bet_result(actual, predict):
bet_result = {
"predict_id": predict["id"],
"time": predict["time"],
"actual_id": actual["id"],
"result_bet": [], # 数字排列押注结果
"result_detail_bet": [], # 数字排列大小单双押注结果
"winner_bet": [], # 冠亚和押注结果
"GD1_bet": [], # 冠亚大小押注结果
"GD2_bet": [], # 冠亚单双押注结果
"GLH_bet": [], # 龙虎押注结果
"total_profit": 0, # 总盈利
"total_cost": 0 # 总投入
}
# 3.1 计算result数字排列押注输赢
actual_result = actual["result"]
for idx, predict_idx_result in predict["result"].items():
_idx = int(idx)
hit = False
total_bet = 0
profit = 0
for predict_idx, bet in predict_idx_result.items():
if is_nan(bet):
continue
total_bet += bet
if actual_result[_idx] == int(predict_idx):
hit = True
profit += bet * NUM_POS_ODDS
bet_result["result_bet"].append({
"num": actual_result[_idx],
"bet": total_bet,
"hit": hit,
"profit": profit,
})
# 3.1.1 计算各位置的大小单双
for idx, predict_detail_result in predict["result_detail"].items():
_idx = int(idx)
bet = 0
profit = 0
big_or_small = "" if actual_result[_idx] >= 6 else ""
single_or_double = "" if (actual_result[_idx] & 1) == 1 else ""
profit1 = predict_detail_result[big_or_small]
profit2 = predict_detail_result[single_or_double]
bet1 = predict_detail_result[""]
bet2 = predict_detail_result[""]
bet3 = predict_detail_result[""]
bet4 = predict_detail_result[""]
bet += (0 if is_nan(bet1) else bet1) + (0 if is_nan(bet2) else bet2) + (0 if is_nan(bet3) else bet3) + (
0 if is_nan(bet4) else bet4)
profit += (0 if is_nan(profit1) else profit1 * NUM_DETAIL_ODDS) + (
0 if is_nan(profit2) else profit2 * NUM_DETAIL_ODDS)
bet_result["result_detail_bet"].append({
"num": actual_result[_idx],
"bet": bet,
"hit": True if profit > 0 else False,
"profit": profit,
"big_or_small": big_or_small,
"single_or_double": single_or_double,
})
# 3.2 计算winner冠亚和押注输赢
actual_winner = actual["winner"]
for winner_str, bet in predict["winner"].items():
if is_nan(bet):
continue
winner_val = int(winner_str)
hit = winner_val == actual_winner
profit = bet * (SUM12_ODDS[winner_val] if hit else 0)
bet_result["winner_bet"].append({
"winner_val": winner_val,
"bet": 0 if is_nan(bet) else bet,
"hit": hit,
"profit": profit,
})
# 3.3 计算GD1冠亚大小押注输赢
actual_gd1 = actual["GD1"]
for gd1_type, bet in predict["GD1"].items():
if is_nan(bet):
continue
hit = gd1_type == actual_gd1
profit = bet * (SIZE_ODDS[actual_gd1] if hit else 0)
bet_result["GD1_bet"].append({
"gd1_type": gd1_type,
"bet": 0 if is_nan(bet) else bet,
"hit": hit,
"profit": profit,
})
# 3.4 计算GD2冠亚单双押注输赢
actual_gd2 = actual["GD2"]
for gd2_type, bet in predict["GD2"].items():
if is_nan(bet):
continue
hit = gd2_type == actual_gd2
profit = bet * (ODD_EVEN_ODDS[gd2_type] if hit else 0)
bet_result["GD2_bet"].append({
"gd2_type": gd2_type,
"bet": 0 if is_nan(bet) else bet,
"hit": hit,
"profit": profit,
})
# 3.5 计算GLH龙虎押注输赢
actual_glh = actual["GLH_result"]
for glh_type, bet in predict["GLH_result"].items():
idx = int(glh_type[4])
if idx >= len(actual_glh):
break
if is_nan(bet):
continue
glh_type = glh_type[-1:]
hit = glh_type == actual_glh[idx]
profit = bet * (RANK_GLH_ODDS if hit else 0)
bet_result["GLH_bet"].append({
"position": idx,
"glh_type": glh_type,
"bet": 0 if is_nan(bet) else bet,
"hit": hit,
"profit": profit,
})
# 3.6 计算本轮总盈利
total_profit = sum([
sum(item["profit"] for item in bet_result["result_bet"]),
sum(item["profit"] for item in bet_result["result_detail_bet"]),
sum(item["profit"] for item in bet_result["winner_bet"]),
sum(item["profit"] for item in bet_result["GD1_bet"]),
sum(item["profit"] for item in bet_result["GD2_bet"]),
sum(item["profit"] for item in bet_result["GLH_bet"])
])
bet_result["total_profit"] = round(total_profit, 6) # 保留6位小数
# 3.7 计算本轮总投入
total_cost = sum([
sum(item["bet"] for item in bet_result["result_bet"]),
sum(item["bet"] for item in bet_result["result_detail_bet"]),
sum(item["bet"] for item in bet_result["winner_bet"]),
sum(item["bet"] for item in bet_result["GD1_bet"]),
sum(item["bet"] for item in bet_result["GD2_bet"]),
sum(item["bet"] for item in bet_result["GLH_bet"])
])
bet_result["total_cost"] = round(total_cost, 6)
return bet_result
def calculate_bet_input_cost(bet_result):
this_round_total_cost = bet_result["total_cost"]
this_round_total_profit = bet_result["total_profit"]
global INIT_CONFIG
INIT_CONFIG["init_cost"] = INIT_CONFIG["init_cost"] - this_round_total_cost + this_round_total_profit
if INIT_CONFIG["init_cost"] <= INIT_CONFIG["stop_loss"] or INIT_CONFIG["init_cost"] >= INIT_CONFIG["stop_win"]:
return False
return True
# ====================== 初始化区间聚合字典 ======================
interval_agg_data = {}
# 定义玩法列表(用于聚合)
PLAY_TYPES = [
"result_bet", # 数字排列
"result_detail_bet", # 数字排列大小单双
"winner_bet", # 冠亚和
"GD1_bet", # 冠亚大小
"GD2_bet", # 冠亚单双
"GLH_bet" # 龙虎
]
# 初始化单个区间的聚合结构
def init_interval_agg():
agg = {
# 各玩法的聚合数据
"play_details": {
play_type: {
"total_input": 0.0, # 总投入
"total_income": 0.0, # 总收益
"input_income_ratio": 0.0, # 投入/收益比例
"profit": 0.0 # 利润(收益-投入)
} for play_type in PLAY_TYPES
},
# 整体聚合数据
"total": {
"total_input": 0.0,
"total_income": 0.0,
"input_income_ratio": 0.0,
"total_profit": 0.0 # 总利润(总收益-总投入)
}
}
return agg
# ====================== 批量匹配并计算结果 ======================
final_results = []
for predict_item in predict_data:
# 标准化预测时间
predict_time_str = predict_item["time"]
predict_dt = standardize_time(predict_time_str)
if not predict_dt:
final_results.append({
"predict_id": predict_item["id"],
"time": predict_time_str,
"error": "时间格式错误,无法解析"
})
continue
# 获取所属区间key
interval_key, _, _ = get_time_interval_key(predict_dt)
# 按时间匹配实际结果
actual_item = actual_time_map.get(predict_dt)
if not actual_item:
final_results.append({
"predict_id": predict_item["id"],
"time": predict_time_str,
"error": "未找到对应时间的实际结果"
})
continue
# 计算单条输赢结果
bet_result = calculate_bet_result(actual_item, predict_item)
_continue = calculate_bet_input_cost(bet_result)
final_results.append(bet_result)
# ====================== 聚合到对应时间区间 ======================
# 初始化区间数据(如果不存在)
if interval_key not in interval_agg_data:
interval_agg_data[interval_key] = init_interval_agg()
# 遍历每个玩法,聚合数据
total_input = 0.0
total_income = 0.0
for play_type in PLAY_TYPES:
# 计算该玩法的总投入和总收益
play_input = sum(item["bet"] for item in bet_result[play_type])
play_income = sum(item["profit"] for item in bet_result[play_type])
# 更新玩法详情
play_detail = interval_agg_data[interval_key]["play_details"][play_type]
play_detail["total_input"] += play_input
play_detail["total_income"] += play_income
# 计算利润
play_detail["profit"] = round(play_detail["total_income"] - play_detail["total_input"], 6)
# 计算投入/收益比例避免除以0
if play_detail["total_income"] != 0:
play_detail["input_income_ratio"] = round(play_detail["total_input"] / play_detail["total_income"], 6)
else:
play_detail["input_income_ratio"] = 0.0
# 累加至总投入/总收益
total_input += play_input
total_income += play_income
# 更新区间整体数据
total_detail = interval_agg_data[interval_key]["total"]
total_detail["total_input"] += total_input
total_detail["total_income"] += total_income
# 总利润(收益-投入)
total_detail["total_profit"] = round(total_detail["total_income"] - total_detail["total_input"], 6)
# 总投入/收益比例
if total_detail["total_income"] != 0:
total_detail["input_income_ratio"] = round(total_detail["total_input"] / total_detail["total_income"], 6)
else:
total_detail["input_income_ratio"] = 0.0
if not _continue:
break
# ====================== 输出区间聚合结果 ======================
print("=" * 100)
print("时间区间聚合盈亏统计")
print("=" * 100)
for interval_key, agg_data in interval_agg_data.items():
print(f"\n【区间】: {interval_key}")
print("-" * 80)
# 输出各玩法详情
print("【各玩法统计】:")
for play_type, play_data in agg_data["play_details"].items():
print(f"\n{play_type}:")
print(f" 总投入: {play_data['total_input']:.6f}")
print(f" 总收益: {play_data['total_income']:.6f}")
print(f" 投入/收益比例: {play_data['input_income_ratio']:.6f}")
print(f" 利润(收益-投入): {play_data['profit']:.6f}")
# 输出整体统计
print("\n【区间整体统计】:")
total_data = agg_data["total"]
print(f" 总投入: {total_data['total_input']:.6f}")
print(f" 总收益: {total_data['total_income']:.6f}")
print(f" 投入/收益比例: {total_data['input_income_ratio']:.6f}")
print(f" 总利润(收益-投入): {total_data['total_profit']:.6f}")
print("-" * 80)
# ====================== 输出原始明细(可选) ======================
print("\n\n" + "=" * 100)
print("原始押注明细")
print("=" * 100)
for res in final_results:
if "error" in res:
print(f"【预测ID: {res['predict_id']}】 时间: {res['time']} | 错误: {res['error']}\n")
continue
print(f"===== 预测ID: {res['predict_id']} | 时间: {res['time']} | 实际ID: {res['actual_id']} =====")
# 输出数字排列押注结果
print("\n--- 数字排列押注 ---")
for item in res["result_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"数字{item['num']} | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
# 输出数字排列大小单双押注结果
print("\n--- 数字排列大小单双押注 ---")
for item in res["result_detail_bet"]:
single_or_double = item["single_or_double"]
big_or_small = item["big_or_small"]
print(f"数字{item['num']} | 押注{item['bet']}元 | {single_or_double}{big_or_small} | 盈利{item['profit']}")
# 输出冠亚和押注结果
print("\n--- 冠亚和押注 ---")
for item in res["winner_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"冠亚和{item['winner_val']} | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
# 输出冠亚大小押注结果
print("\n--- 冠亚大小押注 ---")
for item in res["GD1_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"{item['gd1_type']} | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
# 输出冠亚单双押注结果
print("\n--- 冠亚单双押注 ---")
for item in res["GD2_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"{item['gd2_type']} | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
# 输出龙虎押注结果
print("\n--- 龙虎押注 ---")
for item in res["GLH_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"位置{item['position']}({item['glh_type']}) | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
print("\n" + "-" * 80 + "\n")
total_input = sum([current_input["total"]["total_input"] for _, current_input in interval_agg_data.items()])
total_income = sum([current_input["total"]["total_income"] for _, current_input in interval_agg_data.items()])
print(f"总投入 {total_input}, 总收益 {total_income}, 最终盈利: {total_income - total_input:.6f}")
# 可选将区间聚合结果保存为JSON文件
with open("interval_agg_result_1_1_2.json", "w", encoding="utf-8") as f:
json.dump(interval_agg_data, f, ensure_ascii=False, indent=4)
print("\n区间聚合结果已保存至 interval_agg_result.json")