bikes.jobs

High-level jobs of the project.

 1"""High-level jobs of the project."""
 2
 3# %% IMPORTS
 4
 5from bikes.jobs.evaluations import EvaluationsJob
 6from bikes.jobs.explanations import ExplanationsJob
 7from bikes.jobs.inference import InferenceJob
 8from bikes.jobs.promotion import PromotionJob
 9from bikes.jobs.training import TrainingJob
10from bikes.jobs.tuning import TuningJob
11
12# %% TYPES
13
14JobKind = TuningJob | TrainingJob | PromotionJob | InferenceJob | EvaluationsJob | ExplanationsJob
15
16# %% EXPORTS
17
18__all__ = [
19    "TuningJob",
20    "TrainingJob",
21    "PromotionJob",
22    "InferenceJob",
23    "EvaluationsJob",
24    "ExplanationsJob",
25    "JobKind",
26]
class TuningJob(bikes.jobs.base.Job):
 19class TuningJob(base.Job):
 20    """Find the best hyperparameters for a model.
 21
 22    Parameters:
 23        run_config (services.MlflowService.RunConfig): mlflow run config.
 24        inputs (datasets.ReaderKind): reader for the inputs data.
 25        targets (datasets.ReaderKind): reader for the targets data.
 26        model (models.ModelKind): machine learning model to tune.
 27        metric (metrics.MetricKind): tuning metric to optimize.
 28        splitter (splitters.SplitterKind): data sets splitter.
 29        searcher: (searchers.SearcherKind): hparams searcher.
 30    """
 31
 32    KIND: T.Literal["TuningJob"] = "TuningJob"
 33
 34    # Run
 35    run_config: services.MlflowService.RunConfig = services.MlflowService.RunConfig(name="Tuning")
 36    # Data
 37    inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 38    targets: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 39    # Model
 40    model: models.ModelKind = pdt.Field(models.BaselineSklearnModel(), discriminator="KIND")
 41    # Metric
 42    metric: metrics.MetricKind = pdt.Field(metrics.SklearnMetric(), discriminator="KIND")
 43    # splitter
 44    splitter: splitters.SplitterKind = pdt.Field(
 45        splitters.TimeSeriesSplitter(), discriminator="KIND"
 46    )
 47    # Searcher
 48    searcher: searchers.SearcherKind = pdt.Field(
 49        searchers.GridCVSearcher(
 50            param_grid={
 51                "max_depth": [3, 5, 7],
 52            }
 53        ),
 54        discriminator="KIND",
 55    )
 56
 57    @T.override
 58    def run(self) -> base.Locals:
 59        """Run the tuning job in context."""
 60        # services
 61        # - logger
 62        logger = self.logger_service.logger()
 63        logger.info("With logger: {}", logger)
 64        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 65            logger.info("With run context: {}", run.info)
 66            # data
 67            # - inputs
 68            logger.info("Read inputs: {}", self.inputs)
 69            inputs_ = self.inputs.read()  # unchecked!
 70            inputs = schemas.InputsSchema.check(inputs_)
 71            logger.debug("- Inputs shape: {}", inputs.shape)
 72            # - targets
 73            logger.info("Read targets: {}", self.targets)
 74            targets_ = self.targets.read()  # unchecked!
 75            targets = schemas.TargetsSchema.check(targets_)
 76            logger.debug("- Targets shape: {}", targets.shape)
 77            # lineage
 78            # - inputs
 79            logger.info("Log lineage: inputs")
 80            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 81            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 82            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 83            # - targets
 84            logger.info("Log lineage: targets")
 85            targets_lineage = self.targets.lineage(
 86                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 87            )
 88            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 89            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 90            # model
 91            logger.info("With model: {}", self.model)
 92            # metric
 93            logger.info("With metric: {}", self.metric)
 94            # splitter
 95            logger.info("With splitter: {}", self.splitter)
 96            # searcher
 97            logger.info("Run searcher: {}", self.searcher)
 98            results, best_score, best_params = self.searcher.search(
 99                model=self.model,
100                metric=self.metric,
101                inputs=inputs,
102                targets=targets,
103                cv=self.splitter,
104            )
105            logger.debug("- Results: {}", results.shape)
106            logger.debug("- Best Score: {}", best_score)
107            logger.debug("- Best Params: {}", best_params)
108            # notify
109            self.alerts_service.notify(
110                title="Tuning Job Finished", message=f"Best score: {best_score}"
111            )
112        return locals()

Find the best hyperparameters for a model.

Arguments:
  • run_config (services.MlflowService.RunConfig): mlflow run config.
  • inputs (datasets.ReaderKind): reader for the inputs data.
  • targets (datasets.ReaderKind): reader for the targets data.
  • model (models.ModelKind): machine learning model to tune.
  • metric (metrics.MetricKind): tuning metric to optimize.
  • splitter (splitters.SplitterKind): data sets splitter.
  • searcher: (searchers.SearcherKind): hparams searcher.
KIND: Literal['TuningJob']
@T.override
def run(self) -> Dict[str, Any]:
 57    @T.override
 58    def run(self) -> base.Locals:
 59        """Run the tuning job in context."""
 60        # services
 61        # - logger
 62        logger = self.logger_service.logger()
 63        logger.info("With logger: {}", logger)
 64        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 65            logger.info("With run context: {}", run.info)
 66            # data
 67            # - inputs
 68            logger.info("Read inputs: {}", self.inputs)
 69            inputs_ = self.inputs.read()  # unchecked!
 70            inputs = schemas.InputsSchema.check(inputs_)
 71            logger.debug("- Inputs shape: {}", inputs.shape)
 72            # - targets
 73            logger.info("Read targets: {}", self.targets)
 74            targets_ = self.targets.read()  # unchecked!
 75            targets = schemas.TargetsSchema.check(targets_)
 76            logger.debug("- Targets shape: {}", targets.shape)
 77            # lineage
 78            # - inputs
 79            logger.info("Log lineage: inputs")
 80            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 81            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 82            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 83            # - targets
 84            logger.info("Log lineage: targets")
 85            targets_lineage = self.targets.lineage(
 86                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 87            )
 88            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 89            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 90            # model
 91            logger.info("With model: {}", self.model)
 92            # metric
 93            logger.info("With metric: {}", self.metric)
 94            # splitter
 95            logger.info("With splitter: {}", self.splitter)
 96            # searcher
 97            logger.info("Run searcher: {}", self.searcher)
 98            results, best_score, best_params = self.searcher.search(
 99                model=self.model,
100                metric=self.metric,
101                inputs=inputs,
102                targets=targets,
103                cv=self.splitter,
104            )
105            logger.debug("- Results: {}", results.shape)
106            logger.debug("- Best Score: {}", best_score)
107            logger.debug("- Best Params: {}", best_params)
108            # notify
109            self.alerts_service.notify(
110                title="Tuning Job Finished", message=f"Best score: {best_score}"
111            )
112        return locals()

Run the tuning job in context.

model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=Literal['TuningJob'], required=False, default='TuningJob'), 'logger_service': FieldInfo(annotation=LoggerService, required=False, default=LoggerService(sink='stderr', level='DEBUG', format='<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green><level>[{level}]</level><cyan>[{name}:{function}:{line}]</cyan> <level>{message}</level>', colorize=True, serialize=False, backtrace=True, diagnose=False, catch=True)), 'alerts_service': FieldInfo(annotation=AlertsService, required=False, default=AlertsService(enable=True, app_name='Bikes', timeout=None)), 'mlflow_service': FieldInfo(annotation=MlflowService, required=False, default=MlflowService(tracking_uri='./mlruns', registry_uri='./mlruns', experiment_name='bikes', registry_name='bikes', autolog_disable=False, autolog_disable_for_unsupported_versions=False, autolog_exclusive=False, autolog_log_input_examples=True, autolog_log_model_signatures=True, autolog_log_models=False, autolog_log_datasets=False, autolog_silent=False)), 'run_config': FieldInfo(annotation=MlflowService.RunConfig, required=False, default=RunConfig(name='Tuning', description=None, tags=None, log_system_metrics=True)), 'inputs': FieldInfo(annotation=ParquetReader, required=True, discriminator='KIND'), 'targets': FieldInfo(annotation=ParquetReader, required=True, discriminator='KIND'), 'model': FieldInfo(annotation=BaselineSklearnModel, required=False, default=BaselineSklearnModel(KIND='BaselineSklearnModel', max_depth=20, n_estimators=200, random_state=42), discriminator='KIND'), 'metric': FieldInfo(annotation=SklearnMetric, required=False, default=SklearnMetric(KIND='SklearnMetric', name='mean_squared_error', greater_is_better=False), discriminator='KIND'), 'splitter': FieldInfo(annotation=Union[TrainTestSplitter, TimeSeriesSplitter], required=False, default=TimeSeriesSplitter(KIND='TimeSeriesSplitter', gap=0, n_splits=4, test_size=1440), discriminator='KIND'), 'searcher': FieldInfo(annotation=GridCVSearcher, required=False, default=GridCVSearcher(KIND='GridCVSearcher', param_grid={'max_depth': [3, 5, 7]}, n_jobs=None, refit=True, verbose=3, error_score='raise', return_train_score=False), discriminator='KIND')}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
bikes.jobs.base.Job
logger_service
alerts_service
mlflow_service
class TrainingJob(bikes.jobs.base.Job):
 20class TrainingJob(base.Job):
 21    """Train and register a single AI/ML model.
 22
 23    Parameters:
 24        run_config (services.MlflowService.RunConfig): mlflow run config.
 25        inputs (datasets.ReaderKind): reader for the inputs data.
 26        targets (datasets.ReaderKind): reader for the targets data.
 27        model (models.ModelKind): machine learning model to train.
 28        metrics (metrics_.MetricKind): metrics for the reporting.
 29        splitter (splitters.SplitterKind): data sets splitter.
 30        saver (registries.SaverKind): model saver.
 31        signer (signers.SignerKind): model signer.
 32        registry (registries.RegisterKind): model register.
 33    """
 34
 35    KIND: T.Literal["TrainingJob"] = "TrainingJob"
 36
 37    # Run
 38    run_config: services.MlflowService.RunConfig = services.MlflowService.RunConfig(name="Training")
 39    # Data
 40    inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 41    targets: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 42    # Model
 43    model: models.ModelKind = pdt.Field(models.BaselineSklearnModel(), discriminator="KIND")
 44    # Metrics
 45    metrics: list[metrics_.MetricKind] = pdt.Field([metrics_.SklearnMetric()], discriminator="KIND")
 46    # Splitter
 47    splitter: splitters.SplitterKind = pdt.Field(
 48        splitters.TrainTestSplitter(), discriminator="KIND"
 49    )
 50    # Saver
 51    saver: registries.SaverKind = pdt.Field(registries.CustomSaver(), discriminator="KIND")
 52    # Signer
 53    signer: signers.SignerKind = pdt.Field(signers.InferSigner(), discriminator="KIND")
 54    # Registrer
 55    # - avoid shadowing pydantic `register` pydantic function
 56    registry: registries.RegisterKind = pdt.Field(registries.MlflowRegister(), discriminator="KIND")
 57
 58    @T.override
 59    def run(self) -> base.Locals:
 60        # services
 61        # - logger
 62        logger = self.logger_service.logger()
 63        logger.info("With logger: {}", logger)
 64        # - mlflow
 65        client = self.mlflow_service.client()
 66        logger.info("With client: {}", client.tracking_uri)
 67        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 68            logger.info("With run context: {}", run.info)
 69            # data
 70            # - inputs
 71            logger.info("Read inputs: {}", self.inputs)
 72            inputs_ = self.inputs.read()  # unchecked!
 73            inputs = schemas.InputsSchema.check(inputs_)
 74            logger.debug("- Inputs shape: {}", inputs.shape)
 75            # - targets
 76            logger.info("Read targets: {}", self.targets)
 77            targets_ = self.targets.read()  # unchecked!
 78            targets = schemas.TargetsSchema.check(targets_)
 79            logger.debug("- Targets shape: {}", targets.shape)
 80            # lineage
 81            # - inputs
 82            logger.info("Log lineage: inputs")
 83            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 84            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 85            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 86            # - targets
 87            logger.info("Log lineage: targets")
 88            targets_lineage = self.targets.lineage(
 89                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 90            )
 91            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 92            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 93            # splitter
 94            logger.info("With splitter: {}", self.splitter)
 95            # - index
 96            train_index, test_index = next(self.splitter.split(inputs=inputs, targets=targets))
 97            # - inputs
 98            inputs_train = T.cast(schemas.Inputs, inputs.iloc[train_index])
 99            inputs_test = T.cast(schemas.Inputs, inputs.iloc[test_index])
100            logger.debug("- Inputs train shape: {}", inputs_train.shape)
101            logger.debug("- Inputs test shape: {}", inputs_test.shape)
102            # - targets
103            targets_train = T.cast(schemas.Targets, targets.iloc[train_index])
104            targets_test = T.cast(schemas.Targets, targets.iloc[test_index])
105            logger.debug("- Targets train shape: {}", targets_train.shape)
106            logger.debug("- Targets test shape: {}", targets_test.shape)
107            # model
108            logger.info("Fit model: {}", self.model)
109            self.model.fit(inputs=inputs_train, targets=targets_train)
110            # outputs
111            logger.info("Predict outputs: {}", len(inputs_test))
112            outputs_test = self.model.predict(inputs=inputs_test)
113            logger.debug("- Outputs test shape: {}", outputs_test.shape)
114            # metrics
115            for i, metric in enumerate(self.metrics, start=1):
116                logger.info("{}. Compute metric: {}", i, metric)
117                score = metric.score(targets=targets_test, outputs=outputs_test)
118                client.log_metric(run_id=run.info.run_id, key=metric.name, value=score)
119                logger.debug("- Metric score: {}", score)
120            # signer
121            logger.info("Sign model: {}", self.signer)
122            model_signature = self.signer.sign(inputs=inputs, outputs=outputs_test)
123            logger.debug("- Model signature: {}", model_signature.to_dict())
124            # saver
125            logger.info("Save model: {}", self.saver)
126            model_info = self.saver.save(
127                model=self.model, signature=model_signature, input_example=inputs
128            )
129            logger.debug("- Model URI: {}", model_info.model_uri)
130            # register
131            logger.info("Register model: {}", self.registry)
132            model_version = self.registry.register(
133                name=self.mlflow_service.registry_name, model_uri=model_info.model_uri
134            )
135            logger.debug("- Model version: {}", model_version)
136            # notify
137            self.alerts_service.notify(
138                title="Training Job Finished", message=f"Model version: {model_version.version}"
139            )
140        return locals()

Train and register a single AI/ML model.

Arguments:
  • run_config (services.MlflowService.RunConfig): mlflow run config.
  • inputs (datasets.ReaderKind): reader for the inputs data.
  • targets (datasets.ReaderKind): reader for the targets data.
  • model (models.ModelKind): machine learning model to train.
  • metrics (metrics_.MetricKind): metrics for the reporting.
  • splitter (splitters.SplitterKind): data sets splitter.
  • saver (registries.SaverKind): model saver.
  • signer (signers.SignerKind): model signer.
  • registry (registries.RegisterKind): model register.
KIND: Literal['TrainingJob']
@T.override
def run(self) -> Dict[str, Any]:
 58    @T.override
 59    def run(self) -> base.Locals:
 60        # services
 61        # - logger
 62        logger = self.logger_service.logger()
 63        logger.info("With logger: {}", logger)
 64        # - mlflow
 65        client = self.mlflow_service.client()
 66        logger.info("With client: {}", client.tracking_uri)
 67        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 68            logger.info("With run context: {}", run.info)
 69            # data
 70            # - inputs
 71            logger.info("Read inputs: {}", self.inputs)
 72            inputs_ = self.inputs.read()  # unchecked!
 73            inputs = schemas.InputsSchema.check(inputs_)
 74            logger.debug("- Inputs shape: {}", inputs.shape)
 75            # - targets
 76            logger.info("Read targets: {}", self.targets)
 77            targets_ = self.targets.read()  # unchecked!
 78            targets = schemas.TargetsSchema.check(targets_)
 79            logger.debug("- Targets shape: {}", targets.shape)
 80            # lineage
 81            # - inputs
 82            logger.info("Log lineage: inputs")
 83            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 84            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 85            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 86            # - targets
 87            logger.info("Log lineage: targets")
 88            targets_lineage = self.targets.lineage(
 89                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 90            )
 91            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 92            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 93            # splitter
 94            logger.info("With splitter: {}", self.splitter)
 95            # - index
 96            train_index, test_index = next(self.splitter.split(inputs=inputs, targets=targets))
 97            # - inputs
 98            inputs_train = T.cast(schemas.Inputs, inputs.iloc[train_index])
 99            inputs_test = T.cast(schemas.Inputs, inputs.iloc[test_index])
100            logger.debug("- Inputs train shape: {}", inputs_train.shape)
101            logger.debug("- Inputs test shape: {}", inputs_test.shape)
102            # - targets
103            targets_train = T.cast(schemas.Targets, targets.iloc[train_index])
104            targets_test = T.cast(schemas.Targets, targets.iloc[test_index])
105            logger.debug("- Targets train shape: {}", targets_train.shape)
106            logger.debug("- Targets test shape: {}", targets_test.shape)
107            # model
108            logger.info("Fit model: {}", self.model)
109            self.model.fit(inputs=inputs_train, targets=targets_train)
110            # outputs
111            logger.info("Predict outputs: {}", len(inputs_test))
112            outputs_test = self.model.predict(inputs=inputs_test)
113            logger.debug("- Outputs test shape: {}", outputs_test.shape)
114            # metrics
115            for i, metric in enumerate(self.metrics, start=1):
116                logger.info("{}. Compute metric: {}", i, metric)
117                score = metric.score(targets=targets_test, outputs=outputs_test)
118                client.log_metric(run_id=run.info.run_id, key=metric.name, value=score)
119                logger.debug("- Metric score: {}", score)
120            # signer
121            logger.info("Sign model: {}", self.signer)
122            model_signature = self.signer.sign(inputs=inputs, outputs=outputs_test)
123            logger.debug("- Model signature: {}", model_signature.to_dict())
124            # saver
125            logger.info("Save model: {}", self.saver)
126            model_info = self.saver.save(
127                model=self.model, signature=model_signature, input_example=inputs
128            )
129            logger.debug("- Model URI: {}", model_info.model_uri)
130            # register
131            logger.info("Register model: {}", self.registry)
132            model_version = self.registry.register(
133                name=self.mlflow_service.registry_name, model_uri=model_info.model_uri
134            )
135            logger.debug("- Model version: {}", model_version)
136            # notify
137            self.alerts_service.notify(
138                title="Training Job Finished", message=f"Model version: {model_version.version}"
139            )
140        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=Literal['TrainingJob'], required=False, default='TrainingJob'), 'logger_service': FieldInfo(annotation=LoggerService, required=False, default=LoggerService(sink='stderr', level='DEBUG', format='<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green><level>[{level}]</level><cyan>[{name}:{function}:{line}]</cyan> <level>{message}</level>', colorize=True, serialize=False, backtrace=True, diagnose=False, catch=True)), 'alerts_service': FieldInfo(annotation=AlertsService, required=False, default=AlertsService(enable=True, app_name='Bikes', timeout=None)), 'mlflow_service': FieldInfo(annotation=MlflowService, required=False, default=MlflowService(tracking_uri='./mlruns', registry_uri='./mlruns', experiment_name='bikes', registry_name='bikes', autolog_disable=False, autolog_disable_for_unsupported_versions=False, autolog_exclusive=False, autolog_log_input_examples=True, autolog_log_model_signatures=True, autolog_log_models=False, autolog_log_datasets=False, autolog_silent=False)), 'run_config': FieldInfo(annotation=MlflowService.RunConfig, required=False, default=RunConfig(name='Training', description=None, tags=None, log_system_metrics=True)), 'inputs': FieldInfo(annotation=ParquetReader, required=True, discriminator='KIND'), 'targets': FieldInfo(annotation=ParquetReader, required=True, discriminator='KIND'), 'model': FieldInfo(annotation=BaselineSklearnModel, required=False, default=BaselineSklearnModel(KIND='BaselineSklearnModel', max_depth=20, n_estimators=200, random_state=42), discriminator='KIND'), 'metrics': FieldInfo(annotation=list[SklearnMetric], required=False, default=[SklearnMetric(KIND='SklearnMetric', name='mean_squared_error', greater_is_better=False)], discriminator='KIND'), 'splitter': FieldInfo(annotation=Union[TrainTestSplitter, TimeSeriesSplitter], required=False, default=TrainTestSplitter(KIND='TrainTestSplitter', shuffle=False, test_size=1440, random_state=42), discriminator='KIND'), 'saver': FieldInfo(annotation=Union[CustomSaver, BuiltinSaver], required=False, default=CustomSaver(KIND='CustomSaver', path='model'), discriminator='KIND'), 'signer': FieldInfo(annotation=InferSigner, required=False, default=InferSigner(KIND='InferSigner'), discriminator='KIND'), 'registry': FieldInfo(annotation=MlflowRegister, required=False, default=MlflowRegister(KIND='MlflowRegister', tags={}), discriminator='KIND')}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
bikes.jobs.base.Job
logger_service
alerts_service
mlflow_service
class PromotionJob(bikes.jobs.base.Job):
13class PromotionJob(base.Job):
14    """Define a job for promoting a registered model version with an alias.
15
16    https://mlflow.org/docs/latest/model-registry.html#concepts
17
18    Parameters:
19        alias (str): the mlflow alias to transition the registered model version.
20        version (int | None): the model version to transition (use None for latest).
21    """
22
23    KIND: T.Literal["PromotionJob"] = "PromotionJob"
24
25    alias: str = "Champion"
26    version: int | None = None
27
28    @T.override
29    def run(self) -> base.Locals:
30        # services
31        # - logger
32        logger = self.logger_service.logger()
33        logger.info("With logger: {}", logger)
34        # - mlflow
35        client = self.mlflow_service.client()
36        logger.info("With client: {}", client)
37        name = self.mlflow_service.registry_name
38        # version
39        if self.version is None:  # use the latest model version
40            version = client.search_model_versions(
41                f"name='{name}'", max_results=1, order_by=["version_number DESC"]
42            )[0].version
43        else:
44            version = self.version
45        logger.info("From version: {}", version)
46        # alias
47        logger.info("To alias: {}", self.alias)
48        # promote
49        logger.info("Promote model: {}", name)
50        client.set_registered_model_alias(name=name, alias=self.alias, version=version)
51        model_version = client.get_model_version_by_alias(name=name, alias=self.alias)
52        logger.debug("- Model version: {}", model_version)
53        # notify
54        self.alerts_service.notify(
55            title="Promotion Job Finished",
56            message=f"Version: {model_version.version} @ {self.alias}",
57        )
58        return locals()

Define a job for promoting a registered model version with an alias.

https://mlflow.org/docs/latest/model-registry.html#concepts

Arguments:
  • alias (str): the mlflow alias to transition the registered model version.
  • version (int | None): the model version to transition (use None for latest).
KIND: Literal['PromotionJob']
alias: str
version: int | None
@T.override
def run(self) -> Dict[str, Any]:
28    @T.override
29    def run(self) -> base.Locals:
30        # services
31        # - logger
32        logger = self.logger_service.logger()
33        logger.info("With logger: {}", logger)
34        # - mlflow
35        client = self.mlflow_service.client()
36        logger.info("With client: {}", client)
37        name = self.mlflow_service.registry_name
38        # version
39        if self.version is None:  # use the latest model version
40            version = client.search_model_versions(
41                f"name='{name}'", max_results=1, order_by=["version_number DESC"]
42            )[0].version
43        else:
44            version = self.version
45        logger.info("From version: {}", version)
46        # alias
47        logger.info("To alias: {}", self.alias)
48        # promote
49        logger.info("Promote model: {}", name)
50        client.set_registered_model_alias(name=name, alias=self.alias, version=version)
51        model_version = client.get_model_version_by_alias(name=name, alias=self.alias)
52        logger.debug("- Model version: {}", model_version)
53        # notify
54        self.alerts_service.notify(
55            title="Promotion Job Finished",
56            message=f"Version: {model_version.version} @ {self.alias}",
57        )
58        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=Literal['PromotionJob'], required=False, default='PromotionJob'), 'logger_service': FieldInfo(annotation=LoggerService, required=False, default=LoggerService(sink='stderr', level='DEBUG', format='<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green><level>[{level}]</level><cyan>[{name}:{function}:{line}]</cyan> <level>{message}</level>', colorize=True, serialize=False, backtrace=True, diagnose=False, catch=True)), 'alerts_service': FieldInfo(annotation=AlertsService, required=False, default=AlertsService(enable=True, app_name='Bikes', timeout=None)), 'mlflow_service': FieldInfo(annotation=MlflowService, required=False, default=MlflowService(tracking_uri='./mlruns', registry_uri='./mlruns', experiment_name='bikes', registry_name='bikes', autolog_disable=False, autolog_disable_for_unsupported_versions=False, autolog_exclusive=False, autolog_log_input_examples=True, autolog_log_model_signatures=True, autolog_log_models=False, autolog_log_datasets=False, autolog_silent=False)), 'alias': FieldInfo(annotation=str, required=False, default='Champion'), 'version': FieldInfo(annotation=Union[int, NoneType], required=False, default=None)}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
bikes.jobs.base.Job
logger_service
alerts_service
mlflow_service
class InferenceJob(bikes.jobs.base.Job):
17class InferenceJob(base.Job):
18    """Generate batch predictions from a registered model.
19
20    Parameters:
21        inputs (datasets.ReaderKind): reader for the inputs data.
22        outputs (datasets.WriterKind): writer for the outputs data.
23        alias_or_version (str | int): alias or version for the  model.
24        loader (registries.LoaderKind): registry loader for the model.
25    """
26
27    KIND: T.Literal["InferenceJob"] = "InferenceJob"
28
29    # Inputs
30    inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
31    # Outputs
32    outputs: datasets.WriterKind = pdt.Field(..., discriminator="KIND")
33    # Model
34    alias_or_version: str | int = "Champion"
35    # Loader
36    loader: registries.LoaderKind = pdt.Field(registries.CustomLoader(), discriminator="KIND")
37
38    @T.override
39    def run(self) -> base.Locals:
40        # services
41        logger = self.logger_service.logger()
42        logger.info("With logger: {}", logger)
43        # inputs
44        logger.info("Read inputs: {}", self.inputs)
45        inputs_ = self.inputs.read()  # unchecked!
46        inputs = schemas.InputsSchema.check(inputs_)
47        logger.debug("- Inputs shape: {}", inputs.shape)
48        # model
49        logger.info("With model: {}", self.mlflow_service.registry_name)
50        model_uri = registries.uri_for_model_alias_or_version(
51            name=self.mlflow_service.registry_name, alias_or_version=self.alias_or_version
52        )
53        logger.debug("- Model URI: {}", model_uri)
54        # loader
55        logger.info("Load model: {}", self.loader)
56        model = self.loader.load(uri=model_uri)
57        logger.debug("- Model: {}", model)
58        # outputs
59        logger.info("Predict outputs: {}", len(inputs))
60        outputs = model.predict(inputs=inputs)  # checked
61        logger.debug("- Outputs shape: {}", outputs.shape)
62        # write
63        logger.info("Write outputs: {}", self.outputs)
64        self.outputs.write(data=outputs)
65        # notify
66        self.alerts_service.notify(
67            title="Inference Job Finished", message=f"Outputs Shape: {outputs.shape}"
68        )
69        return locals()

Generate batch predictions from a registered model.

Arguments:
  • inputs (datasets.ReaderKind): reader for the inputs data.
  • outputs (datasets.WriterKind): writer for the outputs data.
  • alias_or_version (str | int): alias or version for the model.
  • loader (registries.LoaderKind): registry loader for the model.
KIND: Literal['InferenceJob']
alias_or_version: str | int
@T.override
def run(self) -> Dict[str, Any]:
38    @T.override
39    def run(self) -> base.Locals:
40        # services
41        logger = self.logger_service.logger()
42        logger.info("With logger: {}", logger)
43        # inputs
44        logger.info("Read inputs: {}", self.inputs)
45        inputs_ = self.inputs.read()  # unchecked!
46        inputs = schemas.InputsSchema.check(inputs_)
47        logger.debug("- Inputs shape: {}", inputs.shape)
48        # model
49        logger.info("With model: {}", self.mlflow_service.registry_name)
50        model_uri = registries.uri_for_model_alias_or_version(
51            name=self.mlflow_service.registry_name, alias_or_version=self.alias_or_version
52        )
53        logger.debug("- Model URI: {}", model_uri)
54        # loader
55        logger.info("Load model: {}", self.loader)
56        model = self.loader.load(uri=model_uri)
57        logger.debug("- Model: {}", model)
58        # outputs
59        logger.info("Predict outputs: {}", len(inputs))
60        outputs = model.predict(inputs=inputs)  # checked
61        logger.debug("- Outputs shape: {}", outputs.shape)
62        # write
63        logger.info("Write outputs: {}", self.outputs)
64        self.outputs.write(data=outputs)
65        # notify
66        self.alerts_service.notify(
67            title="Inference Job Finished", message=f"Outputs Shape: {outputs.shape}"
68        )
69        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=Literal['InferenceJob'], required=False, default='InferenceJob'), 'logger_service': FieldInfo(annotation=LoggerService, required=False, default=LoggerService(sink='stderr', level='DEBUG', format='<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green><level>[{level}]</level><cyan>[{name}:{function}:{line}]</cyan> <level>{message}</level>', colorize=True, serialize=False, backtrace=True, diagnose=False, catch=True)), 'alerts_service': FieldInfo(annotation=AlertsService, required=False, default=AlertsService(enable=True, app_name='Bikes', timeout=None)), 'mlflow_service': FieldInfo(annotation=MlflowService, required=False, default=MlflowService(tracking_uri='./mlruns', registry_uri='./mlruns', experiment_name='bikes', registry_name='bikes', autolog_disable=False, autolog_disable_for_unsupported_versions=False, autolog_exclusive=False, autolog_log_input_examples=True, autolog_log_model_signatures=True, autolog_log_models=False, autolog_log_datasets=False, autolog_silent=False)), 'inputs': FieldInfo(annotation=ParquetReader, required=True, discriminator='KIND'), 'outputs': FieldInfo(annotation=ParquetWriter, required=True, discriminator='KIND'), 'alias_or_version': FieldInfo(annotation=Union[str, int], required=False, default='Champion'), 'loader': FieldInfo(annotation=Union[CustomLoader, BuiltinLoader], required=False, default=CustomLoader(KIND='CustomLoader'), discriminator='KIND')}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
bikes.jobs.base.Job
logger_service
alerts_service
mlflow_service
class EvaluationsJob(bikes.jobs.base.Job):
 20class EvaluationsJob(base.Job):
 21    """Generate evaluations from a registered model and a dataset.
 22
 23    Parameters:
 24        run_config (services.MlflowService.RunConfig): mlflow run config.
 25        inputs (datasets.ReaderKind): reader for the inputs data.
 26        targets (datasets.ReaderKind): reader for the targets data.
 27        model_type (str): model type (e.g. "regressor", "classifier").
 28        alias_or_version (str | int): alias or version for the  model.
 29        metrics (metrics_.MetricKind): metrics for the reporting.
 30        evaluators (list[str]): list of evaluators to use.
 31        thresholds (dict[str, metrics_.Threshold] | None): metric thresholds.
 32    """
 33
 34    KIND: T.Literal["EvaluationsJob"] = "EvaluationsJob"
 35
 36    # Run
 37    run_config: services.MlflowService.RunConfig = services.MlflowService.RunConfig(
 38        name="Evaluations"
 39    )
 40    # Data
 41    inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 42    targets: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
 43    # Model
 44    model_type: str = "regressor"
 45    alias_or_version: str | int = "Champion"
 46    # Metrics
 47    metrics: list[metrics_.MetricKind] = pdt.Field([metrics_.SklearnMetric()], discriminator="KIND")
 48    # Evaluators
 49    evaluators: list[str] = ["default"]
 50    # Thresholds
 51    thresholds: dict[str, metrics_.Threshold] = {
 52        "r2_score": metrics_.Threshold(threshold=0.5, greater_is_better=True)
 53    }
 54
 55    @T.override
 56    def run(self) -> base.Locals:
 57        # services
 58        # - logger
 59        logger = self.logger_service.logger()
 60        logger.info("With logger: {}", logger)
 61        # - mlflow
 62        client = self.mlflow_service.client()
 63        logger.info("With client: {}", client.tracking_uri)
 64        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 65            logger.info("With run context: {}", run.info)
 66            # data
 67            # - inputs
 68            logger.info("Read inputs: {}", self.inputs)
 69            inputs_ = self.inputs.read()  # unchecked!
 70            inputs = schemas.InputsSchema.check(inputs_)
 71            logger.debug("- Inputs shape: {}", inputs.shape)
 72            # - targets
 73            logger.info("Read targets: {}", self.targets)
 74            targets_ = self.targets.read()  # unchecked!
 75            targets = schemas.TargetsSchema.check(targets_)
 76            logger.debug("- Targets shape: {}", targets.shape)
 77            # lineage
 78            # - inputs
 79            logger.info("Log lineage: inputs")
 80            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 81            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 82            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 83            # - targets
 84            logger.info("Log lineage: targets")
 85            targets_lineage = self.targets.lineage(
 86                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 87            )
 88            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 89            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 90            # dataset
 91            logger.info("Create dataset: inputs & targets")
 92            dataset = mlflow.data.from_pandas(
 93                df=pd.concat([inputs, targets], axis="columns"),
 94                name="evaluation",
 95                source=f"{inputs_lineage.source.uri} & {targets_lineage.source.uri}",
 96                targets=schemas.TargetsSchema.cnt,
 97            )
 98            logger.debug("- Dataset: {}", dataset.to_dict())
 99            # model
100            logger.info("With model: {}", self.mlflow_service.registry_name)
101            model_uri = registries.uri_for_model_alias_or_version(
102                name=self.mlflow_service.registry_name, alias_or_version=self.alias_or_version
103            )
104            logger.debug("- Model URI: {}", model_uri)
105            # metrics
106            logger.debug("Convert metrics: {}", self.metrics)
107            extra_metrics = [metric.to_mlflow() for metric in self.metrics]
108            logger.debug("- Extra metrics: {}", extra_metrics)
109            # thresholds
110            logger.info("Convert thresholds: {}", self.thresholds)
111            validation_thresholds = {
112                name: threshold.to_mlflow() for name, threshold in self.thresholds.items()
113            }
114            logger.debug("- Validation thresholds: {}", validation_thresholds)
115            # evaluations
116            logger.info("Compute evaluations: {}", self.model_type)
117            evaluations = mlflow.evaluate(
118                data=dataset,
119                model=model_uri,
120                model_type=self.model_type,
121                evaluators=self.evaluators,
122                extra_metrics=extra_metrics,
123                validation_thresholds=validation_thresholds,
124            )
125            logger.debug("- Evaluations metrics: {}", evaluations.metrics)
126            # notify
127            self.alerts_service.notify(
128                title="Evaluations Job Finished",
129                message=f"Evaluation metrics: {evaluations.metrics}",
130            )
131        return locals()

Generate evaluations from a registered model and a dataset.

Arguments:
  • run_config (services.MlflowService.RunConfig): mlflow run config.
  • inputs (datasets.ReaderKind): reader for the inputs data.
  • targets (datasets.ReaderKind): reader for the targets data.
  • model_type (str): model type (e.g. "regressor", "classifier").
  • alias_or_version (str | int): alias or version for the model.
  • metrics (metrics_.MetricKind): metrics for the reporting.
  • evaluators (list[str]): list of evaluators to use.
  • thresholds (dict[str, metrics_.Threshold] | None): metric thresholds.
KIND: Literal['EvaluationsJob']
model_type: str
alias_or_version: str | int
evaluators: list[str]
thresholds: dict[str, bikes.core.metrics.Threshold]
@T.override
def run(self) -> Dict[str, Any]:
 55    @T.override
 56    def run(self) -> base.Locals:
 57        # services
 58        # - logger
 59        logger = self.logger_service.logger()
 60        logger.info("With logger: {}", logger)
 61        # - mlflow
 62        client = self.mlflow_service.client()
 63        logger.info("With client: {}", client.tracking_uri)
 64        with self.mlflow_service.run_context(run_config=self.run_config) as run:
 65            logger.info("With run context: {}", run.info)
 66            # data
 67            # - inputs
 68            logger.info("Read inputs: {}", self.inputs)
 69            inputs_ = self.inputs.read()  # unchecked!
 70            inputs = schemas.InputsSchema.check(inputs_)
 71            logger.debug("- Inputs shape: {}", inputs.shape)
 72            # - targets
 73            logger.info("Read targets: {}", self.targets)
 74            targets_ = self.targets.read()  # unchecked!
 75            targets = schemas.TargetsSchema.check(targets_)
 76            logger.debug("- Targets shape: {}", targets.shape)
 77            # lineage
 78            # - inputs
 79            logger.info("Log lineage: inputs")
 80            inputs_lineage = self.inputs.lineage(data=inputs, name="inputs")
 81            mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name)
 82            logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict())
 83            # - targets
 84            logger.info("Log lineage: targets")
 85            targets_lineage = self.targets.lineage(
 86                data=targets, name="targets", targets=schemas.TargetsSchema.cnt
 87            )
 88            mlflow.log_input(dataset=targets_lineage, context=self.run_config.name)
 89            logger.debug("- Targets lineage: {}", targets_lineage.to_dict())
 90            # dataset
 91            logger.info("Create dataset: inputs & targets")
 92            dataset = mlflow.data.from_pandas(
 93                df=pd.concat([inputs, targets], axis="columns"),
 94                name="evaluation",
 95                source=f"{inputs_lineage.source.uri} & {targets_lineage.source.uri}",
 96                targets=schemas.TargetsSchema.cnt,
 97            )
 98            logger.debug("- Dataset: {}", dataset.to_dict())
 99            # model
100            logger.info("With model: {}", self.mlflow_service.registry_name)
101            model_uri = registries.uri_for_model_alias_or_version(
102                name=self.mlflow_service.registry_name, alias_or_version=self.alias_or_version
103            )
104            logger.debug("- Model URI: {}", model_uri)
105            # metrics
106            logger.debug("Convert metrics: {}", self.metrics)
107            extra_metrics = [metric.to_mlflow() for metric in self.metrics]
108            logger.debug("- Extra metrics: {}", extra_metrics)
109            # thresholds
110            logger.info("Convert thresholds: {}", self.thresholds)
111            validation_thresholds = {
112                name: threshold.to_mlflow() for name, threshold in self.thresholds.items()
113            }
114            logger.debug("- Validation thresholds: {}", validation_thresholds)
115            # evaluations
116            logger.info("Compute evaluations: {}", self.model_type)
117            evaluations = mlflow.evaluate(
118                data=dataset,
119                model=model_uri,
120                model_type=self.model_type,
121                evaluators=self.evaluators,
122                extra_metrics=extra_metrics,
123                validation_thresholds=validation_thresholds,
124            )
125            logger.debug("- Evaluations metrics: {}", evaluations.metrics)
126            # notify
127            self.alerts_service.notify(
128                title="Evaluations Job Finished",
129                message=f"Evaluation metrics: {evaluations.metrics}",
130            )
131        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=Literal['EvaluationsJob'], required=False, default='EvaluationsJob'), 'logger_service': FieldInfo(annotation=LoggerService, required=False, default=LoggerService(sink='stderr', level='DEBUG', format='<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green><level>[{level}]</level><cyan>[{name}:{function}:{line}]</cyan> <level>{message}</level>', colorize=True, serialize=False, backtrace=True, diagnose=False, catch=True)), 'alerts_service': FieldInfo(annotation=AlertsService, required=False, default=AlertsService(enable=True, app_name='Bikes', timeout=None)), 'mlflow_service': FieldInfo(annotation=MlflowService, required=False, default=MlflowService(tracking_uri='./mlruns', registry_uri='./mlruns', experiment_name='bikes', registry_name='bikes', autolog_disable=False, autolog_disable_for_unsupported_versions=False, autolog_exclusive=False, autolog_log_input_examples=True, autolog_log_model_signatures=True, autolog_log_models=False, autolog_log_datasets=False, autolog_silent=False)), 'run_config': FieldInfo(annotation=MlflowService.RunConfig, required=False, default=RunConfig(name='Evaluations', description=None, tags=None, log_system_metrics=True)), 'inputs': FieldInfo(annotation=ParquetReader, required=True, discriminator='KIND'), 'targets': FieldInfo(annotation=ParquetReader, required=True, discriminator='KIND'), 'model_type': FieldInfo(annotation=str, required=False, default='regressor'), 'alias_or_version': FieldInfo(annotation=Union[str, int], required=False, default='Champion'), 'metrics': FieldInfo(annotation=list[SklearnMetric], required=False, default=[SklearnMetric(KIND='SklearnMetric', name='mean_squared_error', greater_is_better=False)], discriminator='KIND'), 'evaluators': FieldInfo(annotation=list[str], required=False, default=['default']), 'thresholds': FieldInfo(annotation=dict[str, Threshold], required=False, default={'r2_score': Threshold(threshold=0.5, greater_is_better=True)})}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
bikes.jobs.base.Job
logger_service
alerts_service
mlflow_service
class ExplanationsJob(bikes.jobs.base.Job):
17class ExplanationsJob(base.Job):
18    """Generate explanations from the model and a data sample.
19
20    Parameters:
21        inputs_samples (datasets.ReaderKind): reader for the samples data.
22        models_explanations (datasets.WriterKind): writer for models explanation.
23        samples_explanations (datasets.WriterKind): writer for samples explanation.
24        alias_or_version (str | int): alias or version for the  model.
25        loader (registries.LoaderKind): registry loader for the model.
26    """
27
28    KIND: T.Literal["ExplanationsJob"] = "ExplanationsJob"
29
30    # Samples
31    inputs_samples: datasets.ReaderKind = pdt.Field(..., discriminator="KIND")
32    # Explanations
33    models_explanations: datasets.WriterKind = pdt.Field(..., discriminator="KIND")
34    samples_explanations: datasets.WriterKind = pdt.Field(..., discriminator="KIND")
35    # Model
36    alias_or_version: str | int = "Champion"
37    # Loader
38    loader: registries.LoaderKind = pdt.Field(registries.CustomLoader(), discriminator="KIND")
39
40    @T.override
41    def run(self) -> base.Locals:
42        # services
43        logger = self.logger_service.logger()
44        logger.info("With logger: {}", logger)
45        # inputs
46        logger.info("Read samples: {}", self.inputs_samples)
47        inputs_samples = self.inputs_samples.read()  # unchecked!
48        inputs_samples = schemas.InputsSchema.check(inputs_samples)
49        logger.debug("- Inputs samples shape: {}", inputs_samples.shape)
50        # model
51        logger.info("With model: {}", self.mlflow_service.registry_name)
52        model_uri = registries.uri_for_model_alias_or_version(
53            name=self.mlflow_service.registry_name, alias_or_version=self.alias_or_version
54        )
55        logger.debug("- Model URI: {}", model_uri)
56        # loader
57        logger.info("Load model: {}", self.loader)
58        model = self.loader.load(uri=model_uri).model.unwrap_python_model().model
59        logger.debug("- Model: {}", model)
60        # explanations
61        # - models
62        logger.info("Explain model: {}", model)
63        models_explanations = model.explain_model()
64        logger.debug("- Models explanations shape: {}", models_explanations.shape)
65        # - samples
66        logger.info("Explain samples: {}", len(inputs_samples))
67        samples_explanations = model.explain_samples(inputs=inputs_samples)
68        logger.debug("- Samples explanations shape: {}", samples_explanations.shape)
69        # write
70        # - model
71        logger.info("Write models explanations: {}", self.models_explanations)
72        self.models_explanations.write(data=models_explanations)
73        # - samples
74        logger.info("Write samples explanations: {}", self.samples_explanations)
75        self.samples_explanations.write(data=samples_explanations)
76        # notify
77        self.alerts_service.notify(
78            title="Explanations Job Finished", message=f"Features Count: {len(models_explanations)}"
79        )
80        return locals()

Generate explanations from the model and a data sample.

Arguments:
  • inputs_samples (datasets.ReaderKind): reader for the samples data.
  • models_explanations (datasets.WriterKind): writer for models explanation.
  • samples_explanations (datasets.WriterKind): writer for samples explanation.
  • alias_or_version (str | int): alias or version for the model.
  • loader (registries.LoaderKind): registry loader for the model.
KIND: Literal['ExplanationsJob']
models_explanations: bikes.io.datasets.ParquetWriter
samples_explanations: bikes.io.datasets.ParquetWriter
alias_or_version: str | int
@T.override
def run(self) -> Dict[str, Any]:
40    @T.override
41    def run(self) -> base.Locals:
42        # services
43        logger = self.logger_service.logger()
44        logger.info("With logger: {}", logger)
45        # inputs
46        logger.info("Read samples: {}", self.inputs_samples)
47        inputs_samples = self.inputs_samples.read()  # unchecked!
48        inputs_samples = schemas.InputsSchema.check(inputs_samples)
49        logger.debug("- Inputs samples shape: {}", inputs_samples.shape)
50        # model
51        logger.info("With model: {}", self.mlflow_service.registry_name)
52        model_uri = registries.uri_for_model_alias_or_version(
53            name=self.mlflow_service.registry_name, alias_or_version=self.alias_or_version
54        )
55        logger.debug("- Model URI: {}", model_uri)
56        # loader
57        logger.info("Load model: {}", self.loader)
58        model = self.loader.load(uri=model_uri).model.unwrap_python_model().model
59        logger.debug("- Model: {}", model)
60        # explanations
61        # - models
62        logger.info("Explain model: {}", model)
63        models_explanations = model.explain_model()
64        logger.debug("- Models explanations shape: {}", models_explanations.shape)
65        # - samples
66        logger.info("Explain samples: {}", len(inputs_samples))
67        samples_explanations = model.explain_samples(inputs=inputs_samples)
68        logger.debug("- Samples explanations shape: {}", samples_explanations.shape)
69        # write
70        # - model
71        logger.info("Write models explanations: {}", self.models_explanations)
72        self.models_explanations.write(data=models_explanations)
73        # - samples
74        logger.info("Write samples explanations: {}", self.samples_explanations)
75        self.samples_explanations.write(data=samples_explanations)
76        # notify
77        self.alerts_service.notify(
78            title="Explanations Job Finished", message=f"Features Count: {len(models_explanations)}"
79        )
80        return locals()

Run the job in context.

Returns:

Locals: local job variables.

model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=Literal['ExplanationsJob'], required=False, default='ExplanationsJob'), 'logger_service': FieldInfo(annotation=LoggerService, required=False, default=LoggerService(sink='stderr', level='DEBUG', format='<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green><level>[{level}]</level><cyan>[{name}:{function}:{line}]</cyan> <level>{message}</level>', colorize=True, serialize=False, backtrace=True, diagnose=False, catch=True)), 'alerts_service': FieldInfo(annotation=AlertsService, required=False, default=AlertsService(enable=True, app_name='Bikes', timeout=None)), 'mlflow_service': FieldInfo(annotation=MlflowService, required=False, default=MlflowService(tracking_uri='./mlruns', registry_uri='./mlruns', experiment_name='bikes', registry_name='bikes', autolog_disable=False, autolog_disable_for_unsupported_versions=False, autolog_exclusive=False, autolog_log_input_examples=True, autolog_log_model_signatures=True, autolog_log_models=False, autolog_log_datasets=False, autolog_silent=False)), 'inputs_samples': FieldInfo(annotation=ParquetReader, required=True, discriminator='KIND'), 'models_explanations': FieldInfo(annotation=ParquetWriter, required=True, discriminator='KIND'), 'samples_explanations': FieldInfo(annotation=ParquetWriter, required=True, discriminator='KIND'), 'alias_or_version': FieldInfo(annotation=Union[str, int], required=False, default='Champion'), 'loader': FieldInfo(annotation=Union[CustomLoader, BuiltinLoader], required=False, default=CustomLoader(KIND='CustomLoader'), discriminator='KIND')}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
bikes.jobs.base.Job
logger_service
alerts_service
mlflow_service