BigData TRJ Series ตอนที่ 2: บทเรียนจาก Pipeline จริง — Bugs, Optimization และ Reliability
เล่าเรื่องจริงจากการพัฒนาระบบ Pipeline HYSPLIT อัตโนมัติ ทั้ง Data Duplication Bug, ASCDATA.CFG Configuration Fail, Performance Optimization และเทคนิคการทำให้ Pipeline ทนทานแบบ Failure-proof
Problem
Pipeline ที่ Deploy แล้วเจอปัญหาทั้ง Data Duplication (ซ้ำ 4 เท่า), Configuration Error ที่ทำให้ HYSPLIT ทำงานไม่ได้ 100%, และ Pipeline หยุดกลางคันเมื่อเจอวันที่ไม่มีข้อมูล Hotspot
Solution
แก้ Data Duplication Bug ด้วย Cache Deduplication, แก้ ASCDATA.CFG Path Normalization, เพิ่ม Parallel Processing (ลดเวลา 70%), และสร้างกลไก Future Safety Buffer + Zero-Hotspot Placeholder เพื่อให้ Pipeline ไม่สะดุด
Impact
Pipeline เสถียรพอให้รันแบบไร้คนดูแล (Unattended) ตลอด 24 ชั่วโมง — CPU Utilization ~80%, Zero Manual Intervention ในรอบ 30 วัน, Error Recovery 100%
BigData TRJ Series ตอนที่ 2: บทเรียนจาก Pipeline จริง — Bugs, Optimization และ Reliability
ในตอนที่แล้วเราได้ดูภาพรวมสถาปัตยกรรมของระบบ BigData TRJ กันไปแล้ว ตอนนี้ผมจะแชร์ประสบการณ์จริงที่เจอระหว่างพัฒนาและ Maintain ระบบ ตั้งแต่เวอร์ชันแรกเริ่มจนถึงเวอร์ชันล่าสุด
นี่คือ War Story ที่เกิดขึ้นจริง — ทั้ง Bugs ที่ทำให้ข้อมูลเพี้ยน 4 เท่า, Configuration ที่พังทั้ง Pipeline, และวิธีการทำให้ระบบอยู่รอดโดยไม่ต้องมีคนมานั่งดูตลอด 24 ชั่วโมง
1. Data Duplication Bug — ข้อมูลซ้ำ 4 เท่า!
ปัญหา
หลังจาก Deploy ระบบขึ้น Production Server (Ubuntu) เจ้าของโครงการเปรียบเทียบผลลัพธ์ระหว่างเครื่องพัฒนาเดิม (Windows) กับ Server (Ubuntu) สำหรับวันที่ 19-21 เมษายน 2569
ผลลัพธ์ที่ได้น่าตกใจ:
| หัวข้อ | เครื่องเดิม (Windows) | Server (Ubuntu) | ความแตกต่าง |
|---|---|---|---|
| TRJ (Backward) | 8 เส้น/วัน | 8 เส้น/วัน | ✅ ตรงกัน |
| PSCF Overall | ปกติ | ปกติ | ✅ ตรงกัน |
| FWT (Forward) | 3,488 เส้น | 13,952 เส้น | ⚠️ ต่าง 4 เท่า! |
การสืบสวน (Root Cause Analysis)
flowchart LR
subgraph "Pipeline Timeline (24 hours)"
T1[06:00 - Run 1] --> C1[Cache: Append records]
T2[12:00 - Run 2] --> C2[Cache: Append again<br/>Same day records]
T3[18:00 - Run 3] --> C3[Cache: Append again<br/>Same day records]
T4[00:00 - Run 4] --> C4[Cache: Append again<br/>Same day records]
end
C1 --> CSV[CSV Generation<br/>Read ALL cache lines]
C2 --> CSV
C3 --> CSV
C4 --> CSV
CSV --> DUPE[Result: 4x Duplicate Data!]
style DUPE fill:#f44336,color:#fff
style CSV fill:#ff9800,color:#fff
สาเหตุพบว่ามาจากการทำงานของ src/3.py ซึ่งใช้วิธีการเขียน Cache แบบ Append (โหมด "a") ลงไฟล์ NDJSON เดิมทุกครั้งที่ Pipeline รัน (ทุก 6 ชั่วโมง):
# โค้ดต้นฉบับ (เวอร์ชันแรกเริ่ม) ที่เป็นปัญหา — เขียน append ทับทุกครั้ง
with cache_path.open("a", encoding="utf-8") as f:
for row in new_rows:
f.write(json.dumps(row) + "\n")
# ตอนสร้าง CSV — อ่านทุกบรรทัดจาก cache โดยไม่ filter duplicate
with cache_path.open("r", encoding="utf-8") as f:
for line in f:
obj = json.loads(line)
if "rows" in obj:
all_rows.extend(obj["rows"]) # ข้อมูลซ้ำถูกดึงเข้ามาด้วย!
สิ่งที่ค้นพบเพิ่มเติม
จากการเทียบกับ Git History พบว่า Bug นี้มีมาตั้งแต่เวอร์ชันแรกเริ่มแล้ว!
ทำไมเครื่องเดิมไม่เจอ?
- เครื่องเดิมรันแบบ Manual บน Windows — รันครั้งเดียว เสร็จแล้วลบ Cache ทิ้ง
- ระบบเรา รันแบบ Automation ทุก 6 ชม. — Cache สะสม 4 รอบ/วัน → ข้อมูลซ้ำ 4 เท่า
วิธีแก้ไข
เพิ่มกลไกตรวจสอบวันที่ซ้ำก่อนเขียน CSV:
# หลังจาก Fix — ตรวจสอบวันที่ก่อนเขียน
def deduplicate_hotspots(cache_entries, target_date):
seen = set()
unique = []
for entry in cache_entries:
key = (entry["lat"], entry["lon"], entry["datetime"])
if key not in seen:
seen.add(key)
unique.append(entry)
return unique
2. ASCDATA.CFG Configuration Fail — HYSPLIT เริ่มต้นไม่ได้
ปัญหา
เมื่อ Deploy ครั้งแรกบน Server ใหม่ HYSPLIT ไม่สามารถทำงานได้เพราะ Error:
[Errno 2] No such file or directory: '/opt/hysplit/working/ASCDATA.CFG'
การวิเคราะห์
flowchart TB
subgraph "Before Fix"
A[Docker Build] --> B[Copy HYSPLIT binaries]
B --> C[Missing: bdyfiles/ and ASCDATA.CFG!]
C --> D[HYSPLIT Run ❌<br/>FileNotFoundError]
end
style D fill:#f44336,color:#fff
flowchart TB
subgraph "After Fix"
E[Docker Build v2] --> F[Copy bdyfiles/ to /opt/hysplit/bdyfiles/]
F --> G[sed: Fix ASCDATA.CFG paths<br/>../bdyfiles/ → /opt/hysplit/bdyfiles/]
G --> H[Hysplit Run ✅]
end
style H fill:#4caf50,color:#fff
สาเหตุหลัก:
Dockerfileไม่ได้ Copy โฟลเดอร์bdyfiles/(LANDUSE.ASC, ROUGLEN.ASC, TERRAIN.ASC) เข้า ImageASCDATA.CFGใช้ Relative Path (../bdyfiles/) ซึ่งใช้ไม่ได้เมื่อ HYSPLIT ทำงานในโฟลเดอร์ชั่วคราวที่ Python สร้างขึ้น
วิธีแก้ไข
ใช้ Path Normalization ใน Docker Build แทนการแก้ Source Code ของ HYSPLIT (ตามข้อจำกัดที่ห้ามแก้):
# 1. Copy bdyfiles เข้า Image
COPY data/hysplit_linux/*/bdyfiles/ /opt/hysplit/bdyfiles/
# 2. Copy และ normalize path ใน ASCDATA.CFG
COPY data/hysplit_linux/*/bdyfiles/ASCDATA.CFG /opt/hysplit/working/ASCDATA.CFG
RUN sed -i "s|'../bdyfiles/'|'/opt/hysplit/bdyfiles/'|g" /opt/hysplit/working/ASCDATA.CFG
3. Performance Optimization — ลดเวลา 70% ด้วย Parallel Processing
ก่อน Optimize
sequenceDiagram
participant Stage2 as Stage 2 (Before)
participant Station1 as Station 1
participant Station2 as Station 2
participant StationN as Station N...
Stage2->>Station1: Run HYSPLIT (Sequential)
Stage2->>Station2: Run HYSPLIT (Sequential)
Stage2->>StationN: ...continue one by one
Note over Stage2,StationN: Total Time = Sum of all stations<br/>~30-60 minutes
หลัง Optimize
sequenceDiagram
participant Pool as ProcessPoolExecutor
participant CPU1 as CPU Core 1
participant CPU2 as CPU Core 2
participant CPU3 as CPU Core 3
Pool->>CPU1: Run Station 1
Pool->>CPU2: Run Station 2
Pool->>CPU3: Run Station 3
Note over Pool,CPU3: Parallel Execution<br/>Total Time = Max(stations)<br/>~5-10 minutes (-70%)
การเปลี่ยนแปลงหลักใน src/2.py และ src/4.py:
# ก่อน: Sequential — รันทีละสถานี
for station in config.STATIONS:
run_hysplit(station)
# หลัง: Parallel — ใช้ ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=config.MAX_WORKERS) as executor:
futures = [executor.submit(run_hysplit, s) for s in config.STATIONS]
for future in as_completed(futures):
result = future.result()
นอกจากนี้ยังเพิ่ม Incremental Execution — ข้ามสถานีที่มี GeoJSON อยู่แล้ว:
def needs_run(station_id: str, date: str) -> bool:
output_path = DATA_DIR / "output" / "trj" / f"{station_id}_{date}.geojson"
return not output_path.exists() # ถ้ามีแล้วข้าม!
4. Pipeline Reliability — ทำให้ระบบ Failure-proof
นี่คือจุดที่ทำให้ระบบเปลี่ยนจาก “Manual-recovery pipeline” เป็น “Unattended 24/7 pipeline”
4.1 Future Safety Buffer
ปัญหา: HYSPLIT พยายามรัน Trajectory สำหรับเวลาที่เกินกว่าปัจจุบัน (Future Time) ซึ่งข้อมูล GFS ยังไม่มีบนเซิร์ฟเวอร์ NOAA — ทำให้ Error และ Pipeline หยุดทั้ง Stage
# Config
FUTURE_BUFFER_HOURS = 6 # Default
# Check ก่อนรัน
def is_safe_to_run(target_time: datetime, now: datetime) -> bool:
if target_time > now + timedelta(hours=FUTURE_BUFFER_HOURS):
logger.warning(f"[SKIP] Future date: {target_time}")
return False
return True
4.2 Strict GFS Coverage Check
ก่อนเริ่มรัน HYSPLIT ระบบจะตรวจสอบว่าไฟล์ GFS ครอบคลุมช่วงเวลา 24 ชั่วโมงย้อนหลังครบถ้วนหรือไม่:
flowchart LR
A[Start Stage 2] --> B{Check GFS Files<br/>24h backward}
B -->|Complete| C[Run HYSPLIT ✅]
B -->|Missing files| D[Log: Insufficient Coverage]
D --> E[Skip Stage 2]
E --> F[Continue Pipeline ✅]
C --> G[Generate GeoJSON]
G --> F
4.3 Zero-Hotspot Placeholder
ปัญหา: เมื่อวันที่ไม่มีไฟป่า (Zero Hotspots) — 5.py จะหาจุดความร้อนไม่เจอและ Return Error ทำให้ Pipeline หยุด
วิธีแก้: สร้าง Placeholder GeoJSON แทนการ Error:
def create_zero_hotspot_placeholder(province: str, date: str):
placeholder = {
"type": "FeatureCollection",
"metadata": {
"status": "skipped",
"reason": "zero_hotspots",
"province": province,
"date": date,
},
"features": [],
}
# เขียนไฟล์แทนการ return error
with open(output_path, "w") as f:
json.dump(placeholder, f)
logger.info(f"[PLACEHOLDER] Created for {province} on {date}")
flowchart TB
subgraph "Old Behavior (ก่อนแก้ไข)"
A[Start PSCF] --> B{Any hotspots?}
B -->|No| C[Raise Error ❌]
C --> D[Pipeline Halted!]
end
style D fill:#f44336,color:#fff
flowchart TB
subgraph "New Behavior (หลังแก้ไข)"
E[Start PSCF] --> F{Any hotspots?}
F -->|No| G[Create Placeholder GeoJSON<br/>status: skipped]
F -->|Yes| H[Run Normal PSCF]
G --> I[Continue Pipeline ✅]
H --> I
end
style I fill:#4caf50,color:#fff
5. สรุป: ผลลัพธ์และบทเรียน
| Metric | ก่อนปรับปรุง | หลังปรับปรุง |
|---|---|---|
| Pipeline Run Time | ~60 นาที/รอบ | ~5-10 นาที/รอบ |
| CPU Utilization | ~10% (Sequential) | ~80% (Parallel) |
| Data Accuracy | Duplicated 4x | Correct ✅ |
| Error Recovery | Manual Intervention | Auto Skip/Placeholder |
| Uptime | ต้องมีคนดูแล | Unattended 24/7 |
| DevOps | Manual Deploy | Docker + Taskfile |
บทเรียนที่ได้
- Test on Production-like Schedule: Bug การซ้ำ 4 เท่าถูกซ่อนอยู่เพราะขาดการทดสอบรันระบบต่อเนื่องนานพอ — หากมีการทดสอบรัน Pipeline ให้ครบ 24 ชม. ก่อน Deploy จริง เราจะพบปัญหานี้ได้ตั้งแต่แรก
- Configuration ที่ใช้ Relative Path = Time Bomb: HYSPLIT ทำงานในโฟลเดอร์ชั่วคราวที่เปลี่ยนทุกครั้ง — Relative Path ล้วนพังหมด
- อย่าให้ Pipeline หยุดเพราะ Exception: การออกแบบให้ Pipeline ข้าม (Skip) แทนการ Error ทำให้ระบบอยู่รอดได้โดยไม่มีคนดูแล
การสร้างระบบนี้สอนให้รู้ว่า ระบบ Automated Pipeline ที่ดีไม่ใช่แค่ “เขียนโค้ดให้รันได้” แต่คือ “ออกแบบให้ Failure-proof” — ยอมเสียข้อมูลบางส่วน (Skip) ดีกว่าให้ระบบทั้งระบบหยุดทำงาน