fileapp.py 內容


# -*- coding: utf-8 -*-
"""
fileapp.py – 智慧行程規劃系統(含在地票價回退)
------------------------------------------------
1. 優先順序:Google 官方票價 → DB 票價 → THSR 對照表 → TRA 分段費率 → 距離估算。
2. THSR_FARES 依 2025 標準車廂票價;TRA_RATE_TABLE 以自強號 / 莒光號 / 區間車分段。
3. 其餘邏輯(Google Directions、Metro 票價、地圖繪製)維持不變。
streamlit run fileapp.py
"""

from __future__ import annotations
import os, math
import urllib.parse
from datetime import datetime
from typing import Optional

import streamlit as st
import googlemaps
import polyline
import folium
from streamlit_folium import st_folium

from tw_router.core import (
    query_thsr_fare_db,
    query_tra_fare_db,
    query_metro_fare_db,
)

# ╔════════════════════ 1. 高鐵票價對照表 ════════════════════╗
THSR_STATIONS = [
    "南港", "台北", "板橋", "桃園", "新竹", "苗栗",
    "台中", "彰化", "雲林", "嘉義", "台南", "左營",
]
# 12×12 上三角矩陣(單位:TWD)
# row i, col j (j>i) 為站 i → 站 j 票價
THSR_MATRIX = [
    # 南港 → 其餘
    [  40,  70, 200, 330, 480, 750,  870,  970, 1120, 1390, 1530],  # 南港
    # 台北 →
    [      40, 160, 290, 430, 700,  820,  930, 1080, 1350, 1490],  # 台北
    # 板橋 →
    [          130, 260, 400, 670,  790,  900, 1050, 1320, 1460],  # 板橋
    # 桃園 →
    [               130, 280, 540,  670,  780,  920, 1190, 1330],  # 桃園
    # 新竹 →
    [                    140, 410,  540,  640,  790, 1060, 1200],  # 新竹
    # 苗栗 →
    [                         270,  390,  500,  640,  920, 1060],  # 苗栗
    # 台中 →
    [                              130,  230,  380,  650,  790],  # 台中
    # 彰化 →
    [                                   110,  250,  530,  670],  # 彰化
    # 雲林 →
    [                                        150,  420,  560],  # 雲林
    # 嘉義 →
    [                                             280,  410],  # 嘉義
    # 台南 →
    [                                                  140],  # 台南
]

def get_thsr_fare_local(dep: str, arr: str) -> Optional[int]:
    """依中文站名查高鐵對照表,取不到傳回 None。"""
    try:
        i = THSR_STATIONS.index(dep)
        j = THSR_STATIONS.index(arr)
    except ValueError:
        return None
    if i == j:
        return 0
    # 確保 i < j 以索引上三角矩陣
    if i > j:
        i, j = j, i
    return THSR_MATRIX[i][j - i - 1]

# ╔════════════════════ 2. 臺鐵分段費率 ════════════════════╗
# 依距離 * 每公里費率,計算後四捨五入至 5 元。
TRA_RATE_TABLE = {
    "區間車": [
        (50,   2.18),
        (50,   1.92),
        (100,  1.81),
        (100,  1.53),
        (math.inf, 1.42),
    ],
    "莒光號": [
        (50,   2.61),
        (50,   2.30),
        (100,  2.17),
        (100,  1.83),
        (math.inf, 1.70),
    ],
    "自強號": [
        (50,   3.39),
        (50,   2.98),
        (100,  2.81),
        (100,  2.37),
        (math.inf, 2.20),
    ],
}

def estimate_tra_fare_by_distance(dist_m: int, train_type: str = "自強號") -> int:
    """依行駛距離估算 TRA 票價 (TWD),預設自強號。"""
    km_left = dist_m / 1000
    total = 0.0
    for seg_km, rate in TRA_RATE_TABLE[train_type]:
        take = min(km_left, seg_km)
        total += take * rate
        km_left -= take
        if km_left <= 0:
            break
    # 四捨五入至 5 元
    return int(math.ceil(total / 5.0) * 5)

# ╔════════════════════ 3. 距離估算:其他交通工具 ════════════╗
DEFAULT_DIST = {
    "HIGH_SPEED_TRAIN": 300_000,
    "HIGH_SPEED_RAIL":  300_000,
    "HEAVY_RAIL":       120_000,
    "RAIL":             120_000,
    "COMMUTER_TRAIN":   40_000,
    "BUS":              30_000,
    "FERRY":            15_000,
    "SUBWAY":           6_000,
    "METRO_RAIL":       6_000,
    "TRAM":             6_000,
}

ESTIMATE_RATE = {
    "BUS":   (15, 2),
    "SUBWAY": (20, 3),
    "METRO_RAIL": (20, 3),
    "TRAM":  (20, 3),
    "FERRY": (50, 5),
    "HEAVY_RAIL": (20, 2.5),
    "RAIL":  (20, 2.5),
    "COMMUTER_TRAIN": (20, 2.5),
    "HIGH_SPEED_TRAIN": (40, 4),
    "HIGH_SPEED_RAIL":  (40, 4),
}

def generic_estimate_fare(vtype: str, dist_m: Optional[int]) -> int:
    vtype = vtype.upper()
    if dist_m is None:
        dist_m = DEFAULT_DIST.get(vtype, 20_000)
    km = dist_m / 1000
    base, per = ESTIMATE_RATE.get(vtype, (30, 3))
    return int(base + per * km)

# ╔════════════════════ 4. Google Maps 初始化 ════════════════╗
API_KEY = os.getenv("GOOGLE_MAPS_API_KEY", "AIzaSyCRMEzaBBZyqcsIPoiX1B9b7YTaIn85GLk")
if not API_KEY:
    st.warning("未設定 GOOGLE_MAPS_API_KEY,部分功能可能受限。")

gmaps = googlemaps.Client(key=API_KEY)

# ╔════════════════════ 5. Streamlit UI 基本設定 ═════════════╗
st.set_page_config(page_title="🚆 智慧行程規劃系統", layout="wide")
st.title("🚆 智慧行程規劃系統(DB + Local Fallback)")

if "routes" not in st.session_state:
    st.session_state.routes = None
if "method" not in st.session_state:
    st.session_state.method = "全部路線"
if "timestamp" not in st.session_state:
    st.session_state.timestamp = None
if "query_time" not in st.session_state:
    st.session_state.query_time = None
if "time_type" not in st.session_state:
    st.session_state.time_type = None

# ╔════════════════════ 6. 側邊欄 – 使用者輸入 ══════════════╗
with st.sidebar:
    st.header("查詢設定")
    origin = st.text_input("起點地址", "台北車站")
    destination = st.text_input("終點地址", "高鐵左營站")
    method = st.radio("欲顯示方案", ("全部路線", "最快速路線", "最便宜路線"), index=0, key="radio_method")
    time_type = st.radio("時間設定方式", ("出發時間", "抵達時間"))

    if hasattr(st, "datetime_input"):
        dt_obj = st.datetime_input("選擇出發 / 抵達時間", value=datetime.now().replace(second=0, microsecond=0), step=300)
        route_time = dt_obj
    else:
        d = st.date_input("選擇日期", value=datetime.now().date())
        t = st.time_input("選擇時間", value=datetime.now().time().replace(second=0, microsecond=0), step=300)
        route_time = datetime.combine(d, t)

    search_btn = st.button("開始查詢")

# ╔════════════════════ 7. Helper:單段票價計算 ═════════════╗
VEHICLE_MAP = {
    "BUS": "公車", "SUBWAY": "捷運", "METRO_RAIL": "捷運", "TRAM": "輕軌",
    "HEAVY_RAIL": "臺鐵", "RAIL": "臺鐵", "COMMUTER_TRAIN": "臺鐵",
    "HIGH_SPEED_TRAIN": "高鐵", "HIGH_SPEED_RAIL": "高鐵", "FERRY": "渡輪",
}

def calc_step_fare(td: dict, dist_m: Optional[int] = None) -> int:
    """嘗試依序以 DB / Local / 估算計算單段票價"""
    vraw = (td["line"]["vehicle"]["type"] or "").upper()
    dep_stop = td["departure_stop"]
    arr_stop = td["arrival_stop"]
    dep_id = dep_stop.get("stop_id") or dep_stop.get("station_id")
    arr_id = arr_stop.get("stop_id") or arr_stop.get("station_id")
    dep_name = dep_stop.get("name", "")
    arr_name = arr_stop.get("name", "")

    price = None
    # ---- 1) DB ----
    if vraw in ("HEAVY_RAIL", "RAIL", "COMMUTER_TRAIN") and dep_id and arr_id:
        price = query_tra_fare_db(f"TRA_{dep_id}", f"TRA_{arr_id}")
    elif vraw in ("HIGH_SPEED_TRAIN", "HIGH_SPEED_RAIL") and dep_id and arr_id:
        price = query_thsr_fare_db(f"THSR_{dep_id}", f"THSR_{arr_id}")
    elif vraw in ("SUBWAY", "METRO_RAIL", "TRAM"):
        agency = td["line"].get("agencies", [{}])[0].get("name", "")
        op_map = {"台北捷運": "TRTC", "高雄捷運": "KRTC", "桃園捷運": "TYMC", "台中捷運": "TMRT"}
        for k, op in op_map.items():
            if k in agency and dep_id and arr_id:
                price = query_metro_fare_db(op, f"{op}_{dep_id}", f"{op}_{arr_id}")
                break

    # ---- 2) Local Fallback ----
    if price is None:
        if vraw in ("HIGH_SPEED_TRAIN", "HIGH_SPEED_RAIL"):
            price = get_thsr_fare_local(dep_name, arr_name)
        elif vraw in ("HEAVY_RAIL", "RAIL", "COMMUTER_TRAIN") and dist_m:
            price = estimate_tra_fare_by_distance(dist_m)

    # ---- 3) Generic Estimate ----
    if price is None:
        price = generic_estimate_fare(vraw, dist_m)

    return price

# ╔════════════════════ 8. 查詢 Google 路線 ═════════════════╗
if search_btn:
    if not origin or not destination:
        st.sidebar.error("請輸入完整地址!")
    else:
        params = {"mode": "transit", "alternatives": True}
        params["departure_time" if time_type == "出發時間" else "arrival_time"] = route_time
        routes = gmaps.directions(origin, destination, **params)
        if not routes:
            st.sidebar.error("找不到路線,請檢查地址是否正確。")
        else:
            # 計算每條路線票價
            for rt in routes:
                total = 0
                for leg in rt["legs"]:
                    for step in leg["steps"]:
                        if step["travel_mode"] != "TRANSIT":
                            continue
                        dist = step.get("distance", {}).get("value")
                        td = step["transit_details"]
                        total += calc_step_fare(td, dist)
                if total:
                    rt["db_fare"] = total
                    if not rt.get("fare"):
                        rt["fare"] = {"currency": "TWD", "value": total, "text": f"{total} 元"}
            st.session_state.routes = routes
            st.session_state.method = st.session_state.radio_method
            st.session_state.timestamp = datetime.now()
            st.session_state.query_time = route_time
            st.session_state.time_type = time_type
            st.sidebar.success(f"取得 {len(routes)} 條路線")

# ╔════════════════════ 9. 顯示結果 ═══════════════════════╗
routes = st.session_state.routes
if routes:
    st.markdown("### Google 提供的候選路線")
    for idx, r in enumerate(routes, 1):
        leg = r["legs"][0]
        dist = leg["distance"]["text"]
        dur = leg["duration"]["text"]
        g_fare = r.get("fare", {}).get("text")
        db_f = r.get("db_fare")
        if not g_fare and db_f:
            fare_txt = f"≈{db_f} 元 (local)"
        elif g_fare and db_f:
            fare_txt = f"{g_fare} / ≈{db_f} 元"
        else:
            fare_txt = g_fare or "—"
        st.markdown(f"{idx}. **距離** {dist} | **時間** {dur} | **票價** {fare_txt}")

    st.markdown("---")

    # ---- 最快 / 最便宜 ----
    fastest = min(routes, key=lambda r: r["legs"][0]["duration"]["value"])

    def fare_val(r):
        if r.get("fare"):
            return r["fare"]["value"]
        return r.get("db_fare", math.inf)

    with_cost = [r for r in routes if fare_val(r) != math.inf]
    cheapest = min(with_cost, key=fare_val) if with_cost else None

    def zh_dist(s: str) -> str:
        return s.replace(" km", " 公里").replace(" m", " 公尺")

    def zh_time(s: str) -> str:
        return (s.replace(" hours", " 小時")
                 .replace(" hour", " 小時")
                 .replace(" mins", " 分")
                 .replace(" min", " 分"))

    def show_route(rt: dict, title: str):
        leg = rt["legs"][0]
        st.subheader(title)
        st.markdown(f"- **距離**:{zh_dist(leg['distance']['text'])}")
        st.markdown(f"- **時間**:{zh_time(leg['duration']['text'])}")
        if rt.get("fare"):
            st.markdown(f"- **Google 票價**:{rt['fare']['text']}")

        total_local = rt.get("db_fare") or 0
        if total_local:
            st.markdown(f"- **本地計算票價**:≈{total_local} 元")

        # ---- 行程步驟 ----
        st.markdown("#### 行程步驟")
        for step in leg["steps"]:
            if step["travel_mode"] == "WALKING":
                st.markdown(f"- 步行 {step['distance']['text']}")
                continue
            td = step["transit_details"]
            depart = td["departure_stop"].get("name", "")
            arrive = td["arrival_stop"].get("name", "")
            line_name = td["line"].get("short_name") or td["line"].get("name", "")
            vehicle_zh = VEHICLE_MAP.get(td["line"]["vehicle"]["type"].upper(), "")
            dist = step.get("distance", {}).get("value")
            fare_step = calc_step_fare(td, dist)
            st.markdown(f"- {depart} → {arrive}({vehicle_zh} {line_name} - ≈{fare_step} 元)")

        # ---- 地圖 ----
        coords = polyline.decode(rt["overview_polyline"]["points"])
        m = folium.Map(location=coords[0], zoom_start=11)
        folium.PolyLine(coords, weight=6).add_to(m)
        st_folium(m, width=700, height=420, key=f"map_{title}")

        # ---- Google Maps Navigation Link ----
        start_addr = urllib.parse.quote_plus(leg['start_address'])
        end_addr = urllib.parse.quote_plus(leg['end_address'])
        waypoints = []
        for step in leg["steps"][:-1]:
            if step["travel_mode"] == "TRANSIT":
                loc = step["transit_details"]["arrival_stop"]["location"]
            else:
                loc = step["end_location"]
            waypoints.append(f"via:{loc['lat']},{loc['lng']}")
        wp_param = "&waypoints=" + "|".join(waypoints) if waypoints else ""
        nav_url = (
            f"https://www.google.com/maps/dir/?api=1"
            f"&origin={start_addr}&destination={end_addr}"
            f"&travelmode=transit" + wp_param
        )
        qtime = st.session_state.get("query_time")
        ttype = st.session_state.get("time_type")
        if qtime and ttype:
            secs = int(qtime.timestamp())
            nav_url += f"&{'departure_time' if ttype == '出發時間' else 'arrival_time'}={secs}"
        nav_url += "&dir_action=navigate"
            f"👉 在 Google Maps 中啟動導航",
    st.info("請在左側輸入地址並按「開始查詢」。")
            f"&origin={start_addr}&destination={end_addr}"
            f"&travelmode=transit"
        )
        st.markdown(
            f"[👉 在 Google Maps 中啟動導航]({nav_url})",
            unsafe_allow_html=True,
        )

    if st.session_state.method == "全部路線":
        for idx, rt in enumerate(routes, 1):
            show_route(rt, f"🛤️ 路線 {idx}")
    elif st.session_state.method == "最快速路線":
        show_route(fastest, "🏃 最快速路線")
    else:
        if cheapest:
            show_route(cheapest, "💰 最便宜路線")
        else:
            show_route(routes[0], "💰 最便宜路線(無票價)")

    if st.session_state.timestamp:
        ts = st.session_state.timestamp.strftime("%Y-%m-%d %H:%M:%S")
        st.caption(f"資料更新時間:{ts}")
else:
    st.info("請在左側輸入地址並按「開始查詢」。")