bikes.jobs
High-level jobs of the project.
1"""High-level jobs of the project.""" 2 3# %% IMPORTS 4 5from bikes.jobs.evaluations import EvaluationsJob 6from bikes.jobs.explanations import ExplanationsJob 7from bikes.jobs.inference import InferenceJob 8from bikes.jobs.promotion import PromotionJob 9from bikes.jobs.training import TrainingJob 10from bikes.jobs.tuning import TuningJob 11 12# %% TYPES 13 14JobKind = TuningJob | TrainingJob | PromotionJob | InferenceJob | EvaluationsJob | ExplanationsJob 15 16# %% EXPORTS 17 18__all__ = [ 19 "TuningJob", 20 "TrainingJob", 21 "PromotionJob", 22 "InferenceJob", 23 "EvaluationsJob", 24 "ExplanationsJob", 25 "JobKind", 26]
class
TuningJob(bikes.jobs.base.Job):
19class TuningJob(base.Job): 20 """Find the best hyperparameters for a model. 21 22 Parameters: 23 run_config (services.MlflowService.RunConfig): mlflow run config. 24 inputs (datasets.ReaderKind): reader for the inputs data. 25 targets (datasets.ReaderKind): reader for the targets data. 26 model (models.ModelKind): machine learning model to tune. 27 metric (metrics.MetricKind): tuning metric to optimize. 28 splitter (splitters.SplitterKind): data sets splitter. 29 searcher: (searchers.SearcherKind): hparams searcher. 30 """ 31 32 KIND: T.Literal["TuningJob"] = "TuningJob" 33 34 # Run 35 run_config: services.MlflowService.RunConfig = services.MlflowService.RunConfig(name="Tuning") 36 # Data 37 inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND") 38 targets: datasets.ReaderKind = pdt.Field(..., discriminator="KIND") 39 # Model 40 model: models.ModelKind = pdt.Field(models.BaselineSklearnModel(), discriminator="KIND") 41 # Metric 42 metric: metrics.MetricKind = pdt.Field(metrics.SklearnMetric(), discriminator="KIND") 43 # splitter 44 splitter: splitters.SplitterKind = pdt.Field( 45 splitters.TimeSeriesSplitter(), discriminator="KIND" 46 ) 47 # Searcher 48 searcher: searchers.SearcherKind = pdt.Field( 49 searchers.GridCVSearcher( 50 param_grid={ 51 "max_depth": [3, 5, 7], 52 } 53 ), 54 discriminator="KIND", 55 ) 56 57 @T.override 58 def run(self) -> base.Locals: 59 """Run the tuning job in context.""" 60 # services 61 # - logger 62 logger = self.logger_service.logger() 63 logger.info("With logger: {}", logger) 64 with self.mlflow_service.run_context(run_config=self.run_config) as run: 65 logger.info("With run context: {}", run.info) 66 # data 67 # - inputs 68 logger.info("Read inputs: {}", self.inputs) 69 inputs_ = self.inputs.read() # unchecked! 70 inputs = schemas.InputsSchema.check(inputs_) 71 logger.debug("- Inputs shape: {}", inputs.shape) 72 # - targets 73 logger.info("Read targets: {}", self.targets) 74 targets_ = self.targets.read() # unchecked! 75 targets = schemas.TargetsSchema.check(targets_) 76 logger.debug("- Targets shape: {}", targets.shape) 77 # lineage 78 # - inputs 79 logger.info("Log lineage: inputs") 80 inputs_lineage = self.inputs.lineage(data=inputs, name="inputs") 81 mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name) 82 logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict()) 83 # - targets 84 logger.info("Log lineage: targets") 85 targets_lineage = self.targets.lineage( 86 data=targets, name="targets", targets=schemas.TargetsSchema.cnt 87 ) 88 mlflow.log_input(dataset=targets_lineage, context=self.run_config.name) 89 logger.debug("- Targets lineage: {}", targets_lineage.to_dict()) 90 # model 91 logger.info("With model: {}", self.model) 92 # metric 93 logger.info("With metric: {}", self.metric) 94 # splitter 95 logger.info("With splitter: {}", self.splitter) 96 # searcher 97 logger.info("Run searcher: {}", self.searcher) 98 results, best_score, best_params = self.searcher.search( 99 model=self.model, 100 metric=self.metric, 101 inputs=inputs, 102 targets=targets, 103 cv=self.splitter, 104 ) 105 logger.debug("- Results: {}", results.shape) 106 logger.debug("- Best Score: {}", best_score) 107 logger.debug("- Best Params: {}", best_params) 108 # notify 109 self.alerts_service.notify( 110 title="Tuning Job Finished", message=f"Best score: {best_score}" 111 ) 112 return locals()
Find the best hyperparameters for a model.
Arguments:
- run_config (services.MlflowService.RunConfig): mlflow run config.
- inputs (datasets.ReaderKind): reader for the inputs data.
- targets (datasets.ReaderKind): reader for the targets data.
- model (models.ModelKind): machine learning model to tune.
- metric (metrics.MetricKind): tuning metric to optimize.
- splitter (splitters.SplitterKind): data sets splitter.
- searcher: (searchers.SearcherKind): hparams searcher.
run_config: bikes.io.services.MlflowService.RunConfig
inputs: bikes.io.datasets.ParquetReader
targets: bikes.io.datasets.ParquetReader
metric: bikes.core.metrics.SklearnMetric
searcher: bikes.utils.searchers.GridCVSearcher
@T.override
def
run(self) -> Dict[str, Any]:
57 @T.override 58 def run(self) -> base.Locals: 59 """Run the tuning job in context.""" 60 # services 61 # - logger 62 logger = self.logger_service.logger() 63 logger.info("With logger: {}", logger) 64 with self.mlflow_service.run_context(run_config=self.run_config) as run: 65 logger.info("With run context: {}", run.info) 66 # data 67 # - inputs 68 logger.info("Read inputs: {}", self.inputs) 69 inputs_ = self.inputs.read() # unchecked! 70 inputs = schemas.InputsSchema.check(inputs_) 71 logger.debug("- Inputs shape: {}", inputs.shape) 72 # - targets 73 logger.info("Read targets: {}", self.targets) 74 targets_ = self.targets.read() # unchecked! 75 targets = schemas.TargetsSchema.check(targets_) 76 logger.debug("- Targets shape: {}", targets.shape) 77 # lineage 78 # - inputs 79 logger.info("Log lineage: inputs") 80 inputs_lineage = self.inputs.lineage(data=inputs, name="inputs") 81 mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name) 82 logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict()) 83 # - targets 84 logger.info("Log lineage: targets") 85 targets_lineage = self.targets.lineage( 86 data=targets, name="targets", targets=schemas.TargetsSchema.cnt 87 ) 88 mlflow.log_input(dataset=targets_lineage, context=self.run_config.name) 89 logger.debug("- Targets lineage: {}", targets_lineage.to_dict()) 90 # model 91 logger.info("With model: {}", self.model) 92 # metric 93 logger.info("With metric: {}", self.metric) 94 # splitter 95 logger.info("With splitter: {}", self.splitter) 96 # searcher 97 logger.info("Run searcher: {}", self.searcher) 98 results, best_score, best_params = self.searcher.search( 99 model=self.model, 100 metric=self.metric, 101 inputs=inputs, 102 targets=targets, 103 cv=self.splitter, 104 ) 105 logger.debug("- Results: {}", results.shape) 106 logger.debug("- Best Score: {}", best_score) 107 logger.debug("- Best Params: {}", best_params) 108 # notify 109 self.alerts_service.notify( 110 title="Tuning Job Finished", message=f"Best score: {best_score}" 111 ) 112 return locals()
Run the tuning job in context.
class
TrainingJob(bikes.jobs.base.Job):
20class TrainingJob(base.Job): 21 """Train and register a single AI/ML model. 22 23 Parameters: 24 run_config (services.MlflowService.RunConfig): mlflow run config. 25 inputs (datasets.ReaderKind): reader for the inputs data. 26 targets (datasets.ReaderKind): reader for the targets data. 27 model (models.ModelKind): machine learning model to train. 28 metrics (metrics_.MetricsKind): metric list to compute. 29 splitter (splitters.SplitterKind): data sets splitter. 30 saver (registries.SaverKind): model saver. 31 signer (signers.SignerKind): model signer. 32 registry (registries.RegisterKind): model register. 33 """ 34 35 KIND: T.Literal["TrainingJob"] = "TrainingJob" 36 37 # Run 38 run_config: services.MlflowService.RunConfig = services.MlflowService.RunConfig(name="Training") 39 # Data 40 inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND") 41 targets: datasets.ReaderKind = pdt.Field(..., discriminator="KIND") 42 # Model 43 model: models.ModelKind = pdt.Field(models.BaselineSklearnModel(), discriminator="KIND") 44 # Metrics 45 metrics: metrics_.MetricsKind = [metrics_.SklearnMetric()] 46 # Splitter 47 splitter: splitters.SplitterKind = pdt.Field( 48 splitters.TrainTestSplitter(), discriminator="KIND" 49 ) 50 # Saver 51 saver: registries.SaverKind = pdt.Field(registries.CustomSaver(), discriminator="KIND") 52 # Signer 53 signer: signers.SignerKind = pdt.Field(signers.InferSigner(), discriminator="KIND") 54 # Registrer 55 # - avoid shadowing pydantic `register` pydantic function 56 registry: registries.RegisterKind = pdt.Field(registries.MlflowRegister(), discriminator="KIND") 57 58 @T.override 59 def run(self) -> base.Locals: 60 # services 61 # - logger 62 logger = self.logger_service.logger() 63 logger.info("With logger: {}", logger) 64 # - mlflow 65 client = self.mlflow_service.client() 66 logger.info("With client: {}", client.tracking_uri) 67 with self.mlflow_service.run_context(run_config=self.run_config) as run: 68 logger.info("With run context: {}", run.info) 69 # data 70 # - inputs 71 logger.info("Read inputs: {}", self.inputs) 72 inputs_ = self.inputs.read() # unchecked! 73 inputs = schemas.InputsSchema.check(inputs_) 74 logger.debug("- Inputs shape: {}", inputs.shape) 75 # - targets 76 logger.info("Read targets: {}", self.targets) 77 targets_ = self.targets.read() # unchecked! 78 targets = schemas.TargetsSchema.check(targets_) 79 logger.debug("- Targets shape: {}", targets.shape) 80 # lineage 81 # - inputs 82 logger.info("Log lineage: inputs") 83 inputs_lineage = self.inputs.lineage(data=inputs, name="inputs") 84 mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name) 85 logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict()) 86 # - targets 87 logger.info("Log lineage: targets") 88 targets_lineage = self.targets.lineage( 89 data=targets, name="targets", targets=schemas.TargetsSchema.cnt 90 ) 91 mlflow.log_input(dataset=targets_lineage, context=self.run_config.name) 92 logger.debug("- Targets lineage: {}", targets_lineage.to_dict()) 93 # splitter 94 logger.info("With splitter: {}", self.splitter) 95 # - index 96 train_index, test_index = next(self.splitter.split(inputs=inputs, targets=targets)) 97 # - inputs 98 inputs_train = T.cast(schemas.Inputs, inputs.iloc[train_index]) 99 inputs_test = T.cast(schemas.Inputs, inputs.iloc[test_index]) 100 logger.debug("- Inputs train shape: {}", inputs_train.shape) 101 logger.debug("- Inputs test shape: {}", inputs_test.shape) 102 # - targets 103 targets_train = T.cast(schemas.Targets, targets.iloc[train_index]) 104 targets_test = T.cast(schemas.Targets, targets.iloc[test_index]) 105 logger.debug("- Targets train shape: {}", targets_train.shape) 106 logger.debug("- Targets test shape: {}", targets_test.shape) 107 # model 108 logger.info("Fit model: {}", self.model) 109 self.model.fit(inputs=inputs_train, targets=targets_train) 110 # outputs 111 logger.info("Predict outputs: {}", len(inputs_test)) 112 outputs_test = self.model.predict(inputs=inputs_test) 113 logger.debug("- Outputs test shape: {}", outputs_test.shape) 114 # metrics 115 for i, metric in enumerate(self.metrics, start=1): 116 logger.info("{}. Compute metric: {}", i, metric) 117 score = metric.score(targets=targets_test, outputs=outputs_test) 118 client.log_metric(run_id=run.info.run_id, key=metric.name, value=score) 119 logger.debug("- Metric score: {}", score) 120 # signer 121 logger.info("Sign model: {}", self.signer) 122 model_signature = self.signer.sign(inputs=inputs, outputs=outputs_test) 123 logger.debug("- Model signature: {}", model_signature.to_dict()) 124 # saver 125 logger.info("Save model: {}", self.saver) 126 model_info = self.saver.save( 127 model=self.model, signature=model_signature, input_example=inputs 128 ) 129 logger.debug("- Model URI: {}", model_info.model_uri) 130 # register 131 logger.info("Register model: {}", self.registry) 132 model_version = self.registry.register( 133 name=self.mlflow_service.registry_name, model_uri=model_info.model_uri 134 ) 135 logger.debug("- Model version: {}", model_version) 136 # notify 137 self.alerts_service.notify( 138 title="Training Job Finished", 139 message=f"Model version: {model_version.version}", 140 ) 141 return locals()
Train and register a single AI/ML model.
Arguments:
- run_config (services.MlflowService.RunConfig): mlflow run config.
- inputs (datasets.ReaderKind): reader for the inputs data.
- targets (datasets.ReaderKind): reader for the targets data.
- model (models.ModelKind): machine learning model to train.
- metrics (metrics_.MetricsKind): metric list to compute.
- splitter (splitters.SplitterKind): data sets splitter.
- saver (registries.SaverKind): model saver.
- signer (signers.SignerKind): model signer.
- registry (registries.RegisterKind): model register.
run_config: bikes.io.services.MlflowService.RunConfig
inputs: bikes.io.datasets.ParquetReader
targets: bikes.io.datasets.ParquetReader
metrics: list[typing.Annotated[bikes.core.metrics.SklearnMetric, FieldInfo(annotation=NoneType, required=True, discriminator='KIND')]]
signer: bikes.utils.signers.InferSigner
registry: bikes.io.registries.MlflowRegister
@T.override
def
run(self) -> Dict[str, Any]:
58 @T.override 59 def run(self) -> base.Locals: 60 # services 61 # - logger 62 logger = self.logger_service.logger() 63 logger.info("With logger: {}", logger) 64 # - mlflow 65 client = self.mlflow_service.client() 66 logger.info("With client: {}", client.tracking_uri) 67 with self.mlflow_service.run_context(run_config=self.run_config) as run: 68 logger.info("With run context: {}", run.info) 69 # data 70 # - inputs 71 logger.info("Read inputs: {}", self.inputs) 72 inputs_ = self.inputs.read() # unchecked! 73 inputs = schemas.InputsSchema.check(inputs_) 74 logger.debug("- Inputs shape: {}", inputs.shape) 75 # - targets 76 logger.info("Read targets: {}", self.targets) 77 targets_ = self.targets.read() # unchecked! 78 targets = schemas.TargetsSchema.check(targets_) 79 logger.debug("- Targets shape: {}", targets.shape) 80 # lineage 81 # - inputs 82 logger.info("Log lineage: inputs") 83 inputs_lineage = self.inputs.lineage(data=inputs, name="inputs") 84 mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name) 85 logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict()) 86 # - targets 87 logger.info("Log lineage: targets") 88 targets_lineage = self.targets.lineage( 89 data=targets, name="targets", targets=schemas.TargetsSchema.cnt 90 ) 91 mlflow.log_input(dataset=targets_lineage, context=self.run_config.name) 92 logger.debug("- Targets lineage: {}", targets_lineage.to_dict()) 93 # splitter 94 logger.info("With splitter: {}", self.splitter) 95 # - index 96 train_index, test_index = next(self.splitter.split(inputs=inputs, targets=targets)) 97 # - inputs 98 inputs_train = T.cast(schemas.Inputs, inputs.iloc[train_index]) 99 inputs_test = T.cast(schemas.Inputs, inputs.iloc[test_index]) 100 logger.debug("- Inputs train shape: {}", inputs_train.shape) 101 logger.debug("- Inputs test shape: {}", inputs_test.shape) 102 # - targets 103 targets_train = T.cast(schemas.Targets, targets.iloc[train_index]) 104 targets_test = T.cast(schemas.Targets, targets.iloc[test_index]) 105 logger.debug("- Targets train shape: {}", targets_train.shape) 106 logger.debug("- Targets test shape: {}", targets_test.shape) 107 # model 108 logger.info("Fit model: {}", self.model) 109 self.model.fit(inputs=inputs_train, targets=targets_train) 110 # outputs 111 logger.info("Predict outputs: {}", len(inputs_test)) 112 outputs_test = self.model.predict(inputs=inputs_test) 113 logger.debug("- Outputs test shape: {}", outputs_test.shape) 114 # metrics 115 for i, metric in enumerate(self.metrics, start=1): 116 logger.info("{}. Compute metric: {}", i, metric) 117 score = metric.score(targets=targets_test, outputs=outputs_test) 118 client.log_metric(run_id=run.info.run_id, key=metric.name, value=score) 119 logger.debug("- Metric score: {}", score) 120 # signer 121 logger.info("Sign model: {}", self.signer) 122 model_signature = self.signer.sign(inputs=inputs, outputs=outputs_test) 123 logger.debug("- Model signature: {}", model_signature.to_dict()) 124 # saver 125 logger.info("Save model: {}", self.saver) 126 model_info = self.saver.save( 127 model=self.model, signature=model_signature, input_example=inputs 128 ) 129 logger.debug("- Model URI: {}", model_info.model_uri) 130 # register 131 logger.info("Register model: {}", self.registry) 132 model_version = self.registry.register( 133 name=self.mlflow_service.registry_name, model_uri=model_info.model_uri 134 ) 135 logger.debug("- Model version: {}", model_version) 136 # notify 137 self.alerts_service.notify( 138 title="Training Job Finished", 139 message=f"Model version: {model_version.version}", 140 ) 141 return locals()
Run the job in context.
Returns:
Locals: local job variables.
class
PromotionJob(bikes.jobs.base.Job):
13class PromotionJob(base.Job): 14 """Define a job for promoting a registered model version with an alias. 15 16 https://mlflow.org/docs/latest/model-registry.html#concepts 17 18 Parameters: 19 alias (str): the mlflow alias to transition the registered model version. 20 version (int | None): the model version to transition (use None for latest). 21 """ 22 23 KIND: T.Literal["PromotionJob"] = "PromotionJob" 24 25 alias: str = "Champion" 26 version: int | None = None 27 28 @T.override 29 def run(self) -> base.Locals: 30 # services 31 # - logger 32 logger = self.logger_service.logger() 33 logger.info("With logger: {}", logger) 34 # - mlflow 35 client = self.mlflow_service.client() 36 logger.info("With client: {}", client) 37 name = self.mlflow_service.registry_name 38 # version 39 if self.version is None: # use the latest model version 40 version = client.search_model_versions( 41 f"name='{name}'", max_results=1, order_by=["version_number DESC"] 42 )[0].version 43 else: 44 version = self.version 45 logger.info("From version: {}", version) 46 # alias 47 logger.info("To alias: {}", self.alias) 48 # promote 49 logger.info("Promote model: {}", name) 50 client.set_registered_model_alias(name=name, alias=self.alias, version=version) 51 model_version = client.get_model_version_by_alias(name=name, alias=self.alias) 52 logger.debug("- Model version: {}", model_version) 53 # notify 54 self.alerts_service.notify( 55 title="Promotion Job Finished", 56 message=f"Version: {model_version.version} @ {self.alias}", 57 ) 58 return locals()
Define a job for promoting a registered model version with an alias.
https://mlflow.org/docs/latest/model-registry.html#concepts
Arguments:
- alias (str): the mlflow alias to transition the registered model version.
- version (int | None): the model version to transition (use None for latest).
@T.override
def
run(self) -> Dict[str, Any]:
28 @T.override 29 def run(self) -> base.Locals: 30 # services 31 # - logger 32 logger = self.logger_service.logger() 33 logger.info("With logger: {}", logger) 34 # - mlflow 35 client = self.mlflow_service.client() 36 logger.info("With client: {}", client) 37 name = self.mlflow_service.registry_name 38 # version 39 if self.version is None: # use the latest model version 40 version = client.search_model_versions( 41 f"name='{name}'", max_results=1, order_by=["version_number DESC"] 42 )[0].version 43 else: 44 version = self.version 45 logger.info("From version: {}", version) 46 # alias 47 logger.info("To alias: {}", self.alias) 48 # promote 49 logger.info("Promote model: {}", name) 50 client.set_registered_model_alias(name=name, alias=self.alias, version=version) 51 model_version = client.get_model_version_by_alias(name=name, alias=self.alias) 52 logger.debug("- Model version: {}", model_version) 53 # notify 54 self.alerts_service.notify( 55 title="Promotion Job Finished", 56 message=f"Version: {model_version.version} @ {self.alias}", 57 ) 58 return locals()
Run the job in context.
Returns:
Locals: local job variables.
class
InferenceJob(bikes.jobs.base.Job):
17class InferenceJob(base.Job): 18 """Generate batch predictions from a registered model. 19 20 Parameters: 21 inputs (datasets.ReaderKind): reader for the inputs data. 22 outputs (datasets.WriterKind): writer for the outputs data. 23 alias_or_version (str | int): alias or version for the model. 24 loader (registries.LoaderKind): registry loader for the model. 25 """ 26 27 KIND: T.Literal["InferenceJob"] = "InferenceJob" 28 29 # Inputs 30 inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND") 31 # Outputs 32 outputs: datasets.WriterKind = pdt.Field(..., discriminator="KIND") 33 # Model 34 alias_or_version: str | int = "Champion" 35 # Loader 36 loader: registries.LoaderKind = pdt.Field(registries.CustomLoader(), discriminator="KIND") 37 38 @T.override 39 def run(self) -> base.Locals: 40 # services 41 logger = self.logger_service.logger() 42 logger.info("With logger: {}", logger) 43 # inputs 44 logger.info("Read inputs: {}", self.inputs) 45 inputs_ = self.inputs.read() # unchecked! 46 inputs = schemas.InputsSchema.check(inputs_) 47 logger.debug("- Inputs shape: {}", inputs.shape) 48 # model 49 logger.info("With model: {}", self.mlflow_service.registry_name) 50 model_uri = registries.uri_for_model_alias_or_version( 51 name=self.mlflow_service.registry_name, 52 alias_or_version=self.alias_or_version, 53 ) 54 logger.debug("- Model URI: {}", model_uri) 55 # loader 56 logger.info("Load model: {}", self.loader) 57 model = self.loader.load(uri=model_uri) 58 logger.debug("- Model: {}", model) 59 # outputs 60 logger.info("Predict outputs: {}", len(inputs)) 61 outputs = model.predict(inputs=inputs) # checked 62 logger.debug("- Outputs shape: {}", outputs.shape) 63 # write 64 logger.info("Write outputs: {}", self.outputs) 65 self.outputs.write(data=outputs) 66 # notify 67 self.alerts_service.notify( 68 title="Inference Job Finished", message=f"Outputs Shape: {outputs.shape}" 69 ) 70 return locals()
Generate batch predictions from a registered model.
Arguments:
- inputs (datasets.ReaderKind): reader for the inputs data.
- outputs (datasets.WriterKind): writer for the outputs data.
- alias_or_version (str | int): alias or version for the model.
- loader (registries.LoaderKind): registry loader for the model.
inputs: bikes.io.datasets.ParquetReader
outputs: bikes.io.datasets.ParquetWriter
@T.override
def
run(self) -> Dict[str, Any]:
38 @T.override 39 def run(self) -> base.Locals: 40 # services 41 logger = self.logger_service.logger() 42 logger.info("With logger: {}", logger) 43 # inputs 44 logger.info("Read inputs: {}", self.inputs) 45 inputs_ = self.inputs.read() # unchecked! 46 inputs = schemas.InputsSchema.check(inputs_) 47 logger.debug("- Inputs shape: {}", inputs.shape) 48 # model 49 logger.info("With model: {}", self.mlflow_service.registry_name) 50 model_uri = registries.uri_for_model_alias_or_version( 51 name=self.mlflow_service.registry_name, 52 alias_or_version=self.alias_or_version, 53 ) 54 logger.debug("- Model URI: {}", model_uri) 55 # loader 56 logger.info("Load model: {}", self.loader) 57 model = self.loader.load(uri=model_uri) 58 logger.debug("- Model: {}", model) 59 # outputs 60 logger.info("Predict outputs: {}", len(inputs)) 61 outputs = model.predict(inputs=inputs) # checked 62 logger.debug("- Outputs shape: {}", outputs.shape) 63 # write 64 logger.info("Write outputs: {}", self.outputs) 65 self.outputs.write(data=outputs) 66 # notify 67 self.alerts_service.notify( 68 title="Inference Job Finished", message=f"Outputs Shape: {outputs.shape}" 69 ) 70 return locals()
Run the job in context.
Returns:
Locals: local job variables.
class
EvaluationsJob(bikes.jobs.base.Job):
20class EvaluationsJob(base.Job): 21 """Generate evaluations from a registered model and a dataset. 22 23 Parameters: 24 run_config (services.MlflowService.RunConfig): mlflow run config. 25 inputs (datasets.ReaderKind): reader for the inputs data. 26 targets (datasets.ReaderKind): reader for the targets data. 27 model_type (str): model type (e.g. "regressor", "classifier"). 28 alias_or_version (str | int): alias or version for the model. 29 metrics (metrics_.MetricsKind): metric list to compute. 30 evaluators (list[str]): list of evaluators to use. 31 thresholds (dict[str, metrics_.Threshold] | None): metric thresholds. 32 """ 33 34 KIND: T.Literal["EvaluationsJob"] = "EvaluationsJob" 35 36 # Run 37 run_config: services.MlflowService.RunConfig = services.MlflowService.RunConfig( 38 name="Evaluations" 39 ) 40 # Data 41 inputs: datasets.ReaderKind = pdt.Field(..., discriminator="KIND") 42 targets: datasets.ReaderKind = pdt.Field(..., discriminator="KIND") 43 # Model 44 model_type: str = "regressor" 45 alias_or_version: str | int = "Champion" 46 # Loader 47 loader: registries.LoaderKind = pdt.Field(registries.CustomLoader(), discriminator="KIND") 48 # Metrics 49 metrics: metrics_.MetricsKind = [metrics_.SklearnMetric()] 50 # Evaluators 51 evaluators: list[str] = ["default"] 52 # Thresholds 53 thresholds: dict[str, metrics_.Threshold] = { 54 "r2_score": metrics_.Threshold(threshold=0.5, greater_is_better=True) 55 } 56 57 @T.override 58 def run(self) -> base.Locals: 59 # services 60 # - logger 61 logger = self.logger_service.logger() 62 logger.info("With logger: {}", logger) 63 # - mlflow 64 client = self.mlflow_service.client() 65 logger.info("With client: {}", client.tracking_uri) 66 with self.mlflow_service.run_context(run_config=self.run_config) as run: 67 logger.info("With run context: {}", run.info) 68 # data 69 # - inputs 70 logger.info("Read inputs: {}", self.inputs) 71 inputs_ = self.inputs.read() # unchecked! 72 inputs = schemas.InputsSchema.check(inputs_) 73 logger.debug("- Inputs shape: {}", inputs.shape) 74 # - targets 75 logger.info("Read targets: {}", self.targets) 76 targets_ = self.targets.read() # unchecked! 77 targets = schemas.TargetsSchema.check(targets_) 78 logger.debug("- Targets shape: {}", targets.shape) 79 # lineage 80 # - inputs 81 logger.info("Log lineage: inputs") 82 inputs_lineage = self.inputs.lineage(data=inputs, name="inputs") 83 mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name) 84 logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict()) 85 # - targets 86 logger.info("Log lineage: targets") 87 targets_lineage = self.targets.lineage( 88 data=targets, name="targets", targets=schemas.TargetsSchema.cnt 89 ) 90 mlflow.log_input(dataset=targets_lineage, context=self.run_config.name) 91 logger.debug("- Targets lineage: {}", targets_lineage.to_dict()) 92 # model 93 logger.info("With model: {}", self.mlflow_service.registry_name) 94 model_uri = registries.uri_for_model_alias_or_version( 95 name=self.mlflow_service.registry_name, 96 alias_or_version=self.alias_or_version, 97 ) 98 logger.debug("- Model URI: {}", model_uri) 99 # loader 100 logger.info("Load model: {}", self.loader) 101 model = self.loader.load(uri=model_uri) 102 logger.debug("- Model: {}", model) 103 # outputs 104 logger.info("Predict outputs: {}", len(inputs)) 105 outputs = model.predict(inputs=inputs) # checked 106 logger.debug("- Outputs shape: {}", outputs.shape) 107 # dataset 108 logger.info("Create dataset: inputs & targets & outputs") 109 dataset_ = pd.concat([inputs, targets, outputs], axis="columns") 110 dataset = mlflow.data.from_pandas( # type: ignore[attr-defined] 111 df=dataset_, 112 name="evaluation", 113 targets=schemas.TargetsSchema.cnt, 114 predictions=schemas.OutputsSchema.prediction, 115 ) 116 logger.debug("- Dataset: {}", dataset.to_dict()) 117 # metrics 118 logger.debug("Convert metrics: {}", self.metrics) 119 extra_metrics = [metric.to_mlflow() for metric in self.metrics] 120 logger.debug("- Extra metrics: {}", extra_metrics) 121 # thresholds 122 logger.info("Convert thresholds: {}", self.thresholds) 123 validation_thresholds = { 124 name: threshold.to_mlflow() for name, threshold in self.thresholds.items() 125 } 126 logger.debug("- Validation thresholds: {}", validation_thresholds) 127 # evaluations 128 logger.info("Compute evaluations: {}", self.model_type) 129 evaluations = mlflow.evaluate( 130 data=dataset, 131 model_type=self.model_type, 132 evaluators=self.evaluators, 133 extra_metrics=extra_metrics, 134 validation_thresholds=validation_thresholds, 135 ) 136 logger.debug("- Evaluations metrics: {}", evaluations.metrics) 137 # notify 138 self.alerts_service.notify( 139 title="Evaluations Job Finished", 140 message=f"Evaluation metrics: {evaluations.metrics}", 141 ) 142 return locals()
Generate evaluations from a registered model and a dataset.
Arguments:
- run_config (services.MlflowService.RunConfig): mlflow run config.
- inputs (datasets.ReaderKind): reader for the inputs data.
- targets (datasets.ReaderKind): reader for the targets data.
- model_type (str): model type (e.g. "regressor", "classifier").
- alias_or_version (str | int): alias or version for the model.
- metrics (metrics_.MetricsKind): metric list to compute.
- evaluators (list[str]): list of evaluators to use.
- thresholds (dict[str, metrics_.Threshold] | None): metric thresholds.
run_config: bikes.io.services.MlflowService.RunConfig
inputs: bikes.io.datasets.ParquetReader
targets: bikes.io.datasets.ParquetReader
metrics: list[typing.Annotated[bikes.core.metrics.SklearnMetric, FieldInfo(annotation=NoneType, required=True, discriminator='KIND')]]
thresholds: dict[str, bikes.core.metrics.Threshold]
@T.override
def
run(self) -> Dict[str, Any]:
57 @T.override 58 def run(self) -> base.Locals: 59 # services 60 # - logger 61 logger = self.logger_service.logger() 62 logger.info("With logger: {}", logger) 63 # - mlflow 64 client = self.mlflow_service.client() 65 logger.info("With client: {}", client.tracking_uri) 66 with self.mlflow_service.run_context(run_config=self.run_config) as run: 67 logger.info("With run context: {}", run.info) 68 # data 69 # - inputs 70 logger.info("Read inputs: {}", self.inputs) 71 inputs_ = self.inputs.read() # unchecked! 72 inputs = schemas.InputsSchema.check(inputs_) 73 logger.debug("- Inputs shape: {}", inputs.shape) 74 # - targets 75 logger.info("Read targets: {}", self.targets) 76 targets_ = self.targets.read() # unchecked! 77 targets = schemas.TargetsSchema.check(targets_) 78 logger.debug("- Targets shape: {}", targets.shape) 79 # lineage 80 # - inputs 81 logger.info("Log lineage: inputs") 82 inputs_lineage = self.inputs.lineage(data=inputs, name="inputs") 83 mlflow.log_input(dataset=inputs_lineage, context=self.run_config.name) 84 logger.debug("- Inputs lineage: {}", inputs_lineage.to_dict()) 85 # - targets 86 logger.info("Log lineage: targets") 87 targets_lineage = self.targets.lineage( 88 data=targets, name="targets", targets=schemas.TargetsSchema.cnt 89 ) 90 mlflow.log_input(dataset=targets_lineage, context=self.run_config.name) 91 logger.debug("- Targets lineage: {}", targets_lineage.to_dict()) 92 # model 93 logger.info("With model: {}", self.mlflow_service.registry_name) 94 model_uri = registries.uri_for_model_alias_or_version( 95 name=self.mlflow_service.registry_name, 96 alias_or_version=self.alias_or_version, 97 ) 98 logger.debug("- Model URI: {}", model_uri) 99 # loader 100 logger.info("Load model: {}", self.loader) 101 model = self.loader.load(uri=model_uri) 102 logger.debug("- Model: {}", model) 103 # outputs 104 logger.info("Predict outputs: {}", len(inputs)) 105 outputs = model.predict(inputs=inputs) # checked 106 logger.debug("- Outputs shape: {}", outputs.shape) 107 # dataset 108 logger.info("Create dataset: inputs & targets & outputs") 109 dataset_ = pd.concat([inputs, targets, outputs], axis="columns") 110 dataset = mlflow.data.from_pandas( # type: ignore[attr-defined] 111 df=dataset_, 112 name="evaluation", 113 targets=schemas.TargetsSchema.cnt, 114 predictions=schemas.OutputsSchema.prediction, 115 ) 116 logger.debug("- Dataset: {}", dataset.to_dict()) 117 # metrics 118 logger.debug("Convert metrics: {}", self.metrics) 119 extra_metrics = [metric.to_mlflow() for metric in self.metrics] 120 logger.debug("- Extra metrics: {}", extra_metrics) 121 # thresholds 122 logger.info("Convert thresholds: {}", self.thresholds) 123 validation_thresholds = { 124 name: threshold.to_mlflow() for name, threshold in self.thresholds.items() 125 } 126 logger.debug("- Validation thresholds: {}", validation_thresholds) 127 # evaluations 128 logger.info("Compute evaluations: {}", self.model_type) 129 evaluations = mlflow.evaluate( 130 data=dataset, 131 model_type=self.model_type, 132 evaluators=self.evaluators, 133 extra_metrics=extra_metrics, 134 validation_thresholds=validation_thresholds, 135 ) 136 logger.debug("- Evaluations metrics: {}", evaluations.metrics) 137 # notify 138 self.alerts_service.notify( 139 title="Evaluations Job Finished", 140 message=f"Evaluation metrics: {evaluations.metrics}", 141 ) 142 return locals()
Run the job in context.
Returns:
Locals: local job variables.
class
ExplanationsJob(bikes.jobs.base.Job):
17class ExplanationsJob(base.Job): 18 """Generate explanations from the model and a data sample. 19 20 Parameters: 21 inputs_samples (datasets.ReaderKind): reader for the samples data. 22 models_explanations (datasets.WriterKind): writer for models explanation. 23 samples_explanations (datasets.WriterKind): writer for samples explanation. 24 alias_or_version (str | int): alias or version for the model. 25 loader (registries.LoaderKind): registry loader for the model. 26 """ 27 28 KIND: T.Literal["ExplanationsJob"] = "ExplanationsJob" 29 30 # Samples 31 inputs_samples: datasets.ReaderKind = pdt.Field(..., discriminator="KIND") 32 # Explanations 33 models_explanations: datasets.WriterKind = pdt.Field(..., discriminator="KIND") 34 samples_explanations: datasets.WriterKind = pdt.Field(..., discriminator="KIND") 35 # Model 36 alias_or_version: str | int = "Champion" 37 # Loader 38 loader: registries.LoaderKind = pdt.Field(registries.CustomLoader(), discriminator="KIND") 39 40 @T.override 41 def run(self) -> base.Locals: 42 # services 43 logger = self.logger_service.logger() 44 logger.info("With logger: {}", logger) 45 # inputs 46 logger.info("Read samples: {}", self.inputs_samples) 47 inputs_samples = self.inputs_samples.read() # unchecked! 48 inputs_samples = schemas.InputsSchema.check(inputs_samples) 49 logger.debug("- Inputs samples shape: {}", inputs_samples.shape) 50 # model 51 logger.info("With model: {}", self.mlflow_service.registry_name) 52 model_uri = registries.uri_for_model_alias_or_version( 53 name=self.mlflow_service.registry_name, 54 alias_or_version=self.alias_or_version, 55 ) 56 logger.debug("- Model URI: {}", model_uri) 57 # loader 58 logger.info("Load model: {}", self.loader) 59 model = self.loader.load(uri=model_uri).model.unwrap_python_model().model 60 logger.debug("- Model: {}", model) 61 # explanations 62 # - models 63 logger.info("Explain model: {}", model) 64 models_explanations = model.explain_model() 65 logger.debug("- Models explanations shape: {}", models_explanations.shape) 66 # # - samples 67 logger.info("Explain samples: {}", len(inputs_samples)) 68 samples_explanations = model.explain_samples(inputs=inputs_samples) 69 logger.debug("- Samples explanations shape: {}", samples_explanations.shape) 70 # write 71 # - model 72 logger.info("Write models explanations: {}", self.models_explanations) 73 self.models_explanations.write(data=models_explanations) 74 # - samples 75 logger.info("Write samples explanations: {}", self.samples_explanations) 76 self.samples_explanations.write(data=samples_explanations) 77 # notify 78 self.alerts_service.notify( 79 title="Explanations Job Finished", 80 message=f"Features Count: {len(models_explanations)}", 81 ) 82 return locals()
Generate explanations from the model and a data sample.
Arguments:
- inputs_samples (datasets.ReaderKind): reader for the samples data.
- models_explanations (datasets.WriterKind): writer for models explanation.
- samples_explanations (datasets.WriterKind): writer for samples explanation.
- alias_or_version (str | int): alias or version for the model.
- loader (registries.LoaderKind): registry loader for the model.
inputs_samples: bikes.io.datasets.ParquetReader
models_explanations: bikes.io.datasets.ParquetWriter
samples_explanations: bikes.io.datasets.ParquetWriter
@T.override
def
run(self) -> Dict[str, Any]:
40 @T.override 41 def run(self) -> base.Locals: 42 # services 43 logger = self.logger_service.logger() 44 logger.info("With logger: {}", logger) 45 # inputs 46 logger.info("Read samples: {}", self.inputs_samples) 47 inputs_samples = self.inputs_samples.read() # unchecked! 48 inputs_samples = schemas.InputsSchema.check(inputs_samples) 49 logger.debug("- Inputs samples shape: {}", inputs_samples.shape) 50 # model 51 logger.info("With model: {}", self.mlflow_service.registry_name) 52 model_uri = registries.uri_for_model_alias_or_version( 53 name=self.mlflow_service.registry_name, 54 alias_or_version=self.alias_or_version, 55 ) 56 logger.debug("- Model URI: {}", model_uri) 57 # loader 58 logger.info("Load model: {}", self.loader) 59 model = self.loader.load(uri=model_uri).model.unwrap_python_model().model 60 logger.debug("- Model: {}", model) 61 # explanations 62 # - models 63 logger.info("Explain model: {}", model) 64 models_explanations = model.explain_model() 65 logger.debug("- Models explanations shape: {}", models_explanations.shape) 66 # # - samples 67 logger.info("Explain samples: {}", len(inputs_samples)) 68 samples_explanations = model.explain_samples(inputs=inputs_samples) 69 logger.debug("- Samples explanations shape: {}", samples_explanations.shape) 70 # write 71 # - model 72 logger.info("Write models explanations: {}", self.models_explanations) 73 self.models_explanations.write(data=models_explanations) 74 # - samples 75 logger.info("Write samples explanations: {}", self.samples_explanations) 76 self.samples_explanations.write(data=samples_explanations) 77 # notify 78 self.alerts_service.notify( 79 title="Explanations Job Finished", 80 message=f"Features Count: {len(models_explanations)}", 81 ) 82 return locals()
Run the job in context.
Returns:
Locals: local job variables.
JobKind =
TuningJob | TrainingJob | PromotionJob | InferenceJob | EvaluationsJob | ExplanationsJob