bikes.io.services
Manage global context during execution.
1"""Manage global context during execution.""" 2 3# %% IMPORTS 4 5from __future__ import annotations 6 7import abc 8import contextlib as ctx 9import sys 10import typing as T 11 12import loguru 13import mlflow 14import mlflow.tracking as mt 15import pydantic as pdt 16from plyer import notification 17 18# %% SERVICES 19 20 21class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 22 """Base class for a global service. 23 24 Use services to manage global contexts. 25 e.g., logger object, mlflow client, spark context, ... 26 """ 27 28 @abc.abstractmethod 29 def start(self) -> None: 30 """Start the service.""" 31 32 def stop(self) -> None: 33 """Stop the service.""" 34 # does nothing by default 35 36 37class LoggerService(Service): 38 """Service for logging messages. 39 40 https://loguru.readthedocs.io/en/stable/api/logger.html 41 42 Parameters: 43 sink (str): logging output. 44 level (str): logging level. 45 format (str): logging format. 46 colorize (bool): colorize output. 47 serialize (bool): convert to JSON. 48 backtrace (bool): enable exception trace. 49 diagnose (bool): enable variable display. 50 catch (bool): catch errors during log handling. 51 """ 52 53 sink: str = "stderr" 54 level: str = "DEBUG" 55 format: str = ( 56 "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>" 57 "<level>[{level}]</level>" 58 "<cyan>[{name}:{function}:{line}]</cyan>" 59 " <level>{message}</level>" 60 ) 61 colorize: bool = True 62 serialize: bool = False 63 backtrace: bool = True 64 diagnose: bool = False 65 catch: bool = True 66 67 @T.override 68 def start(self) -> None: 69 loguru.logger.remove() 70 config = self.model_dump() 71 # use standard sinks or keep the original 72 sinks = {"stderr": sys.stderr, "stdout": sys.stdout} 73 config["sink"] = sinks.get(config["sink"], config["sink"]) 74 loguru.logger.add(**config) 75 76 def logger(self) -> loguru.Logger: 77 """Return the main logger. 78 79 Returns: 80 loguru.Logger: the main logger. 81 """ 82 return loguru.logger 83 84 85class AlertsService(Service): 86 """Service for sending notifications. 87 88 Require libnotify-bin on Linux systems. 89 90 In production, use with Slack, Discord, or emails. 91 92 https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification 93 94 Parameters: 95 enable (bool): use notifications or print. 96 app_name (str): name of the application. 97 timeout (int | None): timeout in secs. 98 """ 99 100 enable: bool = True 101 app_name: str = "Bikes" 102 timeout: int | None = None 103 104 @T.override 105 def start(self) -> None: 106 pass 107 108 def notify(self, title: str, message: str) -> None: 109 """Send a notification to the system. 110 111 Args: 112 title (str): title of the notification. 113 message (str): message of the notification. 114 """ 115 if self.enable: 116 notification.notify( 117 title=title, message=message, app_name=self.app_name, timeout=self.timeout 118 ) 119 else: 120 print(f"[{self.app_name}] {title}: {message}") 121 122 123class MlflowService(Service): 124 """Service for Mlflow tracking and registry. 125 126 Parameters: 127 tracking_uri (str): the URI for the Mlflow tracking server. 128 registry_uri (str): the URI for the Mlflow model registry. 129 experiment_name (str): the name of tracking experiment. 130 registry_name (str): the name of model registry. 131 autolog_disable (bool): disable autologging. 132 autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions. 133 autolog_exclusive (bool): If True, enables exclusive autologging. 134 autolog_log_input_examples (bool): If True, logs input examples during autologging. 135 autolog_log_model_signatures (bool): If True, logs model signatures during autologging. 136 autolog_log_models (bool): If True, enables logging of models during autologging. 137 autolog_log_datasets (bool): If True, logs datasets used during autologging. 138 autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging. 139 """ 140 141 class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 142 """Run configuration for Mlflow tracking. 143 144 Parameters: 145 name (str): name of the run. 146 description (str | None): description of the run. 147 tags (dict[str, T.Any] | None): tags for the run. 148 log_system_metrics (bool | None): enable system metrics logging. 149 """ 150 151 name: str 152 description: str | None = None 153 tags: dict[str, T.Any] | None = None 154 log_system_metrics: bool | None = True 155 156 # server uri 157 tracking_uri: str = "./mlruns" 158 registry_uri: str = "./mlruns" 159 # experiment 160 experiment_name: str = "bikes" 161 # registry 162 registry_name: str = "bikes" 163 # autolog 164 autolog_disable: bool = False 165 autolog_disable_for_unsupported_versions: bool = False 166 autolog_exclusive: bool = False 167 autolog_log_input_examples: bool = True 168 autolog_log_model_signatures: bool = True 169 autolog_log_models: bool = False 170 autolog_log_datasets: bool = False 171 autolog_silent: bool = False 172 173 @T.override 174 def start(self) -> None: 175 # server uri 176 mlflow.set_tracking_uri(uri=self.tracking_uri) 177 mlflow.set_registry_uri(uri=self.registry_uri) 178 # experiment 179 mlflow.set_experiment(experiment_name=self.experiment_name) 180 # autolog 181 mlflow.autolog( 182 disable=self.autolog_disable, 183 disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions, 184 exclusive=self.autolog_exclusive, 185 log_input_examples=self.autolog_log_input_examples, 186 log_model_signatures=self.autolog_log_model_signatures, 187 log_datasets=self.autolog_log_datasets, 188 silent=self.autolog_silent, 189 ) 190 191 @ctx.contextmanager 192 def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]: 193 """Yield an active Mlflow run and exit it afterwards. 194 195 Args: 196 run (str): run parameters. 197 198 Yields: 199 T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed as the end of context. 200 """ 201 with mlflow.start_run( 202 run_name=run_config.name, 203 tags=run_config.tags, 204 description=run_config.description, 205 log_system_metrics=run_config.log_system_metrics, 206 ) as run: 207 yield run 208 209 def client(self) -> mt.MlflowClient: 210 """Return a new Mlflow client. 211 212 Returns: 213 MlflowClient: the mlflow client. 214 """ 215 return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
22class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 23 """Base class for a global service. 24 25 Use services to manage global contexts. 26 e.g., logger object, mlflow client, spark context, ... 27 """ 28 29 @abc.abstractmethod 30 def start(self) -> None: 31 """Start the service.""" 32 33 def stop(self) -> None: 34 """Stop the service.""" 35 # does nothing by default
Base class for a global service.
Use services to manage global contexts. e.g., logger object, mlflow client, spark context, ...
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
38class LoggerService(Service): 39 """Service for logging messages. 40 41 https://loguru.readthedocs.io/en/stable/api/logger.html 42 43 Parameters: 44 sink (str): logging output. 45 level (str): logging level. 46 format (str): logging format. 47 colorize (bool): colorize output. 48 serialize (bool): convert to JSON. 49 backtrace (bool): enable exception trace. 50 diagnose (bool): enable variable display. 51 catch (bool): catch errors during log handling. 52 """ 53 54 sink: str = "stderr" 55 level: str = "DEBUG" 56 format: str = ( 57 "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>" 58 "<level>[{level}]</level>" 59 "<cyan>[{name}:{function}:{line}]</cyan>" 60 " <level>{message}</level>" 61 ) 62 colorize: bool = True 63 serialize: bool = False 64 backtrace: bool = True 65 diagnose: bool = False 66 catch: bool = True 67 68 @T.override 69 def start(self) -> None: 70 loguru.logger.remove() 71 config = self.model_dump() 72 # use standard sinks or keep the original 73 sinks = {"stderr": sys.stderr, "stdout": sys.stdout} 74 config["sink"] = sinks.get(config["sink"], config["sink"]) 75 loguru.logger.add(**config) 76 77 def logger(self) -> loguru.Logger: 78 """Return the main logger. 79 80 Returns: 81 loguru.Logger: the main logger. 82 """ 83 return loguru.logger
Service for logging messages.
https://loguru.readthedocs.io/en/stable/api/logger.html
Arguments:
- sink (str): logging output.
- level (str): logging level.
- format (str): logging format.
- colorize (bool): colorize output.
- serialize (bool): convert to JSON.
- backtrace (bool): enable exception trace.
- diagnose (bool): enable variable display.
- catch (bool): catch errors during log handling.
68 @T.override 69 def start(self) -> None: 70 loguru.logger.remove() 71 config = self.model_dump() 72 # use standard sinks or keep the original 73 sinks = {"stderr": sys.stderr, "stdout": sys.stdout} 74 config["sink"] = sinks.get(config["sink"], config["sink"]) 75 loguru.logger.add(**config)
Start the service.
77 def logger(self) -> loguru.Logger: 78 """Return the main logger. 79 80 Returns: 81 loguru.Logger: the main logger. 82 """ 83 return loguru.logger
Return the main logger.
Returns:
loguru.Logger: the main logger.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
86class AlertsService(Service): 87 """Service for sending notifications. 88 89 Require libnotify-bin on Linux systems. 90 91 In production, use with Slack, Discord, or emails. 92 93 https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification 94 95 Parameters: 96 enable (bool): use notifications or print. 97 app_name (str): name of the application. 98 timeout (int | None): timeout in secs. 99 """ 100 101 enable: bool = True 102 app_name: str = "Bikes" 103 timeout: int | None = None 104 105 @T.override 106 def start(self) -> None: 107 pass 108 109 def notify(self, title: str, message: str) -> None: 110 """Send a notification to the system. 111 112 Args: 113 title (str): title of the notification. 114 message (str): message of the notification. 115 """ 116 if self.enable: 117 notification.notify( 118 title=title, message=message, app_name=self.app_name, timeout=self.timeout 119 ) 120 else: 121 print(f"[{self.app_name}] {title}: {message}")
Service for sending notifications.
Require libnotify-bin on Linux systems.
In production, use with Slack, Discord, or emails.
https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification
Arguments:
- enable (bool): use notifications or print.
- app_name (str): name of the application.
- timeout (int | None): timeout in secs.
109 def notify(self, title: str, message: str) -> None: 110 """Send a notification to the system. 111 112 Args: 113 title (str): title of the notification. 114 message (str): message of the notification. 115 """ 116 if self.enable: 117 notification.notify( 118 title=title, message=message, app_name=self.app_name, timeout=self.timeout 119 ) 120 else: 121 print(f"[{self.app_name}] {title}: {message}")
Send a notification to the system.
Arguments:
- title (str): title of the notification.
- message (str): message of the notification.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
124class MlflowService(Service): 125 """Service for Mlflow tracking and registry. 126 127 Parameters: 128 tracking_uri (str): the URI for the Mlflow tracking server. 129 registry_uri (str): the URI for the Mlflow model registry. 130 experiment_name (str): the name of tracking experiment. 131 registry_name (str): the name of model registry. 132 autolog_disable (bool): disable autologging. 133 autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions. 134 autolog_exclusive (bool): If True, enables exclusive autologging. 135 autolog_log_input_examples (bool): If True, logs input examples during autologging. 136 autolog_log_model_signatures (bool): If True, logs model signatures during autologging. 137 autolog_log_models (bool): If True, enables logging of models during autologging. 138 autolog_log_datasets (bool): If True, logs datasets used during autologging. 139 autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging. 140 """ 141 142 class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 143 """Run configuration for Mlflow tracking. 144 145 Parameters: 146 name (str): name of the run. 147 description (str | None): description of the run. 148 tags (dict[str, T.Any] | None): tags for the run. 149 log_system_metrics (bool | None): enable system metrics logging. 150 """ 151 152 name: str 153 description: str | None = None 154 tags: dict[str, T.Any] | None = None 155 log_system_metrics: bool | None = True 156 157 # server uri 158 tracking_uri: str = "./mlruns" 159 registry_uri: str = "./mlruns" 160 # experiment 161 experiment_name: str = "bikes" 162 # registry 163 registry_name: str = "bikes" 164 # autolog 165 autolog_disable: bool = False 166 autolog_disable_for_unsupported_versions: bool = False 167 autolog_exclusive: bool = False 168 autolog_log_input_examples: bool = True 169 autolog_log_model_signatures: bool = True 170 autolog_log_models: bool = False 171 autolog_log_datasets: bool = False 172 autolog_silent: bool = False 173 174 @T.override 175 def start(self) -> None: 176 # server uri 177 mlflow.set_tracking_uri(uri=self.tracking_uri) 178 mlflow.set_registry_uri(uri=self.registry_uri) 179 # experiment 180 mlflow.set_experiment(experiment_name=self.experiment_name) 181 # autolog 182 mlflow.autolog( 183 disable=self.autolog_disable, 184 disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions, 185 exclusive=self.autolog_exclusive, 186 log_input_examples=self.autolog_log_input_examples, 187 log_model_signatures=self.autolog_log_model_signatures, 188 log_datasets=self.autolog_log_datasets, 189 silent=self.autolog_silent, 190 ) 191 192 @ctx.contextmanager 193 def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]: 194 """Yield an active Mlflow run and exit it afterwards. 195 196 Args: 197 run (str): run parameters. 198 199 Yields: 200 T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed as the end of context. 201 """ 202 with mlflow.start_run( 203 run_name=run_config.name, 204 tags=run_config.tags, 205 description=run_config.description, 206 log_system_metrics=run_config.log_system_metrics, 207 ) as run: 208 yield run 209 210 def client(self) -> mt.MlflowClient: 211 """Return a new Mlflow client. 212 213 Returns: 214 MlflowClient: the mlflow client. 215 """ 216 return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
Service for Mlflow tracking and registry.
Arguments:
- tracking_uri (str): the URI for the Mlflow tracking server.
- registry_uri (str): the URI for the Mlflow model registry.
- experiment_name (str): the name of tracking experiment.
- registry_name (str): the name of model registry.
- autolog_disable (bool): disable autologging.
- autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions.
- autolog_exclusive (bool): If True, enables exclusive autologging.
- autolog_log_input_examples (bool): If True, logs input examples during autologging.
- autolog_log_model_signatures (bool): If True, logs model signatures during autologging.
- autolog_log_models (bool): If True, enables logging of models during autologging.
- autolog_log_datasets (bool): If True, logs datasets used during autologging.
- autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging.
174 @T.override 175 def start(self) -> None: 176 # server uri 177 mlflow.set_tracking_uri(uri=self.tracking_uri) 178 mlflow.set_registry_uri(uri=self.registry_uri) 179 # experiment 180 mlflow.set_experiment(experiment_name=self.experiment_name) 181 # autolog 182 mlflow.autolog( 183 disable=self.autolog_disable, 184 disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions, 185 exclusive=self.autolog_exclusive, 186 log_input_examples=self.autolog_log_input_examples, 187 log_model_signatures=self.autolog_log_model_signatures, 188 log_datasets=self.autolog_log_datasets, 189 silent=self.autolog_silent, 190 )
Start the service.
192 @ctx.contextmanager 193 def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]: 194 """Yield an active Mlflow run and exit it afterwards. 195 196 Args: 197 run (str): run parameters. 198 199 Yields: 200 T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed as the end of context. 201 """ 202 with mlflow.start_run( 203 run_name=run_config.name, 204 tags=run_config.tags, 205 description=run_config.description, 206 log_system_metrics=run_config.log_system_metrics, 207 ) as run: 208 yield run
Yield an active Mlflow run and exit it afterwards.
Arguments:
- run (str): run parameters.
Yields:
T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed as the end of context.
210 def client(self) -> mt.MlflowClient: 211 """Return a new Mlflow client. 212 213 Returns: 214 MlflowClient: the mlflow client. 215 """ 216 return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
Return a new Mlflow client.
Returns:
MlflowClient: the mlflow client.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
142 class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 143 """Run configuration for Mlflow tracking. 144 145 Parameters: 146 name (str): name of the run. 147 description (str | None): description of the run. 148 tags (dict[str, T.Any] | None): tags for the run. 149 log_system_metrics (bool | None): enable system metrics logging. 150 """ 151 152 name: str 153 description: str | None = None 154 tags: dict[str, T.Any] | None = None 155 log_system_metrics: bool | None = True
Run configuration for Mlflow tracking.
Arguments:
- name (str): name of the run.
- description (str | None): description of the run.
- tags (dict[str, T.Any] | None): tags for the run.
- log_system_metrics (bool | None): enable system metrics logging.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs