BigData TRJ Series ตอนที่ 2: บทเรียนจาก Pipeline จริง — Bugs, Optimization และ Reliability

เล่าเรื่องจริงจากการพัฒนาระบบ Pipeline HYSPLIT อัตโนมัติ ทั้ง Data Duplication Bug, ASCDATA.CFG Configuration Fail, Performance Optimization และเทคนิคการทำให้ Pipeline ทนทานแบบ Failure-proof

· 13 min read

Problem

Pipeline ที่ Deploy แล้วเจอปัญหาทั้ง Data Duplication (ซ้ำ 4 เท่า), Configuration Error ที่ทำให้ HYSPLIT ทำงานไม่ได้ 100%, และ Pipeline หยุดกลางคันเมื่อเจอวันที่ไม่มีข้อมูล Hotspot

Solution

แก้ Data Duplication Bug ด้วย Cache Deduplication, แก้ ASCDATA.CFG Path Normalization, เพิ่ม Parallel Processing (ลดเวลา 70%), และสร้างกลไก Future Safety Buffer + Zero-Hotspot Placeholder เพื่อให้ Pipeline ไม่สะดุด

Impact

Pipeline เสถียรพอให้รันแบบไร้คนดูแล (Unattended) ตลอด 24 ชั่วโมง — CPU Utilization ~80%, Zero Manual Intervention ในรอบ 30 วัน, Error Recovery 100%

BigData TRJ Series ตอนที่ 2: บทเรียนจาก Pipeline จริง — Bugs, Optimization และ Reliability

ในตอนที่แล้วเราได้ดูภาพรวมสถาปัตยกรรมของระบบ BigData TRJ กันไปแล้ว ตอนนี้ผมจะแชร์ประสบการณ์จริงที่เจอระหว่างพัฒนาและ Maintain ระบบ ตั้งแต่เวอร์ชันแรกเริ่มจนถึงเวอร์ชันล่าสุด

นี่คือ War Story ที่เกิดขึ้นจริง — ทั้ง Bugs ที่ทำให้ข้อมูลเพี้ยน 4 เท่า, Configuration ที่พังทั้ง Pipeline, และวิธีการทำให้ระบบอยู่รอดโดยไม่ต้องมีคนมานั่งดูตลอด 24 ชั่วโมง


1. Data Duplication Bug — ข้อมูลซ้ำ 4 เท่า!

ปัญหา

หลังจาก Deploy ระบบขึ้น Production Server (Ubuntu) เจ้าของโครงการเปรียบเทียบผลลัพธ์ระหว่างเครื่องพัฒนาเดิม (Windows) กับ Server (Ubuntu) สำหรับวันที่ 19-21 เมษายน 2569

ผลลัพธ์ที่ได้น่าตกใจ:

หัวข้อเครื่องเดิม (Windows)Server (Ubuntu)ความแตกต่าง
TRJ (Backward)8 เส้น/วัน8 เส้น/วันตรงกัน
PSCF Overallปกติปกติตรงกัน
FWT (Forward)3,488 เส้น13,952 เส้น⚠️ ต่าง 4 เท่า!

การสืบสวน (Root Cause Analysis)

flowchart LR
    subgraph "Pipeline Timeline (24 hours)"
        T1[06:00 - Run 1] --> C1[Cache: Append records]
        T2[12:00 - Run 2] --> C2[Cache: Append again<br/>Same day records]
        T3[18:00 - Run 3] --> C3[Cache: Append again<br/>Same day records]
        T4[00:00 - Run 4] --> C4[Cache: Append again<br/>Same day records]
    end

    C1 --> CSV[CSV Generation<br/>Read ALL cache lines]
    C2 --> CSV
    C3 --> CSV
    C4 --> CSV

    CSV --> DUPE[Result: 4x Duplicate Data!]

    style DUPE fill:#f44336,color:#fff
    style CSV fill:#ff9800,color:#fff

สาเหตุพบว่ามาจากการทำงานของ src/3.py ซึ่งใช้วิธีการเขียน Cache แบบ Append (โหมด "a") ลงไฟล์ NDJSON เดิมทุกครั้งที่ Pipeline รัน (ทุก 6 ชั่วโมง):

# โค้ดต้นฉบับ (เวอร์ชันแรกเริ่ม) ที่เป็นปัญหา — เขียน append ทับทุกครั้ง
with cache_path.open("a", encoding="utf-8") as f:
    for row in new_rows:
        f.write(json.dumps(row) + "\n")

# ตอนสร้าง CSV — อ่านทุกบรรทัดจาก cache โดยไม่ filter duplicate
with cache_path.open("r", encoding="utf-8") as f:
    for line in f:
        obj = json.loads(line)
        if "rows" in obj:
            all_rows.extend(obj["rows"])  # ข้อมูลซ้ำถูกดึงเข้ามาด้วย!

สิ่งที่ค้นพบเพิ่มเติม

จากการเทียบกับ Git History พบว่า Bug นี้มีมาตั้งแต่เวอร์ชันแรกเริ่มแล้ว!

ทำไมเครื่องเดิมไม่เจอ?

  • เครื่องเดิมรันแบบ Manual บน Windows — รันครั้งเดียว เสร็จแล้วลบ Cache ทิ้ง
  • ระบบเรา รันแบบ Automation ทุก 6 ชม. — Cache สะสม 4 รอบ/วัน → ข้อมูลซ้ำ 4 เท่า

วิธีแก้ไข

เพิ่มกลไกตรวจสอบวันที่ซ้ำก่อนเขียน CSV:

# หลังจาก Fix — ตรวจสอบวันที่ก่อนเขียน
def deduplicate_hotspots(cache_entries, target_date):
    seen = set()
    unique = []
    for entry in cache_entries:
        key = (entry["lat"], entry["lon"], entry["datetime"])
        if key not in seen:
            seen.add(key)
            unique.append(entry)
    return unique

2. ASCDATA.CFG Configuration Fail — HYSPLIT เริ่มต้นไม่ได้

ปัญหา

เมื่อ Deploy ครั้งแรกบน Server ใหม่ HYSPLIT ไม่สามารถทำงานได้เพราะ Error:

[Errno 2] No such file or directory: '/opt/hysplit/working/ASCDATA.CFG'

การวิเคราะห์

flowchart TB
    subgraph "Before Fix"
        A[Docker Build] --> B[Copy HYSPLIT binaries]
        B --> C[Missing: bdyfiles/ and ASCDATA.CFG!]
        C --> D[HYSPLIT Run ❌<br/>FileNotFoundError]
    end
    style D fill:#f44336,color:#fff
flowchart TB
    subgraph "After Fix"
        E[Docker Build v2] --> F[Copy bdyfiles/ to /opt/hysplit/bdyfiles/]
        F --> G[sed: Fix ASCDATA.CFG paths<br/>../bdyfiles/ → /opt/hysplit/bdyfiles/]
        G --> H[Hysplit Run ✅]
    end
    style H fill:#4caf50,color:#fff

สาเหตุหลัก:

  1. Dockerfile ไม่ได้ Copy โฟลเดอร์ bdyfiles/ (LANDUSE.ASC, ROUGLEN.ASC, TERRAIN.ASC) เข้า Image
  2. ASCDATA.CFG ใช้ Relative Path (../bdyfiles/) ซึ่งใช้ไม่ได้เมื่อ HYSPLIT ทำงานในโฟลเดอร์ชั่วคราวที่ Python สร้างขึ้น

วิธีแก้ไข

ใช้ Path Normalization ใน Docker Build แทนการแก้ Source Code ของ HYSPLIT (ตามข้อจำกัดที่ห้ามแก้):

# 1. Copy bdyfiles เข้า Image
COPY data/hysplit_linux/*/bdyfiles/ /opt/hysplit/bdyfiles/

# 2. Copy และ normalize path ใน ASCDATA.CFG
COPY data/hysplit_linux/*/bdyfiles/ASCDATA.CFG /opt/hysplit/working/ASCDATA.CFG
RUN sed -i "s|'../bdyfiles/'|'/opt/hysplit/bdyfiles/'|g" /opt/hysplit/working/ASCDATA.CFG

3. Performance Optimization — ลดเวลา 70% ด้วย Parallel Processing

ก่อน Optimize

sequenceDiagram
    participant Stage2 as Stage 2 (Before)
    participant Station1 as Station 1
    participant Station2 as Station 2
    participant StationN as Station N...

    Stage2->>Station1: Run HYSPLIT (Sequential)
    Stage2->>Station2: Run HYSPLIT (Sequential)
    Stage2->>StationN: ...continue one by one
    Note over Stage2,StationN: Total Time = Sum of all stations<br/>~30-60 minutes

หลัง Optimize

sequenceDiagram
    participant Pool as ProcessPoolExecutor
    participant CPU1 as CPU Core 1
    participant CPU2 as CPU Core 2
    participant CPU3 as CPU Core 3

    Pool->>CPU1: Run Station 1
    Pool->>CPU2: Run Station 2
    Pool->>CPU3: Run Station 3
    Note over Pool,CPU3: Parallel Execution<br/>Total Time = Max(stations)<br/>~5-10 minutes (-70%)

การเปลี่ยนแปลงหลักใน src/2.py และ src/4.py:

# ก่อน: Sequential — รันทีละสถานี
for station in config.STATIONS:
    run_hysplit(station)

# หลัง: Parallel — ใช้ ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=config.MAX_WORKERS) as executor:
    futures = [executor.submit(run_hysplit, s) for s in config.STATIONS]
    for future in as_completed(futures):
        result = future.result()

นอกจากนี้ยังเพิ่ม Incremental Execution — ข้ามสถานีที่มี GeoJSON อยู่แล้ว:

def needs_run(station_id: str, date: str) -> bool:
    output_path = DATA_DIR / "output" / "trj" / f"{station_id}_{date}.geojson"
    return not output_path.exists()  # ถ้ามีแล้วข้าม!

4. Pipeline Reliability — ทำให้ระบบ Failure-proof

นี่คือจุดที่ทำให้ระบบเปลี่ยนจาก “Manual-recovery pipeline” เป็น “Unattended 24/7 pipeline”

4.1 Future Safety Buffer

ปัญหา: HYSPLIT พยายามรัน Trajectory สำหรับเวลาที่เกินกว่าปัจจุบัน (Future Time) ซึ่งข้อมูล GFS ยังไม่มีบนเซิร์ฟเวอร์ NOAA — ทำให้ Error และ Pipeline หยุดทั้ง Stage

# Config
FUTURE_BUFFER_HOURS = 6  # Default

# Check ก่อนรัน
def is_safe_to_run(target_time: datetime, now: datetime) -> bool:
    if target_time > now + timedelta(hours=FUTURE_BUFFER_HOURS):
        logger.warning(f"[SKIP] Future date: {target_time}")
        return False
    return True

4.2 Strict GFS Coverage Check

ก่อนเริ่มรัน HYSPLIT ระบบจะตรวจสอบว่าไฟล์ GFS ครอบคลุมช่วงเวลา 24 ชั่วโมงย้อนหลังครบถ้วนหรือไม่:

flowchart LR
    A[Start Stage 2] --> B{Check GFS Files<br/>24h backward}
    B -->|Complete| C[Run HYSPLIT ✅]
    B -->|Missing files| D[Log: Insufficient Coverage]
    D --> E[Skip Stage 2]
    E --> F[Continue Pipeline ✅]
    C --> G[Generate GeoJSON]
    G --> F

4.3 Zero-Hotspot Placeholder

ปัญหา: เมื่อวันที่ไม่มีไฟป่า (Zero Hotspots) — 5.py จะหาจุดความร้อนไม่เจอและ Return Error ทำให้ Pipeline หยุด

วิธีแก้: สร้าง Placeholder GeoJSON แทนการ Error:

def create_zero_hotspot_placeholder(province: str, date: str):
    placeholder = {
        "type": "FeatureCollection",
        "metadata": {
            "status": "skipped",
            "reason": "zero_hotspots",
            "province": province,
            "date": date,
        },
        "features": [],
    }
    # เขียนไฟล์แทนการ return error
    with open(output_path, "w") as f:
        json.dump(placeholder, f)
    logger.info(f"[PLACEHOLDER] Created for {province} on {date}")
flowchart TB
    subgraph "Old Behavior (ก่อนแก้ไข)"
        A[Start PSCF] --> B{Any hotspots?}
        B -->|No| C[Raise Error ❌]
        C --> D[Pipeline Halted!]
    end
    style D fill:#f44336,color:#fff
flowchart TB
    subgraph "New Behavior (หลังแก้ไข)"
        E[Start PSCF] --> F{Any hotspots?}
        F -->|No| G[Create Placeholder GeoJSON<br/>status: skipped]
        F -->|Yes| H[Run Normal PSCF]
        G --> I[Continue Pipeline ✅]
        H --> I
    end
    style I fill:#4caf50,color:#fff

5. สรุป: ผลลัพธ์และบทเรียน

Metric ก่อนปรับปรุง หลังปรับปรุง
Pipeline Run Time ~60 นาที/รอบ ~5-10 นาที/รอบ
CPU Utilization ~10% (Sequential) ~80% (Parallel)
Data Accuracy Duplicated 4x Correct ✅
Error Recovery Manual Intervention Auto Skip/Placeholder
Uptime ต้องมีคนดูแล Unattended 24/7
DevOps Manual Deploy Docker + Taskfile

บทเรียนที่ได้

  1. Test on Production-like Schedule: Bug การซ้ำ 4 เท่าถูกซ่อนอยู่เพราะขาดการทดสอบรันระบบต่อเนื่องนานพอ — หากมีการทดสอบรัน Pipeline ให้ครบ 24 ชม. ก่อน Deploy จริง เราจะพบปัญหานี้ได้ตั้งแต่แรก
  2. Configuration ที่ใช้ Relative Path = Time Bomb: HYSPLIT ทำงานในโฟลเดอร์ชั่วคราวที่เปลี่ยนทุกครั้ง — Relative Path ล้วนพังหมด
  3. อย่าให้ Pipeline หยุดเพราะ Exception: การออกแบบให้ Pipeline ข้าม (Skip) แทนการ Error ทำให้ระบบอยู่รอดได้โดยไม่มีคนดูแล

การสร้างระบบนี้สอนให้รู้ว่า ระบบ Automated Pipeline ที่ดีไม่ใช่แค่ “เขียนโค้ดให้รันได้” แต่คือ “ออกแบบให้ Failure-proof” — ยอมเสียข้อมูลบางส่วน (Skip) ดีกว่าให้ระบบทั้งระบบหยุดทำงาน