==========================================
【App A】 GPV解析・キャッシュエンジン (Ver 97.5 モダン統合版)
VERSION INFO: 97.5 (新旧GSM完全対応・モダンUI維持・省略なし完全版)
==========================================
import sys
import os
import glob
import re
import subprocess
from datetime import datetime
import multiprocessing as mp
os.environ[‘QT_API’] = ‘pyqt6’
from PyQt6.QtWidgets import (QApplication, QWidget, QVBoxLayout, QHBoxLayout,
QPushButton, QLabel, QListWidget, QFileDialog, QMessageBox,
QSystemTrayIcon, QMenu, QProgressBar)
from PyQt6.QtCore import Qt, QTimer, QThread, pyqtSignal, QSettings
from PyQt6.QtGui import QIcon, QPixmap, QPainter, QAction
try:
import cfgrib
except ImportError:
sys.exit(“エラー: cfgribモジュールがインストールされていません。”)
==========================================
基本パス・定数設定
==========================================
APP_DIR = os.getcwd()
DEFAULT_OUTPUT_DIR = os.path.join(APP_DIR, “gpv_cache_npz”)
WGRIB2_EXE = os.path.join(APP_DIR, “wgrib2_data”, “wgrib2.exe”)
os.makedirs(DEFAULT_OUTPUT_DIR, exist_ok=True)
def create_lightning_icon():
“””タスクトレイ・ウィンドウ用の⚡アイコンを動的に生成”””
pixmap = QPixmap(64, 64)
pixmap.fill(Qt.GlobalColor.transparent)
painter = QPainter(pixmap)
font = painter.font()
font.setPixelSize(50)
painter.setFont(font)
painter.drawText(pixmap.rect(), Qt.AlignmentFlag.AlignCenter, “⚡”)
painter.end()
return QIcon(pixmap)
==========================================
裏方ワーカー: メモリリーク対策の独立プロセス
==========================================
def extract_gpv_worker(model, mode, cache, init, target_fts, f1, f2, wgrib2_path, q):
try:
import os, sys, subprocess, warnings
import numpy as np
import cfgrib
warnings.filterwarnings(“ignore”)
os.environ[“ECCODES_MAX_VALUES”] = “5000000” # Windows環境でのDLLパス設定 if sys.platform == "win32": conda_dir = os.path.dirname(sys.executable) dll_paths = [os.path.join(conda_dir, "Library", "bin"), os.path.join(conda_dir, "bin")] for p in dll_paths: if os.path.exists(p): os.environ["PATH"] = f"{p};{os.environ.get('PATH', '')}" try: os.add_dll_directory(p) except Exception: pass def calculate_vorticity(u, v, lon, lat): """500hPa渦度計算 (厳命事項)""" try: R = 6371000.0 LON, LAT = np.meshgrid(lon, lat) if lon.ndim == 1 else (lon, lat) rad_lat = np.deg2rad(LAT); rad_lon = np.deg2rad(LON) dy = R * np.gradient(rad_lat, axis=0) dx = R * np.cos(rad_lat) * np.gradient(rad_lon, axis=1) dx[dx == 0] = 1e-10; dy[dy == 0] = 1e-10 return ((np.gradient(v, axis=1) / dx) - (np.gradient(u, axis=0) / dy)) * 1e5 except Exception: return np.zeros_like(u) d_all = {ft: {} for ft in target_fts} creationflags = 0x08000000 if sys.platform == "win32" else 0 for t_ft in target_fts: d = d_all[t_ft] def slice_with_wgrib2(fin, ft_val, suffix): if fin == "NONE": return "NONE" fout = os.path.join(cache, f"temp_{model}_{mode}_{init}_{ft_val}_{suffix}.bin") if mode == "ANAL": match_str = ":(anl):" elif ft_val == 0: match_str = ":(anl|0 hour [^:]+|[0-9]+-0 hour [^:]+):" else: match_str = f":({ft_val} hour [^:]+|[0-9]+-{ft_val} hour [^:]+):" cmd = [wgrib2_path, fin, "-match", match_str, "-grib", fout] try: subprocess.run(cmd, creationflags=creationflags, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if os.path.exists(fout) and os.path.getsize(fout) > 0: return fout except Exception: pass return "NONE" f1_mini = slice_with_wgrib2(f1, t_ft, "1") f2_mini = slice_with_wgrib2(f2, t_ft, "2") def process_file(filepath, is_pall): if filepath == "NONE": return try: dss = cfgrib.open_datasets(filepath, backend_kwargs={'indexpath': ''}) for ds in dss: lon = ds['longitude'].values if 'longitude' in ds.coords else (ds['lon'].values if 'lon' in ds.coords else None) lat = ds['latitude'].values if 'latitude' in ds.coords else (ds['lat'].values if 'lat' in ds.coords else None) for v in ds.data_vars: da = ds[v] da_step = da.isel(step=0) if 'step' in da.dims else da sName = str(da.attrs.get('GRIB_shortName', v)).lower() # 次元削減 val_2d = da_step.values.copy() while val_2d.ndim > 2: val_2d = val_2d[0] # モード別のキー割り当て if mode == "GUID": if sName != 'unknown': d[sName] = val_2d if sName in ['2t', 't2m']: d['t2m'] = val_2d elif sName in ['tp', 'apcp', 'pr']: d['precip'] = val_2d else: if not is_pall and 'lon_surf' not in d: d['lon_surf'] = lon; d['lat_surf'] = lat elif is_pall and 'lon_pall' not in d: d['lon_pall'] = lon; d['lat_pall'] = lat if not is_pall: if sName in ['prmsl', 'msl', 'slp']: d['slp'] = val_2d / 100.0 if np.nanmax(val_2d) > 2000 else val_2d elif sName in ['tp', 'apcp', 'pr']: d['precip_raw'] = np.nan_to_num(val_2d, nan=0.0) elif sName in ['10u', 'u10']: d['u10'] = val_2d elif sName in ['10v', 'v10']: d['v10'] = val_2d elif sName in ['2t', 't2m']: d['t2m'] = val_2d - 273.15 if np.nanmax(val_2d) > 150 else val_2d else: lvl = int(da.attrs.get('GRIB_level', 0)) if lvl > 0: if sName in ['u', 'v', 't', 'gh', 'r', 'w']: d[f'{sName}{lvl}'] = val_2d for ds in dss: ds.close() except Exception: pass process_file(f1_mini, False) process_file(f2_mini, True) # 追加計算ロジック(渦度計算) if 'vort500' not in d and 'u500' in d and 'v500' in d: lon_arr = d.get('lon_pall', d.get('lon_surf')) lat_arr = d.get('lat_pall', d.get('lat_surf')) if lon_arr is not None: d['vort500'] = calculate_vorticity(d['u500'], d['v500'], lon_arr, lat_arr) # ファイル保存 pfx = "GUID_" if mode == "GUID" else "" out_file = os.path.join(cache, f"{model}_{pfx}{init}_FT{t_ft:02d}.npz") np.savez_compressed(out_file, **d) q.put(("SUCCESS", t_ft)) # 一時ファイル削除 try: if f1_mini != "NONE": os.remove(f1_mini) if f2_mini != "NONE": os.remove(f2_mini) except: pass except Exception as e: q.put(("ERROR", str(e)))
==========================================
監視・タスク振り分けスレッド
==========================================
class DataParserThread(QThread):
log_signal = pyqtSignal(str)
progress_signal = pyqtSignal(int, str)
finished_signal = pyqtSignal(bool)def __init__(self, folder_paths, output_dir, last_reported): super().__init__() self.folder_paths = folder_paths self.output_dir = output_dir self.last_reported = last_reported self.abort = False def run(self): has_new = False for folder_path in self.folder_paths: if self.abort or not folder_path or not os.path.exists(folder_path): continue # スキャン対象 (新旧GSM, MSM, ANAL, GUIDを網羅) scan_rules = [ ('*MSM*GPV*.bin', 'MSM', 'GPV', range(0, 80)), ('*GSM*GPV*Rjp*.bin', 'GSM_JP', 'GPV', range(0, 265, 3)), # 新規・気象庁直GSM ('*GSM*GPV*.bin', 'GSM_JP', 'GPV', range(0, 265, 3)), # 旧・一部加工GSM ('*ANAL_grib2*.bin', 'ANAL', 'ANAL', [0]), ('*GSM_GUID*.bin', 'GSM', 'GUID', range(0, 265, 3)) ] for pattern, model_id, mode, ft_range in scan_rules: files = glob.glob(os.path.join(folder_path, '**', pattern), recursive=True) if not files: continue # 最新時刻の特定 times = set(re.search(r'_(\d{14})_', f).group(1) for f in files if re.search(r'_(\d{14})_', f)) if not times: continue latest_it = max(times) latest_files = [f for f in files if latest_it in f] if self.last_reported.get(f"{model_id}_{mode}") != latest_it: self.log_signal.emit(f"✅ {model_id} {mode} 新データを確認: {latest_it}") self.last_reported[f"{model_id}_{mode}"] = latest_it # 未処理FTの抽出 tasks = [] for ft in ft_range: pfx = "GUID_" if mode == "GUID" else "" if not os.path.exists(os.path.join(self.output_dir, f"{model_id}_{pfx}{latest_it}_FT{ft:02d}.npz")): tasks.append(ft) if tasks: has_new = True self.log_signal.emit(f"🚀 {model_id} NPZ抽出処理を開始 ({len(tasks)}件)") ctx = mp.get_context('spawn') for i, ft in enumerate(tasks): if self.abort: break q = ctx.Queue() f1 = latest_files[0] p = ctx.Process(target=extract_gpv_worker, args=(model_id, mode, self.output_dir, latest_it, [ft], f1, "NONE", WGRIB2_EXE, q)) p.start() p.join() while not q.empty(): msg = q.get() if msg[0] == "SUCCESS": prog = int(((i + 1) / len(tasks)) * 100) self.progress_signal.emit(prog, f"{model_id} FT{msg[1]} 完了") elif msg[0] == "ERROR": self.log_signal.emit(f"⚠️ エラー発生 FT{ft}: {msg[1]}") self.finished_signal.emit(has_new)
==========================================
GUI アプリケーション本体
==========================================
class EngineAppWindow(QWidget):
def init(self):
super().init()
self.settings = QSettings(‘WeatherApp’, ‘EngineAppA’)
self.input_dirs = [self.settings.value(‘input_dir’, os.path.join(os.getcwd(), “grib2_data”))]
self.output_dir = self.settings.value(‘output_dir’, DEFAULT_OUTPUT_DIR) self.last_reported = {} self.worker = None self.init_ui() self.setup_tray() # 定期監視タイマー self.monitor_timer = QTimer(self) self.monitor_timer.timeout.connect(self.run_parser) def init_ui(self): self.setWindowTitle("App A: GPV 解析エンジン (Ver 97.5 統合版)") self.setFixedSize(650, 580) # モダン・ダークテーマの適用 self.setStyleSheet(""" QWidget { background-color: #0A192F; color: #E0E0E0; font-family: 'MS Gothic'; font-size: 11pt; } QPushButton { background-color: #1D3557; border: 1px solid #457B9D; padding: 8px; border-radius: 4px; color: white; font-weight: bold; } QPushButton:hover { background-color: #457B9D; } QListWidget { background-color: #112240; border: 1px solid #444; color: #64FFDA; padding: 5px; } QProgressBar { text-align: center; color: white; background-color: #112240; border: 1px solid #457B9D; border-radius: 4px; } QProgressBar::chunk { background-color: #27AE60; } """) layout = QVBoxLayout(self) # ステータス表示 self.status_label = QLabel("状態: 待機中 (■ 停止)") self.status_label.setStyleSheet("font-size: 14pt; font-weight: bold; color: #8892B0;") layout.addWidget(self.status_label) # ディレクトリ設定パネル dir_panel = QWidget() dir_panel.setStyleSheet("background-color: #112240; border: 1px solid #233554; border-radius: 5px;") dir_layout = QVBoxLayout(dir_panel) self.lbl_in = QLabel(f"入力元 (GRIB2): {self.input_dirs[0]}") btn_in = QPushButton("入力フォルダを変更") btn_in.clicked.connect(self.change_input_dir) self.lbl_out = QLabel(f"出力先 (NPZ): {self.output_dir}") btn_out = QPushButton("出力フォルダを変更") btn_out.clicked.connect(self.change_output_dir) dir_layout.addWidget(self.lbl_in) dir_layout.addWidget(btn_in) dir_layout.addWidget(self.lbl_out) dir_layout.addWidget(btn_out) layout.addWidget(dir_panel) # 進捗バーとログ self.progress_bar = QProgressBar() layout.addWidget(self.progress_bar) self.log_list = QListWidget() layout.addWidget(self.log_list) # コントロールボタン ctrl_layout = QHBoxLayout() self.btn_start = QPushButton("▶ 監視・解析を開始") self.btn_start.setStyleSheet("background-color: #27AE60; font-size: 12pt; height: 40px;") self.btn_start.clicked.connect(self.start_engine) self.btn_stop = QPushButton("⏸ 一時停止") self.btn_stop.setStyleSheet("background-color: #F39C12; font-size: 12pt; height: 40px;") self.btn_stop.clicked.connect(self.stop_engine) self.btn_stop.setEnabled(False) ctrl_layout.addWidget(self.btn_start) ctrl_layout.addWidget(self.btn_stop) layout.addLayout(ctrl_layout) # 完全終了ボタン(ご要望のフルストップボタン) self.btn_exit = QPushButton("🛑 システムを完全終了") self.btn_exit.setStyleSheet("background-color: #C0392B; color: white; font-weight: bold; font-size: 12pt; height: 40px; margin-top: 10px;") self.btn_exit.clicked.connect(self.force_quit) layout.addWidget(self.btn_exit) def setup_tray(self): self.tray_icon = QSystemTrayIcon(self) self.tray_icon.setIcon(create_lightning_icon()) menu = QMenu() show_action = QAction("画面を表示", self) show_action.triggered.connect(self.showNormal) quit_action = QAction("完全終了", self) quit_action.triggered.connect(self.force_quit) menu.addAction(show_action) menu.addAction(quit_action) self.tray_icon.setContextMenu(menu) self.tray_icon.show() def log(self, msg): time_str = datetime.now().strftime('%H:%M:%S') self.log_list.addItem(f"[{time_str}] {msg}") self.log_list.scrollToBottom() def change_input_dir(self): d = QFileDialog.getExistingDirectory(self, "入力GRIB2フォルダを選択", self.input_dirs[0]) if d: self.input_dirs = [d] self.settings.setValue('input_dir', d) self.lbl_in.setText(f"入力元 (GRIB2): {d}") def change_output_dir(self): d = QFileDialog.getExistingDirectory(self, "出力NPZフォルダを選択", self.output_dir) if d: self.output_dir = d self.settings.setValue('output_dir', d) self.lbl_out.setText(f"出力先 (NPZ): {d}") def start_engine(self): self.btn_start.setEnabled(False) self.btn_stop.setEnabled(True) self.status_label.setText("状態: 🟢 監視稼働中") self.status_label.setStyleSheet("font-size: 14pt; font-weight: bold; color: #2ECC71;") self.log("▶ 自動監視・解析エンジンを起動しました。") self.run_parser() # 初回即時実行 self.monitor_timer.start(10000) # 10秒間隔で監視 def stop_engine(self): self.monitor_timer.stop() if self.worker and self.worker.isRunning(): self.worker.abort = True self.worker.wait() self.btn_start.setEnabled(True) self.btn_stop.setEnabled(False) self.status_label.setText("状態: 待機中 (■ 停止)") self.status_label.setStyleSheet("font-size: 14pt; font-weight: bold; color: #8892B0;") self.log("⏸ エンジンを一時停止しました。") def force_quit(self): self.stop_engine() self.log("🛑 システムを完全終了します...") QApplication.quit() def run_parser(self): if self.worker and self.worker.isRunning(): return self.worker = DataParserThread(self.input_dirs, self.output_dir, self.last_reported) self.worker.log_signal.connect(self.log) def update_progress(val, text): self.progress_bar.setValue(val) self.progress_bar.setFormat(f"{text} ({val}%)") self.worker.progress_signal.connect(update_progress) self.worker.finished_signal.connect(self.on_parser_finished) self.worker.start() def on_parser_finished(self, has_new): if has_new: self.progress_bar.setValue(100) self.progress_bar.setFormat("全データ抽出完了 (100%)") self.log("✅ 新規データの抽出がすべて完了しました。") else: self.progress_bar.setValue(0) self.progress_bar.setFormat("待機中...")
if name == ‘main‘:
mp.freeze_support()
app = QApplication(sys.argv)
window = EngineAppWindow()
window.show()
sys.exit(app.exec())
