bikes.io.services

Manage global context during execution.

  1"""Manage global context during execution."""
  2
  3# %% IMPORTS
  4
  5from __future__ import annotations
  6
  7import abc
  8import contextlib as ctx
  9import sys
 10import typing as T
 11
 12import loguru
 13import mlflow
 14import mlflow.tracking as mt
 15import pydantic as pdt
 16from plyer import notification
 17
 18# %% SERVICES
 19
 20
 21class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
 22    """Base class for a global service.
 23
 24    Use services to manage global contexts.
 25    e.g., logger object, mlflow client, spark context, ...
 26    """
 27
 28    @abc.abstractmethod
 29    def start(self) -> None:
 30        """Start the service."""
 31
 32    def stop(self) -> None:
 33        """Stop the service."""
 34        # does nothing by default
 35
 36
 37class LoggerService(Service):
 38    """Service for logging messages.
 39
 40    https://loguru.readthedocs.io/en/stable/api/logger.html
 41
 42    Parameters:
 43        sink (str): logging output.
 44        level (str): logging level.
 45        format (str): logging format.
 46        colorize (bool): colorize output.
 47        serialize (bool): convert to JSON.
 48        backtrace (bool): enable exception trace.
 49        diagnose (bool): enable variable display.
 50        catch (bool): catch errors during log handling.
 51    """
 52
 53    sink: str = "stderr"
 54    level: str = "DEBUG"
 55    format: str = (
 56        "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>"
 57        "<level>[{level}]</level>"
 58        "<cyan>[{name}:{function}:{line}]</cyan>"
 59        " <level>{message}</level>"
 60    )
 61    colorize: bool = True
 62    serialize: bool = False
 63    backtrace: bool = True
 64    diagnose: bool = False
 65    catch: bool = True
 66
 67    @T.override
 68    def start(self) -> None:
 69        loguru.logger.remove()
 70        config = self.model_dump()
 71        # use standard sinks or keep the original
 72        sinks = {"stderr": sys.stderr, "stdout": sys.stdout}
 73        config["sink"] = sinks.get(config["sink"], config["sink"])
 74        loguru.logger.add(**config)
 75
 76    def logger(self) -> loguru.Logger:
 77        """Return the main logger.
 78
 79        Returns:
 80            loguru.Logger: the main logger.
 81        """
 82        return loguru.logger
 83
 84
 85class AlertsService(Service):
 86    """Service for sending notifications.
 87
 88    Require libnotify-bin on Linux systems.
 89
 90    In production, use with Slack, Discord, or emails.
 91
 92    https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification
 93
 94    Parameters:
 95        enable (bool): use notifications or print.
 96        app_name (str): name of the application.
 97        timeout (int | None): timeout in secs.
 98    """
 99
100    enable: bool = True
101    app_name: str = "Bikes"
102    timeout: int | None = None
103
104    @T.override
105    def start(self) -> None:
106        pass
107
108    def notify(self, title: str, message: str) -> None:
109        """Send a notification to the system.
110
111        Args:
112            title (str): title of the notification.
113            message (str): message of the notification.
114        """
115        if self.enable:
116            try:
117                notification.notify(
118                    title=title,
119                    message=message,
120                    app_name=self.app_name,
121                    timeout=self.timeout,
122                )
123            except NotImplementedError:
124                print("Notifications are not supported on this system.")
125                self._print(title=title, message=message)
126        else:
127            self._print(title=title, message=message)
128
129    def _print(self, title: str, message: str) -> None:
130        """Print a notification to the system.
131
132        Args:
133            title (str): title of the notification.
134            message (str): message of the notification.
135        """
136        print(f"[{self.app_name}] {title}: {message}")
137
138
139class MlflowService(Service):
140    """Service for Mlflow tracking and registry.
141
142    Parameters:
143        tracking_uri (str): the URI for the Mlflow tracking server.
144        registry_uri (str): the URI for the Mlflow model registry.
145        experiment_name (str): the name of tracking experiment.
146        registry_name (str): the name of model registry.
147        autolog_disable (bool): disable autologging.
148        autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions.
149        autolog_exclusive (bool): If True, enables exclusive autologging.
150        autolog_log_input_examples (bool): If True, logs input examples during autologging.
151        autolog_log_model_signatures (bool): If True, logs model signatures during autologging.
152        autolog_log_models (bool): If True, enables logging of models during autologging.
153        autolog_log_datasets (bool): If True, logs datasets used during autologging.
154        autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging.
155    """
156
157    class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
158        """Run configuration for Mlflow tracking.
159
160        Parameters:
161            name (str): name of the run.
162            description (str | None): description of the run.
163            tags (dict[str, T.Any] | None): tags for the run.
164            log_system_metrics (bool | None): enable system metrics logging.
165        """
166
167        name: str
168        description: str | None = None
169        tags: dict[str, T.Any] | None = None
170        log_system_metrics: bool | None = True
171
172    # server uri
173    tracking_uri: str = "./mlruns"
174    registry_uri: str = "./mlruns"
175    # experiment
176    experiment_name: str = "bikes"
177    # registry
178    registry_name: str = "bikes"
179    # autolog
180    autolog_disable: bool = False
181    autolog_disable_for_unsupported_versions: bool = False
182    autolog_exclusive: bool = False
183    autolog_log_input_examples: bool = True
184    autolog_log_model_signatures: bool = True
185    autolog_log_models: bool = False
186    autolog_log_datasets: bool = False
187    autolog_silent: bool = False
188
189    @T.override
190    def start(self) -> None:
191        # server uri
192        mlflow.set_tracking_uri(uri=self.tracking_uri)
193        mlflow.set_registry_uri(uri=self.registry_uri)
194        # experiment
195        mlflow.set_experiment(experiment_name=self.experiment_name)
196        # autolog
197        mlflow.autolog(
198            disable=self.autolog_disable,
199            disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions,
200            exclusive=self.autolog_exclusive,
201            log_input_examples=self.autolog_log_input_examples,
202            log_model_signatures=self.autolog_log_model_signatures,
203            log_datasets=self.autolog_log_datasets,
204            silent=self.autolog_silent,
205        )
206
207    @ctx.contextmanager
208    def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]:
209        """Yield an active Mlflow run and exit it afterwards.
210
211        Args:
212            run (str): run parameters.
213
214        Yields:
215            T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.
216        """
217        with mlflow.start_run(
218            run_name=run_config.name,
219            tags=run_config.tags,
220            description=run_config.description,
221            log_system_metrics=run_config.log_system_metrics,
222        ) as run:
223            yield run
224
225    def client(self) -> mt.MlflowClient:
226        """Return a new Mlflow client.
227
228        Returns:
229            MlflowClient: the mlflow client.
230        """
231        return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)
class Service(abc.ABC, pydantic.main.BaseModel):
22class Service(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
23    """Base class for a global service.
24
25    Use services to manage global contexts.
26    e.g., logger object, mlflow client, spark context, ...
27    """
28
29    @abc.abstractmethod
30    def start(self) -> None:
31        """Start the service."""
32
33    def stop(self) -> None:
34        """Stop the service."""
35        # does nothing by default

Base class for a global service.

Use services to manage global contexts. e.g., logger object, mlflow client, spark context, ...

@abc.abstractmethod
def start(self) -> None:
29    @abc.abstractmethod
30    def start(self) -> None:
31        """Start the service."""

Start the service.

def stop(self) -> None:
33    def stop(self) -> None:
34        """Stop the service."""
35        # does nothing by default

Stop the service.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class LoggerService(Service):
38class LoggerService(Service):
39    """Service for logging messages.
40
41    https://loguru.readthedocs.io/en/stable/api/logger.html
42
43    Parameters:
44        sink (str): logging output.
45        level (str): logging level.
46        format (str): logging format.
47        colorize (bool): colorize output.
48        serialize (bool): convert to JSON.
49        backtrace (bool): enable exception trace.
50        diagnose (bool): enable variable display.
51        catch (bool): catch errors during log handling.
52    """
53
54    sink: str = "stderr"
55    level: str = "DEBUG"
56    format: str = (
57        "<green>[{time:YYYY-MM-DD HH:mm:ss.SSS}]</green>"
58        "<level>[{level}]</level>"
59        "<cyan>[{name}:{function}:{line}]</cyan>"
60        " <level>{message}</level>"
61    )
62    colorize: bool = True
63    serialize: bool = False
64    backtrace: bool = True
65    diagnose: bool = False
66    catch: bool = True
67
68    @T.override
69    def start(self) -> None:
70        loguru.logger.remove()
71        config = self.model_dump()
72        # use standard sinks or keep the original
73        sinks = {"stderr": sys.stderr, "stdout": sys.stdout}
74        config["sink"] = sinks.get(config["sink"], config["sink"])
75        loguru.logger.add(**config)
76
77    def logger(self) -> loguru.Logger:
78        """Return the main logger.
79
80        Returns:
81            loguru.Logger: the main logger.
82        """
83        return loguru.logger

Service for logging messages.

https://loguru.readthedocs.io/en/stable/api/logger.html

Arguments:
  • sink (str): logging output.
  • level (str): logging level.
  • format (str): logging format.
  • colorize (bool): colorize output.
  • serialize (bool): convert to JSON.
  • backtrace (bool): enable exception trace.
  • diagnose (bool): enable variable display.
  • catch (bool): catch errors during log handling.
sink: str
level: str
format: str
colorize: bool
serialize: bool
backtrace: bool
diagnose: bool
catch: bool
@T.override
def start(self) -> None:
68    @T.override
69    def start(self) -> None:
70        loguru.logger.remove()
71        config = self.model_dump()
72        # use standard sinks or keep the original
73        sinks = {"stderr": sys.stderr, "stdout": sys.stdout}
74        config["sink"] = sinks.get(config["sink"], config["sink"])
75        loguru.logger.add(**config)

Start the service.

def logger(self) -> 'loguru.Logger':
77    def logger(self) -> loguru.Logger:
78        """Return the main logger.
79
80        Returns:
81            loguru.Logger: the main logger.
82        """
83        return loguru.logger

Return the main logger.

Returns:

loguru.Logger: the main logger.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
Service
stop
class AlertsService(Service):
 86class AlertsService(Service):
 87    """Service for sending notifications.
 88
 89    Require libnotify-bin on Linux systems.
 90
 91    In production, use with Slack, Discord, or emails.
 92
 93    https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification
 94
 95    Parameters:
 96        enable (bool): use notifications or print.
 97        app_name (str): name of the application.
 98        timeout (int | None): timeout in secs.
 99    """
100
101    enable: bool = True
102    app_name: str = "Bikes"
103    timeout: int | None = None
104
105    @T.override
106    def start(self) -> None:
107        pass
108
109    def notify(self, title: str, message: str) -> None:
110        """Send a notification to the system.
111
112        Args:
113            title (str): title of the notification.
114            message (str): message of the notification.
115        """
116        if self.enable:
117            try:
118                notification.notify(
119                    title=title,
120                    message=message,
121                    app_name=self.app_name,
122                    timeout=self.timeout,
123                )
124            except NotImplementedError:
125                print("Notifications are not supported on this system.")
126                self._print(title=title, message=message)
127        else:
128            self._print(title=title, message=message)
129
130    def _print(self, title: str, message: str) -> None:
131        """Print a notification to the system.
132
133        Args:
134            title (str): title of the notification.
135            message (str): message of the notification.
136        """
137        print(f"[{self.app_name}] {title}: {message}")

Service for sending notifications.

Require libnotify-bin on Linux systems.

In production, use with Slack, Discord, or emails.

https://plyer.readthedocs.io/en/latest/api.html#plyer.facades.Notification

Arguments:
  • enable (bool): use notifications or print.
  • app_name (str): name of the application.
  • timeout (int | None): timeout in secs.
enable: bool
app_name: str
timeout: int | None
@T.override
def start(self) -> None:
105    @T.override
106    def start(self) -> None:
107        pass

Start the service.

def notify(self, title: str, message: str) -> None:
109    def notify(self, title: str, message: str) -> None:
110        """Send a notification to the system.
111
112        Args:
113            title (str): title of the notification.
114            message (str): message of the notification.
115        """
116        if self.enable:
117            try:
118                notification.notify(
119                    title=title,
120                    message=message,
121                    app_name=self.app_name,
122                    timeout=self.timeout,
123                )
124            except NotImplementedError:
125                print("Notifications are not supported on this system.")
126                self._print(title=title, message=message)
127        else:
128            self._print(title=title, message=message)

Send a notification to the system.

Arguments:
  • title (str): title of the notification.
  • message (str): message of the notification.
model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
Service
stop
class MlflowService(Service):
140class MlflowService(Service):
141    """Service for Mlflow tracking and registry.
142
143    Parameters:
144        tracking_uri (str): the URI for the Mlflow tracking server.
145        registry_uri (str): the URI for the Mlflow model registry.
146        experiment_name (str): the name of tracking experiment.
147        registry_name (str): the name of model registry.
148        autolog_disable (bool): disable autologging.
149        autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions.
150        autolog_exclusive (bool): If True, enables exclusive autologging.
151        autolog_log_input_examples (bool): If True, logs input examples during autologging.
152        autolog_log_model_signatures (bool): If True, logs model signatures during autologging.
153        autolog_log_models (bool): If True, enables logging of models during autologging.
154        autolog_log_datasets (bool): If True, logs datasets used during autologging.
155        autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging.
156    """
157
158    class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
159        """Run configuration for Mlflow tracking.
160
161        Parameters:
162            name (str): name of the run.
163            description (str | None): description of the run.
164            tags (dict[str, T.Any] | None): tags for the run.
165            log_system_metrics (bool | None): enable system metrics logging.
166        """
167
168        name: str
169        description: str | None = None
170        tags: dict[str, T.Any] | None = None
171        log_system_metrics: bool | None = True
172
173    # server uri
174    tracking_uri: str = "./mlruns"
175    registry_uri: str = "./mlruns"
176    # experiment
177    experiment_name: str = "bikes"
178    # registry
179    registry_name: str = "bikes"
180    # autolog
181    autolog_disable: bool = False
182    autolog_disable_for_unsupported_versions: bool = False
183    autolog_exclusive: bool = False
184    autolog_log_input_examples: bool = True
185    autolog_log_model_signatures: bool = True
186    autolog_log_models: bool = False
187    autolog_log_datasets: bool = False
188    autolog_silent: bool = False
189
190    @T.override
191    def start(self) -> None:
192        # server uri
193        mlflow.set_tracking_uri(uri=self.tracking_uri)
194        mlflow.set_registry_uri(uri=self.registry_uri)
195        # experiment
196        mlflow.set_experiment(experiment_name=self.experiment_name)
197        # autolog
198        mlflow.autolog(
199            disable=self.autolog_disable,
200            disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions,
201            exclusive=self.autolog_exclusive,
202            log_input_examples=self.autolog_log_input_examples,
203            log_model_signatures=self.autolog_log_model_signatures,
204            log_datasets=self.autolog_log_datasets,
205            silent=self.autolog_silent,
206        )
207
208    @ctx.contextmanager
209    def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]:
210        """Yield an active Mlflow run and exit it afterwards.
211
212        Args:
213            run (str): run parameters.
214
215        Yields:
216            T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.
217        """
218        with mlflow.start_run(
219            run_name=run_config.name,
220            tags=run_config.tags,
221            description=run_config.description,
222            log_system_metrics=run_config.log_system_metrics,
223        ) as run:
224            yield run
225
226    def client(self) -> mt.MlflowClient:
227        """Return a new Mlflow client.
228
229        Returns:
230            MlflowClient: the mlflow client.
231        """
232        return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)

Service for Mlflow tracking and registry.

Arguments:
  • tracking_uri (str): the URI for the Mlflow tracking server.
  • registry_uri (str): the URI for the Mlflow model registry.
  • experiment_name (str): the name of tracking experiment.
  • registry_name (str): the name of model registry.
  • autolog_disable (bool): disable autologging.
  • autolog_disable_for_unsupported_versions (bool): disable autologging for unsupported versions.
  • autolog_exclusive (bool): If True, enables exclusive autologging.
  • autolog_log_input_examples (bool): If True, logs input examples during autologging.
  • autolog_log_model_signatures (bool): If True, logs model signatures during autologging.
  • autolog_log_models (bool): If True, enables logging of models during autologging.
  • autolog_log_datasets (bool): If True, logs datasets used during autologging.
  • autolog_silent (bool): If True, suppresses all Mlflow warnings during autologging.
tracking_uri: str
registry_uri: str
experiment_name: str
registry_name: str
autolog_disable: bool
autolog_disable_for_unsupported_versions: bool
autolog_exclusive: bool
autolog_log_input_examples: bool
autolog_log_model_signatures: bool
autolog_log_models: bool
autolog_log_datasets: bool
autolog_silent: bool
@T.override
def start(self) -> None:
190    @T.override
191    def start(self) -> None:
192        # server uri
193        mlflow.set_tracking_uri(uri=self.tracking_uri)
194        mlflow.set_registry_uri(uri=self.registry_uri)
195        # experiment
196        mlflow.set_experiment(experiment_name=self.experiment_name)
197        # autolog
198        mlflow.autolog(
199            disable=self.autolog_disable,
200            disable_for_unsupported_versions=self.autolog_disable_for_unsupported_versions,
201            exclusive=self.autolog_exclusive,
202            log_input_examples=self.autolog_log_input_examples,
203            log_model_signatures=self.autolog_log_model_signatures,
204            log_datasets=self.autolog_log_datasets,
205            silent=self.autolog_silent,
206        )

Start the service.

@ctx.contextmanager
def run_context( self, run_config: MlflowService.RunConfig) -> Generator[mlflow.tracking.fluent.ActiveRun, NoneType, NoneType]:
208    @ctx.contextmanager
209    def run_context(self, run_config: RunConfig) -> T.Generator[mlflow.ActiveRun, None, None]:
210        """Yield an active Mlflow run and exit it afterwards.
211
212        Args:
213            run (str): run parameters.
214
215        Yields:
216            T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.
217        """
218        with mlflow.start_run(
219            run_name=run_config.name,
220            tags=run_config.tags,
221            description=run_config.description,
222            log_system_metrics=run_config.log_system_metrics,
223        ) as run:
224            yield run

Yield an active Mlflow run and exit it afterwards.

Arguments:
  • run (str): run parameters.
Yields:

T.Generator[mlflow.ActiveRun, None, None]: active run context. Will be closed at the end of context.

def client(self) -> mlflow.tracking.client.MlflowClient:
226    def client(self) -> mt.MlflowClient:
227        """Return a new Mlflow client.
228
229        Returns:
230            MlflowClient: the mlflow client.
231        """
232        return mt.MlflowClient(tracking_uri=self.tracking_uri, registry_uri=self.registry_uri)

Return a new Mlflow client.

Returns:

MlflowClient: the mlflow client.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
Service
stop
class MlflowService.RunConfig(pydantic.main.BaseModel):
158    class RunConfig(pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
159        """Run configuration for Mlflow tracking.
160
161        Parameters:
162            name (str): name of the run.
163            description (str | None): description of the run.
164            tags (dict[str, T.Any] | None): tags for the run.
165            log_system_metrics (bool | None): enable system metrics logging.
166        """
167
168        name: str
169        description: str | None = None
170        tags: dict[str, T.Any] | None = None
171        log_system_metrics: bool | None = True

Run configuration for Mlflow tracking.

Arguments:
  • name (str): name of the run.
  • description (str | None): description of the run.
  • tags (dict[str, T.Any] | None): tags for the run.
  • log_system_metrics (bool | None): enable system metrics logging.
name: str
description: str | None
tags: dict[str, typing.Any] | None
log_system_metrics: bool | None
model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].