bikes.io.datasets

Read/Write datasets from/to external sources/destinations.

  1"""Read/Write datasets from/to external sources/destinations."""
  2
  3# %% IMPORTS
  4
  5import abc
  6import typing as T
  7
  8import mlflow.data.pandas_dataset as lineage
  9import pandas as pd
 10import pydantic as pdt
 11
 12# %% TYPINGS
 13
 14Lineage: T.TypeAlias = lineage.PandasDataset
 15
 16# %% READERS
 17
 18
 19class Reader(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
 20    """Base class for a dataset reader.
 21
 22    Use a reader to load a dataset in memory.
 23    e.g., to read file, database, cloud storage, ...
 24
 25    Parameters:
 26        limit (int, optional): maximum number of rows to read. Defaults to None.
 27    """
 28
 29    KIND: str
 30
 31    limit: int | None = None
 32
 33    @abc.abstractmethod
 34    def read(self) -> pd.DataFrame:
 35        """Read a dataframe from a dataset.
 36
 37        Returns:
 38            pd.DataFrame: dataframe representation.
 39        """
 40
 41    @abc.abstractmethod
 42    def lineage(
 43        self,
 44        name: str,
 45        data: pd.DataFrame,
 46        targets: str | None = None,
 47        predictions: str | None = None,
 48    ) -> Lineage:
 49        """Generate lineage information.
 50
 51        Args:
 52            name (str): dataset name.
 53            data (pd.DataFrame): reader dataframe.
 54            targets (str | None): name of the target column.
 55            predictions (str | None): name of the prediction column.
 56
 57        Returns:
 58            Lineage: lineage information.
 59        """
 60
 61
 62class ParquetReader(Reader):
 63    """Read a dataframe from a parquet file.
 64
 65    Parameters:
 66        path (str): local path to the dataset.
 67    """
 68
 69    KIND: T.Literal["ParquetReader"] = "ParquetReader"
 70
 71    path: str
 72    backend: T.Literal["pyarrow", "numpy_nullable"] = "pyarrow"
 73
 74    @T.override
 75    def read(self) -> pd.DataFrame:
 76        # can't limit rows at read time
 77        data = pd.read_parquet(self.path, dtype_backend="pyarrow")
 78        if self.limit is not None:
 79            data = data.head(self.limit)
 80        return data
 81
 82    @T.override
 83    def lineage(
 84        self,
 85        name: str,
 86        data: pd.DataFrame,
 87        targets: str | None = None,
 88        predictions: str | None = None,
 89    ) -> Lineage:
 90        return lineage.from_pandas(
 91            df=data,
 92            name=name,
 93            source=self.path,
 94            targets=targets,
 95            predictions=predictions,
 96        )
 97
 98
 99ReaderKind = ParquetReader
100
101# %% WRITERS
102
103
104class Writer(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
105    """Base class for a dataset writer.
106
107    Use a writer to save a dataset from memory.
108    e.g., to write file, database, cloud storage, ...
109    """
110
111    KIND: str
112
113    @abc.abstractmethod
114    def write(self, data: pd.DataFrame) -> None:
115        """Write a dataframe to a dataset.
116
117        Args:
118            data (pd.DataFrame): dataframe representation.
119        """
120
121
122class ParquetWriter(Writer):
123    """Writer a dataframe to a parquet file.
124
125    Parameters:
126        path (str): local or S3 path to the dataset.
127    """
128
129    KIND: T.Literal["ParquetWriter"] = "ParquetWriter"
130
131    path: str
132
133    @T.override
134    def write(self, data: pd.DataFrame) -> None:
135        pd.DataFrame.to_parquet(data, self.path)
136
137
138WriterKind = ParquetWriter
Lineage: TypeAlias = mlflow.data.pandas_dataset.PandasDataset
class Reader(abc.ABC, pydantic.main.BaseModel):
20class Reader(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
21    """Base class for a dataset reader.
22
23    Use a reader to load a dataset in memory.
24    e.g., to read file, database, cloud storage, ...
25
26    Parameters:
27        limit (int, optional): maximum number of rows to read. Defaults to None.
28    """
29
30    KIND: str
31
32    limit: int | None = None
33
34    @abc.abstractmethod
35    def read(self) -> pd.DataFrame:
36        """Read a dataframe from a dataset.
37
38        Returns:
39            pd.DataFrame: dataframe representation.
40        """
41
42    @abc.abstractmethod
43    def lineage(
44        self,
45        name: str,
46        data: pd.DataFrame,
47        targets: str | None = None,
48        predictions: str | None = None,
49    ) -> Lineage:
50        """Generate lineage information.
51
52        Args:
53            name (str): dataset name.
54            data (pd.DataFrame): reader dataframe.
55            targets (str | None): name of the target column.
56            predictions (str | None): name of the prediction column.
57
58        Returns:
59            Lineage: lineage information.
60        """

Base class for a dataset reader.

Use a reader to load a dataset in memory. e.g., to read file, database, cloud storage, ...

Arguments:
  • limit (int, optional): maximum number of rows to read. Defaults to None.
KIND: str
limit: int | None
@abc.abstractmethod
def read(self) -> pandas.core.frame.DataFrame:
34    @abc.abstractmethod
35    def read(self) -> pd.DataFrame:
36        """Read a dataframe from a dataset.
37
38        Returns:
39            pd.DataFrame: dataframe representation.
40        """

Read a dataframe from a dataset.

Returns:

pd.DataFrame: dataframe representation.

@abc.abstractmethod
def lineage( self, name: str, data: pandas.core.frame.DataFrame, targets: str | None = None, predictions: str | None = None) -> mlflow.data.pandas_dataset.PandasDataset:
42    @abc.abstractmethod
43    def lineage(
44        self,
45        name: str,
46        data: pd.DataFrame,
47        targets: str | None = None,
48        predictions: str | None = None,
49    ) -> Lineage:
50        """Generate lineage information.
51
52        Args:
53            name (str): dataset name.
54            data (pd.DataFrame): reader dataframe.
55            targets (str | None): name of the target column.
56            predictions (str | None): name of the prediction column.
57
58        Returns:
59            Lineage: lineage information.
60        """

Generate lineage information.

Arguments:
  • name (str): dataset name.
  • data (pd.DataFrame): reader dataframe.
  • targets (str | None): name of the target column.
  • predictions (str | None): name of the prediction column.
Returns:

Lineage: lineage information.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ParquetReader(Reader):
63class ParquetReader(Reader):
64    """Read a dataframe from a parquet file.
65
66    Parameters:
67        path (str): local path to the dataset.
68    """
69
70    KIND: T.Literal["ParquetReader"] = "ParquetReader"
71
72    path: str
73    backend: T.Literal["pyarrow", "numpy_nullable"] = "pyarrow"
74
75    @T.override
76    def read(self) -> pd.DataFrame:
77        # can't limit rows at read time
78        data = pd.read_parquet(self.path, dtype_backend="pyarrow")
79        if self.limit is not None:
80            data = data.head(self.limit)
81        return data
82
83    @T.override
84    def lineage(
85        self,
86        name: str,
87        data: pd.DataFrame,
88        targets: str | None = None,
89        predictions: str | None = None,
90    ) -> Lineage:
91        return lineage.from_pandas(
92            df=data,
93            name=name,
94            source=self.path,
95            targets=targets,
96            predictions=predictions,
97        )

Read a dataframe from a parquet file.

Arguments:
  • path (str): local path to the dataset.
KIND: Literal['ParquetReader']
path: str
backend: Literal['pyarrow', 'numpy_nullable']
@T.override
def read(self) -> pandas.core.frame.DataFrame:
75    @T.override
76    def read(self) -> pd.DataFrame:
77        # can't limit rows at read time
78        data = pd.read_parquet(self.path, dtype_backend="pyarrow")
79        if self.limit is not None:
80            data = data.head(self.limit)
81        return data

Read a dataframe from a dataset.

Returns:

pd.DataFrame: dataframe representation.

@T.override
def lineage( self, name: str, data: pandas.core.frame.DataFrame, targets: str | None = None, predictions: str | None = None) -> mlflow.data.pandas_dataset.PandasDataset:
83    @T.override
84    def lineage(
85        self,
86        name: str,
87        data: pd.DataFrame,
88        targets: str | None = None,
89        predictions: str | None = None,
90    ) -> Lineage:
91        return lineage.from_pandas(
92            df=data,
93            name=name,
94            source=self.path,
95            targets=targets,
96            predictions=predictions,
97        )

Generate lineage information.

Arguments:
  • name (str): dataset name.
  • data (pd.DataFrame): reader dataframe.
  • targets (str | None): name of the target column.
  • predictions (str | None): name of the prediction column.
Returns:

Lineage: lineage information.

model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Inherited Members
Reader
limit
ReaderKind = <class 'ParquetReader'>
class Writer(abc.ABC, pydantic.main.BaseModel):
105class Writer(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
106    """Base class for a dataset writer.
107
108    Use a writer to save a dataset from memory.
109    e.g., to write file, database, cloud storage, ...
110    """
111
112    KIND: str
113
114    @abc.abstractmethod
115    def write(self, data: pd.DataFrame) -> None:
116        """Write a dataframe to a dataset.
117
118        Args:
119            data (pd.DataFrame): dataframe representation.
120        """

Base class for a dataset writer.

Use a writer to save a dataset from memory. e.g., to write file, database, cloud storage, ...

KIND: str
@abc.abstractmethod
def write(self, data: pandas.core.frame.DataFrame) -> None:
114    @abc.abstractmethod
115    def write(self, data: pd.DataFrame) -> None:
116        """Write a dataframe to a dataset.
117
118        Args:
119            data (pd.DataFrame): dataframe representation.
120        """

Write a dataframe to a dataset.

Arguments:
  • data (pd.DataFrame): dataframe representation.
model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ParquetWriter(Writer):
123class ParquetWriter(Writer):
124    """Writer a dataframe to a parquet file.
125
126    Parameters:
127        path (str): local or S3 path to the dataset.
128    """
129
130    KIND: T.Literal["ParquetWriter"] = "ParquetWriter"
131
132    path: str
133
134    @T.override
135    def write(self, data: pd.DataFrame) -> None:
136        pd.DataFrame.to_parquet(data, self.path)

Writer a dataframe to a parquet file.

Arguments:
  • path (str): local or S3 path to the dataset.
KIND: Literal['ParquetWriter']
path: str
@T.override
def write(self, data: pandas.core.frame.DataFrame) -> None:
134    @T.override
135    def write(self, data: pd.DataFrame) -> None:
136        pd.DataFrame.to_parquet(data, self.path)

Write a dataframe to a dataset.

Arguments:
  • data (pd.DataFrame): dataframe representation.
model_config: ClassVar[pydantic.config.ConfigDict] = {'strict': True, 'frozen': True, 'extra': 'forbid'}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

WriterKind = <class 'ParquetWriter'>