Skip to content

NGED Data API

nged_data.ckan

Generic CKAN client for interacting with NGED's Connected Data portal.

Classes

Functions

get_primary_substation_locations(api_key)

Note that 'Park Lane' appears twice (with different substation numbers).

Source code in packages/nged_data/src/nged_data/ckan.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def get_primary_substation_locations(api_key: str) -> pt.DataFrame[SubstationLocations]:
    """Note that 'Park Lane' appears twice (with different substation numbers)."""
    if api_key:
        log.info(
            "Fetching substation locations with API key (length: %d, prefix: %s...)",
            len(api_key),
            api_key[:4],
        )
    else:
        log.warning("Fetching substation locations WITHOUT API key")

    url = f"{BASE_CKAN_URL}/api/3/action/resource_search"
    headers = {"Authorization": api_key} if api_key else {}
    params = {"query": "name:Primary Substation Location"}

    try:
        resp = httpx.get(url, headers=headers, params=params, timeout=30)
        resp.raise_for_status()
        data = resp.json()
    except Exception as e:
        log.error("CKAN resource_search failed: %s", e)
        raise

    if not data.get("success"):
        log.error("CKAN resource_search returned success=False: %s", data.get("error"))
        raise RuntimeError(f"CKAN search failed: {data.get('error')}")

    ckan_results = data["result"]["results"]
    ckan_result = find_one_match(lambda result: result["format"].upper() == "CSV", ckan_results)
    url = str(ckan_result["url"])
    http_response = httpx_get_with_auth(url, api_key=api_key)
    locations = pl.read_csv(http_response.content)
    locations = change_dataframe_column_names_to_snake_case(locations)
    locations = locations.filter(
        pl.col("substation_type").str.to_lowercase().str.contains("primary")
    )
    dtypes = cast(Any, SubstationLocations.dtypes)
    locations = locations.cast(dtypes)
    return SubstationLocations.validate(locations, drop_superfluous_columns=True)

nged_data.cleaning

Data cleaning logic for substation flows.

This module provides functions to identify and handle problematic telemetry data: - "Stuck" values: Values where the rolling standard deviation is below a threshold, indicating a sensor that hasn't changed reading. - "Insane" values: Values that fall outside physically plausible min/max bounds.

All operations use backward-looking rolling windows and group by substation to prevent data leakage. Bad values are replaced with null to preserve the temporal grid rather than being removed or imputed.

Classes

Functions

clean_substation_flows(df, settings, group_by_cols=None)

Clean substation flows by replacing stuck and insane values with null.

This function identifies problematic telemetry data and replaces these values with null to preserve the temporal grid. It performs the following checks:

  1. Stuck sensor detection: Uses a backward-looking rolling standard deviation (24-hour window) to detect sensors that haven't changed reading. Values are marked as stuck if the rolling standard deviation falls below settings.data_quality.stuck_std_threshold. We use time-based rolling to correctly handle different temporal resolutions and missing data.

  2. Insane value detection: Flags values outside the range [min_mw_threshold, max_mw_threshold] as insane. This captures physically implausible readings (e.g., MW > 100 for primary substations, or extreme negative values).

Both checks are performed per-substation to prevent cross-substation data leakage.

Parameters:

Name Type Description Default
df DataFrame

Raw substation flows DataFrame with 'MW' and 'MVA' columns.

required
settings Settings

Configuration containing data quality thresholds.

required
group_by_cols Sequence[str] | None

Columns to group by for per-substation operations. Defaults to ["substation_number"].

None

Returns:

Type Description
DataFrame

DataFrame with stuck and insane values replaced with null. The temporal grid

DataFrame

is preserved (no rows removed, no imputation).

Example

from contracts.settings import Settings df = clean_substation_flows(raw_df, Settings(...))

Notes
  • All rolling window operations are backward-looking to prevent data leakage.
  • Null values from the original data are preserved as-is.
  • Both 'MW' and 'MVA' columns are checked independently.
  • After cleaning, downstream logic should use pl.coalesce(['MW', 'MVA']) to create the MW_or_MVA column.
Source code in packages/nged_data/src/nged_data/cleaning.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
def clean_substation_flows(
    df: pl.DataFrame, settings: Settings, group_by_cols: Sequence[str] | None = None
) -> pl.DataFrame:
    """Clean substation flows by replacing stuck and insane values with null.

    This function identifies problematic telemetry data and replaces these values
    with null to preserve the temporal grid. It performs the following checks:

    1. **Stuck sensor detection**: Uses a backward-looking rolling standard deviation
       (24-hour window) to detect sensors that haven't changed reading.
       Values are marked as stuck if the rolling standard deviation falls below
       `settings.data_quality.stuck_std_threshold`. We use time-based rolling
       to correctly handle different temporal resolutions and missing data.

    2. **Insane value detection**: Flags values outside the range
       `[min_mw_threshold, max_mw_threshold]` as insane. This captures physically
       implausible readings (e.g., MW > 100 for primary substations, or extreme
       negative values).

    Both checks are performed per-substation to prevent cross-substation data leakage.

    Args:
        df: Raw substation flows DataFrame with 'MW' and 'MVA' columns.
        settings: Configuration containing data quality thresholds.
        group_by_cols: Columns to group by for per-substation operations.
                     Defaults to ["substation_number"].

    Returns:
        DataFrame with stuck and insane values replaced with null. The temporal grid
        is preserved (no rows removed, no imputation).

    Example:
        >>> from contracts.settings import Settings
        >>> df = clean_substation_flows(raw_df, Settings(...))

    Notes:
        - All rolling window operations are backward-looking to prevent data leakage.
        - Null values from the original data are preserved as-is.
        - Both 'MW' and 'MVA' columns are checked independently.
        - After cleaning, downstream logic should use `pl.coalesce(['MW', 'MVA'])`
          to create the `MW_or_MVA` column.
    """

    def _compute_insane_mask(power_col: str, min_thresh: float, max_thresh: float) -> pl.Expr:
        """Compute a boolean expression for insane value detection."""
        return (pl.col(power_col) < min_thresh) | (pl.col(power_col) > max_thresh)

    # Determine the power columns to check
    power_columns = [col for col in ["MW", "MVA"] if col in df.columns]

    if not power_columns:
        # No power columns to check, return early
        return df

    # Determine grouping columns
    group_by = list(group_by_cols) if group_by_cols is not None else ["substation_number"]

    # Sort by timestamp as required by .rolling()
    df_sorted = df.sort("timestamp")

    # Build the mask for stuck sensors using time-based rolling windows.
    # This is more robust than row-based rolling as it handles missing data
    # and different temporal resolutions correctly.
    agg_exprs = []
    rolling_cols = []
    for col in power_columns:
        std_col_name = f"{col}_rolling_std"
        count_col_name = f"{col}_rolling_count"
        rolling_cols.extend([std_col_name, count_col_name])
        agg_exprs.extend(
            [
                pl.col(col).std().alias(std_col_name),
                pl.col(col).count().alias(count_col_name),
            ]
        )

    rolling_df = df_sorted.rolling(
        index_column="timestamp",
        period="24h",
        group_by=group_by,
        closed="right",
    ).agg(agg_exprs)

    df_with_rolling = df_sorted.join(rolling_df, on=["timestamp", *group_by], how="left")

    # Replace bad values with null and drop temporary rolling columns.
    # We evaluate stuck and insane masks independently for each power column.
    # This prevents a stuck MW sensor from unnecessarily nulling out a healthy MVA sensor.
    stuck_window_periods = settings.data_quality.stuck_window_periods
    stuck_std_threshold = settings.data_quality.stuck_std_threshold
    min_mw_threshold = settings.data_quality.min_mw_threshold
    max_mw_threshold = settings.data_quality.max_mw_threshold

    with_columns_exprs = []
    for col in power_columns:
        std_col_name = f"{col}_rolling_std"
        count_col_name = f"{col}_rolling_count"

        # A sensor is stuck if its rolling std is below the threshold.
        # We require a minimum number of periods (e.g. 48 for 24 hours at 30m resolution)
        # to avoid false positives from short-term constant values.
        stuck_mask = (pl.col(count_col_name) >= stuck_window_periods) & (
            pl.col(std_col_name).fill_null(float("inf")) < stuck_std_threshold
        )

        # A value is insane if it falls outside physically plausible bounds.
        insane_mask = _compute_insane_mask(col, min_mw_threshold, max_mw_threshold)

        # Combine masks: a value is bad if it's stuck OR insane
        bad_mask = stuck_mask | insane_mask

        with_columns_exprs.append(
            pl.when(bad_mask).then(pl.lit(None)).otherwise(pl.col(col)).alias(col)
        )

    df_cleaned = df_with_rolling.with_columns(with_columns_exprs).drop(rolling_cols)

    return df_cleaned

nged_data.process_flows

Classes

MilfordHavenParser

Parser for Milford Haven format (unit=MW).

Source code in packages/nged_data/src/nged_data/process_flows.py
31
32
33
34
35
36
37
38
class MilfordHavenParser:
    """Parser for Milford Haven format (unit=MW)."""

    def can_parse(self, df: pl.DataFrame) -> bool:
        return "unit" in df.columns and "value" in df.columns and (df["unit"] == "MW").all()

    def parse(self, df: pl.DataFrame) -> pl.DataFrame:
        return df.rename({"value": "MW", "time": "timestamp"}, strict=False)

ParserStrategy

Bases: Protocol

Protocol for CSV parsing strategies.

Source code in packages/nged_data/src/nged_data/process_flows.py
 9
10
11
12
13
14
15
16
17
18
class ParserStrategy(Protocol):
    """Protocol for CSV parsing strategies."""

    def can_parse(self, df: pl.DataFrame) -> bool:
        """Return True if this strategy can parse the given DataFrame."""
        ...

    def parse(self, df: pl.DataFrame) -> pl.DataFrame:
        """Parse the DataFrame and return it with standardized column names."""
        ...
Functions
can_parse(df)

Return True if this strategy can parse the given DataFrame.

Source code in packages/nged_data/src/nged_data/process_flows.py
12
13
14
def can_parse(self, df: pl.DataFrame) -> bool:
    """Return True if this strategy can parse the given DataFrame."""
    ...
parse(df)

Parse the DataFrame and return it with standardized column names.

Source code in packages/nged_data/src/nged_data/process_flows.py
16
17
18
def parse(self, df: pl.DataFrame) -> pl.DataFrame:
    """Parse the DataFrame and return it with standardized column names."""
    ...

RegentStreetParser

Parser for Regent Street format (unit=MVA).

Source code in packages/nged_data/src/nged_data/process_flows.py
21
22
23
24
25
26
27
28
class RegentStreetParser:
    """Parser for Regent Street format (unit=MVA)."""

    def can_parse(self, df: pl.DataFrame) -> bool:
        return "unit" in df.columns and "value" in df.columns and (df["unit"] == "MVA").all()

    def parse(self, df: pl.DataFrame) -> pl.DataFrame:
        return df.rename({"value": "MVA", "time": "timestamp"}, strict=False)

StandardFormatParser

Parser for standard NGED formats with various column names.

Source code in packages/nged_data/src/nged_data/process_flows.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class StandardFormatParser:
    """Parser for standard NGED formats with various column names."""

    def can_parse(self, df: pl.DataFrame) -> bool:
        standard_cols = [
            "ValueDate",
            "Timestamp",
            "MW Inst",
            "MVA Inst",
            "Derived MVA",
            "Amps",
            "Volts",
        ]
        return any(col in df.columns for col in standard_cols)

    def parse(self, df: pl.DataFrame) -> pl.DataFrame:
        return df.rename(
            {
                "time": "timestamp",
                "ValueDate": "timestamp",
                "Timestamp": "timestamp",
                "MW Inst": "MW",
                "MVAr Inst": "MVAr",
                "MVA Inst": "MVA",
                "Derived MVA": "MVA",
            },
            strict=False,
        )

Functions

process_live_primary_substation_power_flows(csv_data)

Read a primary substation CSV and validate it against the schema.

Source code in packages/nged_data/src/nged_data/process_flows.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
def process_live_primary_substation_power_flows(
    csv_data: bytes,
) -> pt.DataFrame[SubstationPowerFlows]:
    """Read a primary substation CSV and validate it against the schema."""
    df = pl.read_csv(csv_data)
    first_orig_rows = df.head()

    # Define available parsing strategies
    parsers: list[ParserStrategy] = [
        RegentStreetParser(),
        MilfordHavenParser(),
        StandardFormatParser(),
    ]

    # Try each parser until one succeeds
    parsed = False
    for parser in parsers:
        if parser.can_parse(df):
            df = parser.parse(df)
            parsed = True
            break

    if not parsed and ("unit" in df.columns and "value" in df.columns):
        raise ValueError(f"Unexpected unit in CSV: {df['unit'].unique().to_list()}")

    # Compute MVA if missing
    df = _compute_missing_mva(df)

    # Ensure MW, MVA, MVAr, and ingested_at are present before validation.
    # If they are missing from the source CSV, fill them with null.
    for col in ["MW", "MVA", "MVAr"]:
        if col not in df.columns:
            df = df.with_columns(pl.lit(None).cast(pl.Float32).alias(col))

    if "ingested_at" not in df.columns:
        df = df.with_columns(pl.lit(None).cast(UTC_DATETIME_DTYPE).alias("ingested_at"))

    columns = [col for col in SubstationPowerFlows.columns if col in df.columns]
    df = df.select(columns)
    df = df.cast({col: SubstationPowerFlows.dtypes[col] for col in columns})
    df = df.sort("timestamp")

    try:
        return SubstationPowerFlows.validate(df, allow_missing_columns=True)
    except Exception as e:
        e.add_note(f"First rows of CSV data, before processing: {first_orig_rows}")
        raise

nged_data.schemas

nged_data.substation_names.align

Classes

Functions

join_location_table_to_live_primaries(locations, live_primaries)

Joins the substation locations dataset to the list of live primary flow resources.

This function performs a "dance" of normalization because NGED uses slightly different naming conventions in different datasets. For example: - The locations dataset might use "Abington 33/11kv". - The live flows dataset might use "Abington Primary Transformer Flows".

We normalize these by stripping out common suffixes and voltage information to create a "simple_name" that can be used for joining. We also use a manual mapping file for cases where the automated normalization isn't enough.

Returns a DataFrame that can be validated against SubstationMetadata.

Source code in packages/nged_data/src/nged_data/substation_names/align.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def join_location_table_to_live_primaries(
    locations: pt.DataFrame[SubstationLocations],
    live_primaries: list[CkanResource],
) -> pt.DataFrame[SubstationMetadata]:
    """
    Joins the substation locations dataset to the list of live primary flow resources.

    This function performs a "dance" of normalization because NGED uses slightly different
    naming conventions in different datasets. For example:
    - The locations dataset might use "Abington 33/11kv".
    - The live flows dataset might use "Abington Primary Transformer Flows".

    We normalize these by stripping out common suffixes and voltage information to create
    a "simple_name" that can be used for joining. We also use a manual mapping file for
    cases where the automated normalization isn't enough.

    Returns a DataFrame that can be validated against SubstationMetadata.
    """
    live_primaries_df = pl.DataFrame([r.model_dump(mode="json") for r in live_primaries])

    # Append a "simple_name" column to each dataframe:
    live_primaries_df = live_primaries_df.with_columns(simple_name=simplify_substation_name("name"))
    locations = locations.with_columns(simple_name=simplify_substation_name("substation_name"))

    # Load manual mapping
    csv_filename = (
        Path(__file__).parent
        / "map_substation_names_in_live_primary_flows_to_substation_names_in_location_table.csv"
    )
    name_mapping = pl.read_csv(csv_filename)

    # Replace any simple names in the live primaries dataframe with the name from the manual
    # mapping, if such a mapping exists.
    live_primaries_df = live_primaries_df.join(
        name_mapping,
        how="left",
        left_on="simple_name",
        right_on="simplified_substation_name_in_live_flows",
    )
    live_primaries_df = live_primaries_df.with_columns(
        simple_name=pl.coalesce("simplified_substation_name_in_location_table", "simple_name")
    )

    # Rename the name columns to match SubstationMetadata
    live_primaries_df = live_primaries_df.rename({"name": "substation_name_in_live_primaries"})
    locations_renamed_col = locations.rename(
        {"substation_name": "substation_name_in_location_table"}
    )

    # Join and select relevant columns for SubstationMetadata
    joined = live_primaries_df.join(locations_renamed_col, on="simple_name")

    # Ensure the URL is a string
    joined = joined.cast({"url": pl.String})

    # Select only the columns defined in the SubstationMetadata schema
    columns_to_select = [col for col in SubstationMetadata.columns if col in joined.columns]
    df = joined.select(columns_to_select).sort(by="substation_number")

    return SubstationMetadata.validate(df, allow_missing_columns=True)

nged_data.utils

Functions

ensure_utc_timestamp_lazy(lf)

Ensures the timestamp column is UTC timezone-aware lazily.

This handles cases where Delta tables lose timezone metadata or Polars scans them as naive. It also handles non-UTC timezones by converting them.

Parameters:

Name Type Description Default
lf LazyFrame

Polars LazyFrame with a 'timestamp' column.

required

Returns:

Type Description
LazyFrame

LazyFrame with UTC-aware 'timestamp' column.

Source code in packages/nged_data/src/nged_data/utils.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def ensure_utc_timestamp_lazy(lf: pl.LazyFrame) -> pl.LazyFrame:
    """Ensures the timestamp column is UTC timezone-aware lazily.

    This handles cases where Delta tables lose timezone metadata or Polars scans
    them as naive. It also handles non-UTC timezones by converting them.

    Args:
        lf: Polars LazyFrame with a 'timestamp' column.

    Returns:
        LazyFrame with UTC-aware 'timestamp' column.
    """
    schema = lf.collect_schema()
    # If the timestamp column is not present, we return early to avoid errors.
    # This is useful when scanning tables that might not have a timestamp column.
    if "timestamp" not in schema:
        return lf

    timestamp_dtype = schema["timestamp"]

    if isinstance(timestamp_dtype, pl.Datetime) and timestamp_dtype.time_zone is None:
        return lf.with_columns(pl.col("timestamp").dt.replace_time_zone("UTC"))
    elif isinstance(timestamp_dtype, pl.Datetime) and timestamp_dtype.time_zone != "UTC":
        return lf.with_columns(pl.col("timestamp").dt.convert_time_zone("UTC"))
    return lf

find_one_match(predicate, haystack)

Find exactly one match in haystack. Raise a ValueError if haystack is empty, or if there is more than 1 match.

Source code in packages/nged_data/src/nged_data/utils.py
 8
 9
10
11
12
13
14
15
16
def find_one_match[T](predicate: Callable[[T], bool], haystack: list[T]) -> T:
    """Find exactly one match in haystack. Raise a ValueError if haystack is empty, or if there is
    more than 1 match."""
    if len(haystack) == 0:
        raise ValueError("haystack is empty!")
    filtered = list(filter(predicate, haystack))
    if len(filtered) != 1:
        raise ValueError(f"Found {len(filtered)} matches when we were expecting exactly 1!")
    return filtered[0]

get_partition_window(partition_key, lookback_days=0)

Calculates the partition start, end, and lookback start times.

Parameters:

Name Type Description Default
partition_key str

The Dagster partition key (ISO format date string).

required
lookback_days int

Number of days to look back from the partition start.

0

Returns:

Type Description
tuple[datetime, datetime, datetime]

A tuple of (partition_start, partition_end, lookback_start).

Source code in packages/nged_data/src/nged_data/utils.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def get_partition_window(
    partition_key: str, lookback_days: int = 0
) -> tuple[datetime, datetime, datetime]:
    """Calculates the partition start, end, and lookback start times.

    Args:
        partition_key: The Dagster partition key (ISO format date string).
        lookback_days: Number of days to look back from the partition start.

    Returns:
        A tuple of (partition_start, partition_end, lookback_start).
    """
    # We use fromisoformat to handle the partition key, which is expected to be a date string.
    # We ensure the resulting datetime is UTC-aware.
    partition_start = datetime.fromisoformat(partition_key).replace(tzinfo=timezone.utc)
    partition_end = partition_start + timedelta(days=1)
    lookback_start = partition_start - timedelta(days=lookback_days)
    return partition_start, partition_end, lookback_start

scan_delta_table(path)

Scans a Delta table and ensures the timestamp column is UTC-aware.

Parameters:

Name Type Description Default
path str | Path

Path to the Delta table.

required

Returns:

Type Description
LazyFrame

Polars LazyFrame with UTC-aware 'timestamp' column (if present).

Source code in packages/nged_data/src/nged_data/utils.py
55
56
57
58
59
60
61
62
63
64
def scan_delta_table(path: str | Path) -> pl.LazyFrame:
    """Scans a Delta table and ensures the timestamp column is UTC-aware.

    Args:
        path: Path to the Delta table.

    Returns:
        Polars LazyFrame with UTC-aware 'timestamp' column (if present).
    """
    return ensure_utc_timestamp_lazy(pl.scan_delta(str(path)))