bikes.io.services
Manage global context during execution.
1"""Manage global context during execution.""" 2 3# %% IMPORTS 4 5from __future__ import annotations 6 7import abc 8import contextlib as ctx 9import sys 10import typing as T 11import warnings 12 13import loguru 14import mlflow 15import mlflow.tracking as mt 16import pydantic as pdt 17from plyer import notification 18 19# %% SERVICES 20 21 22class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 23 """Base class for a global service. 24 25 Use services to manage global contexts. 26 e.g., logger object, mlflow client, spark context, ... 27 """ 28 29 @abc.abstractmethod 30 def start(self) -> None: 31 """Start the service.""" 32 33 def stop(self) -> None: 34 """Stop the service.""" 35 # does nothing by default 36 37 38class LoggerService(Service): 39 """Service for logging messages. 40 41 https://loguru.readthedocs.io/en/stable/api/logger.html 42 43 Parameters: 44 sink (str): logging output. 45 level (str): logging level. 46 format (str): logging format. 47 colorize (bool): colorize output. 48 serialize (bool): convert to JSON. 49 backtrace (bool): enable exception trace. 50 diagnose (bool): enable variable display. 51 catch (bool): catch errors during log handling. 52 """ 53 54 sink: str = "stderr" 55 level: str = "DEBUG" 56 format: str = ( 57 "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>" 58 "<level>[{level}]</level>" 59 "<cyan>[{name}:{function}:{line}]</cyan>" 60 " <level>{message}</level>" 61 ) 62 colorize: bool = True 63 serialize: bool = False 64 backtrace: bool = True 65 diagnose: bool = False 66 catch: bool = True 67 68 @T.override 69 def start(self) -> None: 70 loguru.logger.remove() 71 config = self.model_dump() 72 # use standard sinks or keep the original 73 sinks = {"stderr": sys.stderr, "stdout": sys.stdout} 74 config["sink"] = sinks.get(config["sink"], config["sink"]) 75 loguru.logger.add(**config) 76 77 def logger(self) -> loguru.Logger: 78 """Return the main logger. 79 80 Returns: 81 loguru.Logger: the main logger. 82 """ 83 return loguru.logger 84 85 86class AlertsService(Service): 87 """Service for sending notifications. 88 89 Require libnotify-bin on Linux systems. 90 91 In production, use with Slack, Discord, or emails. 92 93 https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification 94 95 Parameters: 96 enable (bool): use notifications or print. 97 app_name (str): name of the application. 98 timeout (int | None): timeout in secs. 99 """ 100 101 enable: bool = True 102 app_name: str = "Bikes" 103 timeout: int | None = None 104 105 @T.override 106 def start(self) -> None: 107 pass 108 109 def notify(self, title: str, message: str) -> None: 110 """Send a notification to the system. 111 112 Args: 113 title (str): title of the notification. 114 message (str): message of the notification. 115 """ 116 if self.enable: 117 try: 118 notification.notify( 119 title=title, 120 message=message, 121 app_name=self.app_name, 122 timeout=self.timeout, 123 ) 124 except NotImplementedError: 125 warnings.warn("Notifications are not supported on this system.", RuntimeWarning) 126 self._print(title=title, message=message) 127 else: 128 self._print(title=title, message=message) 129 130 def _print(self, title: str, message: str) -> None: 131 """Print a notification to the system. 132 133 Args: 134 title (str): title of the notification. 135 message (str): message of the notification. 136 """ 137 print(f"[{self.app_name}] {title}: {message}") 138 139 140class MlflowService(Service): 141 """Service for Mlflow tracking and registry. 142 143 Parameters: 144 tracking_uri (str): the URI for the Mlflow tracking server. 145 registry_uri (str): the URI for the Mlflow model registry. 146 experiment_name (str): the name of tracking experiment. 147 registry_name (str): the name of model registry. 148 autolog_disable (bool): disable autologging. 149 autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions. 150 autolog_exclusive (bool): If True, enables exclusive autologging. 151 autolog_log_input_examples (bool): If True, logs input examples during autologging. 152 autolog_log_model_signatures (bool): If True, logs model signatures during autologging. 153 autolog_log_models (bool): If True, enables logging of models during autologging. 154 autolog_log_datasets (bool): If True, logs datasets used during autologging. 155 autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging. 156 """ 157 158 class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 159 """Run configuration for Mlflow tracking. 160 161 Parameters: 162 name (str): name of the run. 163 description (str | None): description of the run. 164 tags (dict[str, T.Any] | None): tags for the run. 165 log_system_metrics (bool | None): enable system metrics logging. 166 """ 167 168 name: str 169 description: str | None = None 170 tags: dict[str, T.Any] | None = None 171 log_system_metrics: bool | None = True 172 173 # server uri 174 tracking_uri: str = "./mlruns" 175 registry_uri: str = "./mlruns" 176 # experiment 177 experiment_name: str = "bikes" 178 # registry 179 registry_name: str = "bikes" 180 # autolog 181 autolog_disable: bool = False 182 autolog_disable_for_unsupported_versions: bool = False 183 autolog_exclusive: bool = False 184 autolog_log_input_examples: bool = True 185 autolog_log_model_signatures: bool = True 186 autolog_log_models: bool = False 187 autolog_log_datasets: bool = False 188 autolog_silent: bool = False 189 190 @T.override 191 def start(self) -> None: 192 # server uri 193 mlflow.set_tracking_uri(uri=self.tracking_uri) 194 mlflow.set_registry_uri(uri=self.registry_uri) 195 # experiment 196 mlflow.set_experiment(experiment_name=self.experiment_name) 197 # autolog 198 mlflow.autolog( 199 disable=self.autolog_disable, 200 disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions, 201 exclusive=self.autolog_exclusive, 202 log_input_examples=self.autolog_log_input_examples, 203 log_model_signatures=self.autolog_log_model_signatures, 204 log_datasets=self.autolog_log_datasets, 205 silent=self.autolog_silent, 206 ) 207 208 @ctx.contextmanager 209 def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]: 210 """Yield an active Mlflow run and exit it afterwards. 211 212 Args: 213 run (str): run parameters. 214 215 Yields: 216 T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context. 217 """ 218 with mlflow.start_run( 219 run_name=run_config.name, 220 tags=run_config.tags, 221 description=run_config.description, 222 log_system_metrics=run_config.log_system_metrics, 223 ) as run: 224 yield run 225 226 def client(self) -> mt.MlflowClient: 227 """Return a new Mlflow client. 228 229 Returns: 230 MlflowClient: the mlflow client. 231 """ 232 return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
23class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 24 """Base class for a global service. 25 26 Use services to manage global contexts. 27 e.g., logger object, mlflow client, spark context, ... 28 """ 29 30 @abc.abstractmethod 31 def start(self) -> None: 32 """Start the service.""" 33 34 def stop(self) -> None: 35 """Stop the service.""" 36 # does nothing by default
Base class for a global service.
Use services to manage global contexts. e.g., logger object, mlflow client, spark context, ...
39class LoggerService(Service): 40 """Service for logging messages. 41 42 https://loguru.readthedocs.io/en/stable/api/logger.html 43 44 Parameters: 45 sink (str): logging output. 46 level (str): logging level. 47 format (str): logging format. 48 colorize (bool): colorize output. 49 serialize (bool): convert to JSON. 50 backtrace (bool): enable exception trace. 51 diagnose (bool): enable variable display. 52 catch (bool): catch errors during log handling. 53 """ 54 55 sink: str = "stderr" 56 level: str = "DEBUG" 57 format: str = ( 58 "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>" 59 "<level>[{level}]</level>" 60 "<cyan>[{name}:{function}:{line}]</cyan>" 61 " <level>{message}</level>" 62 ) 63 colorize: bool = True 64 serialize: bool = False 65 backtrace: bool = True 66 diagnose: bool = False 67 catch: bool = True 68 69 @T.override 70 def start(self) -> None: 71 loguru.logger.remove() 72 config = self.model_dump() 73 # use standard sinks or keep the original 74 sinks = {"stderr": sys.stderr, "stdout": sys.stdout} 75 config["sink"] = sinks.get(config["sink"], config["sink"]) 76 loguru.logger.add(**config) 77 78 def logger(self) -> loguru.Logger: 79 """Return the main logger. 80 81 Returns: 82 loguru.Logger: the main logger. 83 """ 84 return loguru.logger
Service for logging messages.
https://loguru.readthedocs.io/en/stable/api/logger.html
Arguments:
- sink (str): logging output.
- level (str): logging level.
- format (str): logging format.
- colorize (bool): colorize output.
- serialize (bool): convert to JSON.
- backtrace (bool): enable exception trace.
- diagnose (bool): enable variable display.
- catch (bool): catch errors during log handling.
69 @T.override 70 def start(self) -> None: 71 loguru.logger.remove() 72 config = self.model_dump() 73 # use standard sinks or keep the original 74 sinks = {"stderr": sys.stderr, "stdout": sys.stdout} 75 config["sink"] = sinks.get(config["sink"], config["sink"]) 76 loguru.logger.add(**config)
Start the service.
78 def logger(self) -> loguru.Logger: 79 """Return the main logger. 80 81 Returns: 82 loguru.Logger: the main logger. 83 """ 84 return loguru.logger
Return the main logger.
Returns:
loguru.Logger: the main logger.
87class AlertsService(Service): 88 """Service for sending notifications. 89 90 Require libnotify-bin on Linux systems. 91 92 In production, use with Slack, Discord, or emails. 93 94 https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification 95 96 Parameters: 97 enable (bool): use notifications or print. 98 app_name (str): name of the application. 99 timeout (int | None): timeout in secs. 100 """ 101 102 enable: bool = True 103 app_name: str = "Bikes" 104 timeout: int | None = None 105 106 @T.override 107 def start(self) -> None: 108 pass 109 110 def notify(self, title: str, message: str) -> None: 111 """Send a notification to the system. 112 113 Args: 114 title (str): title of the notification. 115 message (str): message of the notification. 116 """ 117 if self.enable: 118 try: 119 notification.notify( 120 title=title, 121 message=message, 122 app_name=self.app_name, 123 timeout=self.timeout, 124 ) 125 except NotImplementedError: 126 warnings.warn("Notifications are not supported on this system.", RuntimeWarning) 127 self._print(title=title, message=message) 128 else: 129 self._print(title=title, message=message) 130 131 def _print(self, title: str, message: str) -> None: 132 """Print a notification to the system. 133 134 Args: 135 title (str): title of the notification. 136 message (str): message of the notification. 137 """ 138 print(f"[{self.app_name}] {title}: {message}")
Service for sending notifications.
Require libnotify-bin on Linux systems.
In production, use with Slack, Discord, or emails.
https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification
Arguments:
- enable (bool): use notifications or print.
- app_name (str): name of the application.
- timeout (int | None): timeout in secs.
110 def notify(self, title: str, message: str) -> None: 111 """Send a notification to the system. 112 113 Args: 114 title (str): title of the notification. 115 message (str): message of the notification. 116 """ 117 if self.enable: 118 try: 119 notification.notify( 120 title=title, 121 message=message, 122 app_name=self.app_name, 123 timeout=self.timeout, 124 ) 125 except NotImplementedError: 126 warnings.warn("Notifications are not supported on this system.", RuntimeWarning) 127 self._print(title=title, message=message) 128 else: 129 self._print(title=title, message=message)
Send a notification to the system.
Arguments:
- title (str): title of the notification.
- message (str): message of the notification.
141class MlflowService(Service): 142 """Service for Mlflow tracking and registry. 143 144 Parameters: 145 tracking_uri (str): the URI for the Mlflow tracking server. 146 registry_uri (str): the URI for the Mlflow model registry. 147 experiment_name (str): the name of tracking experiment. 148 registry_name (str): the name of model registry. 149 autolog_disable (bool): disable autologging. 150 autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions. 151 autolog_exclusive (bool): If True, enables exclusive autologging. 152 autolog_log_input_examples (bool): If True, logs input examples during autologging. 153 autolog_log_model_signatures (bool): If True, logs model signatures during autologging. 154 autolog_log_models (bool): If True, enables logging of models during autologging. 155 autolog_log_datasets (bool): If True, logs datasets used during autologging. 156 autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging. 157 """ 158 159 class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 160 """Run configuration for Mlflow tracking. 161 162 Parameters: 163 name (str): name of the run. 164 description (str | None): description of the run. 165 tags (dict[str, T.Any] | None): tags for the run. 166 log_system_metrics (bool | None): enable system metrics logging. 167 """ 168 169 name: str 170 description: str | None = None 171 tags: dict[str, T.Any] | None = None 172 log_system_metrics: bool | None = True 173 174 # server uri 175 tracking_uri: str = "./mlruns" 176 registry_uri: str = "./mlruns" 177 # experiment 178 experiment_name: str = "bikes" 179 # registry 180 registry_name: str = "bikes" 181 # autolog 182 autolog_disable: bool = False 183 autolog_disable_for_unsupported_versions: bool = False 184 autolog_exclusive: bool = False 185 autolog_log_input_examples: bool = True 186 autolog_log_model_signatures: bool = True 187 autolog_log_models: bool = False 188 autolog_log_datasets: bool = False 189 autolog_silent: bool = False 190 191 @T.override 192 def start(self) -> None: 193 # server uri 194 mlflow.set_tracking_uri(uri=self.tracking_uri) 195 mlflow.set_registry_uri(uri=self.registry_uri) 196 # experiment 197 mlflow.set_experiment(experiment_name=self.experiment_name) 198 # autolog 199 mlflow.autolog( 200 disable=self.autolog_disable, 201 disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions, 202 exclusive=self.autolog_exclusive, 203 log_input_examples=self.autolog_log_input_examples, 204 log_model_signatures=self.autolog_log_model_signatures, 205 log_datasets=self.autolog_log_datasets, 206 silent=self.autolog_silent, 207 ) 208 209 @ctx.contextmanager 210 def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]: 211 """Yield an active Mlflow run and exit it afterwards. 212 213 Args: 214 run (str): run parameters. 215 216 Yields: 217 T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context. 218 """ 219 with mlflow.start_run( 220 run_name=run_config.name, 221 tags=run_config.tags, 222 description=run_config.description, 223 log_system_metrics=run_config.log_system_metrics, 224 ) as run: 225 yield run 226 227 def client(self) -> mt.MlflowClient: 228 """Return a new Mlflow client. 229 230 Returns: 231 MlflowClient: the mlflow client. 232 """ 233 return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
Service for Mlflow tracking and registry.
Arguments:
- tracking_uri (str): the URI for the Mlflow tracking server.
- registry_uri (str): the URI for the Mlflow model registry.
- experiment_name (str): the name of tracking experiment.
- registry_name (str): the name of model registry.
- autolog_disable (bool): disable autologging.
- autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions.
- autolog_exclusive (bool): If True, enables exclusive autologging.
- autolog_log_input_examples (bool): If True, logs input examples during autologging.
- autolog_log_model_signatures (bool): If True, logs model signatures during autologging.
- autolog_log_models (bool): If True, enables logging of models during autologging.
- autolog_log_datasets (bool): If True, logs datasets used during autologging.
- autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging.
191 @T.override 192 def start(self) -> None: 193 # server uri 194 mlflow.set_tracking_uri(uri=self.tracking_uri) 195 mlflow.set_registry_uri(uri=self.registry_uri) 196 # experiment 197 mlflow.set_experiment(experiment_name=self.experiment_name) 198 # autolog 199 mlflow.autolog( 200 disable=self.autolog_disable, 201 disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions, 202 exclusive=self.autolog_exclusive, 203 log_input_examples=self.autolog_log_input_examples, 204 log_model_signatures=self.autolog_log_model_signatures, 205 log_datasets=self.autolog_log_datasets, 206 silent=self.autolog_silent, 207 )
Start the service.
209 @ctx.contextmanager 210 def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]: 211 """Yield an active Mlflow run and exit it afterwards. 212 213 Args: 214 run (str): run parameters. 215 216 Yields: 217 T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context. 218 """ 219 with mlflow.start_run( 220 run_name=run_config.name, 221 tags=run_config.tags, 222 description=run_config.description, 223 log_system_metrics=run_config.log_system_metrics, 224 ) as run: 225 yield run
Yield an active Mlflow run and exit it afterwards.
Arguments:
- run (str): run parameters.
Yields:
T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.
227 def client(self) -> mt.MlflowClient: 228 """Return a new Mlflow client. 229 230 Returns: 231 MlflowClient: the mlflow client. 232 """ 233 return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
Return a new Mlflow client.
Returns:
MlflowClient: the mlflow client.
159 class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 160 """Run configuration for Mlflow tracking. 161 162 Parameters: 163 name (str): name of the run. 164 description (str | None): description of the run. 165 tags (dict[str, T.Any] | None): tags for the run. 166 log_system_metrics (bool | None): enable system metrics logging. 167 """ 168 169 name: str 170 description: str | None = None 171 tags: dict[str, T.Any] | None = None 172 log_system_metrics: bool | None = True
Run configuration for Mlflow tracking.
Arguments:
- name (str): name of the run.
- description (str | None): description of the run.
- tags (dict[str, T.Any] | None): tags for the run.
- log_system_metrics (bool | None): enable system metrics logging.