Bỏ qua

Data Pipeline

Python
GeoJSON
SUMO
ETL

Tổng Quan

Data Pipeline của GreenMap xử lý dữ liệu từ nhiều nguồn khác nhau và chuẩn hóa về định dạng thống nhất.

graph LR
    subgraph Sources
        OSM[OpenStreetMap]
        OW[OpenWeather]
        OAQ[OpenAQ]
        SUMO[SUMO Simulator]
    end

    subgraph ETL
        Extract[Extract]
        Transform[Transform]
        Load[Load]
    end

    subgraph Storage
        PG[(PostgreSQL)]
        Broker[Orion-LD]
    end

    OSM --> Extract
    OW --> Extract
    OAQ --> Extract
    SUMO --> Extract

    Extract --> Transform
    Transform --> Load

    Load --> PG
    Load --> Broker

Thu Thập Dữ Liệu OSM

Overpass API Query

import requests

# Bounding box Hà Nội
BBOX = (20.57, 105.28, 21.39, 106.02)

def query_osm(element_type, tags):
    """Query OSM via Overpass API"""
    query = f"""
    [out:json][timeout:60];
    (
        node["{tags}"]{BBOX};
        way["{tags}"]{BBOX};
    );
    out center;
    """

    response = requests.post(
        "https://overpass-api.de/api/interpreter",
        data={"data": query}
    )
    return response.json()

Chuyển Đổi Sang GeoJSON

def osm_to_geojson(osm_data):
    """Convert OSM data to GeoJSON format"""
    features = []

    for element in osm_data.get("elements", []):
        if element["type"] == "node":
            geometry = {
                "type": "Point",
                "coordinates": [element["lon"], element["lat"]]
            }
        elif element["type"] == "way" and "center" in element:
            geometry = {
                "type": "Point",
                "coordinates": [element["center"]["lon"], element["center"]["lat"]]
            }
        else:
            continue

        feature = {
            "type": "Feature",
            "properties": element.get("tags", {}),
            "geometry": geometry
        }
        features.append(feature)

    return {
        "type": "FeatureCollection",
        "features": features
    }

SUMO Traffic Simulation

Cấu Trúc Dữ Liệu

{
  "timestamp": "2025-12-10T14:30:00Z",
  "vehicles": [
    {
      "id": "veh_001",
      "type": "car",
      "position": [105.8542, 21.0285],
      "speed": 35.5,
      "heading": 90,
      "emissions": {
        "CO2": 0.15,
        "NOx": 0.02
      }
    }
  ],
  "road_segments": [
    {
      "id": "edge_abc",
      "density": 0.75,
      "average_speed": 28.3,
      "congestion_level": "moderate"
    }
  ]
}

Processing Script

# process_simulation.py

import json
from datetime import datetime

def process_simulation_file(file_path):
    """Process SUMO simulation output"""
    with open(file_path, 'r') as f:
        data = json.load(f)

    # Aggregate traffic data
    traffic_entities = []
    for segment in data.get("road_segments", []):
        entity = {
            "id": f"urn:ngsi-ld:TrafficFlow:{segment['id']}",
            "type": "TrafficFlow",
            "density": {
                "type": "Property",
                "value": segment["density"]
            },
            "averageSpeed": {
                "type": "Property",
                "value": segment["average_speed"],
                "unitCode": "KMH"
            },
            "congestionLevel": {
                "type": "Property",
                "value": segment["congestion_level"]
            },
            "dateObserved": {
                "type": "Property",
                "value": datetime.utcnow().isoformat()
            }
        }
        traffic_entities.append(entity)

    return traffic_entities

NGSI-LD Entity Format

AirQualityObserved

{
  "id": "urn:ngsi-ld:AirQualityObserved:HN-001",
  "type": "AirQualityObserved",
  "@context": [
    "https://raw.githubusercontent.com/smart-data-models/dataModel.Environment/master/context.jsonld"
  ],
  "location": {
    "type": "GeoProperty",
    "value": {
      "type": "Point",
      "coordinates": [105.8542, 21.0285]
    }
  },
  "airQualityIndex": {
    "type": "Property",
    "value": 85
  },
  "PM2.5": {
    "type": "Property",
    "value": 35.2,
    "unitCode": "GQ"
  },
  "PM10": {
    "type": "Property",
    "value": 58.1,
    "unitCode": "GQ"
  },
  "dateObserved": {
    "type": "Property",
    "value": "2025-12-10T14:30:00Z"
  }
}

Utility Scripts

merge_json.py

"""Merge multiple JSON files into one"""
import json
import glob

def merge_json_files(pattern, output_file):
    merged_data = []

    for file_path in glob.glob(pattern):
        with open(file_path, 'r') as f:
            data = json.load(f)
            if isinstance(data, list):
                merged_data.extend(data)
            else:
                merged_data.append(data)

    with open(output_file, 'w') as f:
        json.dump(merged_data, f, indent=2)

    print(f"Merged {len(merged_data)} records to {output_file}")

split_json.py

"""Split large JSON file into smaller parts"""
import json
import math

def split_json_file(input_file, chunk_size=1000):
    with open(input_file, 'r') as f:
        data = json.load(f)

    num_chunks = math.ceil(len(data) / chunk_size)

    for i in range(num_chunks):
        chunk = data[i * chunk_size : (i + 1) * chunk_size]
        output_file = f"{input_file.replace('.json', '')}_part{i+1}.json"

        with open(output_file, 'w') as f:
            json.dump(chunk, f, indent=2)

        print(f"Created {output_file} with {len(chunk)} records")