Files
bocai/PyModel/aggregate_data_v7.py
2026-01-23 18:04:11 +08:00

129 lines
4.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import os
import pandas as pd
import numpy as np
from datetime import timedelta
# 1. 整合数据
# data_dirs = ['data_test_dir', 'data_test_dir1']
data_dirs = ['data_test_dir']
all_records = []
for d in data_dirs:
print(f"Processing directory: {d}")
if not os.path.exists(d):
print(f"Directory {d} does not exist")
continue
print(f"Found directory: {d}")
files = os.listdir(d)
print(f"Files in directory: {files}")
for f in files:
if f.endswith('.json') and f != 'stat_result.json':
file_path = os.path.join(d, f)
print(f"Processing file: {file_path}")
try:
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
print(f"Loaded {len(data)} records from {f}")
if len(data) > 0:
print(f"First record keys: {list(data[0].keys())}")
all_records.extend(data)
except Exception as e:
print(f"Error processing {file_path}: {str(e)}")
continue
print(f"Total records loaded: {len(all_records)}")
df = pd.DataFrame(all_records)
print(f"DataFrame columns: {list(df.columns)}")
if 'id' in df.columns:
df = df.drop_duplicates(subset=['id'], keep='last')
print(f"Total unique records after deduplication: {len(df)}")
else:
print("No 'id' column found in DataFrame")
if 'time' in df.columns:
df['time'] = pd.to_datetime(df['time'])
print(f"Total unique records: {len(df)}")
else:
print("No 'time' column found in DataFrame")
print("Sample of first 5 records:")
print(df.head())
# 2. 极速统计函数
def calculate_stats_fast(group):
if group.empty: return {}
stats = {}
stats['winner_prob'] = group['winner'].value_counts(normalize=True).to_dict()
stats['GD1_prob'] = (group['winner'] >= 12).map({True: '冠亚大', False: '冠亚小'}).value_counts(
normalize=True).to_dict()
stats['GD2_prob'] = group['GD2'].value_counts(normalize=True).to_dict()
res_df = pd.DataFrame(group['result'].tolist())
pos_probs = {}
pos_detail_probs = {}
for col in range(10):
col_data = res_df[col]
pos_probs[f'pos_{col}'] = col_data.value_counts(normalize=True).to_dict()
is_big = (col_data >= 6).map({True: '', False: ''})
is_odd = (col_data % 2 != 0).map({True: '', False: ''})
pos_detail_probs[f'pos_{col}'] = {
'big_small': is_big.value_counts(normalize=True).to_dict(),
'odd_even': is_odd.value_counts(normalize=True).to_dict()
}
stats['result_pos_prob'] = pos_probs
stats['result_pos_detail_prob'] = pos_detail_probs
glh_df = pd.DataFrame(group['GLH_result'].tolist())
glh_pos_probs = {}
for col in range(5):
glh_pos_probs[f'pos_{col}'] = glh_df[col].value_counts(normalize=True).to_dict()
stats['GLH_pos_prob'] = glh_pos_probs
return stats
# 3. 多维度聚合
df['hour_min'] = df['time'].dt.strftime('%H:%M:%S')
df['day_of_month'] = df['time'].dt.day
df['day_of_week'] = df['time'].dt.dayofweek
# 全量统计
print("Calculating full history stats...")
time_stats = df.groupby('hour_min').apply(calculate_stats_fast).to_dict()
date_stats = df.groupby('day_of_month').apply(calculate_stats_fast).to_dict()
week_stats = df.groupby('day_of_week').apply(calculate_stats_fast).to_dict()
# 最近 100 天统计
print("Calculating recent 100 days stats...")
first_date = df['time'].min()
last_date = df['time'].max()
# 总预测命中率大概在 0.3241
# start_date_last_0000_1d = last_date - timedelta(days=int(len(df["time"]) * 0.3241 / 276))
# 取前 x 天,多了这 x 天会影响概率分布 + 万份之一
start_date_last_0000_2d = last_date - timedelta(days=(0.0001 * len(df["time"]) / 276) + 1)
df_0000_2d = df[df['time'] >= start_date_last_0000_2d]
time_stats_0000_2d = df_0000_2d.groupby('hour_min').apply(calculate_stats_fast).to_dict()
# 4. 保存结果
output_data = {
'by_time': time_stats,
'by_time_recent_0000_2d': time_stats_0000_2d,
'by_date': date_stats,
'by_week': week_stats,
'last_updated': last_date.strftime('%Y-%m-%d %H:%M:%S')
}
# 创建data_test_predict目录如果不存在
predict_dir = 'data_test_predict'
if not os.path.exists(predict_dir):
print(f"Creating directory: {predict_dir}")
os.makedirs(predict_dir)
else:
print(f"Directory {predict_dir} already exists")
# 保存结果
output_file = os.path.join(predict_dir, 'aggregated_stats_v7.json')
print(f"Saving results to: {output_file}")
with open(output_file, 'w') as f:
json.dump(output_data, f)
print(f"Stats V7 generated with 100-day window. Last data point: {last_date}")