bikes.jobs

High-level jobs of the project.

 1"""High-level jobs of the project."""
 2
 3# %% IMPORTS
 4
 5from bikes.jobs.evaluations import EvaluationsJob
 6from bikes.jobs.explanations import ExplanationsJob
 7from bikes.jobs.inference import InferenceJob
 8from bikes.jobs.promotion import PromotionJob
 9from bikes.jobs.training import TrainingJob
10from bikes.jobs.tuning import TuningJob
11
12# %% TYPES
13
14JobKind = TuningJob | TrainingJob | PromotionJob | InferenceJob | EvaluationsJob | ExplanationsJob
15
16# %% EXPORTS
17
18__all__ = [
19    "TuningJob",
20    "TrainingJob",
21    "PromotionJob",
22    "InferenceJob",
23    "EvaluationsJob",
24    "ExplanationsJob",
25    "JobKind",
26]
class TuningJob(bikes.jobs.base.Job):
 19class TuningJob(base.Job):
 20    """Find the best hyperparameters for a model.
 21
 22    Parameters:
 23        run_config (services.MlflowService.RunConfig): mlflow run config.
 24        inputs (datasets.ReaderKind): reader for the inputs data.
 25        targets (datasets.ReaderKind): reader for the targets data.
 26        model (models.ModelKind): machine learning model to tune.
 27        metric (metrics.MetricKind): tuning metric to optimize.
 28        splitter (splitters.SplitterKind): data sets splitter.
 29        searcher: (searchers.SearcherKind): hparams searcher.
 30    """
 31
 32    KIND: T.Literal["TuningJob"] = "TuningJob"
 33
 34    # Run
 35    run_config: services.MlflowService.RunConfig = services.MlflowService.RunConfig(name="Tuning")
 36    # Data
 37    inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 38    targets: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 39    # Model
 40    model: models.ModelKind = pdt.Field(models.BaselineSklearnModel(), discriminator="KIND")
 41    # Metric
 42    metric: metrics.MetricKind = pdt.Field(metrics.SklearnMetric(), discriminator="KIND")
 43    # splitter
 44    splitter: splitters.SplitterKind = pdt.Field(
 45        splitters.TimeSeriesSplitter(), discriminator="KIND"
 46    )
 47    # Searcher
 48    searcher: searchers.SearcherKind = pdt.Field(
 49        searchers.GridCVSearcher(
 50            param_grid={
 51                "max_depth": [3, 5, 7],
 52            }
 53        ),
 54        discriminator="KIND",
 55    )
 56
 57    @T.override
 58    def run(self) -> base.Locals:
 59        """Run the tuning job in context."""
 60        # services
 61        # - logger
 62        logger = self.logger_service.logger()
 63        logger.info("With logger: {}", logger)
 64        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 65            logger.info("With run context: {}", run.info)
 66            # data
 67            # - inputs
 68            logger.info("Read inputs: {}", self.inputs)
 69            inputs_ = self.inputs.read()  # unchecked!
 70            inputs = schemas.InputsSchema.check(inputs_)
 71            logger.debug("- Inputs shape: {}", inputs.shape)
 72            # - targets
 73            logger.info("Read targets: {}", self.targets)
 74            targets_ = self.targets.read()  # unchecked!
 75            targets = schemas.TargetsSchema.check(targets_)
 76            logger.debug("- Targets shape: {}", targets.shape)
 77            # lineage
 78            # - inputs
 79            logger.info("Log lineage: inputs")
 80            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 81            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 82            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 83            # - targets
 84            logger.info("Log lineage: targets")
 85            targets_lineage = self.targets.lineage(
 86                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 87            )
 88            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 89            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 90            # model
 91            logger.info("With model: {}", self.model)
 92            # metric
 93            logger.info("With metric: {}", self.metric)
 94            # splitter
 95            logger.info("With splitter: {}", self.splitter)
 96            # searcher
 97            logger.info("Run searcher: {}", self.searcher)
 98            results, best_score, best_params = self.searcher.search(
 99                model=self.model,
100                metric=self.metric,
101                inputs=inputs,
102                targets=targets,
103                cv=self.splitter,
104            )
105            logger.debug("- Results: {}", results.shape)
106            logger.debug("- Best Score: {}", best_score)
107            logger.debug("- Best Params: {}", best_params)
108            # notify
109            self.alerts_service.notify(
110                title="Tuning Job Finished", message=f"Best score: {best_score}"
111            )
112        return locals()

Find the best hyperparameters for a model.

Arguments:
  • run_config (services.MlflowService.RunConfig): mlflow run config.
  • inputs (datasets.ReaderKind): reader for the inputs data.
  • targets (datasets.ReaderKind): reader for the targets data.
  • model (models.ModelKind): machine learning model to tune.
  • metric (metrics.MetricKind): tuning metric to optimize.
  • splitter (splitters.SplitterKind): data sets splitter.
  • searcher: (searchers.SearcherKind): hparams searcher.
KIND: Literal['TuningJob']
@T.override
def run(self) -> Dict[str, Any]:
 57    @T.override
 58    def run(self) -> base.Locals:
 59        """Run the tuning job in context."""
 60        # services
 61        # - logger
 62        logger = self.logger_service.logger()
 63        logger.info("With logger: {}", logger)
 64        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 65            logger.info("With run context: {}", run.info)
 66            # data
 67            # - inputs
 68            logger.info("Read inputs: {}", self.inputs)
 69            inputs_ = self.inputs.read()  # unchecked!
 70            inputs = schemas.InputsSchema.check(inputs_)
 71            logger.debug("- Inputs shape: {}", inputs.shape)
 72            # - targets
 73            logger.info("Read targets: {}", self.targets)
 74            targets_ = self.targets.read()  # unchecked!
 75            targets = schemas.TargetsSchema.check(targets_)
 76            logger.debug("- Targets shape: {}", targets.shape)
 77            # lineage
 78            # - inputs
 79            logger.info("Log lineage: inputs")
 80            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 81            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 82            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 83            # - targets
 84            logger.info("Log lineage: targets")
 85            targets_lineage = self.targets.lineage(
 86                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 87            )
 88            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 89            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 90            # model
 91            logger.info("With model: {}", self.model)
 92            # metric
 93            logger.info("With metric: {}", self.metric)
 94            # splitter
 95            logger.info("With splitter: {}", self.splitter)
 96            # searcher
 97            logger.info("Run searcher: {}", self.searcher)
 98            results, best_score, best_params = self.searcher.search(
 99                model=self.model,
100                metric=self.metric,
101                inputs=inputs,
102                targets=targets,
103                cv=self.splitter,
104            )
105            logger.debug("- Results: {}", results.shape)
106            logger.debug("- Best Score: {}", best_score)
107            logger.debug("- Best Params: {}", best_params)
108            # notify
109            self.alerts_service.notify(
110                title="Tuning Job Finished", message=f"Best score: {best_score}"
111            )
112        return locals()

Run the tuning job in context.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class TrainingJob(bikes.jobs.base.Job):
 20class TrainingJob(base.Job):
 21    """Train and register a single AI/ML model.
 22
 23    Parameters:
 24        run_config (services.MlflowService.RunConfig): mlflow run config.
 25        inputs (datasets.ReaderKind): reader for the inputs data.
 26        targets (datasets.ReaderKind): reader for the targets data.
 27        model (models.ModelKind): machine learning model to train.
 28        metrics (metrics_.MetricsKind): metric list to compute.
 29        splitter (splitters.SplitterKind): data sets splitter.
 30        saver (registries.SaverKind): model saver.
 31        signer (signers.SignerKind): model signer.
 32        registry (registries.RegisterKind): model register.
 33    """
 34
 35    KIND: T.Literal["TrainingJob"] = "TrainingJob"
 36
 37    # Run
 38    run_config: services.MlflowService.RunConfig = services.MlflowService.RunConfig(name="Training")
 39    # Data
 40    inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 41    targets: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 42    # Model
 43    model: models.ModelKind = pdt.Field(models.BaselineSklearnModel(), discriminator="KIND")
 44    # Metrics
 45    metrics: metrics_.MetricsKind = [metrics_.SklearnMetric()]
 46    # Splitter
 47    splitter: splitters.SplitterKind = pdt.Field(
 48        splitters.TrainTestSplitter(), discriminator="KIND"
 49    )
 50    # Saver
 51    saver: registries.SaverKind = pdt.Field(registries.CustomSaver(), discriminator="KIND")
 52    # Signer
 53    signer: signers.SignerKind = pdt.Field(signers.InferSigner(), discriminator="KIND")
 54    # Registrer
 55    # - avoid shadowing pydantic `register` pydantic function
 56    registry: registries.RegisterKind = pdt.Field(registries.MlflowRegister(), discriminator="KIND")
 57
 58    @T.override
 59    def run(self) -> base.Locals:
 60        # services
 61        # - logger
 62        logger = self.logger_service.logger()
 63        logger.info("With logger: {}", logger)
 64        # - mlflow
 65        client = self.mlflow_service.client()
 66        logger.info("With client: {}", client.tracking_uri)
 67        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 68            logger.info("With run context: {}", run.info)
 69            # data
 70            # - inputs
 71            logger.info("Read inputs: {}", self.inputs)
 72            inputs_ = self.inputs.read()  # unchecked!
 73            inputs = schemas.InputsSchema.check(inputs_)
 74            logger.debug("- Inputs shape: {}", inputs.shape)
 75            # - targets
 76            logger.info("Read targets: {}", self.targets)
 77            targets_ = self.targets.read()  # unchecked!
 78            targets = schemas.TargetsSchema.check(targets_)
 79            logger.debug("- Targets shape: {}", targets.shape)
 80            # lineage
 81            # - inputs
 82            logger.info("Log lineage: inputs")
 83            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 84            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 85            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 86            # - targets
 87            logger.info("Log lineage: targets")
 88            targets_lineage = self.targets.lineage(
 89                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 90            )
 91            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 92            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 93            # splitter
 94            logger.info("With splitter: {}", self.splitter)
 95            # - index
 96            train_index, test_index = next(self.splitter.split(inputs=inputs, targets=targets))
 97            # - inputs
 98            inputs_train = T.cast(schemas.Inputs, inputs.iloc[train_index])
 99            inputs_test = T.cast(schemas.Inputs, inputs.iloc[test_index])
100            logger.debug("- Inputs train shape: {}", inputs_train.shape)
101            logger.debug("- Inputs test shape: {}", inputs_test.shape)
102            # - targets
103            targets_train = T.cast(schemas.Targets, targets.iloc[train_index])
104            targets_test = T.cast(schemas.Targets, targets.iloc[test_index])
105            logger.debug("- Targets train shape: {}", targets_train.shape)
106            logger.debug("- Targets test shape: {}", targets_test.shape)
107            # model
108            logger.info("Fit model: {}", self.model)
109            self.model.fit(inputs=inputs_train, targets=targets_train)
110            # outputs
111            logger.info("Predict outputs: {}", len(inputs_test))
112            outputs_test = self.model.predict(inputs=inputs_test)
113            logger.debug("- Outputs test shape: {}", outputs_test.shape)
114            # metrics
115            for i, metric in enumerate(self.metrics, start=1):
116                logger.info("{}. Compute metric: {}", i, metric)
117                score = metric.score(targets=targets_test, outputs=outputs_test)
118                client.log_metric(run_id=run.info.run_id, key=metric.name, value=score)
119                logger.debug("- Metric score: {}", score)
120            # signer
121            logger.info("Sign model: {}", self.signer)
122            model_signature = self.signer.sign(inputs=inputs, outputs=outputs_test)
123            logger.debug("- Model signature: {}", model_signature.to_dict())
124            # saver
125            logger.info("Save model: {}", self.saver)
126            model_info = self.saver.save(
127                model=self.model, signature=model_signature, input_example=inputs
128            )
129            logger.debug("- Model URI: {}", model_info.model_uri)
130            # register
131            logger.info("Register model: {}", self.registry)
132            model_version = self.registry.register(
133                name=self.mlflow_service.registry_name, model_uri=model_info.model_uri
134            )
135            logger.debug("- Model version: {}", model_version)
136            # notify
137            self.alerts_service.notify(
138                title="Training Job Finished",
139                message=f"Model version: {model_version.version}",
140            )
141        return locals()

Train and register a single AI/ML model.

Arguments:
  • run_config (services.MlflowService.RunConfig): mlflow run config.
  • inputs (datasets.ReaderKind): reader for the inputs data.
  • targets (datasets.ReaderKind): reader for the targets data.
  • model (models.ModelKind): machine learning model to train.
  • metrics (metrics_.MetricsKind): metric list to compute.
  • splitter (splitters.SplitterKind): data sets splitter.
  • saver (registries.SaverKind): model saver.
  • signer (signers.SignerKind): model signer.
  • registry (registries.RegisterKind): model register.
KIND: Literal['TrainingJob']
metrics: list[typing.Annotated[bikes.core.metrics.SklearnMetric, FieldInfo(annotation=NoneType, required=True, discriminator='KIND')]]
@T.override
def run(self) -> Dict[str, Any]:
 58    @T.override
 59    def run(self) -> base.Locals:
 60        # services
 61        # - logger
 62        logger = self.logger_service.logger()
 63        logger.info("With logger: {}", logger)
 64        # - mlflow
 65        client = self.mlflow_service.client()
 66        logger.info("With client: {}", client.tracking_uri)
 67        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 68            logger.info("With run context: {}", run.info)
 69            # data
 70            # - inputs
 71            logger.info("Read inputs: {}", self.inputs)
 72            inputs_ = self.inputs.read()  # unchecked!
 73            inputs = schemas.InputsSchema.check(inputs_)
 74            logger.debug("- Inputs shape: {}", inputs.shape)
 75            # - targets
 76            logger.info("Read targets: {}", self.targets)
 77            targets_ = self.targets.read()  # unchecked!
 78            targets = schemas.TargetsSchema.check(targets_)
 79            logger.debug("- Targets shape: {}", targets.shape)
 80            # lineage
 81            # - inputs
 82            logger.info("Log lineage: inputs")
 83            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 84            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 85            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 86            # - targets
 87            logger.info("Log lineage: targets")
 88            targets_lineage = self.targets.lineage(
 89                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 90            )
 91            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 92            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 93            # splitter
 94            logger.info("With splitter: {}", self.splitter)
 95            # - index
 96            train_index, test_index = next(self.splitter.split(inputs=inputs, targets=targets))
 97            # - inputs
 98            inputs_train = T.cast(schemas.Inputs, inputs.iloc[train_index])
 99            inputs_test = T.cast(schemas.Inputs, inputs.iloc[test_index])
100            logger.debug("- Inputs train shape: {}", inputs_train.shape)
101            logger.debug("- Inputs test shape: {}", inputs_test.shape)
102            # - targets
103            targets_train = T.cast(schemas.Targets, targets.iloc[train_index])
104            targets_test = T.cast(schemas.Targets, targets.iloc[test_index])
105            logger.debug("- Targets train shape: {}", targets_train.shape)
106            logger.debug("- Targets test shape: {}", targets_test.shape)
107            # model
108            logger.info("Fit model: {}", self.model)
109            self.model.fit(inputs=inputs_train, targets=targets_train)
110            # outputs
111            logger.info("Predict outputs: {}", len(inputs_test))
112            outputs_test = self.model.predict(inputs=inputs_test)
113            logger.debug("- Outputs test shape: {}", outputs_test.shape)
114            # metrics
115            for i, metric in enumerate(self.metrics, start=1):
116                logger.info("{}. Compute metric: {}", i, metric)
117                score = metric.score(targets=targets_test, outputs=outputs_test)
118                client.log_metric(run_id=run.info.run_id, key=metric.name, value=score)
119                logger.debug("- Metric score: {}", score)
120            # signer
121            logger.info("Sign model: {}", self.signer)
122            model_signature = self.signer.sign(inputs=inputs, outputs=outputs_test)
123            logger.debug("- Model signature: {}", model_signature.to_dict())
124            # saver
125            logger.info("Save model: {}", self.saver)
126            model_info = self.saver.save(
127                model=self.model, signature=model_signature, input_example=inputs
128            )
129            logger.debug("- Model URI: {}", model_info.model_uri)
130            # register
131            logger.info("Register model: {}", self.registry)
132            model_version = self.registry.register(
133                name=self.mlflow_service.registry_name, model_uri=model_info.model_uri
134            )
135            logger.debug("- Model version: {}", model_version)
136            # notify
137            self.alerts_service.notify(
138                title="Training Job Finished",
139                message=f"Model version: {model_version.version}",
140            )
141        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class PromotionJob(bikes.jobs.base.Job):
13class PromotionJob(base.Job):
14    """Define a job for promoting a registered model version with an alias.
15
16    https://mlflow.org/docs/latest/model-registry.html#concepts
17
18    Parameters:
19        alias (str): the mlflow alias to transition the registered model version.
20        version (int | None): the model version to transition (use None for latest).
21    """
22
23    KIND: T.Literal["PromotionJob"] = "PromotionJob"
24
25    alias: str = "Champion"
26    version: int | None = None
27
28    @T.override
29    def run(self) -> base.Locals:
30        # services
31        # - logger
32        logger = self.logger_service.logger()
33        logger.info("With logger: {}", logger)
34        # - mlflow
35        client = self.mlflow_service.client()
36        logger.info("With client: {}", client)
37        name = self.mlflow_service.registry_name
38        # version
39        if self.version is None:  # use the latest model version
40            version = client.search_model_versions(
41                f"name='{name}'", max_results=1, order_by=["version_number DESC"]
42            )[0].version
43        else:
44            version = self.version
45        logger.info("From version: {}", version)
46        # alias
47        logger.info("To alias: {}", self.alias)
48        # promote
49        logger.info("Promote model: {}", name)
50        client.set_registered_model_alias(name=name, alias=self.alias, version=version)
51        model_version = client.get_model_version_by_alias(name=name, alias=self.alias)
52        logger.debug("- Model version: {}", model_version)
53        # notify
54        self.alerts_service.notify(
55            title="Promotion Job Finished",
56            message=f"Version: {model_version.version} @ {self.alias}",
57        )
58        return locals()

Define a job for promoting a registered model version with an alias.

https://mlflow.org/docs/latest/model-registry.html#concepts

Arguments:
  • alias (str): the mlflow alias to transition the registered model version.
  • version (int | None): the model version to transition (use None for latest).
KIND: Literal['PromotionJob']
alias: str
version: int | None
@T.override
def run(self) -> Dict[str, Any]:
28    @T.override
29    def run(self) -> base.Locals:
30        # services
31        # - logger
32        logger = self.logger_service.logger()
33        logger.info("With logger: {}", logger)
34        # - mlflow
35        client = self.mlflow_service.client()
36        logger.info("With client: {}", client)
37        name = self.mlflow_service.registry_name
38        # version
39        if self.version is None:  # use the latest model version
40            version = client.search_model_versions(
41                f"name='{name}'", max_results=1, order_by=["version_number DESC"]
42            )[0].version
43        else:
44            version = self.version
45        logger.info("From version: {}", version)
46        # alias
47        logger.info("To alias: {}", self.alias)
48        # promote
49        logger.info("Promote model: {}", name)
50        client.set_registered_model_alias(name=name, alias=self.alias, version=version)
51        model_version = client.get_model_version_by_alias(name=name, alias=self.alias)
52        logger.debug("- Model version: {}", model_version)
53        # notify
54        self.alerts_service.notify(
55            title="Promotion Job Finished",
56            message=f"Version: {model_version.version} @ {self.alias}",
57        )
58        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class InferenceJob(bikes.jobs.base.Job):
17class InferenceJob(base.Job):
18    """Generate batch predictions from a registered model.
19
20    Parameters:
21        inputs (datasets.ReaderKind): reader for the inputs data.
22        outputs (datasets.WriterKind): writer for the outputs data.
23        alias_or_version (str | int): alias or version for the  model.
24        loader (registries.LoaderKind): registry loader for the model.
25    """
26
27    KIND: T.Literal["InferenceJob"] = "InferenceJob"
28
29    # Inputs
30    inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
31    # Outputs
32    outputs: datasets.WriterKind = pdt.Field(..., discriminator="KIND")
33    # Model
34    alias_or_version: str | int = "Champion"
35    # Loader
36    loader: registries.LoaderKind = pdt.Field(registries.CustomLoader(), discriminator="KIND")
37
38    @T.override
39    def run(self) -> base.Locals:
40        # services
41        logger = self.logger_service.logger()
42        logger.info("With logger: {}", logger)
43        # inputs
44        logger.info("Read inputs: {}", self.inputs)
45        inputs_ = self.inputs.read()  # unchecked!
46        inputs = schemas.InputsSchema.check(inputs_)
47        logger.debug("- Inputs shape: {}", inputs.shape)
48        # model
49        logger.info("With model: {}", self.mlflow_service.registry_name)
50        model_uri = registries.uri_for_model_alias_or_version(
51            name=self.mlflow_service.registry_name,
52            alias_or_version=self.alias_or_version,
53        )
54        logger.debug("- Model URI: {}", model_uri)
55        # loader
56        logger.info("Load model: {}", self.loader)
57        model = self.loader.load(uri=model_uri)
58        logger.debug("- Model: {}", model)
59        # outputs
60        logger.info("Predict outputs: {}", len(inputs))
61        outputs = model.predict(inputs=inputs)  # checked
62        logger.debug("- Outputs shape: {}", outputs.shape)
63        # write
64        logger.info("Write outputs: {}", self.outputs)
65        self.outputs.write(data=outputs)
66        # notify
67        self.alerts_service.notify(
68            title="Inference Job Finished", message=f"Outputs Shape: {outputs.shape}"
69        )
70        return locals()

Generate batch predictions from a registered model.

Arguments:
  • inputs (datasets.ReaderKind): reader for the inputs data.
  • outputs (datasets.WriterKind): writer for the outputs data.
  • alias_or_version (str | int): alias or version for the model.
  • loader (registries.LoaderKind): registry loader for the model.
KIND: Literal['InferenceJob']
alias_or_version: str | int
@T.override
def run(self) -> Dict[str, Any]:
38    @T.override
39    def run(self) -> base.Locals:
40        # services
41        logger = self.logger_service.logger()
42        logger.info("With logger: {}", logger)
43        # inputs
44        logger.info("Read inputs: {}", self.inputs)
45        inputs_ = self.inputs.read()  # unchecked!
46        inputs = schemas.InputsSchema.check(inputs_)
47        logger.debug("- Inputs shape: {}", inputs.shape)
48        # model
49        logger.info("With model: {}", self.mlflow_service.registry_name)
50        model_uri = registries.uri_for_model_alias_or_version(
51            name=self.mlflow_service.registry_name,
52            alias_or_version=self.alias_or_version,
53        )
54        logger.debug("- Model URI: {}", model_uri)
55        # loader
56        logger.info("Load model: {}", self.loader)
57        model = self.loader.load(uri=model_uri)
58        logger.debug("- Model: {}", model)
59        # outputs
60        logger.info("Predict outputs: {}", len(inputs))
61        outputs = model.predict(inputs=inputs)  # checked
62        logger.debug("- Outputs shape: {}", outputs.shape)
63        # write
64        logger.info("Write outputs: {}", self.outputs)
65        self.outputs.write(data=outputs)
66        # notify
67        self.alerts_service.notify(
68            title="Inference Job Finished", message=f"Outputs Shape: {outputs.shape}"
69        )
70        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class EvaluationsJob(bikes.jobs.base.Job):
 20class EvaluationsJob(base.Job):
 21    """Generate evaluations from a registered model and a dataset.
 22
 23    Parameters:
 24        run_config (services.MlflowService.RunConfig): mlflow run config.
 25        inputs (datasets.ReaderKind): reader for the inputs data.
 26        targets (datasets.ReaderKind): reader for the targets data.
 27        model_type (str): model type (e.g. "regressor", "classifier").
 28        alias_or_version (str | int): alias or version for the  model.
 29        metrics (metrics_.MetricsKind): metric list to compute.
 30        evaluators (list[str]): list of evaluators to use.
 31        thresholds (dict[str, metrics_.Threshold] | None): metric thresholds.
 32    """
 33
 34    KIND: T.Literal["EvaluationsJob"] = "EvaluationsJob"
 35
 36    # Run
 37    run_config: services.MlflowService.RunConfig = services.MlflowService.RunConfig(
 38        name="Evaluations"
 39    )
 40    # Data
 41    inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 42    targets: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 43    # Model
 44    model_type: str = "regressor"
 45    alias_or_version: str | int = "Champion"
 46    # Loader
 47    loader: registries.LoaderKind = pdt.Field(registries.CustomLoader(), discriminator="KIND")
 48    # Metrics
 49    metrics: metrics_.MetricsKind = [metrics_.SklearnMetric()]
 50    # Evaluators
 51    evaluators: list[str] = ["default"]
 52    # Thresholds
 53    thresholds: dict[str, metrics_.Threshold] = {
 54        "r2_score": metrics_.Threshold(threshold=0.5, greater_is_better=True)
 55    }
 56
 57    @T.override
 58    def run(self) -> base.Locals:
 59        # services
 60        # - logger
 61        logger = self.logger_service.logger()
 62        logger.info("With logger: {}", logger)
 63        # - mlflow
 64        client = self.mlflow_service.client()
 65        logger.info("With client: {}", client.tracking_uri)
 66        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 67            logger.info("With run context: {}", run.info)
 68            # data
 69            # - inputs
 70            logger.info("Read inputs: {}", self.inputs)
 71            inputs_ = self.inputs.read()  # unchecked!
 72            inputs = schemas.InputsSchema.check(inputs_)
 73            logger.debug("- Inputs shape: {}", inputs.shape)
 74            # - targets
 75            logger.info("Read targets: {}", self.targets)
 76            targets_ = self.targets.read()  # unchecked!
 77            targets = schemas.TargetsSchema.check(targets_)
 78            logger.debug("- Targets shape: {}", targets.shape)
 79            # lineage
 80            # - inputs
 81            logger.info("Log lineage: inputs")
 82            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 83            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 84            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 85            # - targets
 86            logger.info("Log lineage: targets")
 87            targets_lineage = self.targets.lineage(
 88                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 89            )
 90            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 91            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 92            # model
 93            logger.info("With model: {}", self.mlflow_service.registry_name)
 94            model_uri = registries.uri_for_model_alias_or_version(
 95                name=self.mlflow_service.registry_name,
 96                alias_or_version=self.alias_or_version,
 97            )
 98            logger.debug("- Model URI: {}", model_uri)
 99            # loader
100            logger.info("Load model: {}", self.loader)
101            model = self.loader.load(uri=model_uri)
102            logger.debug("- Model: {}", model)
103            # outputs
104            logger.info("Predict outputs: {}", len(inputs))
105            outputs = model.predict(inputs=inputs)  # checked
106            logger.debug("- Outputs shape: {}", outputs.shape)
107            # dataset
108            logger.info("Create dataset: inputs & targets & outputs")
109            dataset_ = pd.concat([inputs, targets, outputs], axis="columns")
110            dataset = mlflow.data.from_pandas(  # type: ignore[attr-defined]
111                df=dataset_,
112                name="evaluation",
113                targets=schemas.TargetsSchema.cnt,
114                predictions=schemas.OutputsSchema.prediction,
115            )
116            logger.debug("- Dataset: {}", dataset.to_dict())
117            # metrics
118            logger.debug("Convert metrics: {}", self.metrics)
119            extra_metrics = [metric.to_mlflow() for metric in self.metrics]
120            logger.debug("- Extra metrics: {}", extra_metrics)
121            # thresholds
122            logger.info("Convert thresholds: {}", self.thresholds)
123            validation_thresholds = {
124                name: threshold.to_mlflow() for name, threshold in self.thresholds.items()
125            }
126            logger.debug("- Validation thresholds: {}", validation_thresholds)
127            # evaluations
128            logger.info("Compute evaluations: {}", self.model_type)
129            evaluations = mlflow.evaluate(
130                data=dataset,
131                model_type=self.model_type,
132                evaluators=self.evaluators,
133                extra_metrics=extra_metrics,
134                validation_thresholds=validation_thresholds,
135            )
136            logger.debug("- Evaluations metrics: {}", evaluations.metrics)
137            # notify
138            self.alerts_service.notify(
139                title="Evaluations Job Finished",
140                message=f"Evaluation metrics: {evaluations.metrics}",
141            )
142        return locals()

Generate evaluations from a registered model and a dataset.

Arguments:
  • run_config (services.MlflowService.RunConfig): mlflow run config.
  • inputs (datasets.ReaderKind): reader for the inputs data.
  • targets (datasets.ReaderKind): reader for the targets data.
  • model_type (str): model type (e.g. "regressor", "classifier").
  • alias_or_version (str | int): alias or version for the model.
  • metrics (metrics_.MetricsKind): metric list to compute.
  • evaluators (list[str]): list of evaluators to use.
  • thresholds (dict[str, metrics_.Threshold] | None): metric thresholds.
KIND: Literal['EvaluationsJob']
model_type: str
alias_or_version: str | int
metrics: list[typing.Annotated[bikes.core.metrics.SklearnMetric, FieldInfo(annotation=NoneType, required=True, discriminator='KIND')]]
evaluators: list[str]
thresholds: dict[str, bikes.core.metrics.Threshold]
@T.override
def run(self) -> Dict[str, Any]:
 57    @T.override
 58    def run(self) -> base.Locals:
 59        # services
 60        # - logger
 61        logger = self.logger_service.logger()
 62        logger.info("With logger: {}", logger)
 63        # - mlflow
 64        client = self.mlflow_service.client()
 65        logger.info("With client: {}", client.tracking_uri)
 66        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 67            logger.info("With run context: {}", run.info)
 68            # data
 69            # - inputs
 70            logger.info("Read inputs: {}", self.inputs)
 71            inputs_ = self.inputs.read()  # unchecked!
 72            inputs = schemas.InputsSchema.check(inputs_)
 73            logger.debug("- Inputs shape: {}", inputs.shape)
 74            # - targets
 75            logger.info("Read targets: {}", self.targets)
 76            targets_ = self.targets.read()  # unchecked!
 77            targets = schemas.TargetsSchema.check(targets_)
 78            logger.debug("- Targets shape: {}", targets.shape)
 79            # lineage
 80            # - inputs
 81            logger.info("Log lineage: inputs")
 82            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 83            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 84            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 85            # - targets
 86            logger.info("Log lineage: targets")
 87            targets_lineage = self.targets.lineage(
 88                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 89            )
 90            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 91            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 92            # model
 93            logger.info("With model: {}", self.mlflow_service.registry_name)
 94            model_uri = registries.uri_for_model_alias_or_version(
 95                name=self.mlflow_service.registry_name,
 96                alias_or_version=self.alias_or_version,
 97            )
 98            logger.debug("- Model URI: {}", model_uri)
 99            # loader
100            logger.info("Load model: {}", self.loader)
101            model = self.loader.load(uri=model_uri)
102            logger.debug("- Model: {}", model)
103            # outputs
104            logger.info("Predict outputs: {}", len(inputs))
105            outputs = model.predict(inputs=inputs)  # checked
106            logger.debug("- Outputs shape: {}", outputs.shape)
107            # dataset
108            logger.info("Create dataset: inputs & targets & outputs")
109            dataset_ = pd.concat([inputs, targets, outputs], axis="columns")
110            dataset = mlflow.data.from_pandas(  # type: ignore[attr-defined]
111                df=dataset_,
112                name="evaluation",
113                targets=schemas.TargetsSchema.cnt,
114                predictions=schemas.OutputsSchema.prediction,
115            )
116            logger.debug("- Dataset: {}", dataset.to_dict())
117            # metrics
118            logger.debug("Convert metrics: {}", self.metrics)
119            extra_metrics = [metric.to_mlflow() for metric in self.metrics]
120            logger.debug("- Extra metrics: {}", extra_metrics)
121            # thresholds
122            logger.info("Convert thresholds: {}", self.thresholds)
123            validation_thresholds = {
124                name: threshold.to_mlflow() for name, threshold in self.thresholds.items()
125            }
126            logger.debug("- Validation thresholds: {}", validation_thresholds)
127            # evaluations
128            logger.info("Compute evaluations: {}", self.model_type)
129            evaluations = mlflow.evaluate(
130                data=dataset,
131                model_type=self.model_type,
132                evaluators=self.evaluators,
133                extra_metrics=extra_metrics,
134                validation_thresholds=validation_thresholds,
135            )
136            logger.debug("- Evaluations metrics: {}", evaluations.metrics)
137            # notify
138            self.alerts_service.notify(
139                title="Evaluations Job Finished",
140                message=f"Evaluation metrics: {evaluations.metrics}",
141            )
142        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ExplanationsJob(bikes.jobs.base.Job):
17class ExplanationsJob(base.Job):
18    """Generate explanations from the model and a data sample.
19
20    Parameters:
21        inputs_samples (datasets.ReaderKind): reader for the samples data.
22        models_explanations (datasets.WriterKind): writer for models explanation.
23        samples_explanations (datasets.WriterKind): writer for samples explanation.
24        alias_or_version (str | int): alias or version for the  model.
25        loader (registries.LoaderKind): registry loader for the model.
26    """
27
28    KIND: T.Literal["ExplanationsJob"] = "ExplanationsJob"
29
30    # Samples
31    inputs_samples: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
32    # Explanations
33    models_explanations: datasets.WriterKind = pdt.Field(..., discriminator="KIND")
34    samples_explanations: datasets.WriterKind = pdt.Field(..., discriminator="KIND")
35    # Model
36    alias_or_version: str | int = "Champion"
37    # Loader
38    loader: registries.LoaderKind = pdt.Field(registries.CustomLoader(), discriminator="KIND")
39
40    @T.override
41    def run(self) -> base.Locals:
42        # services
43        logger = self.logger_service.logger()
44        logger.info("With logger: {}", logger)
45        # inputs
46        logger.info("Read samples: {}", self.inputs_samples)
47        inputs_samples = self.inputs_samples.read()  # unchecked!
48        inputs_samples = schemas.InputsSchema.check(inputs_samples)
49        logger.debug("- Inputs samples shape: {}", inputs_samples.shape)
50        # model
51        logger.info("With model: {}", self.mlflow_service.registry_name)
52        model_uri = registries.uri_for_model_alias_or_version(
53            name=self.mlflow_service.registry_name,
54            alias_or_version=self.alias_or_version,
55        )
56        logger.debug("- Model URI: {}", model_uri)
57        # loader
58        logger.info("Load model: {}", self.loader)
59        model = self.loader.load(uri=model_uri).model.unwrap_python_model().model
60        logger.debug("- Model: {}", model)
61        # explanations
62        # - models
63        logger.info("Explain model: {}", model)
64        models_explanations = model.explain_model()
65        logger.debug("- Models explanations shape: {}", models_explanations.shape)
66        # # - samples
67        logger.info("Explain samples: {}", len(inputs_samples))
68        samples_explanations = model.explain_samples(inputs=inputs_samples)
69        logger.debug("- Samples explanations shape: {}", samples_explanations.shape)
70        # write
71        # - model
72        logger.info("Write models explanations: {}", self.models_explanations)
73        self.models_explanations.write(data=models_explanations)
74        # - samples
75        logger.info("Write samples explanations: {}", self.samples_explanations)
76        self.samples_explanations.write(data=samples_explanations)
77        # notify
78        self.alerts_service.notify(
79            title="Explanations Job Finished",
80            message=f"Features Count: {len(models_explanations)}",
81        )
82        return locals()

Generate explanations from the model and a data sample.

Arguments:
  • inputs_samples (datasets.ReaderKind): reader for the samples data.
  • models_explanations (datasets.WriterKind): writer for models explanation.
  • samples_explanations (datasets.WriterKind): writer for samples explanation.
  • alias_or_version (str | int): alias or version for the model.
  • loader (registries.LoaderKind): registry loader for the model.
KIND: Literal['ExplanationsJob']
models_explanations: bikes.io.datasets.ParquetWriter
samples_explanations: bikes.io.datasets.ParquetWriter
alias_or_version: str | int
@T.override
def run(self) -> Dict[str, Any]:
40    @T.override
41    def run(self) -> base.Locals:
42        # services
43        logger = self.logger_service.logger()
44        logger.info("With logger: {}", logger)
45        # inputs
46        logger.info("Read samples: {}", self.inputs_samples)
47        inputs_samples = self.inputs_samples.read()  # unchecked!
48        inputs_samples = schemas.InputsSchema.check(inputs_samples)
49        logger.debug("- Inputs samples shape: {}", inputs_samples.shape)
50        # model
51        logger.info("With model: {}", self.mlflow_service.registry_name)
52        model_uri = registries.uri_for_model_alias_or_version(
53            name=self.mlflow_service.registry_name,
54            alias_or_version=self.alias_or_version,
55        )
56        logger.debug("- Model URI: {}", model_uri)
57        # loader
58        logger.info("Load model: {}", self.loader)
59        model = self.loader.load(uri=model_uri).model.unwrap_python_model().model
60        logger.debug("- Model: {}", model)
61        # explanations
62        # - models
63        logger.info("Explain model: {}", model)
64        models_explanations = model.explain_model()
65        logger.debug("- Models explanations shape: {}", models_explanations.shape)
66        # # - samples
67        logger.info("Explain samples: {}", len(inputs_samples))
68        samples_explanations = model.explain_samples(inputs=inputs_samples)
69        logger.debug("- Samples explanations shape: {}", samples_explanations.shape)
70        # write
71        # - model
72        logger.info("Write models explanations: {}", self.models_explanations)
73        self.models_explanations.write(data=models_explanations)
74        # - samples
75        logger.info("Write samples explanations: {}", self.samples_explanations)
76        self.samples_explanations.write(data=samples_explanations)
77        # notify
78        self.alerts_service.notify(
79            title="Explanations Job Finished",
80            message=f"Features Count: {len(models_explanations)}",
81        )
82        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].