bikes.io.services

Manage global context during execution.

  1"""Manage global context during execution."""
  2
  3# %% IMPORTS
  4
  5from __future__ import annotations
  6
  7import abc
  8import contextlib as ctx
  9import sys
 10import typing as T
 11import warnings
 12
 13import loguru
 14import mlflow
 15import mlflow.tracking as mt
 16import pydantic as pdt
 17from plyer import notification
 18
 19# %% SERVICES
 20
 21
 22class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
 23    """Base class for a global service.
 24
 25    Use services to manage global contexts.
 26    e.g., logger object, mlflow client, spark context, ...
 27    """
 28
 29    @abc.abstractmethod
 30    def start(self) -> None:
 31        """Start the service."""
 32
 33    def stop(self) -> None:
 34        """Stop the service."""
 35        # does nothing by default
 36
 37
 38class LoggerService(Service):
 39    """Service for logging messages.
 40
 41    https://loguru.readthedocs.io/en/stable/api/logger.html
 42
 43    Parameters:
 44        sink (str): logging output.
 45        level (str): logging level.
 46        format (str): logging format.
 47        colorize (bool): colorize output.
 48        serialize (bool): convert to JSON.
 49        backtrace (bool): enable exception trace.
 50        diagnose (bool): enable variable display.
 51        catch (bool): catch errors during log handling.
 52    """
 53
 54    sink: str = "stderr"
 55    level: str = "DEBUG"
 56    format: str = (
 57        "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>"
 58        "<level>[{level}]</level>"
 59        "<cyan>[{name}:{function}:{line}]</cyan>"
 60        " <level>{message}</level>"
 61    )
 62    colorize: bool = True
 63    serialize: bool = False
 64    backtrace: bool = True
 65    diagnose: bool = False
 66    catch: bool = True
 67
 68    @T.override
 69    def start(self) -> None:
 70        loguru.logger.remove()
 71        config = self.model_dump()
 72        # use standard sinks or keep the original
 73        sinks = {"stderr": sys.stderr, "stdout": sys.stdout}
 74        config["sink"] = sinks.get(config["sink"], config["sink"])
 75        loguru.logger.add(**config)
 76
 77    def logger(self) -> loguru.Logger:
 78        """Return the main logger.
 79
 80        Returns:
 81            loguru.Logger: the main logger.
 82        """
 83        return loguru.logger
 84
 85
 86class AlertsService(Service):
 87    """Service for sending notifications.
 88
 89    Require libnotify-bin on Linux systems.
 90
 91    In production, use with Slack, Discord, or emails.
 92
 93    https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification
 94
 95    Parameters:
 96        enable (bool): use notifications or print.
 97        app_name (str): name of the application.
 98        timeout (int | None): timeout in secs.
 99    """
100
101    enable: bool = True
102    app_name: str = "Bikes"
103    timeout: int | None = None
104
105    @T.override
106    def start(self) -> None:
107        pass
108
109    def notify(self, title: str, message: str) -> None:
110        """Send a notification to the system.
111
112        Args:
113            title (str): title of the notification.
114            message (str): message of the notification.
115        """
116        if self.enable:
117            try:
118                notification.notify(
119                    title=title,
120                    message=message,
121                    app_name=self.app_name,
122                    timeout=self.timeout,
123                )
124            except NotImplementedError:
125                warnings.warn("Notifications are not supported on this system.", RuntimeWarning)
126                self._print(title=title, message=message)
127        else:
128            self._print(title=title, message=message)
129
130    def _print(self, title: str, message: str) -> None:
131        """Print a notification to the system.
132
133        Args:
134            title (str): title of the notification.
135            message (str): message of the notification.
136        """
137        print(f"[{self.app_name}] {title}: {message}")
138
139
140class MlflowService(Service):
141    """Service for Mlflow tracking and registry.
142
143    Parameters:
144        tracking_uri (str): the URI for the Mlflow tracking server.
145        registry_uri (str): the URI for the Mlflow model registry.
146        experiment_name (str): the name of tracking experiment.
147        registry_name (str): the name of model registry.
148        autolog_disable (bool): disable autologging.
149        autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions.
150        autolog_exclusive (bool): If True, enables exclusive autologging.
151        autolog_log_input_examples (bool): If True, logs input examples during autologging.
152        autolog_log_model_signatures (bool): If True, logs model signatures during autologging.
153        autolog_log_models (bool): If True, enables logging of models during autologging.
154        autolog_log_datasets (bool): If True, logs datasets used during autologging.
155        autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging.
156    """
157
158    class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
159        """Run configuration for Mlflow tracking.
160
161        Parameters:
162            name (str): name of the run.
163            description (str | None): description of the run.
164            tags (dict[str, T.Any] | None): tags for the run.
165            log_system_metrics (bool | None): enable system metrics logging.
166        """
167
168        name: str
169        description: str | None = None
170        tags: dict[str, T.Any] | None = None
171        log_system_metrics: bool | None = True
172
173    # server uri
174    tracking_uri: str = "./mlruns"
175    registry_uri: str = "./mlruns"
176    # experiment
177    experiment_name: str = "bikes"
178    # registry
179    registry_name: str = "bikes"
180    # autolog
181    autolog_disable: bool = False
182    autolog_disable_for_unsupported_versions: bool = False
183    autolog_exclusive: bool = False
184    autolog_log_input_examples: bool = True
185    autolog_log_model_signatures: bool = True
186    autolog_log_models: bool = False
187    autolog_log_datasets: bool = False
188    autolog_silent: bool = False
189
190    @T.override
191    def start(self) -> None:
192        # server uri
193        mlflow.set_tracking_uri(uri=self.tracking_uri)
194        mlflow.set_registry_uri(uri=self.registry_uri)
195        # experiment
196        mlflow.set_experiment(experiment_name=self.experiment_name)
197        # autolog
198        mlflow.autolog(
199            disable=self.autolog_disable,
200            disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions,
201            exclusive=self.autolog_exclusive,
202            log_input_examples=self.autolog_log_input_examples,
203            log_model_signatures=self.autolog_log_model_signatures,
204            log_datasets=self.autolog_log_datasets,
205            silent=self.autolog_silent,
206        )
207
208    @ctx.contextmanager
209    def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]:
210        """Yield an active Mlflow run and exit it afterwards.
211
212        Args:
213            run (str): run parameters.
214
215        Yields:
216            T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.
217        """
218        with mlflow.start_run(
219            run_name=run_config.name,
220            tags=run_config.tags,
221            description=run_config.description,
222            log_system_metrics=run_config.log_system_metrics,
223        ) as run:
224            yield run
225
226    def client(self) -> mt.MlflowClient:
227        """Return a new Mlflow client.
228
229        Returns:
230            MlflowClient: the mlflow client.
231        """
232        return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
class Service(abc.ABC, pydantic.main.BaseModel):
23class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
24    """Base class for a global service.
25
26    Use services to manage global contexts.
27    e.g., logger object, mlflow client, spark context, ...
28    """
29
30    @abc.abstractmethod
31    def start(self) -> None:
32        """Start the service."""
33
34    def stop(self) -> None:
35        """Stop the service."""
36        # does nothing by default

Base class for a global service.

Use services to manage global contexts. e.g., logger object, mlflow client, spark context, ...

@abc.abstractmethod
def start(self) -> None:
30    @abc.abstractmethod
31    def start(self) -> None:
32        """Start the service."""

Start the service.

def stop(self) -> None:
34    def stop(self) -> None:
35        """Stop the service."""
36        # does nothing by default

Stop the service.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class LoggerService(Service):
39class LoggerService(Service):
40    """Service for logging messages.
41
42    https://loguru.readthedocs.io/en/stable/api/logger.html
43
44    Parameters:
45        sink (str): logging output.
46        level (str): logging level.
47        format (str): logging format.
48        colorize (bool): colorize output.
49        serialize (bool): convert to JSON.
50        backtrace (bool): enable exception trace.
51        diagnose (bool): enable variable display.
52        catch (bool): catch errors during log handling.
53    """
54
55    sink: str = "stderr"
56    level: str = "DEBUG"
57    format: str = (
58        "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>"
59        "<level>[{level}]</level>"
60        "<cyan>[{name}:{function}:{line}]</cyan>"
61        " <level>{message}</level>"
62    )
63    colorize: bool = True
64    serialize: bool = False
65    backtrace: bool = True
66    diagnose: bool = False
67    catch: bool = True
68
69    @T.override
70    def start(self) -> None:
71        loguru.logger.remove()
72        config = self.model_dump()
73        # use standard sinks or keep the original
74        sinks = {"stderr": sys.stderr, "stdout": sys.stdout}
75        config["sink"] = sinks.get(config["sink"], config["sink"])
76        loguru.logger.add(**config)
77
78    def logger(self) -> loguru.Logger:
79        """Return the main logger.
80
81        Returns:
82            loguru.Logger: the main logger.
83        """
84        return loguru.logger

Service for logging messages.

https://loguru.readthedocs.io/en/stable/api/logger.html

Arguments:
  • sink (str): logging output.
  • level (str): logging level.
  • format (str): logging format.
  • colorize (bool): colorize output.
  • serialize (bool): convert to JSON.
  • backtrace (bool): enable exception trace.
  • diagnose (bool): enable variable display.
  • catch (bool): catch errors during log handling.
sink: str
level: str
format: str
colorize: bool
serialize: bool
backtrace: bool
diagnose: bool
catch: bool
@T.override
def start(self) -> None:
69    @T.override
70    def start(self) -> None:
71        loguru.logger.remove()
72        config = self.model_dump()
73        # use standard sinks or keep the original
74        sinks = {"stderr": sys.stderr, "stdout": sys.stdout}
75        config["sink"] = sinks.get(config["sink"], config["sink"])
76        loguru.logger.add(**config)

Start the service.

def logger(self) -> 'loguru.Logger':
78    def logger(self) -> loguru.Logger:
79        """Return the main logger.
80
81        Returns:
82            loguru.Logger: the main logger.
83        """
84        return loguru.logger

Return the main logger.

Returns:

loguru.Logger: the main logger.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
Service
stop
class AlertsService(Service):
 87class AlertsService(Service):
 88    """Service for sending notifications.
 89
 90    Require libnotify-bin on Linux systems.
 91
 92    In production, use with Slack, Discord, or emails.
 93
 94    https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification
 95
 96    Parameters:
 97        enable (bool): use notifications or print.
 98        app_name (str): name of the application.
 99        timeout (int | None): timeout in secs.
100    """
101
102    enable: bool = True
103    app_name: str = "Bikes"
104    timeout: int | None = None
105
106    @T.override
107    def start(self) -> None:
108        pass
109
110    def notify(self, title: str, message: str) -> None:
111        """Send a notification to the system.
112
113        Args:
114            title (str): title of the notification.
115            message (str): message of the notification.
116        """
117        if self.enable:
118            try:
119                notification.notify(
120                    title=title,
121                    message=message,
122                    app_name=self.app_name,
123                    timeout=self.timeout,
124                )
125            except NotImplementedError:
126                warnings.warn("Notifications are not supported on this system.", RuntimeWarning)
127                self._print(title=title, message=message)
128        else:
129            self._print(title=title, message=message)
130
131    def _print(self, title: str, message: str) -> None:
132        """Print a notification to the system.
133
134        Args:
135            title (str): title of the notification.
136            message (str): message of the notification.
137        """
138        print(f"[{self.app_name}] {title}: {message}")

Service for sending notifications.

Require libnotify-bin on Linux systems.

In production, use with Slack, Discord, or emails.

https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification

Arguments:
  • enable (bool): use notifications or print.
  • app_name (str): name of the application.
  • timeout (int | None): timeout in secs.
enable: bool
app_name: str
timeout: int | None
@T.override
def start(self) -> None:
106    @T.override
107    def start(self) -> None:
108        pass

Start the service.

def notify(self, title: str, message: str) -> None:
110    def notify(self, title: str, message: str) -> None:
111        """Send a notification to the system.
112
113        Args:
114            title (str): title of the notification.
115            message (str): message of the notification.
116        """
117        if self.enable:
118            try:
119                notification.notify(
120                    title=title,
121                    message=message,
122                    app_name=self.app_name,
123                    timeout=self.timeout,
124                )
125            except NotImplementedError:
126                warnings.warn("Notifications are not supported on this system.", RuntimeWarning)
127                self._print(title=title, message=message)
128        else:
129            self._print(title=title, message=message)

Send a notification to the system.

Arguments:
  • title (str): title of the notification.
  • message (str): message of the notification.
model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
Service
stop
class MlflowService(Service):
141class MlflowService(Service):
142    """Service for Mlflow tracking and registry.
143
144    Parameters:
145        tracking_uri (str): the URI for the Mlflow tracking server.
146        registry_uri (str): the URI for the Mlflow model registry.
147        experiment_name (str): the name of tracking experiment.
148        registry_name (str): the name of model registry.
149        autolog_disable (bool): disable autologging.
150        autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions.
151        autolog_exclusive (bool): If True, enables exclusive autologging.
152        autolog_log_input_examples (bool): If True, logs input examples during autologging.
153        autolog_log_model_signatures (bool): If True, logs model signatures during autologging.
154        autolog_log_models (bool): If True, enables logging of models during autologging.
155        autolog_log_datasets (bool): If True, logs datasets used during autologging.
156        autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging.
157    """
158
159    class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
160        """Run configuration for Mlflow tracking.
161
162        Parameters:
163            name (str): name of the run.
164            description (str | None): description of the run.
165            tags (dict[str, T.Any] | None): tags for the run.
166            log_system_metrics (bool | None): enable system metrics logging.
167        """
168
169        name: str
170        description: str | None = None
171        tags: dict[str, T.Any] | None = None
172        log_system_metrics: bool | None = True
173
174    # server uri
175    tracking_uri: str = "./mlruns"
176    registry_uri: str = "./mlruns"
177    # experiment
178    experiment_name: str = "bikes"
179    # registry
180    registry_name: str = "bikes"
181    # autolog
182    autolog_disable: bool = False
183    autolog_disable_for_unsupported_versions: bool = False
184    autolog_exclusive: bool = False
185    autolog_log_input_examples: bool = True
186    autolog_log_model_signatures: bool = True
187    autolog_log_models: bool = False
188    autolog_log_datasets: bool = False
189    autolog_silent: bool = False
190
191    @T.override
192    def start(self) -> None:
193        # server uri
194        mlflow.set_tracking_uri(uri=self.tracking_uri)
195        mlflow.set_registry_uri(uri=self.registry_uri)
196        # experiment
197        mlflow.set_experiment(experiment_name=self.experiment_name)
198        # autolog
199        mlflow.autolog(
200            disable=self.autolog_disable,
201            disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions,
202            exclusive=self.autolog_exclusive,
203            log_input_examples=self.autolog_log_input_examples,
204            log_model_signatures=self.autolog_log_model_signatures,
205            log_datasets=self.autolog_log_datasets,
206            silent=self.autolog_silent,
207        )
208
209    @ctx.contextmanager
210    def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]:
211        """Yield an active Mlflow run and exit it afterwards.
212
213        Args:
214            run (str): run parameters.
215
216        Yields:
217            T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.
218        """
219        with mlflow.start_run(
220            run_name=run_config.name,
221            tags=run_config.tags,
222            description=run_config.description,
223            log_system_metrics=run_config.log_system_metrics,
224        ) as run:
225            yield run
226
227    def client(self) -> mt.MlflowClient:
228        """Return a new Mlflow client.
229
230        Returns:
231            MlflowClient: the mlflow client.
232        """
233        return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)

Service for Mlflow tracking and registry.

Arguments:
  • tracking_uri (str): the URI for the Mlflow tracking server.
  • registry_uri (str): the URI for the Mlflow model registry.
  • experiment_name (str): the name of tracking experiment.
  • registry_name (str): the name of model registry.
  • autolog_disable (bool): disable autologging.
  • autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions.
  • autolog_exclusive (bool): If True, enables exclusive autologging.
  • autolog_log_input_examples (bool): If True, logs input examples during autologging.
  • autolog_log_model_signatures (bool): If True, logs model signatures during autologging.
  • autolog_log_models (bool): If True, enables logging of models during autologging.
  • autolog_log_datasets (bool): If True, logs datasets used during autologging.
  • autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging.
tracking_uri: str
registry_uri: str
experiment_name: str
registry_name: str
autolog_disable: bool
autolog_disable_for_unsupported_versions: bool
autolog_exclusive: bool
autolog_log_input_examples: bool
autolog_log_model_signatures: bool
autolog_log_models: bool
autolog_log_datasets: bool
autolog_silent: bool
@T.override
def start(self) -> None:
191    @T.override
192    def start(self) -> None:
193        # server uri
194        mlflow.set_tracking_uri(uri=self.tracking_uri)
195        mlflow.set_registry_uri(uri=self.registry_uri)
196        # experiment
197        mlflow.set_experiment(experiment_name=self.experiment_name)
198        # autolog
199        mlflow.autolog(
200            disable=self.autolog_disable,
201            disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions,
202            exclusive=self.autolog_exclusive,
203            log_input_examples=self.autolog_log_input_examples,
204            log_model_signatures=self.autolog_log_model_signatures,
205            log_datasets=self.autolog_log_datasets,
206            silent=self.autolog_silent,
207        )

Start the service.

@ctx.contextmanager
def run_context( self, run_config: MlflowService.RunConfig) -> Generator[mlflow.tracking.fluent.ActiveRun, NoneType, NoneType]:
209    @ctx.contextmanager
210    def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]:
211        """Yield an active Mlflow run and exit it afterwards.
212
213        Args:
214            run (str): run parameters.
215
216        Yields:
217            T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.
218        """
219        with mlflow.start_run(
220            run_name=run_config.name,
221            tags=run_config.tags,
222            description=run_config.description,
223            log_system_metrics=run_config.log_system_metrics,
224        ) as run:
225            yield run

Yield an active Mlflow run and exit it afterwards.

Arguments:
  • run (str): run parameters.
Yields:

T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.

def client(self) -> mlflow.tracking.client.MlflowClient:
227    def client(self) -> mt.MlflowClient:
228        """Return a new Mlflow client.
229
230        Returns:
231            MlflowClient: the mlflow client.
232        """
233        return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)

Return a new Mlflow client.

Returns:

MlflowClient: the mlflow client.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
Service
stop
class MlflowService.RunConfig(pydantic.main.BaseModel):
159    class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
160        """Run configuration for Mlflow tracking.
161
162        Parameters:
163            name (str): name of the run.
164            description (str | None): description of the run.
165            tags (dict[str, T.Any] | None): tags for the run.
166            log_system_metrics (bool | None): enable system metrics logging.
167        """
168
169        name: str
170        description: str | None = None
171        tags: dict[str, T.Any] | None = None
172        log_system_metrics: bool | None = True

Run configuration for Mlflow tracking.

Arguments:
  • name (str): name of the run.
  • description (str | None): description of the run.
  • tags (dict[str, T.Any] | None): tags for the run.
  • log_system_metrics (bool | None): enable system metrics logging.
name: str
description: str | None
tags: dict[str, typing.Any] | None
log_system_metrics: bool | None
model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].