BigData TRJ Series ตอนที่ 1: สถาปัตยกรรมระบบประมวลผลวิถีอากาศอัตโนมัติ HYSPLIT (Architecture & Overview)

เจาะลึกสถาปัตยกรรมของระบบ BigData TRJ — Pipeline อัตโนมัติที่ใช้ HYSPLIT, GFS Meteorology Data และ PSCF Analysis เพื่อวิเคราะห์แหล่งกำเนิดฝุ่น PM2.5 ในภาคเหนือ

· 13 min read

Problem

ภาคเหนือของไทยเผชิญปัญหาฝุ่น PM2.5 รุนแรงทุกปี แต่การวิเคราะห์แหล่งกำเนิดฝุ่นด้วยแบบจำลอง HYSPLIT ยังทำแบบ Manual ไม่ต่อเนื่อง ต้องใช้ทรัพยากรบุคคลสูง และไม่มีระบบติดตามผลแบบ Real-time

Solution

ออกแบบและพัฒนา Pipeline อัตโนมัติแบบ 6-Stage ด้วย Python + Docker ที่ดาวน์โหลดข้อมูล GFS อัตโนมัติ รัน Backward/Forward Trajectories พร้อมคำนวณ PSCF โดยไม่ต้องใช้คนดูแลตลอด 24 ชั่วโมง

Impact

ลดเวลาการประมวลผลจากหลายวันเหลือ 5-10 นาทีต่อรอบ รองรับการรันอัตโนมัติทุก 6 ชั่วโมงตลอด 24 ชม. และเชื่อมต่อกับ Dashboard แบบ Real-time

BigData TRJ Series ตอนที่ 1: สถาปัตยกรรมระบบประมวลผลวิถีอากาศอัตโนมัติ HYSPLIT

ในฐานะ ผู้ช่วยนักวิจัย และผู้รับผิดชอบระบบ BigData TRJ ของมหาวิทยาลัยชั้นนำแห่งหนึ่งในภาคเหนือ หนึ่งในความท้าทายที่ใหญ่ที่สุดของผมคือการออกแบบระบบอัตโนมัติที่สามารถประมวลผลข้อมูลอุตุนิยมวิทยาและวิเคราะห์แหล่งกำเนิดฝุ่น PM2.5 ได้ตลอด 24 ชั่วโมง โดยไม่ต้องพึ่งพาทีมงานคอยมอนิเตอร์ตลอดเวลา

บทความนี้จะพาไปดูภาพรวมของระบบ และเหตุผลที่เราเลือกสถาปัตยกรรมแบบนี้

1. ปัญหา: ทำไมต้องใช้ระบบอัตโนมัติ?

ทุกปี ช่วงเดือนมกราคมถึงเมษายน ภาคเหนือของไทยต้องเผชิญกับวิกฤตฝุ่น PM2.5 ที่มีสาเหตุหลักจาก ไฟป่าและการเผาในที่โล่ง การทำความเข้าใจ “ทิศทางและแหล่งกำเนิดของฝุ่น” มีความสำคัญต่อการวางมาตรการป้องกันและแจ้งเตือนประชาชน

ก่อนหน้านี้ การวิเคราะห์ใช้วิธี:

  • ดาวน์โหลดข้อมูล GFS จาก NOAA ด้วยมือ
  • รัน HYSPLIT ทีละสถานี
  • วิเคราะห์ผลลัพธ์แบบ Manual

ซึ่งกินเวลาหลายวันและมีโอกาสผิดพลาดสูง

หมายเหตุ: โค้ด Pipeline หลัก (1.py - 6.py) ได้รับต่อยอดจากนักวิจัยข้อมูลชาวต่างชาติ (Data Analyst) ซึ่งเป็นผู้เขียน Logic หลักในการคำนวณ Trajectory และ PSCF ส่วนบทบาทของผมคือการออกแบบ Infrastructure ใหม่ทั้งหมด — Containerization ด้วย Docker, การทำ Parallel Processing, ระบบ Logging, Pipeline Reliability และระบบ Monitoring ต่างๆ ที่ทำให้โค้ดดั้งเดิมกลายเป็นระบบ Automation ที่พร้อม Deploy จริง

2. ภาพรวมระบบ (System Overview)

ระบบนี้ถูกออกแบบให้ทำงานเป็น Daemon รันวนรอบทุก 6 ชั่วโมง โดยใช้ Python เป็นภาษาหลัก ทำงานบน Docker Container เพื่อความสะดวกในการ Deploy

flowchart LR
    subgraph DS ["Data Sources"]
        A[NOAA GFS 0.25°<br/>Global Forecast System]
        B[FIRMS API<br/>Fire Hotspots]
    end

    subgraph PA ["Pipeline Automation"]
        C[Stage 1: 1.py<br/>GFS Download]
        D[Stage 2: 2.py<br/>Backward TRJ<br/>14 Stations]
        E[Stage 3: 3.py<br/>Hotspot Download<br/>Cache Management]
        F[Stage 4: 4.py<br/>Forward TRJ<br/>From Hotspots]
        G[Stage 5: 5.py<br/>PSCF Province<br/>9 Provinces]
        H[Stage 6: 6.py<br/>PSCF Overall<br/>All Stations]
    end

    subgraph OP ["Output"]
        I[TRJ GeoJSON<br/>Backward Trajectories]
        J[FWT GeoJSON<br/>Forward Trajectories]
        K[PSCF Province<br/>Source Contribution]
        L[PSCF Overall<br/>Regional Analysis]
    end

    A --> C
    C --> D
    B --> E
    E --> F
    D --> F
    F --> G
    G --> H
    D --> I
    F --> J
    G --> K
    H --> L

    style DS stroke:#333,stroke-width:1px,color:#fff
    style PA stroke:#333,stroke-width:1px,color:#fff
    style OP stroke:#333,stroke-width:1px,color:#fff
    style C fill:#334155,color:#fff
    style D fill:#e65100,color:#fff
    style E fill:#d97706,color:#fff
    style F fill:#e65100,color:#fff
    style G fill:#2e7d32,color:#fff
    style H fill:#2e7d32,color:#fff

3. 6-Stage Pipeline — อธิบายทีละขั้นตอน

Stage 1: GFS Download (1.py)

ดาวน์โหลดข้อมูลอุตุนิยมวิทยา GFS (Global Forecast System) จาก NOAA ที่ความละเอียด 0.25 องศา ย้อนหลัง 3 วันล่าสุด ข้อมูลเหล่านี้จำเป็นสำหรับการคำนวณ Trajectory ในขั้นตอนถัดไป

  • Protocol: HTTP Download จาก NOAA NOMADS Server
  • Storage: เก็บใน data/gfs/ พร้อม Cleanup ไฟล์ที่อายุเกิน 30 วัน
  • Retry Logic: หากดาวน์โหลดไม่สำเร็จจะลองใหม่ พร้อมบันทึก Log

Stage 2: Backward Trajectory (2.py)

รัน HYSPLIT เพื่อคำนวณทิศทางลมย้อนหลัง 24 ชั่วโมง จากสถานีตรวจวัดคุณภาพอากาศ 14 แห่งในภาคเหนือ

flowchart LR
    A[GFS Files<br/>Meteorology Data] --> B[ProcessPoolExecutor<br/>Parallel Runs]
    C[14 Stations<br/>Lat/Lon Config] --> B
    B --> D[HYSPLIT hyts_std<br/>Backward 24h]
    D --> E[tdump files<br/>Raw Output]
    E --> F[GeoJSON Generator<br/>Convert Format]
    F --> G[TRJ GeoJSON<br/>Final Output]

    style B fill:#ff9800,color:#fff
    style D fill:#f44336,color:#fff
  • Parallel Execution: ใช้ ProcessPoolExecutor เพื่อรัน HYSPLIT แบบขนาน — ลดเวลาได้มากกว่า 70%
  • Incremental Processing: ข้ามสถานีที่สร้าง GeoJSON ไว้แล้วโดยอัตโนมัติ
  • Future Safety Buffer: ไม่รัน Trajectory สำหรับเวลาที่เกินกว่าปัจจุบัน +6 ชั่วโมง

Stage 3: Hotspot Download (3.py)

ดึงข้อมูลจุดความร้อน (Fire Hotspots) จากระบบ FIRMS (Fire Information for Resource Management System) เพื่อใช้ในการคำนวณ Forward Trajectory

  • Data Format: CSV มีพิกัด Latitude/Longitude, เวลาที่ตรวจพบ, และความเชื่อมั่น (Confidence)
  • Cache Management: เก็บข้อมูลในรูปแบบ NDJSON เพื่อป้องกันการดาวน์โหลดซ้ำ
  • Auto Deduplication: (ถูกเพิ่มในเวอร์ชันหลัง) ตรวจสอบวันที่ซ้ำก่อนเขียน CSV

Stage 4: Forward Trajectory (4.py)

รัน HYSPLIT แบบไปข้างหน้า (Forward) จากจุดความร้อนที่ได้จาก Stage 3 เหมือนเป็นการ “ทำนาย” ว่าฝุ่นจะเคลื่อนที่ไปทางไหนใน 24 ชั่วโมงข้างหน้า

Stage 5 & 6: PSCF Analysis (5.py, 6.py)

คำนวณ Potential Source Contribution Function (PSCF) เพื่อหาความน่าจะเป็นของแหล่งกำเนิดฝุ่น

Stage ไฟล์ รายละเอียด
5 pscf_province/ คำนวณ PSCF แยกรายจังหวัด 9 จังหวัดภาคเหนือ
6 pscf_overall/ คำนวณ PSCF ในภาพรวมของทุกสถานี
sequenceDiagram
    participant 5py as Stage 5: Province PSCF
    participant TRJ as TRJ Data
    participant HOT as Hotspot Data
    participant 6py as Stage 6: Overall PSCF
    participant OUT as Output GeoJSON

    5py->>TRJ: Load Backward Trajectories
    5py->>HOT: Load Fire Hotspots
    5py->>5py: Spatial Join<br/>Grid-based Probability
    5py->>OUT: Province PSCF GeoJSON
    6py->>TRJ: Load All Trajectories
    6py->>HOT: Load All Hotspots
    6py->>6py: Aggregate Analysis
    6py->>OUT: Overall PSCF GeoJSON

กลไกสำคัญ: หากวันไหนไม่มีจุดความร้อน (Zero Hotspots) ระบบจะสร้างไฟล์ Placeholder GeoJSON ที่มีสถานะ skipped แทนการ Error เพื่อให้ Dashboard ทำงานต่อได้

4. สถาปัตยกรรมทางเทคนิค (Technical Architecture)

Tech Stack

flowchart LR
    subgraph INF ["Infrastructure"]
        A[Docker<br/>Alpine/Debian]
        B[Go Task<br/>Taskfile Orchestration]
    end

    subgraph RT ["Runtime"]
        C[Python 3.12+]
        D[HYSPLIT v5.4.2<br/>Linux Binary]
        E[Custom Logger<br/>Daily Rotation]
    end

    subgraph LIB ["Libraries"]
        F[Pandas + GeoPandas]
        G[NumPy]
        H[SMTP Email Alerts]
    end

    A --> C
    A --> D
    C --> F
    C --> G
    C --> H
    C --> E

    style INF stroke:#333,stroke-width:1px,color:#fff
    style RT stroke:#333,stroke-width:1px,color:#fff
    style LIB stroke:#333,stroke-width:1px,color:#fff
    style A fill:#475569,color:#fff
    style D fill:#f44336,color:#fff
  • Language: Python 3.12+ (Pandas, GeoPandas, NumPy)
  • Model Engine: HYSPLIT (Linux Binary จาก NOAA ARL)
  • Container: Docker — เพื่อความ Portability สูงสุด
  • Orchestration: Go Task (Taskfile.yml) — จัดการคำสั่ง Build/Run/Clean
  • Scheduling: Infinite Loop ใน Python + Docker restart: always

สถานีตรวจวัด 14 แห่ง

ระบบเชื่อมต่อกับสถานีตรวจวัดคุณภาพอากาศ 14 แห่งครอบคลุมพื้นที่ภาคเหนือ:

ลำดับ สถานี จังหวัด
1 สถานีเชียงราย เชียงราย
2 สถานีเชียงใหม่ เชียงใหม่
3 สถานีลำปาง ลำปาง
4 สถานีลำพูน ลำพูน
5 สถานีแพร่ แพร่
6 สถานีน่าน น่าน
7 สถานีแม่ฮ่องสอน แม่ฮ่องสอน
8 สถานีตาก ตาก
9 สถานีสุโขทัย สุโขทัย
10 สถานีพิษณุโลก พิษณุโลก
11 สถานีเพชรบูรณ์ เพชรบูรณ์
12 สถานีอุตรดิตถ์ อุตรดิตถ์
13 สถานีพะเยา พะเยา
14 สถานีกำแพงเพชร กำแพงเพชร

5. โครงสร้างระบบ (System Structure)

project-root/
├── src/                    # Source code
│   ├── 1.py ~ 6.py         # Pipeline stages
│   ├── entrypoint.py       # Orchestrator (Daemon loop)
│   ├── config.py           # Centralized config & constants
│   ├── logger.py           # Custom logging (daily rotation)
│   └── alert.py            # Email alert system
├── data/
│   ├── gfs/                # GFS meteorological files
│   ├── firms/              # Hotspot CSV files
│   ├── assets/             # Shapefiles & grid tables
│   ├── output/             # Final GeoJSON results
│   │   ├── trj/            # Backward trajectories
│   │   ├── fwt/            # Forward trajectories
│   │   └── pscf_*/         # PSCF analysis
│   ├── logs/               # Daily log rotation
│   └── runs/               # HYSPLIT temp working dirs
├── Dockerfile
├── Taskfile.yml
└── .env.prod

6. การทำงานแบบ Daemon และ Health Check

ระบบทำงานใน Daemon Mode — รันตลอดเวลาและทำ Pipeline ซ้ำทุก 6 ชั่วโมง พร้อมระบบตรวจสอบสุขภาพ:

sequenceDiagram
    participant Container as Docker Container
    participant Pipeline as Pipeline Engine
    participant Logger as Log System
    participant Alert as Email Alert

    Note over Container: Container starts
    Container->>Pipeline: Start Daemon Loop
    loop Every 6 Hours
        Pipeline->>Pipeline: Run Stages 1-6
        Pipeline->>Logger: Write pipeline.log
        alt Success
            Logger->>Logger: Update health status<br/>Last success = now
        else Error
            Pipeline->>Alert: Send SMTP alert
            Alert->>Alert: Email to operator
        end
    end
    Note over Container: Docker restart: always
  • Health Check: คำสั่ง task check จะตรวจสอบว่ารอบล่าสุดสำเร็จภายใน 12 ชม. ที่ผ่านมาหรือไม่
  • Auto Restart: Docker restart: always ทำให้ระบบกลับ up อัตโนมัติหลัง Server Reboot

ใน ตอนที่ 2 เราจะเจาะลึกบทเรียนจาก Pipeline จริง — Bugs ที่เจอ, การปรับ Performance, และเทคนิคการทำให้ Pipeline ทนทานต่อความผิดพลาด ทุกสถานการณ์แบบ Failure-proof!

ติดตามต่อในตอนที่ 2 ครับ!