未分類

import tkinter as tk

from tkinter import ttk, messagebox, filedialog

import pandas as pd

import pyodbc

import json

import os

import warnings

# pandasの警告を非表示

warnings.filterwarnings(‘ignore’, category=UserWarning)

SETTINGS_FILE = “settings.json”

class DatabaseAppPro:

    def __init__(self, root):

        self.root = root

        self.root.title(“気象データ統合・評価システム Pro”)

        self.root.geometry(“1100×850”)

        self.settings = self.load_settings()

        self.vars = {} # 全入力UIの変数を保持

        self.dataframes = {} # 取得したDFを辞書で保持 (obs, fcst1, fcst2…)

        self.amedas_df = None

        # SQLite DB Browser風スタイル

        self.style = ttk.Style()

        self.style.theme_use(“clam”)

        self.style.configure(“Treeview”, rowheight=25, borderwidth=1, fieldbackground=”#ffffff”)

        self.style.configure(“Treeview.Heading”, font=(‘Helvetica’, 9, ‘bold’), background=”#e1e1e1″)

        self.create_widgets()

    def load_settings(self):

        if os.path.exists(SETTINGS_FILE):

            try:

                with open(SETTINGS_FILE, “r”, encoding=”utf-8″) as f:

                    return json.load(f)

            except:

                pass

        return {}

    def save_settings(self):

        for key, var in self.vars.items():

            self.settings[key] = var.get()

        with open(SETTINGS_FILE, “w”, encoding=”utf-8″) as f:

            json.dump(self.settings, f, ensure_ascii=False, indent=4)

    def on_closing(self):

        self.save_settings()

        self.root.destroy()

    def create_widgets(self):

        self.notebook = ttk.Notebook(self.root)

        self.notebook.pack(fill=”both”, expand=True, padx=5, pady=5)

        # タブ構成

        tab1 = ttk.Frame(self.notebook) # 共通設定&観測値

        tab2 = ttk.Frame(self.notebook) # 予報1-5

        tab3 = ttk.Frame(self.notebook) # 予報6-10

        tab4 = ttk.Frame(self.notebook) # プレビュー

        tab5 = ttk.Frame(self.notebook) # B-1 Excel出力

        self.notebook.add(tab1, text=” 1. DB接続 & 観測値・アメダス “)

        self.notebook.add(tab2, text=” 2. 予報設定 (1〜5) “)

        self.notebook.add(tab3, text=” 3. 予報設定 (6〜10) “)

        self.notebook.add(tab4, text=” 4. データ取得 & プレビュー “)

        self.notebook.add(tab5, text=” 5. Excel出力 (B-1形式) “)

        # — タブ1: 共通DB設定 & 観測値 —

        self.setup_db_connection(tab1)

        self.build_sql_frame(tab1, “【実況】観測値テーブル抽出設定”, “obs”)

        frame_amedas = tk.LabelFrame(tab1, text=” アメダスデータ (CSV等) “, padx=10, pady=10)

        frame_amedas.pack(fill=”x”, padx=10, pady=5)

        tk.Button(frame_amedas, text=”アメダスファイル読込”, command=self.load_amedas).pack(side=”left”)

        self.lbl_amedas = tk.Label(frame_amedas, text=”未読込”)

        self.lbl_amedas.pack(side=”left”, padx=10)

        # — タブ2, 3: 予報テーブル設定 —

        self.build_scrollable_sqls(tab2, [“fcst1”, “fcst2”, “fcst3”, “fcst4”, “fcst5”])

        self.build_scrollable_sqls(tab3, [“fcst6”, “fcst7”, “fcst8”, “fcst9”, “fcst10”])

        # — タブ4: 取得とプレビュー —

        self.setup_preview_tab(tab4)

        # — タブ5: Excel出力 —

        self.setup_export_tab(tab5)

    def setup_db_connection(self, parent):

        “””前回の導通点検機能を復活させた共通DB接続枠”””

        frame_db = tk.LabelFrame(parent, text=” データベース接続設定 (共通) “, padx=10, pady=5)

        frame_db.pack(fill=”x”, padx=10, pady=5)

        keys = [“host”, “db”, “user”, “pass”, “driver”]

        for k in keys:

            if k not in self.settings:

                self.settings[k] = “ODBC Driver 17 for SQL Server” if k == “driver” else “”

            self.vars[k] = tk.StringVar(value=self.settings[k])

        tk.Label(frame_db, text=”Host:”).grid(row=0, column=0, sticky=”e”)

        tk.Entry(frame_db, textvariable=self.vars[“host”], width=20).grid(row=0, column=1, sticky=”w”, padx=2, pady=2)

        tk.Label(frame_db, text=”DB名:”).grid(row=0, column=2, sticky=”e”)

        tk.Entry(frame_db, textvariable=self.vars[“db”], width=20).grid(row=0, column=3, sticky=”w”, padx=2, pady=2)

        tk.Label(frame_db, text=”ID:”).grid(row=1, column=0, sticky=”e”)

        tk.Entry(frame_db, textvariable=self.vars[“user”], width=20).grid(row=1, column=1, sticky=”w”, padx=2, pady=2)

        tk.Label(frame_db, text=”Pass:”).grid(row=1, column=2, sticky=”e”)

        tk.Entry(frame_db, textvariable=self.vars[“pass”], width=20, show=”*”).grid(row=1, column=3, sticky=”w”, padx=2, pady=2)

        tk.Label(frame_db, text=”Driver:”).grid(row=2, column=0, sticky=”e”)

        tk.Entry(frame_db, textvariable=self.vars[“driver”], width=20).grid(row=2, column=1, sticky=”w”, padx=2, pady=2)

        btn_test = tk.Button(frame_db, text=”導通点検 (接続テスト)”, command=self.test_connection, bg=”#ffd700″)

        btn_test.grid(row=0, column=4, rowspan=2, padx=15)

        self.text_test_result = tk.Text(frame_db, width=40, height=3, bg=”#f0f0f0″)

        self.text_test_result.grid(row=0, column=5, rowspan=3)

    def build_scrollable_sqls(self, parent, prefixes):

        canvas = tk.Canvas(parent)

        scrollbar = ttk.Scrollbar(parent, orient=”vertical”, command=canvas.yview)

        scrollable_frame = ttk.Frame(canvas)

        scrollable_frame.bind(“<Configure>”, lambda e: canvas.configure(scrollregion=canvas.bbox(“all”)))

        canvas.create_window((0, 0), window=scrollable_frame, anchor=”nw”)

        canvas.configure(yscrollcommand=scrollbar.set)

        canvas.pack(side=”left”, fill=”both”, expand=True)

        scrollbar.pack(side=”right”, fill=”y”)

        for i, prefix in enumerate(prefixes):

            title = f”予報テーブル {prefix.replace(‘fcst’, ”)} 抽出設定”

            self.build_sql_frame(scrollable_frame, title, prefix)

    def build_sql_frame(self, parent, title, prefix):

        “””消えていた細かいSQL条件入力欄を復活”””

        frame = tk.LabelFrame(parent, text=f” {title} “, padx=10, pady=5)

        frame.pack(fill=”x”, padx=10, pady=5)

        for k in [“table”, “cols”, “where”, “order”, “limit”]:

            if f”{prefix}_{k}” not in self.settings:

                if k == “cols”: val = “*”

                elif k == “limit”: val = “1000”

                else: val = “”

                self.settings[f”{prefix}_{k}”] = val

            self.vars[f”{prefix}_{k}”] = tk.StringVar(value=self.settings[f”{prefix}_{k}”])

        tk.Label(frame, text=”テーブル:”).grid(row=0, column=0, sticky=”e”)

        tk.Entry(frame, textvariable=self.vars[f”{prefix}_table”], width=20).grid(row=0, column=1, sticky=”w”, padx=2)

        tk.Label(frame, text=”列名:”).grid(row=0, column=2, sticky=”e”)

        tk.Entry(frame, textvariable=self.vars[f”{prefix}_cols”], width=40).grid(row=0, column=3, sticky=”w”, padx=2)

        tk.Label(frame, text=”WHERE:”).grid(row=1, column=0, sticky=”e”)

        tk.Entry(frame, textvariable=self.vars[f”{prefix}_where”], width=20).grid(row=1, column=1, sticky=”w”, padx=2)

        tk.Label(frame, text=”ORDER:”).grid(row=1, column=2, sticky=”e”)

        tk.Entry(frame, textvariable=self.vars[f”{prefix}_order”], width=20).grid(row=1, column=3, sticky=”w”, padx=2)

        tk.Label(frame, text=”LIMIT(TOP):”).grid(row=1, column=4, sticky=”e”)

        tk.Entry(frame, textvariable=self.vars[f”{prefix}_limit”], width=10).grid(row=1, column=5, sticky=”w”, padx=2)

    def setup_preview_tab(self, parent):

        “””取得ボタンとSQLite風プレビュー(復活)”””

        frame_top = tk.Frame(parent, padx=10, pady=10)

        frame_top.pack(fill=”x”)

        tk.Button(frame_top, text=”▶ 全データを一括取得”, command=self.fetch_all_data, bg=”#87ceeb”, font=(“”, 10, “bold”)).pack(side=”left”)

        self.lbl_status = tk.Label(frame_top, text=”待機中”, fg=”gray”)

        self.lbl_status.pack(side=”left”, padx=10)

        tk.Label(frame_top, text=”プレビュー表示:”).pack(side=”left”, padx=(20, 0))

        self.combo_preview = ttk.Combobox(frame_top, values=[], state=”readonly”)

        self.combo_preview.pack(side=”left”, padx=5)

        self.combo_preview.bind(“<<ComboboxSelected>>”, self.update_preview)

        frame_tree = tk.Frame(parent, padx=10, pady=5)

        frame_tree.pack(fill=”both”, expand=True)

        scroll_y = ttk.Scrollbar(frame_tree, orient=”vertical”)

        scroll_y.pack(side=”right”, fill=”y”)

        scroll_x = ttk.Scrollbar(frame_tree, orient=”horizontal”)

        scroll_x.pack(side=”bottom”, fill=”x”)

        self.tree = ttk.Treeview(frame_tree, yscrollcommand=scroll_y.set, xscrollcommand=scroll_x.set)

        self.tree.pack(fill=”both”, expand=True)

        scroll_y.config(command=self.tree.yview)

        scroll_x.config(command=self.tree.xview)

        self.tree.tag_configure(‘oddrow’, background=”#f9f9f9″)

        self.tree.tag_configure(‘evenrow’, background=”#ffffff”)

    def setup_export_tab(self, parent):

        “””要素のリストボックス選択とB-1出力”””

        frame_left = tk.LabelFrame(parent, text=” B-1行列にする対象列を選択 “, padx=10, pady=10)

        frame_left.pack(side=”left”, fill=”y”, padx=10, pady=10)

        tk.Label(frame_left, text=”※降水量や気温などを選択\n(DateとMscodeは自動認識)”).pack()

        self.listbox_cols = tk.Listbox(frame_left, selectmode=tk.MULTIPLE, width=25, height=20)

        self.listbox_cols.pack(fill=”both”, expand=True)

        frame_right = tk.Frame(parent, padx=20, pady=20)

        frame_right.pack(side=”right”, fill=”both”, expand=True)

        tk.Button(frame_right, text=”選択した列で B-1形式Excel を出力”, command=self.export_excel, bg=”#90ee90″, height=2, font=(“”, 10, “bold”)).pack(fill=”x”, pady=20)

        tk.Label(frame_right, text=”【B-1形式出力イメージ】\n縦軸: 時間\n横軸: 予報テーブル\n予報値の下に観測値が横並び\nシート右側にアメダス併記”, justify=”left”, bg=”#fffaf0″, relief=”solid”, borderwidth=1, padx=10, pady=10).pack(fill=”x”)

    def test_connection(self):

        “””導通点検ロジックの復元”””

        self.text_test_result.delete(“1.0”, tk.END)

        self.text_test_result.config(bg=”#ffffff”)

        driver, host, db, user, pwd = [self.vars[k].get() for k in [“driver”, “host”, “db”, “user”, “pass”]]

        if not all([host, db, driver]):

            self.text_test_result.insert(tk.END, “⚠️ Host, DB, Driverを入力してください”)

            return

        try:

            self.text_test_result.insert(tk.END, “接続テスト中…\n”)

            self.root.update()

            conn_str = f”DRIVER={{{driver}}};SERVER={host};DATABASE={db};UID={user};PWD={pwd}”

            conn = pyodbc.connect(conn_str, timeout=3)

            self.text_test_result.delete(“1.0”, tk.END)

            self.text_test_result.insert(tk.END, f”✅ 接続成功!\nDB「{db}」にアクセス可”)

            self.text_test_result.config(bg=”#e6ffe6″)

            conn.close()

        except Exception as e:

            self.text_test_result.delete(“1.0”, tk.END)

            self.text_test_result.insert(tk.END, f”❌ 失敗\n{str(e)[:50]}…”)

            self.text_test_result.config(bg=”#ffe6e6″)

    def load_amedas(self):

        filepath = filedialog.askopenfilename(filetypes=[(“CSV files”, “*.csv”), (“All files”, “*.*”)])

        if filepath:

            try:

                self.amedas_df = pd.read_csv(filepath, encoding=”shift_jis”)

                self.lbl_amedas.config(text=f”読込完了: {os.path.basename(filepath)}”)

            except Exception as e:

                messagebox.showerror(“エラー”, f”読込失敗:\n{e}”)

    def build_query(self, prefix):

        table = self.vars[f”{prefix}_table”].get().strip()

        if not table: return None

        cols = self.vars[f”{prefix}_cols”].get().strip() or “*”

        where = self.vars[f”{prefix}_where”].get().strip()

        order = self.vars[f”{prefix}_order”].get().strip()

        limit = self.vars[f”{prefix}_limit”].get().strip()

        sql = “SELECT “

        if limit.isdigit(): sql += f”TOP {limit} “

        sql += f”{cols} FROM {table} “

        if where: sql += f”WHERE {where} “

        if order: sql += f”ORDER BY {order} “

        return sql

    def fetch_all_data(self):

        self.save_settings()

        self.lbl_status.config(text=”データ取得中…”, fg=”blue”)

        self.root.update()

        driver, host, db, user, pwd = [self.vars[k].get() for k in [“driver”, “host”, “db”, “user”, “pass”]]

        conn_str = f”DRIVER={{{driver}}};SERVER={host};DATABASE={db};UID={user};PWD={pwd}”

        self.dataframes.clear()

        success_count = 0

        try:

            conn = pyodbc.connect(conn_str)

            # obs と fcst1〜10 をループ処理

            prefixes = [“obs”] + [f”fcst{i}” for i in range(1, 11)]

            for p in prefixes:

                sql = self.build_query(p)

                if sql:

                    df = pd.read_sql(sql, conn)

                    # タイムゾーン考慮(UTC化)

                    if ‘Date’ in df.columns:

                        df[‘Date’] = pd.to_datetime(df[‘Date’], utc=True)

                    self.dataframes[p] = df

                    success_count += 1

            conn.close()

            self.lbl_status.config(text=f”完了: {success_count}テーブル取得”, fg=”green”)

            # プレビュー用のコンボボックスとリストボックス更新

            self.combo_preview[‘values’] = list(self.dataframes.keys())

            if self.dataframes:

                self.combo_preview.current(0)

                self.update_preview()

                self.update_listbox()

            messagebox.showinfo(“成功”, f”{success_count} 個のテーブルからデータを取得しました。”)

        except Exception as e:

            self.lbl_status.config(text=”取得失敗”, fg=”red”)

            messagebox.showerror(“エラー”, str(e))

    def update_preview(self, event=None):

        “””SQLite風プレビューに表示”””

        target = self.combo_preview.get()

        if target not in self.dataframes: return

        df = self.dataframes[target]

        self.tree.delete(*self.tree.get_children())

        self.tree[“column”] = list(df.columns)

        self.tree[“show”] = “headings”

        for col in self.tree[“column”]:

            self.tree.heading(col, text=col)

            self.tree.column(col, width=100)

        count = 0

        for row in df.head(500).itertuples(index=False):

            tag = ‘evenrow’ if count % 2 == 0 else ‘oddrow’

            self.tree.insert(“”, “end”, values=row, tags=(tag,))

            count += 1

    def update_listbox(self):

        “””タブ5のリストボックス更新”””

        self.listbox_cols.delete(0, tk.END)

        # 代表して最初のDFのカラムを表示

        first_df = list(self.dataframes.values())[0]

        for col in first_df.columns:

            if col not in [‘Date’, ‘Mscode’]: # DateとMscode以外を選択可能に

                self.listbox_cols.insert(tk.END, col)

        self.listbox_cols.select_set(0, tk.END)

    def export_excel(self):

        if not self.dataframes:

            messagebox.showwarning(“警告”, “出力するデータがありません。”)

            return

        selected_indices = self.listbox_cols.curselection()

        if not selected_indices:

            messagebox.showwarning(“警告”, “行列化する要素(列)を選択してください。”)

            return

        target_vals = [self.listbox_cols.get(i) for i in selected_indices]

        filepath = filedialog.asksaveasfilename(defaultextension=”.xlsx”, filetypes=[(“Excel files”, “*.xlsx”)])

        if not filepath: return

        try:

            with pd.ExcelWriter(filepath, engine=’openpyxl’) as writer:

                # MS(地点)の抽出

                ms_list = set()

                for df in self.dataframes.values():

                    if ‘Mscode’ in df.columns: ms_list.update(df[‘Mscode’].unique())

                for ms in ms_list:

                    sheet_name = str(ms)[:31]

                    combined_data = []

                    # データの縦持ち統合

                    for prefix, df in self.dataframes.items():

                        if ‘Mscode’ not in df.columns: continue

                        ms_df = df[df[‘Mscode’] == ms].copy()

                        if ms_df.empty: continue

                        ms_df[‘Type’] = ‘実況値’ if prefix == ‘obs’ else ‘予報値’

                        ms_df[‘Source’] = ‘観測’ if prefix == ‘obs’ else prefix.replace(‘fcst’, ‘予報’)

                        combined_data.append(ms_df)

                    if not combined_data: continue

                    all_df = pd.concat(combined_data, ignore_index=True)

                    # 選択された要素ごとにピボット

                    for val_col in target_vals:

                        if val_col not in all_df.columns: continue

                        pivot_df = all_df.pivot_table(

                            index=[‘Date’, ‘Type’], columns=’Source’, values=val_col, aggfunc=’first’

                        ).reset_index()

                        pivot_df[‘Type_Sort’] = pivot_df[‘Type’].map({‘予報値’: 0, ‘実況値’: 1})

                        pivot_df = pivot_df.sort_values([‘Date’, ‘Type_Sort’]).drop(‘Type_Sort’, axis=1)

                        pivot_df.to_excel(writer, sheet_name=sheet_name, index=False)

                        break # ※とりあえず最初の選択要素でB-1表を作成

                    # アメダスデータの併記

                    if self.amedas_df is not None:

                        start_col = len(pivot_df.columns) + 2

                        self.amedas_df.to_excel(writer, sheet_name=sheet_name, index=False, startcol=start_col)

            messagebox.showinfo(“成功”, f”B-1形式で出力しました!\n{filepath}”)

        except Exception as e:

            messagebox.showerror(“出力エラー”, str(e))

if __name__ == “__main__”:

    root = tk.Tk()

    app = DatabaseAppPro(root)

    root.protocol(“WM_DELETE_WINDOW”, app.on_closing)

    root.mainloop()

コメント