bikes.io.services
Manage global context during execution.
1"""Manage global context during execution.""" 2 3# %% IMPORTS 4 5from __future__ import annotations 6 7import abc 8import contextlib as ctx 9import sys 10import typing as T 11 12import loguru 13import mlflow 14import mlflow.tracking as mt 15import pydantic as pdt 16from plyer import notification 17 18# %% SERVICES 19 20 21class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 22 """Base class for a global service. 23 24 Use services to manage global contexts. 25 e.g., logger object, mlflow client, spark context, ... 26 """ 27 28 @abc.abstractmethod 29 def start(self) -> None: 30 """Start the service.""" 31 32 def stop(self) -> None: 33 """Stop the service.""" 34 # does nothing by default 35 36 37class LoggerService(Service): 38 """Service for logging messages. 39 40 https://loguru.readthedocs.io/en/stable/api/logger.html 41 42 Parameters: 43 sink (str): logging output. 44 level (str): logging level. 45 format (str): logging format. 46 colorize (bool): colorize output. 47 serialize (bool): convert to JSON. 48 backtrace (bool): enable exception trace. 49 diagnose (bool): enable variable display. 50 catch (bool): catch errors during log handling. 51 """ 52 53 sink: str = "stderr" 54 level: str = "DEBUG" 55 format: str = ( 56 "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>" 57 "<level>[{level}]</level>" 58 "<cyan>[{name}:{function}:{line}]</cyan>" 59 " <level>{message}</level>" 60 ) 61 colorize: bool = True 62 serialize: bool = False 63 backtrace: bool = True 64 diagnose: bool = False 65 catch: bool = True 66 67 @T.override 68 def start(self) -> None: 69 loguru.logger.remove() 70 config = self.model_dump() 71 # use standard sinks or keep the original 72 sinks = {"stderr": sys.stderr, "stdout": sys.stdout} 73 config["sink"] = sinks.get(config["sink"], config["sink"]) 74 loguru.logger.add(**config) 75 76 def logger(self) -> loguru.Logger: 77 """Return the main logger. 78 79 Returns: 80 loguru.Logger: the main logger. 81 """ 82 return loguru.logger 83 84 85class AlertsService(Service): 86 """Service for sending notifications. 87 88 Require libnotify-bin on Linux systems. 89 90 In production, use with Slack, Discord, or emails. 91 92 https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification 93 94 Parameters: 95 enable (bool): use notifications or print. 96 app_name (str): name of the application. 97 timeout (int | None): timeout in secs. 98 """ 99 100 enable: bool = True 101 app_name: str = "Bikes" 102 timeout: int | None = None 103 104 @T.override 105 def start(self) -> None: 106 pass 107 108 def notify(self, title: str, message: str) -> None: 109 """Send a notification to the system. 110 111 Args: 112 title (str): title of the notification. 113 message (str): message of the notification. 114 """ 115 if self.enable: 116 try: 117 notification.notify( 118 title=title, 119 message=message, 120 app_name=self.app_name, 121 timeout=self.timeout, 122 ) 123 except NotImplementedError: 124 print("Notifications are not supported on this system.") 125 self._print(title=title, message=message) 126 else: 127 self._print(title=title, message=message) 128 129 def _print(self, title: str, message: str) -> None: 130 """Print a notification to the system. 131 132 Args: 133 title (str): title of the notification. 134 message (str): message of the notification. 135 """ 136 print(f"[{self.app_name}] {title}: {message}") 137 138 139class MlflowService(Service): 140 """Service for Mlflow tracking and registry. 141 142 Parameters: 143 tracking_uri (str): the URI for the Mlflow tracking server. 144 registry_uri (str): the URI for the Mlflow model registry. 145 experiment_name (str): the name of tracking experiment. 146 registry_name (str): the name of model registry. 147 autolog_disable (bool): disable autologging. 148 autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions. 149 autolog_exclusive (bool): If True, enables exclusive autologging. 150 autolog_log_input_examples (bool): If True, logs input examples during autologging. 151 autolog_log_model_signatures (bool): If True, logs model signatures during autologging. 152 autolog_log_models (bool): If True, enables logging of models during autologging. 153 autolog_log_datasets (bool): If True, logs datasets used during autologging. 154 autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging. 155 """ 156 157 class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 158 """Run configuration for Mlflow tracking. 159 160 Parameters: 161 name (str): name of the run. 162 description (str | None): description of the run. 163 tags (dict[str, T.Any] | None): tags for the run. 164 log_system_metrics (bool | None): enable system metrics logging. 165 """ 166 167 name: str 168 description: str | None = None 169 tags: dict[str, T.Any] | None = None 170 log_system_metrics: bool | None = True 171 172 # server uri 173 tracking_uri: str = "./mlruns" 174 registry_uri: str = "./mlruns" 175 # experiment 176 experiment_name: str = "bikes" 177 # registry 178 registry_name: str = "bikes" 179 # autolog 180 autolog_disable: bool = False 181 autolog_disable_for_unsupported_versions: bool = False 182 autolog_exclusive: bool = False 183 autolog_log_input_examples: bool = True 184 autolog_log_model_signatures: bool = True 185 autolog_log_models: bool = False 186 autolog_log_datasets: bool = False 187 autolog_silent: bool = False 188 189 @T.override 190 def start(self) -> None: 191 # server uri 192 mlflow.set_tracking_uri(uri=self.tracking_uri) 193 mlflow.set_registry_uri(uri=self.registry_uri) 194 # experiment 195 mlflow.set_experiment(experiment_name=self.experiment_name) 196 # autolog 197 mlflow.autolog( 198 disable=self.autolog_disable, 199 disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions, 200 exclusive=self.autolog_exclusive, 201 log_input_examples=self.autolog_log_input_examples, 202 log_model_signatures=self.autolog_log_model_signatures, 203 log_datasets=self.autolog_log_datasets, 204 silent=self.autolog_silent, 205 ) 206 207 @ctx.contextmanager 208 def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]: 209 """Yield an active Mlflow run and exit it afterwards. 210 211 Args: 212 run (str): run parameters. 213 214 Yields: 215 T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context. 216 """ 217 with mlflow.start_run( 218 run_name=run_config.name, 219 tags=run_config.tags, 220 description=run_config.description, 221 log_system_metrics=run_config.log_system_metrics, 222 ) as run: 223 yield run 224 225 def client(self) -> mt.MlflowClient: 226 """Return a new Mlflow client. 227 228 Returns: 229 MlflowClient: the mlflow client. 230 """ 231 return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
22class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 23 """Base class for a global service. 24 25 Use services to manage global contexts. 26 e.g., logger object, mlflow client, spark context, ... 27 """ 28 29 @abc.abstractmethod 30 def start(self) -> None: 31 """Start the service.""" 32 33 def stop(self) -> None: 34 """Stop the service.""" 35 # does nothing by default
Base class for a global service.
Use services to manage global contexts. e.g., logger object, mlflow client, spark context, ...
38class LoggerService(Service): 39 """Service for logging messages. 40 41 https://loguru.readthedocs.io/en/stable/api/logger.html 42 43 Parameters: 44 sink (str): logging output. 45 level (str): logging level. 46 format (str): logging format. 47 colorize (bool): colorize output. 48 serialize (bool): convert to JSON. 49 backtrace (bool): enable exception trace. 50 diagnose (bool): enable variable display. 51 catch (bool): catch errors during log handling. 52 """ 53 54 sink: str = "stderr" 55 level: str = "DEBUG" 56 format: str = ( 57 "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>" 58 "<level>[{level}]</level>" 59 "<cyan>[{name}:{function}:{line}]</cyan>" 60 " <level>{message}</level>" 61 ) 62 colorize: bool = True 63 serialize: bool = False 64 backtrace: bool = True 65 diagnose: bool = False 66 catch: bool = True 67 68 @T.override 69 def start(self) -> None: 70 loguru.logger.remove() 71 config = self.model_dump() 72 # use standard sinks or keep the original 73 sinks = {"stderr": sys.stderr, "stdout": sys.stdout} 74 config["sink"] = sinks.get(config["sink"], config["sink"]) 75 loguru.logger.add(**config) 76 77 def logger(self) -> loguru.Logger: 78 """Return the main logger. 79 80 Returns: 81 loguru.Logger: the main logger. 82 """ 83 return loguru.logger
Service for logging messages.
https://loguru.readthedocs.io/en/stable/api/logger.html
Arguments:
- sink (str): logging output.
- level (str): logging level.
- format (str): logging format.
- colorize (bool): colorize output.
- serialize (bool): convert to JSON.
- backtrace (bool): enable exception trace.
- diagnose (bool): enable variable display.
- catch (bool): catch errors during log handling.
68 @T.override 69 def start(self) -> None: 70 loguru.logger.remove() 71 config = self.model_dump() 72 # use standard sinks or keep the original 73 sinks = {"stderr": sys.stderr, "stdout": sys.stdout} 74 config["sink"] = sinks.get(config["sink"], config["sink"]) 75 loguru.logger.add(**config)
Start the service.
77 def logger(self) -> loguru.Logger: 78 """Return the main logger. 79 80 Returns: 81 loguru.Logger: the main logger. 82 """ 83 return loguru.logger
Return the main logger.
Returns:
loguru.Logger: the main logger.
86class AlertsService(Service): 87 """Service for sending notifications. 88 89 Require libnotify-bin on Linux systems. 90 91 In production, use with Slack, Discord, or emails. 92 93 https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification 94 95 Parameters: 96 enable (bool): use notifications or print. 97 app_name (str): name of the application. 98 timeout (int | None): timeout in secs. 99 """ 100 101 enable: bool = True 102 app_name: str = "Bikes" 103 timeout: int | None = None 104 105 @T.override 106 def start(self) -> None: 107 pass 108 109 def notify(self, title: str, message: str) -> None: 110 """Send a notification to the system. 111 112 Args: 113 title (str): title of the notification. 114 message (str): message of the notification. 115 """ 116 if self.enable: 117 try: 118 notification.notify( 119 title=title, 120 message=message, 121 app_name=self.app_name, 122 timeout=self.timeout, 123 ) 124 except NotImplementedError: 125 print("Notifications are not supported on this system.") 126 self._print(title=title, message=message) 127 else: 128 self._print(title=title, message=message) 129 130 def _print(self, title: str, message: str) -> None: 131 """Print a notification to the system. 132 133 Args: 134 title (str): title of the notification. 135 message (str): message of the notification. 136 """ 137 print(f"[{self.app_name}] {title}: {message}")
Service for sending notifications.
Require libnotify-bin on Linux systems.
In production, use with Slack, Discord, or emails.
https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification
Arguments:
- enable (bool): use notifications or print.
- app_name (str): name of the application.
- timeout (int | None): timeout in secs.
109 def notify(self, title: str, message: str) -> None: 110 """Send a notification to the system. 111 112 Args: 113 title (str): title of the notification. 114 message (str): message of the notification. 115 """ 116 if self.enable: 117 try: 118 notification.notify( 119 title=title, 120 message=message, 121 app_name=self.app_name, 122 timeout=self.timeout, 123 ) 124 except NotImplementedError: 125 print("Notifications are not supported on this system.") 126 self._print(title=title, message=message) 127 else: 128 self._print(title=title, message=message)
Send a notification to the system.
Arguments:
- title (str): title of the notification.
- message (str): message of the notification.
140class MlflowService(Service): 141 """Service for Mlflow tracking and registry. 142 143 Parameters: 144 tracking_uri (str): the URI for the Mlflow tracking server. 145 registry_uri (str): the URI for the Mlflow model registry. 146 experiment_name (str): the name of tracking experiment. 147 registry_name (str): the name of model registry. 148 autolog_disable (bool): disable autologging. 149 autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions. 150 autolog_exclusive (bool): If True, enables exclusive autologging. 151 autolog_log_input_examples (bool): If True, logs input examples during autologging. 152 autolog_log_model_signatures (bool): If True, logs model signatures during autologging. 153 autolog_log_models (bool): If True, enables logging of models during autologging. 154 autolog_log_datasets (bool): If True, logs datasets used during autologging. 155 autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging. 156 """ 157 158 class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 159 """Run configuration for Mlflow tracking. 160 161 Parameters: 162 name (str): name of the run. 163 description (str | None): description of the run. 164 tags (dict[str, T.Any] | None): tags for the run. 165 log_system_metrics (bool | None): enable system metrics logging. 166 """ 167 168 name: str 169 description: str | None = None 170 tags: dict[str, T.Any] | None = None 171 log_system_metrics: bool | None = True 172 173 # server uri 174 tracking_uri: str = "./mlruns" 175 registry_uri: str = "./mlruns" 176 # experiment 177 experiment_name: str = "bikes" 178 # registry 179 registry_name: str = "bikes" 180 # autolog 181 autolog_disable: bool = False 182 autolog_disable_for_unsupported_versions: bool = False 183 autolog_exclusive: bool = False 184 autolog_log_input_examples: bool = True 185 autolog_log_model_signatures: bool = True 186 autolog_log_models: bool = False 187 autolog_log_datasets: bool = False 188 autolog_silent: bool = False 189 190 @T.override 191 def start(self) -> None: 192 # server uri 193 mlflow.set_tracking_uri(uri=self.tracking_uri) 194 mlflow.set_registry_uri(uri=self.registry_uri) 195 # experiment 196 mlflow.set_experiment(experiment_name=self.experiment_name) 197 # autolog 198 mlflow.autolog( 199 disable=self.autolog_disable, 200 disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions, 201 exclusive=self.autolog_exclusive, 202 log_input_examples=self.autolog_log_input_examples, 203 log_model_signatures=self.autolog_log_model_signatures, 204 log_datasets=self.autolog_log_datasets, 205 silent=self.autolog_silent, 206 ) 207 208 @ctx.contextmanager 209 def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]: 210 """Yield an active Mlflow run and exit it afterwards. 211 212 Args: 213 run (str): run parameters. 214 215 Yields: 216 T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context. 217 """ 218 with mlflow.start_run( 219 run_name=run_config.name, 220 tags=run_config.tags, 221 description=run_config.description, 222 log_system_metrics=run_config.log_system_metrics, 223 ) as run: 224 yield run 225 226 def client(self) -> mt.MlflowClient: 227 """Return a new Mlflow client. 228 229 Returns: 230 MlflowClient: the mlflow client. 231 """ 232 return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
Service for Mlflow tracking and registry.
Arguments:
- tracking_uri (str): the URI for the Mlflow tracking server.
- registry_uri (str): the URI for the Mlflow model registry.
- experiment_name (str): the name of tracking experiment.
- registry_name (str): the name of model registry.
- autolog_disable (bool): disable autologging.
- autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions.
- autolog_exclusive (bool): If True, enables exclusive autologging.
- autolog_log_input_examples (bool): If True, logs input examples during autologging.
- autolog_log_model_signatures (bool): If True, logs model signatures during autologging.
- autolog_log_models (bool): If True, enables logging of models during autologging.
- autolog_log_datasets (bool): If True, logs datasets used during autologging.
- autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging.
190 @T.override 191 def start(self) -> None: 192 # server uri 193 mlflow.set_tracking_uri(uri=self.tracking_uri) 194 mlflow.set_registry_uri(uri=self.registry_uri) 195 # experiment 196 mlflow.set_experiment(experiment_name=self.experiment_name) 197 # autolog 198 mlflow.autolog( 199 disable=self.autolog_disable, 200 disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions, 201 exclusive=self.autolog_exclusive, 202 log_input_examples=self.autolog_log_input_examples, 203 log_model_signatures=self.autolog_log_model_signatures, 204 log_datasets=self.autolog_log_datasets, 205 silent=self.autolog_silent, 206 )
Start the service.
208 @ctx.contextmanager 209 def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]: 210 """Yield an active Mlflow run and exit it afterwards. 211 212 Args: 213 run (str): run parameters. 214 215 Yields: 216 T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context. 217 """ 218 with mlflow.start_run( 219 run_name=run_config.name, 220 tags=run_config.tags, 221 description=run_config.description, 222 log_system_metrics=run_config.log_system_metrics, 223 ) as run: 224 yield run
Yield an active Mlflow run and exit it afterwards.
Arguments:
- run (str): run parameters.
Yields:
T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.
226 def client(self) -> mt.MlflowClient: 227 """Return a new Mlflow client. 228 229 Returns: 230 MlflowClient: the mlflow client. 231 """ 232 return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
Return a new Mlflow client.
Returns:
MlflowClient: the mlflow client.
158 class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 159 """Run configuration for Mlflow tracking. 160 161 Parameters: 162 name (str): name of the run. 163 description (str | None): description of the run. 164 tags (dict[str, T.Any] | None): tags for the run. 165 log_system_metrics (bool | None): enable system metrics logging. 166 """ 167 168 name: str 169 description: str | None = None 170 tags: dict[str, T.Any] | None = None 171 log_system_metrics: bool | None = True
Run configuration for Mlflow tracking.
Arguments:
- name (str): name of the run.
- description (str | None): description of the run.
- tags (dict[str, T.Any] | None): tags for the run.
- log_system_metrics (bool | None): enable system metrics logging.