Files
bocai/PyModel/test_789.py
2026-01-28 09:44:21 +08:00

476 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import math
import pandas as pd
from datetime import datetime, timedelta
# 玩法1.2 赔率映射
SUM12_ODDS = {
3: 41.9, 4: 41.9, 18: 41.9, 19: 41.9,
5: 20.38, 6: 20.38, 16: 20.38, 17: 20.38,
7: 13.51, 8: 13.51, 14: 13.51, 15: 13.51,
9: 9.7, 10: 9.7, 12: 9.7, 13: 9.7,
11: 8.1
}
INIT_CONFIG = {
"INIT_GOLD": 100000, # 初始金币betting_predictions_final_500.json
"init_cost": 100000,
"stop_win": 1100000, # 本金止盈点
"stop_loss": 0
}
# 玩法1.1 赔率映射
SIZE_ODDS = {"冠亚大": 2.1, "冠亚小": 1.7}
ODD_EVEN_ODDS = {"冠亚单": 1.7, "冠亚双": 2.1}
NUM_DETAIL_ODDS = 1.9199
# 玩法2.1/2.2/3 固定赔率
RANK_GLH_ODDS = 1.9199 # 玩法2.2
NUM_POS_ODDS = 9.599 # 玩法3
# 定义工具函数判断是否为NaN兼容Python的None和math.nan
def is_nan(val):
return val is None or (isinstance(val, float) and math.isnan(val))
def load_actual_data(actual_dir):
actual_records = []
if not os.path.exists(actual_dir):
return actual_records
# 处理可能的嵌套目录
target_dir = actual_dir
if 'data_test_dir' in os.listdir(actual_dir):
target_dir = os.path.join(actual_dir, 'data_test_dir')
for file in os.listdir(target_dir):
if file.endswith('.json') and file != 'stat_result.json':
with open(os.path.join(target_dir, file), 'r') as f:
try:
data = json.load(f)
actual_records.extend(data)
except:
continue
return actual_records
# 1. 加载实际结果数据示例从JSON字符串加载可替换为文件读取
# with open("data_test_dir1/2026_1_17_data_test.json", "r", encoding="utf-8") as f:
# actual_data = json.load(f)
actual_data = load_actual_data("data_test_dir1")
def get_history_records(data_dirs):
all_records = []
for d in data_dirs:
if not os.path.exists(d): continue
for f in os.listdir(d):
if f.endswith('.json') and f != 'stat_result.json':
with open(os.path.join(d, f), 'r') as file:
try:
content = json.load(file)
if isinstance(content, list):
all_records.extend([item for item in content if isinstance(item, dict)])
except:
continue
return all_records
# with open("data_test_predict/betting_predictions_final_2026-01-11.json", "r", encoding="utf-8") as f:
# with open("data_test_predict/betting_predictions_final_502.json", "r", encoding="utf-8") as f:
# with open("data_test_predict/betting_predictions_final_501.json", "r", encoding="utf-8") as f:
# predict_data = json.load(f)
predict_data = load_actual_data("data_test_predict")
# 标准化时间函数
def standardize_time(time_str):
try:
dt = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S")
return dt
except ValueError:
return None
# 构建实际数据的时间映射使用datetime对象作为key
actual_time_map = {}
for item in actual_data:
std_dt = standardize_time(item["time"])
if std_dt:
actual_time_map[std_dt] = item
# 定义函数获取时间所属的区间key07:05:00 ~ 次日06:00:00
def get_time_interval_key(dt):
# 确定区间起始时间:如果当前时间 >= 07:05:00起始为当天07:05否则为前一天07:05
start_hour, start_min = 7, 5
if dt.hour > start_hour or (dt.hour == start_hour and dt.minute >= start_min):
interval_start = datetime(dt.year, dt.month, dt.day, start_hour, start_min, 0)
else:
interval_start = datetime(dt.year, dt.month, dt.day, start_hour, start_min, 0) - timedelta(days=1)
# 区间结束时间:起始时间 + 1天 - 1小时5分钟即次日06:00:00
interval_end = interval_start + timedelta(days=1) - timedelta(hours=1, minutes=5)
# 格式化区间key
key = f"{interval_start.strftime('%Y-%m-%d %H:%M:%S')}_{interval_end.strftime('%Y-%m-%d %H:%M:%S')}"
return key, interval_start, interval_end
# ====================== 定义输赢计算函数 ======================
def calculate_bet_result(actual, predict):
bet_result = {
"predict_id": predict["id"],
"time": predict["time"],
"actual_id": actual["id"],
"result_bet": [], # 数字排列押注结果
"result_detail_bet": [], # 数字排列大小单双押注结果
"winner_bet": [], # 冠亚和押注结果
"GD1_bet": [], # 冠亚大小押注结果
"GD2_bet": [], # 冠亚单双押注结果
"GLH_bet": [], # 龙虎押注结果
"total_profit": 0, # 总盈利
"total_cost": 0 # 总投入
}
# 3.1 计算result数字排列押注输赢
actual_result = actual["result"]
for idx, predict_idx_result in predict["result"].items():
_idx = int(idx)
hit = False
total_bet = 0
profit = 0
for predict_idx, bet in predict_idx_result.items():
if is_nan(bet):
continue
total_bet += bet
if actual_result[_idx] == int(predict_idx):
hit = True
profit += bet * NUM_POS_ODDS
bet_result["result_bet"].append({
"num": actual_result[_idx],
"bet": total_bet,
"hit": hit,
"profit": profit,
})
# 3.1.1 计算各位置的大小单双
for idx, predict_detail_result in predict["result_detail"].items():
_idx = int(idx)
bet = 0
profit = 0
big_or_small = "" if actual_result[_idx] >= 6 else ""
single_or_double = "" if (actual_result[_idx] & 1) == 1 else ""
profit1 = predict_detail_result[big_or_small]
profit2 = predict_detail_result[single_or_double]
bet1 = predict_detail_result[""]
bet2 = predict_detail_result[""]
bet3 = predict_detail_result[""]
bet4 = predict_detail_result[""]
bet += (0 if is_nan(bet1) else bet1) + (0 if is_nan(bet2) else bet2) + (0 if is_nan(bet3) else bet3) + (
0 if is_nan(bet4) else bet4)
profit += (0 if is_nan(profit1) else profit1 * NUM_DETAIL_ODDS) + (
0 if is_nan(profit2) else profit2 * NUM_DETAIL_ODDS)
bet_result["result_detail_bet"].append({
"num": actual_result[_idx],
"bet": bet,
"hit": True if profit > 0 else False,
"profit": profit,
"big_or_small": big_or_small,
"single_or_double": single_or_double,
})
# 3.2 计算winner冠亚和押注输赢
actual_winner = actual["winner"]
for winner_str, bet in predict["winner"].items():
if is_nan(bet):
continue
winner_val = int(winner_str)
hit = winner_val == actual_winner
profit = bet * (SUM12_ODDS[winner_val] if hit else 0)
bet_result["winner_bet"].append({
"winner_val": winner_val,
"bet": 0 if is_nan(bet) else bet,
"hit": hit,
"profit": profit,
})
# 3.3 计算GD1冠亚大小押注输赢
actual_gd1 = actual["GD1"]
for gd1_type, bet in predict["GD1"].items():
if is_nan(bet):
continue
hit = gd1_type == actual_gd1
profit = bet * (SIZE_ODDS[actual_gd1] if hit else 0)
bet_result["GD1_bet"].append({
"gd1_type": gd1_type,
"bet": 0 if is_nan(bet) else bet,
"hit": hit,
"profit": profit,
})
# 3.4 计算GD2冠亚单双押注输赢
actual_gd2 = actual["GD2"]
for gd2_type, bet in predict["GD2"].items():
if is_nan(bet):
continue
hit = gd2_type == actual_gd2
profit = bet * (ODD_EVEN_ODDS[gd2_type] if hit else 0)
bet_result["GD2_bet"].append({
"gd2_type": gd2_type,
"bet": 0 if is_nan(bet) else bet,
"hit": hit,
"profit": profit,
})
# 3.5 计算GLH龙虎押注输赢
actual_glh = actual["GLH_result"]
for glh_type, bet in predict["GLH_result"].items():
idx = int(glh_type[4])
if idx >= len(actual_glh):
break
if is_nan(bet):
continue
glh_type = glh_type[-1:]
hit = glh_type == actual_glh[idx]
profit = bet * (RANK_GLH_ODDS if hit else 0)
bet_result["GLH_bet"].append({
"position": idx,
"glh_type": glh_type,
"bet": 0 if is_nan(bet) else bet,
"hit": hit,
"profit": profit,
})
# 3.6 计算本轮总盈利
total_profit = sum([
sum(item["profit"] for item in bet_result["result_bet"]),
sum(item["profit"] for item in bet_result["result_detail_bet"]),
sum(item["profit"] for item in bet_result["winner_bet"]),
sum(item["profit"] for item in bet_result["GD1_bet"]),
sum(item["profit"] for item in bet_result["GD2_bet"]),
sum(item["profit"] for item in bet_result["GLH_bet"])
])
bet_result["total_profit"] = round(total_profit, 6) # 保留6位小数
# 3.7 计算本轮总投入
total_cost = sum([
sum(item["bet"] for item in bet_result["result_bet"]),
sum(item["bet"] for item in bet_result["result_detail_bet"]),
sum(item["bet"] for item in bet_result["winner_bet"]),
sum(item["bet"] for item in bet_result["GD1_bet"]),
sum(item["bet"] for item in bet_result["GD2_bet"]),
sum(item["bet"] for item in bet_result["GLH_bet"])
])
bet_result["total_cost"] = round(total_cost, 6)
return bet_result
def calculate_bet_input_cost(bet_result):
this_round_total_cost = bet_result["total_cost"]
this_round_total_profit = bet_result["total_profit"]
global INIT_CONFIG
INIT_CONFIG["init_cost"] = INIT_CONFIG["init_cost"] - this_round_total_cost + this_round_total_profit
if INIT_CONFIG["init_cost"] <= INIT_CONFIG["stop_loss"] or INIT_CONFIG["init_cost"] >= INIT_CONFIG["stop_win"]:
return False
return True
# ====================== 初始化区间聚合字典 ======================
interval_agg_data = {}
# 定义玩法列表(用于聚合)
PLAY_TYPES = [
"result_bet", # 数字排列
"result_detail_bet", # 数字排列大小单双
"winner_bet", # 冠亚和
"GD1_bet", # 冠亚大小
"GD2_bet", # 冠亚单双
"GLH_bet" # 龙虎
]
# 初始化单个区间的聚合结构
def init_interval_agg():
agg = {
# 各玩法的聚合数据
"play_details": {
play_type: {
"total_input": 0.0, # 总投入
"total_income": 0.0, # 总收益
"input_income_ratio": 0.0, # 投入/收益比例
"profit": 0.0 # 利润(收益-投入)
} for play_type in PLAY_TYPES
},
# 整体聚合数据
"total": {
"total_input": 0.0,
"total_income": 0.0,
"input_income_ratio": 0.0,
"total_profit": 0.0 # 总利润(总收益-总投入)
}
}
return agg
# ====================== 批量匹配并计算结果 ======================
final_results = []
for predict_item in predict_data:
# 标准化预测时间
predict_time_str = predict_item["time"]
predict_dt = standardize_time(predict_time_str)
if not predict_dt:
final_results.append({
"predict_id": predict_item["id"],
"time": predict_time_str,
"error": "时间格式错误,无法解析"
})
continue
# 获取所属区间key
interval_key, _, _ = get_time_interval_key(predict_dt)
# 按时间匹配实际结果
actual_item = actual_time_map.get(predict_dt)
if not actual_item:
final_results.append({
"predict_id": predict_item["id"],
"time": predict_time_str,
"error": "未找到对应时间的实际结果"
})
continue
# 计算单条输赢结果
bet_result = calculate_bet_result(actual_item, predict_item)
_continue = calculate_bet_input_cost(bet_result)
final_results.append(bet_result)
# ====================== 聚合到对应时间区间 ======================
# 初始化区间数据(如果不存在)
if interval_key not in interval_agg_data:
interval_agg_data[interval_key] = init_interval_agg()
# 遍历每个玩法,聚合数据
total_input = 0.0
total_income = 0.0
for play_type in PLAY_TYPES:
# 计算该玩法的总投入和总收益
play_input = sum(item["bet"] for item in bet_result[play_type])
play_income = sum(item["profit"] for item in bet_result[play_type])
# 更新玩法详情
play_detail = interval_agg_data[interval_key]["play_details"][play_type]
play_detail["total_input"] += play_input
play_detail["total_income"] += play_income
# 计算利润
play_detail["profit"] = round(play_detail["total_income"] - play_detail["total_input"], 6)
# 计算投入/收益比例避免除以0
if play_detail["total_income"] != 0:
play_detail["input_income_ratio"] = round(play_detail["total_input"] / play_detail["total_income"], 6)
else:
play_detail["input_income_ratio"] = 0.0
# 累加至总投入/总收益
total_input += play_input
total_income += play_income
# 更新区间整体数据
total_detail = interval_agg_data[interval_key]["total"]
total_detail["total_input"] += total_input
total_detail["total_income"] += total_income
# 总利润(收益-投入)
total_detail["total_profit"] = round(total_detail["total_income"] - total_detail["total_input"], 6)
# 总投入/收益比例
if total_detail["total_income"] != 0:
total_detail["input_income_ratio"] = round(total_detail["total_input"] / total_detail["total_income"], 6)
else:
total_detail["input_income_ratio"] = 0.0
if not _continue:
break
# ====================== 输出区间聚合结果 ======================
print("=" * 100)
print("时间区间聚合盈亏统计")
print("=" * 100)
for interval_key, agg_data in interval_agg_data.items():
print(f"\n【区间】: {interval_key}")
print("-" * 80)
# 输出各玩法详情
print("【各玩法统计】:")
for play_type, play_data in agg_data["play_details"].items():
print(f"\n{play_type}:")
print(f" 总投入: {play_data['total_input']:.6f}")
print(f" 总收益: {play_data['total_income']:.6f}")
print(f" 投入/收益比例: {play_data['input_income_ratio']:.6f}")
print(f" 利润(收益-投入): {play_data['profit']:.6f}")
# 输出整体统计
print("\n【区间整体统计】:")
total_data = agg_data["total"]
print(f" 总投入: {total_data['total_input']:.6f}")
print(f" 总收益: {total_data['total_income']:.6f}")
print(f" 投入/收益比例: {total_data['input_income_ratio']:.6f}")
print(f" 总利润(收益-投入): {total_data['total_profit']:.6f}")
print("-" * 80)
# ====================== 输出原始明细(可选) ======================
print("\n\n" + "=" * 100)
print("原始押注明细")
print("=" * 100)
for res in final_results:
if "error" in res:
print(f"【预测ID: {res['predict_id']}】 时间: {res['time']} | 错误: {res['error']}\n")
continue
print(f"===== 预测ID: {res['predict_id']} | 时间: {res['time']} | 实际ID: {res['actual_id']} =====")
# 输出数字排列押注结果
print("\n--- 数字排列押注 ---")
for item in res["result_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"数字{item['num']} | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
# 输出数字排列大小单双押注结果
print("\n--- 数字排列大小单双押注 ---")
for item in res["result_detail_bet"]:
single_or_double = item["single_or_double"]
big_or_small = item["big_or_small"]
print(f"数字{item['num']} | 押注{item['bet']}元 | {single_or_double}{big_or_small} | 盈利{item['profit']}")
# 输出冠亚和押注结果
print("\n--- 冠亚和押注 ---")
for item in res["winner_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"冠亚和{item['winner_val']} | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
# 输出冠亚大小押注结果
print("\n--- 冠亚大小押注 ---")
for item in res["GD1_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"{item['gd1_type']} | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
# 输出冠亚单双押注结果
print("\n--- 冠亚单双押注 ---")
for item in res["GD2_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"{item['gd2_type']} | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
# 输出龙虎押注结果
print("\n--- 龙虎押注 ---")
for item in res["GLH_bet"]:
status = "命中" if item["hit"] else "未命中"
print(f"位置{item['position']}({item['glh_type']}) | 押注{item['bet']}元 | {status} | 盈利{item['profit']}")
print("\n" + "-" * 80 + "\n")
total_input = sum([current_input["total"]["total_input"] for _, current_input in interval_agg_data.items()])
total_income = sum([current_input["total"]["total_income"] for _, current_input in interval_agg_data.items()])
print(f"总投入 {total_input}, 总收益 {total_income}, 最终盈利: {total_income - total_input:.6f}")
# 可选将区间聚合结果保存为JSON文件
with open("interval_agg_result_1_1_2.json", "w", encoding="utf-8") as f:
json.dump(interval_agg_data, f, ensure_ascii=False, indent=4)
print("\n区间聚合结果已保存至 interval_agg_result.json")