Skip to content

ML Core API

ml_core

Unified ML model interface and shared utilities for substation forecasting.

Purpose

This package provides the abstract base classes and shared logic required to implement the "Tiny Wrapper" ML pipeline. It prioritizes readability, explicit dependencies, and native framework integrations.

Why this package exists

To enable rapid experimentation with different model architectures (XGBoost, GNNs, etc.), we need a common interface that shields the model developer from the underlying data engineering.

Key Components

  • model.py: Base classes for forecasters (BaseForecaster). The BaseForecaster protocol defines a standard interface for all forecasting models, requiring them to implement train() and predict() methods. This allows Dagster to orchestrate any model uniformly.
  • utils.py: Shared MLOps utilities for training (train_and_log_model) and evaluation (evaluate_and_save_model).
  • features.py: Shared feature engineering logic (e.g., cyclical time features).
  • data.py: Shared data splitting and loading logic.
  • scaling.py: Shared normalization and scaling utilities.

Advanced Forecasting Features

The BaseForecaster protocol supports several advanced forecasting features:

  1. Multi-NWP Support: Models can ingest forecasts from multiple Numerical Weather Prediction (NWP) providers simultaneously. Secondary NWP features are prefixed with their model name (e.g., gfs_temperature_2m), and all NWPs are joined using a 3-hour availability delay.
  2. Dynamic Seasonal Lags: Prevents lookahead bias by calculating autoregressive lags dynamically based on the forecast lead time. The model always uses the most recent available historical data for a given lead time (e.g., lag_days = max(1, ceil(lead_time_days / 7)) * 7).
  3. Rigorous Backtesting: Supports simulating real-time inference via the collapse_lead_times parameter. When enabled, it filters NWP data to keep only the latest available forecast for each valid time, enforcing the 3-hour availability delay.

ml_core.model

Base classes for ML model inference.

Classes

BaseForecaster

Bases: ABC

Abstract base class for all ML model forecasters.

A Forecaster handles the full lifecycle of an ML model: training and production inference. It handles eager DataFrames and is designed to be used within Dagster assets or standalone scripts.

Subclasses should override the train and predict methods with explicit, strictly-typed keyword arguments for the specific data they require.

Source code in packages/ml_core/src/ml_core/model.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
class BaseForecaster(ABC):
    """Abstract base class for all ML model forecasters.

    A Forecaster handles the full lifecycle of an ML model: training and
    production inference. It handles eager `DataFrames` and is designed to
    be used within Dagster assets or standalone scripts.

    Subclasses should override the `train` and `predict` methods with explicit,
    strictly-typed keyword arguments for the specific data they require.
    """

    @abstractmethod
    def train(
        self,
        config: ModelConfig,
        flows_30m: pl.LazyFrame,
        substation_metadata: pt.DataFrame[SubstationMetadata],
        nwps: Mapping[NwpModel, pl.LazyFrame] | None = None,
    ) -> Any:
        """Train the model.

        Args:
            config: Model configuration object.
            flows_30m: Historical power flow data downsampled to 30m.
            substation_metadata: The substation metadata.
            nwps: A dictionary of weather forecast dataframes.

        Returns:
            The trained native model object (e.g., XGBRegressor).
        """
        pass

    @abstractmethod
    def predict(
        self,
        substation_metadata: pt.DataFrame[SubstationMetadata],
        inference_params: InferenceParams,
        flows_30m: pl.LazyFrame,
        nwps: Mapping[NwpModel, pl.LazyFrame] | None = None,
        collapse_lead_times: bool = False,
    ) -> pt.DataFrame[PowerForecast]:
        """Generate power forecasts.

        Args:
            substation_metadata: The substation metadata.
            inference_params: Parameters for inference.
            flows_30m: Historical power flow data downsampled to 30m (for lags).
            nwps: A dictionary of weather forecast dataframes.
            collapse_lead_times: Whether to collapse lead times (used in backtesting).

        Returns:
            A Patito DataFrame containing the model's predictions.
        """
        pass

    @abstractmethod
    def log_model(self, model_name: str) -> None:
        """Log the model to MLflow.

        Args:
            model_name: The name to register the model under.
        """
        pass
Functions
log_model(model_name) abstractmethod

Log the model to MLflow.

Parameters:

Name Type Description Default
model_name str

The name to register the model under.

required
Source code in packages/ml_core/src/ml_core/model.py
75
76
77
78
79
80
81
82
@abstractmethod
def log_model(self, model_name: str) -> None:
    """Log the model to MLflow.

    Args:
        model_name: The name to register the model under.
    """
    pass
predict(substation_metadata, inference_params, flows_30m, nwps=None, collapse_lead_times=False) abstractmethod

Generate power forecasts.

Parameters:

Name Type Description Default
substation_metadata DataFrame[SubstationMetadata]

The substation metadata.

required
inference_params InferenceParams

Parameters for inference.

required
flows_30m LazyFrame

Historical power flow data downsampled to 30m (for lags).

required
nwps Mapping[NwpModel, LazyFrame] | None

A dictionary of weather forecast dataframes.

None
collapse_lead_times bool

Whether to collapse lead times (used in backtesting).

False

Returns:

Type Description
DataFrame[PowerForecast]

A Patito DataFrame containing the model's predictions.

Source code in packages/ml_core/src/ml_core/model.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
@abstractmethod
def predict(
    self,
    substation_metadata: pt.DataFrame[SubstationMetadata],
    inference_params: InferenceParams,
    flows_30m: pl.LazyFrame,
    nwps: Mapping[NwpModel, pl.LazyFrame] | None = None,
    collapse_lead_times: bool = False,
) -> pt.DataFrame[PowerForecast]:
    """Generate power forecasts.

    Args:
        substation_metadata: The substation metadata.
        inference_params: Parameters for inference.
        flows_30m: Historical power flow data downsampled to 30m (for lags).
        nwps: A dictionary of weather forecast dataframes.
        collapse_lead_times: Whether to collapse lead times (used in backtesting).

    Returns:
        A Patito DataFrame containing the model's predictions.
    """
    pass
train(config, flows_30m, substation_metadata, nwps=None) abstractmethod

Train the model.

Parameters:

Name Type Description Default
config ModelConfig

Model configuration object.

required
flows_30m LazyFrame

Historical power flow data downsampled to 30m.

required
substation_metadata DataFrame[SubstationMetadata]

The substation metadata.

required
nwps Mapping[NwpModel, LazyFrame] | None

A dictionary of weather forecast dataframes.

None

Returns:

Type Description
Any

The trained native model object (e.g., XGBRegressor).

Source code in packages/ml_core/src/ml_core/model.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@abstractmethod
def train(
    self,
    config: ModelConfig,
    flows_30m: pl.LazyFrame,
    substation_metadata: pt.DataFrame[SubstationMetadata],
    nwps: Mapping[NwpModel, pl.LazyFrame] | None = None,
) -> Any:
    """Train the model.

    Args:
        config: Model configuration object.
        flows_30m: Historical power flow data downsampled to 30m.
        substation_metadata: The substation metadata.
        nwps: A dictionary of weather forecast dataframes.

    Returns:
        The trained native model object (e.g., XGBRegressor).
    """
    pass

ml_core.utils

Classes

Functions

evaluate_and_save_model(context, model_name, forecaster, config, **kwargs)

Universal utility to handle temporal slicing, inference, and storage.

Parameters:

Name Type Description Default
context Union[AssetExecutionContext, OpExecutionContext]

Dagster execution context (Asset or Op).

required
model_name str

Name of the model.

required
forecaster

An object with a predict(**kwargs) method.

required
config TrainingConfig

Training configuration object.

required
**kwargs

Input LazyFrames to be temporally sliced and collected.

{}

Returns:

Type Description

A Polars DataFrame containing the predictions.

Source code in packages/ml_core/src/ml_core/utils.py
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
def evaluate_and_save_model(
    context: Union[dg.AssetExecutionContext, dg.OpExecutionContext],
    model_name: str,
    forecaster,
    config: TrainingConfig,
    **kwargs,
):
    """Universal utility to handle temporal slicing, inference, and storage.

    Args:
        context: Dagster execution context (Asset or Op).
        model_name: Name of the model.
        forecaster: An object with a `predict(**kwargs)` method.
        config: Training configuration object.
        **kwargs: Input LazyFrames to be temporally sliced and collected.

    Returns:
        A Polars DataFrame containing the predictions.
    """
    # 1. Universal Temporal Slicing for Test Set
    test_start = config.data_split.test_start
    test_end = config.data_split.test_end

    sliced_data = {}
    for key, val in kwargs.items():
        if key == "substation_metadata":
            sliced_data[key] = val
            continue

        time_col = "timestamp" if "power_flows" in key else "valid_time"

        # Add a configurable lookback for autoregressive features
        slice_start = test_start
        if "power_flows" in key or "nwps" in key:
            lookback = getattr(config.model, "required_lookback_days", 14)
            slice_start = test_start - timedelta(days=lookback)

        sliced_data[key] = _slice_temporal_data(val, slice_start, test_end, time_col)

    # 2. Call the Model-Specific Inference
    # Extract the actual init_time from the provided nwps data
    forecast_time = datetime.now(timezone.utc)
    if "nwps" in sliced_data:
        nwps_data = sliced_data["nwps"]
        first_nwp = None
        if isinstance(nwps_data, dict) and nwps_data:
            first_nwp = next(iter(nwps_data.values()))
        elif isinstance(nwps_data, list) and nwps_data:
            first_nwp = nwps_data[0]
        elif isinstance(nwps_data, pl.LazyFrame):
            first_nwp = nwps_data

        if isinstance(first_nwp, pl.LazyFrame):
            df = cast(pl.DataFrame, first_nwp.select(pl.col("init_time").max()).collect())
            if not df.is_empty() and df.item() is not None:
                delay_hours = getattr(config.model, "nwp_availability_delay_hours", 3)
                forecast_time = df.item() + timedelta(hours=delay_hours)

    inference_params = InferenceParams(
        forecast_time=forecast_time,
        power_fcst_model_name=model_name,
    )

    # Downsample power flows to 30m for inference (lags)
    if "substation_power_flows" in sliced_data:
        flows = sliced_data.pop("substation_power_flows")
        # Use the provided target_map for consistency
        target_map = kwargs.get("target_map")
        if target_map is None:
            # Fallback if no target_map is available on the forecaster
            # In production, the forecaster should always have one.
            if hasattr(forecaster, "target_map") and forecaster.target_map is not None:
                target_map = forecaster.target_map
            else:
                raise ValueError(
                    "target_map must be provided in kwargs or present on the forecaster to downsample power flows."
                )

        target_map_lf = target_map.lazy() if isinstance(target_map, pl.DataFrame) else target_map
        flows_30m = downsample_power_flows(flows, target_map=target_map_lf)
        sliced_data["flows_30m"] = flows_30m

    results_df = forecaster.predict(
        inference_params=inference_params, collapse_lead_times=False, **sliced_data
    )

    # 3. Calculate Metrics per lead_time
    if "flows_30m" in sliced_data:
        actuals = cast(
            pl.DataFrame,
            sliced_data["flows_30m"].collect(),
        )

        # Join predictions with actuals
        eval_df = results_df.join(
            actuals.rename({"timestamp": "valid_time", "MW_or_MVA": "actual"}),
            on=["valid_time", "substation_number"],
            how="inner",
        )

        # Join the peak_capacity_MW_or_MVA from the forecaster's target_map
        if hasattr(forecaster, "target_map") and forecaster.target_map is not None:
            target_map_df = forecaster.target_map
            if isinstance(target_map_df, pl.LazyFrame):
                target_map_df = target_map_df.collect()

            # We cast to pl.DataFrame here because eval_df and target_map_df are
            # different Patito models. Polars' join method on Patito subclasses
            # enforces that both sides have the same type, which would fail here.
            eval_df = pl.DataFrame(eval_df).join(
                pl.DataFrame(target_map_df).select(
                    ["substation_number", "peak_capacity_MW_or_MVA"]
                ),
                on="substation_number",
                how="left",
            )
            # Fill missing peak_capacity_MW_or_MVA with 1.0 to avoid division by zero
            eval_df = eval_df.with_columns(pl.col("peak_capacity_MW_or_MVA").fill_null(1.0))
        else:
            # Fallback if no target_map is available
            eval_df = eval_df.with_columns(peak_capacity_MW_or_MVA=pl.lit(1.0))

        if not eval_df.is_empty():
            # Filter out the lookback period to avoid data leakage in evaluation
            if isinstance(test_start, date) and not isinstance(test_start, datetime):
                test_start_dt = datetime.combine(test_start, datetime.min.time()).replace(
                    tzinfo=timezone.utc
                )
            else:
                test_start_dt = test_start

            eval_df = eval_df.filter(pl.col("valid_time") >= test_start_dt)

            # Calculate lead_time_hours
            if "nwp_init_time" in eval_df.columns:
                eval_df = eval_df.with_columns(
                    lead_time_hours=(
                        pl.col("valid_time") - pl.col("nwp_init_time")
                    ).dt.total_minutes()
                    / 60.0
                )
            else:
                # Fallback if nwp_init_time is not available
                eval_df = eval_df.with_columns(
                    lead_time_hours=(
                        pl.col("valid_time") - pl.lit(forecast_time)
                    ).dt.total_minutes()
                    / 60.0
                )

            # Group by lead_time_hours and calculate metrics
            metrics = (
                eval_df.group_by("lead_time_hours")
                .agg(
                    [
                        (pl.col("MW_or_MVA") - pl.col("actual")).abs().mean().alias("MAE"),
                        ((pl.col("MW_or_MVA") - pl.col("actual")) ** 2).mean().sqrt().alias("RMSE"),
                        (
                            (pl.col("MW_or_MVA") - pl.col("actual")).abs()
                            / pl.col("peak_capacity_MW_or_MVA")
                        )
                        .mean()
                        .alias("nMAE"),
                    ]
                )
                .sort("lead_time_hours")
            )

            # Log metrics to MLflow
            mlflow.set_experiment(model_name)
            with mlflow.start_run(run_name=f"{model_name}_eval"):
                # 1. Metric Thinning: Log only key operational horizons
                key_horizons = {24.0, 48.0, 72.0, 168.0, 336.0}
                for row in metrics.iter_rows(named=True):
                    lt = round(float(row["lead_time_hours"]), 1)
                    if lt in key_horizons:
                        if row["MAE"] is not None:
                            mlflow.log_metric(f"MAE_LT_{lt}h", row["MAE"])
                        if row["RMSE"] is not None:
                            mlflow.log_metric(f"RMSE_LT_{lt}h", row["RMSE"])
                        if row["nMAE"] is not None:
                            mlflow.log_metric(f"nMAE_LT_{lt}h", row["nMAE"])

                # 2. Log global aggregate metrics
                mae_global = eval_df.select(
                    (pl.col("MW_or_MVA") - pl.col("actual")).abs().mean()
                ).item()
                if mae_global is not None:
                    mlflow.log_metric("mean_mae_all_horizons", mae_global)
                    # Keep MAE_global for backward compatibility
                    mlflow.log_metric("MAE_global", mae_global)

                rmse_global = eval_df.select(
                    ((pl.col("MW_or_MVA") - pl.col("actual")) ** 2).mean().sqrt()
                ).item()
                if rmse_global is not None:
                    mlflow.log_metric("RMSE_global", rmse_global)

                nmae_global = eval_df.select(
                    (
                        (pl.col("MW_or_MVA") - pl.col("actual")).abs()
                        / pl.col("peak_capacity_MW_or_MVA")
                    ).mean()
                ).item()
                if nmae_global is not None:
                    mlflow.log_metric("nMAE_global", nmae_global)

                # 3. Save full granular evaluation dataframe as Parquet artifact
                with tempfile.TemporaryDirectory() as tmpdir:
                    eval_df_path = os.path.join(tmpdir, "evaluation_granular.parquet")
                    eval_df.write_parquet(eval_df_path)
                    mlflow.log_artifact(eval_df_path)

    # 4. Trigger Dynamic Partition
    if hasattr(context, "instance") and context.instance:
        context.instance.add_dynamic_partitions("model_partitions", [model_name])

    if hasattr(context, "add_output_metadata"):
        context.add_output_metadata(
            {
                "num_rows": len(results_df),
                "power_fcst_model_name": model_name,
            }
        )

    return results_df

train_and_log_model(context, model_name, trainer, config, **kwargs)

Universal utility to handle temporal slicing and MLflow logging for training.

Parameters:

Name Type Description Default
context Union[AssetExecutionContext, OpExecutionContext]

Dagster execution context (Asset or Op).

required
model_name str

Name of the model (for MLflow run name).

required
trainer

An object with a train(config, **kwargs) method.

required
config TrainingConfig

Training configuration object.

required
**kwargs

Input LazyFrames to be temporally sliced.

{}

Returns:

Type Description

The trained model object.

Source code in packages/ml_core/src/ml_core/utils.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def train_and_log_model(
    context: Union[dg.AssetExecutionContext, dg.OpExecutionContext],
    model_name: str,
    trainer,
    config: TrainingConfig,
    **kwargs,
):
    """Universal utility to handle temporal slicing and MLflow logging for training.

    Args:
        context: Dagster execution context (Asset or Op).
        model_name: Name of the model (for MLflow run name).
        trainer: An object with a `train(config, **kwargs)` method.
        config: Training configuration object.
        **kwargs: Input LazyFrames to be temporally sliced.

    Returns:
        The trained model object.
    """
    # 1. Universal Temporal Slicing
    train_start = config.data_split.train_start
    train_end = config.data_split.train_end

    sliced_data = {}
    for key, val in kwargs.items():
        if key == "substation_metadata":
            sliced_data[key] = val
            continue

        time_col = "timestamp" if "power_flows" in key else "valid_time"

        # Add a configurable lookback for autoregressive features
        slice_start = train_start
        if "power_flows" in key or "nwps" in key:
            lookback = getattr(config.model, "required_lookback_days", 14)
            slice_start = train_start - timedelta(days=lookback)

        sliced_data[key] = _slice_temporal_data(val, slice_start, train_end, time_col)

        # 2. Call the Model-Specific Math
        # The trainer is responsible for joining and feature engineering.
        # We downsample power flows to 30m and use the provided target map
        # to ensure consistency across models.
        if "substation_power_flows" in sliced_data:
            flows = sliced_data.pop("substation_power_flows")
            target_map = kwargs.get("target_map")
            if target_map is None:
                raise ValueError("target_map must be passed in kwargs to downsample power flows.")

            flows_30m = downsample_power_flows(
                flows,
                target_map=target_map.lazy()
                if isinstance(target_map, pl.DataFrame)
                else target_map,
            )
            sliced_data["flows_30m"] = flows_30m

            # Store target_map on the trainer if it supports it
            if hasattr(trainer, "target_map"):
                trainer.target_map = target_map

        # Remove target_map from sliced_data so it's not passed to trainer.train()
        sliced_data.pop("target_map", None)

    model = trainer.train(config=config.model, **sliced_data)

    # 3. Universal MLflow Logging
    mlflow.set_experiment(model_name)
    with mlflow.start_run(run_name=model_name) as run:
        mlflow.log_params(config.model_dump(mode="json"))
        trainer.log_model(model_name)
        if hasattr(context, "add_output_metadata"):
            context.add_output_metadata({"mlflow_run_id": run.info.run_id})

    return model

ml_core.data

Shared data processing logic for ML models.

Classes

Functions

calculate_target_map(flows)

Calculate the target map (power_col and peak_capacity) for each substation.

This function analyzes historical power flows to determine whether MW or MVA is the more reliable target variable (based on data availability) and calculates the peak capacity for normalization.

POTENTIAL DATA LEAKAGE: The 90-day dead sensor rule uses the entire history, introducing temporal leakage, but the user has consciously accepted this to simplify the global decision.

Parameters:

Name Type Description Default
flows LazyFrame[SubstationPowerFlows] | DataFrame[SubstationPowerFlows]

Historical power flow data (LazyFrame or DataFrame).

required

Returns:

Type Description
DataFrame[SubstationTargetMap]

A Patito DataFrame containing the target map for each substation.

Source code in packages/ml_core/src/ml_core/data.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def calculate_target_map(
    flows: pt.LazyFrame[SubstationPowerFlows] | pt.DataFrame[SubstationPowerFlows],
) -> pt.DataFrame[SubstationTargetMap]:
    """Calculate the target map (power_col and peak_capacity) for each substation.

    This function analyzes historical power flows to determine whether MW or MVA
    is the more reliable target variable (based on data availability) and
    calculates the peak capacity for normalization.

    POTENTIAL DATA LEAKAGE: The 90-day dead sensor rule uses the entire history,
    introducing temporal leakage, but the user has consciously accepted this to
    simplify the global decision.

    Args:
        flows: Historical power flow data (LazyFrame or DataFrame).

    Returns:
        A Patito DataFrame containing the target map for each substation.
    """
    flows_lazy = flows.lazy() if isinstance(flows, pl.DataFrame) else flows

    # Calculate valid counts, last seen timestamps, and peak capacity
    stats = flows_lazy.group_by("substation_number").agg(
        valid_count_mw=pl.col(POWER_MW).is_not_null().sum(),
        valid_count_mva=pl.col(POWER_MVA).is_not_null().sum(),
        last_seen_mw=pl.col("timestamp").filter(pl.col(POWER_MW).is_not_null()).max(),
        last_seen_mva=pl.col("timestamp").filter(pl.col(POWER_MVA).is_not_null()).max(),
        max_timestamp=pl.col("timestamp").max(),
        peak_capacity_MW_or_MVA=pl.max_horizontal(
            pl.col(POWER_MW).cast(pl.Float32).abs().max(),
            pl.col(POWER_MVA).cast(pl.Float32).abs().max(),
        )
        .fill_null(1.0)
        .clip(lower_bound=1.0),
    )

    # Determine initial choice based on volume (Priority: MW > MVA if equal counts)
    stats = stats.with_columns(
        preferred_power_col=pl.when(pl.col("valid_count_mw") >= pl.col("valid_count_mva"))
        .then(pl.lit(POWER_MW))
        .otherwise(pl.lit(POWER_MVA)),
    )

    # Apply "Dead Sensor" Exception (90 day rule)
    # If the preferred column's last seen is > 90 days before the total max_timestamp,
    # we switch to the alternative.
    dead_sensor_threshold = pl.duration(days=90)

    # Check if each sensor is dead (no data in the last 90 days)
    is_mw_dead = (pl.col("max_timestamp") - pl.col("last_seen_mw")) > dead_sensor_threshold
    is_mva_dead = (pl.col("max_timestamp") - pl.col("last_seen_mva")) > dead_sensor_threshold

    # Determine if we need to switch from the initially preferred column
    switch_to_mva = (
        (pl.col("preferred_power_col") == POWER_MW) & is_mw_dead & (pl.col("valid_count_mva") > 0)
    )
    switch_to_mw = (
        (pl.col("preferred_power_col") == POWER_MVA) & is_mva_dead & (pl.col("valid_count_mw") > 0)
    )

    pref_col_expr = (
        pl.when(switch_to_mva)
        .then(pl.lit(POWER_MVA))
        .when(switch_to_mw)
        .then(pl.lit(POWER_MW))
        .otherwise(pl.col("preferred_power_col"))
    )

    # To avoid type overload issues, we compute the target map entirely lazily
    # and only collect at the very end.
    target_map_lazy = stats.with_columns(
        preferred_power_col=pref_col_expr,
        peak_capacity_MW_or_MVA=pl.col("peak_capacity_MW_or_MVA"),
    ).select(["substation_number", "preferred_power_col", "peak_capacity_MW_or_MVA"])

    target_map_df = cast(pl.DataFrame, target_map_lazy.collect()).cast(
        {
            "substation_number": pl.Int32,
            "peak_capacity_MW_or_MVA": pl.Float32,
        }
    )

    return SubstationTargetMap.validate(target_map_df)

downsample_power_flows(flows, target_map)

Downsample power flows to 30m using period-ending semantics.

We assume that NWP data represents the average (or accumulated) value for the period ending at valid_time. For example, a weather forecast for 10:00 describes the weather from 09:00 to 10:00.

To align our targets with these features, we downsample power flows using closed="right", label="right". This ensures that power readings from 09:30 to 10:00 are aggregated and labeled as 10:00.

Parameters:

Name Type Description Default
flows DataFrame[SubstationPowerFlows] | LazyFrame[SubstationPowerFlows]

Historical power flow data.

required
target_map DataFrame[SubstationTargetMap] | LazyFrame[SubstationTargetMap]

Map of substation_number to target_col (MW or MVA).

required

Returns:

Type Description
LazyFrame[SimplifiedSubstationPowerFlows]

Downsampled power flows.

Source code in packages/ml_core/src/ml_core/data.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def downsample_power_flows(
    flows: pt.DataFrame[SubstationPowerFlows] | pt.LazyFrame[SubstationPowerFlows],
    target_map: pt.DataFrame[SubstationTargetMap] | pt.LazyFrame[SubstationTargetMap],
) -> pt.LazyFrame[SimplifiedSubstationPowerFlows]:
    """Downsample power flows to 30m using period-ending semantics.

    We assume that NWP data represents the average (or accumulated) value for the
    period *ending* at `valid_time`. For example, a weather forecast for 10:00
    describes the weather from 09:00 to 10:00.

    To align our targets with these features, we downsample power flows using
    `closed="right", label="right"`. This ensures that power readings from
    09:30 to 10:00 are aggregated and labeled as `10:00`.

    Args:
        flows: Historical power flow data.
        target_map: Map of substation_number to target_col (MW or MVA).

    Returns:
        Downsampled power flows.
    """
    flows_lazy = flows.lazy() if isinstance(flows, pl.DataFrame) else flows
    target_map_lazy = target_map.lazy() if isinstance(target_map, pl.DataFrame) else target_map

    # Select the correct column before downsampling to avoid expensive operations on both MW and MVA
    flows_lazy = (
        flows_lazy.join(
            target_map_lazy.select(["substation_number", "preferred_power_col"]),
            on="substation_number",
            how="left",
        )
        .with_columns(
            pl.when(pl.col("preferred_power_col") == POWER_MVA)
            .then(pl.col(POWER_MVA))
            .otherwise(pl.col(POWER_MW))
            .alias(POWER_MW_OR_MVA)
        )
        .select(["timestamp", "substation_number", POWER_MW_OR_MVA])
    )

    # Downsample the single MW_or_MVA column
    return cast(
        pt.LazyFrame[SimplifiedSubstationPowerFlows],
        (
            flows_lazy.sort("timestamp")
            .group_by_dynamic(
                "timestamp",
                every="30m",
                group_by="substation_number",
                closed="right",
                label="right",
            )
            .agg(pl.col(POWER_MW_OR_MVA).mean())
        ),
    )

ml_core.features

Shared feature engineering logic for ML models.

Functions

add_cyclical_temporal_features(df, time_col='valid_time')

Add cyclical and standard temporal features.

Parameters:

Name Type Description Default
df T

DataFrame or LazyFrame with a time column.

required
time_col str

Name of the time column.

'valid_time'

Returns:

Type Description
T

DataFrame or LazyFrame with added temporal features (hour_sin, hour_cos,

T

day_of_year_sin, day_of_year_cos, day_of_week).

Source code in packages/ml_core/src/ml_core/features.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def add_cyclical_temporal_features(df: T, time_col: str = "valid_time") -> T:
    """Add cyclical and standard temporal features.

    Args:
        df: DataFrame or LazyFrame with a time column.
        time_col: Name of the time column.

    Returns:
        DataFrame or LazyFrame with added temporal features (hour_sin, hour_cos,
        day_of_year_sin, day_of_year_cos, day_of_week).
    """
    return cast(
        T,
        cast(Any, df)
        .with_columns(
            # Convert to local time (Europe/London) before extracting temporal features
            # to ensure diurnal patterns align with human activity and DST shifts.
            local_time=pl.col(time_col).dt.convert_time_zone("Europe/London")
        )
        .with_columns(
            # Cyclical Hour (24h)
            hour_sin=(
                (pl.col("local_time").dt.hour() + pl.col("local_time").dt.minute() / 60.0)
                * 2
                * np.pi
                / 24
            )
            .sin()
            .cast(pl.Float32),
            hour_cos=(
                (pl.col("local_time").dt.hour() + pl.col("local_time").dt.minute() / 60.0)
                * 2
                * np.pi
                / 24
            )
            .cos()
            .cast(pl.Float32),
            # Cyclical Day of Year (365.25d)
            day_of_year_sin=(pl.col("local_time").dt.ordinal_day() * 2 * np.pi / 365.25)
            .sin()
            .cast(pl.Float32),
            day_of_year_cos=(pl.col("local_time").dt.ordinal_day() * 2 * np.pi / 365.25)
            .cos()
            .cast(pl.Float32),
            # Day of week (1-7)
            day_of_week=pl.col("local_time").dt.weekday().cast(pl.Int8),
        )
        .drop("local_time"),
    )

ml_core.scaling

Shared scaling and normalization utilities for ML models.

Classes

Functions

uint8_to_physical_unit(params)

Convert uint8 columns back to physical units (Float32).

Parameters:

Name Type Description Default
params DataFrame[ScalingParams]

Patito DataFrame with scaling parameters.

required

Returns:

Type Description
list[Expr]

List of Polars expressions for the conversion.

Source code in packages/ml_core/src/ml_core/scaling.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def uint8_to_physical_unit(params: pt.DataFrame[ScalingParams]) -> list[pl.Expr]:
    """Convert uint8 columns back to physical units (Float32).

    Args:
        params: Patito DataFrame with scaling parameters.

    Returns:
        List of Polars expressions for the conversion.
    """
    exprs = []
    for row in params.iter_rows(named=True):
        col = row["col_name"]
        b_min = row["buffered_min"]
        b_range = row["buffered_range"]

        # UInt8 -> Raw (Float32)
        expr = ((pl.col(col).cast(pl.Float32) / 255 * b_range) + b_min).alias(col)
        exprs.append(expr)

    return exprs