bikes.io.datasets

Read/Write datasets from/to external sources/destinations.

  1"""Read/Write datasets from/to external sources/destinations."""
  2
  3# %% IMPORTS
  4
  5import abc
  6import typing as T
  7
  8import mlflow.data.pandas_dataset as lineage
  9import pandas as pd
 10import pydantic as pdt
 11
 12# %% TYPINGS
 13
 14Lineage: T.TypeAlias = lineage.PandasDataset
 15
 16# %% READERS
 17
 18
 19class Reader(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
 20    """Base class for a dataset reader.
 21
 22    Use a reader to load a dataset in memory.
 23    e.g., to read file, database, cloud storage, ...
 24
 25    Parameters:
 26        limit (int, optional): maximum number of rows to read. Defaults to None.
 27    """
 28
 29    KIND: str
 30
 31    limit: int | None = None
 32
 33    @abc.abstractmethod
 34    def read(self) -> pd.DataFrame:
 35        """Read a dataframe from a dataset.
 36
 37        Returns:
 38            pd.DataFrame: dataframe representation.
 39        """
 40
 41    @abc.abstractmethod
 42    def lineage(
 43        self,
 44        name: str,
 45        data: pd.DataFrame,
 46        targets: str | None = None,
 47        predictions: str | None = None,
 48    ) -> Lineage:
 49        """Generate lineage information.
 50
 51        Args:
 52            name (str): dataset name.
 53            data (pd.DataFrame): reader dataframe.
 54            targets (str | None): name of the target column.
 55            predictions (str | None): name of the prediction column.
 56
 57        Returns:
 58            Lineage: lineage information.
 59        """
 60
 61
 62class ParquetReader(Reader):
 63    """Read a dataframe from a parquet file.
 64
 65    Parameters:
 66        path (str): local path to the dataset.
 67    """
 68
 69    KIND: T.Literal["ParquetReader"] = "ParquetReader"
 70
 71    path: str
 72
 73    @T.override
 74    def read(self) -> pd.DataFrame:
 75        # can't limit rows at read time
 76        data = pd.read_parquet(self.path)
 77        if self.limit is not None:
 78            data = data.head(self.limit)
 79        return data
 80
 81    @T.override
 82    def lineage(
 83        self,
 84        name: str,
 85        data: pd.DataFrame,
 86        targets: str | None = None,
 87        predictions: str | None = None,
 88    ) -> Lineage:
 89        return lineage.from_pandas(
 90            df=data, name=name, source=self.path, targets=targets, predictions=predictions
 91        )
 92
 93
 94ReaderKind = ParquetReader
 95
 96# %% WRITERS
 97
 98
 99class Writer(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
100    """Base class for a dataset writer.
101
102    Use a writer to save a dataset from memory.
103    e.g., to write file, database, cloud storage, ...
104    """
105
106    KIND: str
107
108    @abc.abstractmethod
109    def write(self, data: pd.DataFrame) -> None:
110        """Write a dataframe to a dataset.
111
112        Args:
113            data (pd.DataFrame): dataframe representation.
114        """
115
116
117class ParquetWriter(Writer):
118    """Writer a dataframe to a parquet file.
119
120    Parameters:
121        path (str): local or S3 path to the dataset.
122    """
123
124    KIND: T.Literal["ParquetWriter"] = "ParquetWriter"
125
126    path: str
127
128    @T.override
129    def write(self, data: pd.DataFrame) -> None:
130        pd.DataFrame.to_parquet(data, self.path)
131
132
133WriterKind = ParquetWriter
Lineage: TypeAlias = mlflow.data.pandas_dataset.PandasDataset
class Reader(abc.ABC, pydantic.main.BaseModel):
20class Reader(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
21    """Base class for a dataset reader.
22
23    Use a reader to load a dataset in memory.
24    e.g., to read file, database, cloud storage, ...
25
26    Parameters:
27        limit (int, optional): maximum number of rows to read. Defaults to None.
28    """
29
30    KIND: str
31
32    limit: int | None = None
33
34    @abc.abstractmethod
35    def read(self) -> pd.DataFrame:
36        """Read a dataframe from a dataset.
37
38        Returns:
39            pd.DataFrame: dataframe representation.
40        """
41
42    @abc.abstractmethod
43    def lineage(
44        self,
45        name: str,
46        data: pd.DataFrame,
47        targets: str | None = None,
48        predictions: str | None = None,
49    ) -> Lineage:
50        """Generate lineage information.
51
52        Args:
53            name (str): dataset name.
54            data (pd.DataFrame): reader dataframe.
55            targets (str | None): name of the target column.
56            predictions (str | None): name of the prediction column.
57
58        Returns:
59            Lineage: lineage information.
60        """

Base class for a dataset reader.

Use a reader to load a dataset in memory. e.g., to read file, database, cloud storage, ...

Arguments:
  • limit (int, optional): maximum number of rows to read. Defaults to None.
KIND: str
limit: int | None
@abc.abstractmethod
def read(self) -> pandas.core.frame.DataFrame:
34    @abc.abstractmethod
35    def read(self) -> pd.DataFrame:
36        """Read a dataframe from a dataset.
37
38        Returns:
39            pd.DataFrame: dataframe representation.
40        """

Read a dataframe from a dataset.

Returns:

pd.DataFrame: dataframe representation.

@abc.abstractmethod
def lineage( self, name: str, data: pandas.core.frame.DataFrame, targets: str | None = None, predictions: str | None = None) -> mlflow.data.pandas_dataset.PandasDataset:
42    @abc.abstractmethod
43    def lineage(
44        self,
45        name: str,
46        data: pd.DataFrame,
47        targets: str | None = None,
48        predictions: str | None = None,
49    ) -> Lineage:
50        """Generate lineage information.
51
52        Args:
53            name (str): dataset name.
54            data (pd.DataFrame): reader dataframe.
55            targets (str | None): name of the target column.
56            predictions (str | None): name of the prediction column.
57
58        Returns:
59            Lineage: lineage information.
60        """

Generate lineage information.

Arguments:
  • name (str): dataset name.
  • data (pd.DataFrame): reader dataframe.
  • targets (str | None): name of the target column.
  • predictions (str | None): name of the prediction column.
Returns:

Lineage: lineage information.

model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=str, required=True), 'limit': FieldInfo(annotation=Union[int, NoneType], required=False, default=None)}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
class ParquetReader(Reader):
63class ParquetReader(Reader):
64    """Read a dataframe from a parquet file.
65
66    Parameters:
67        path (str): local path to the dataset.
68    """
69
70    KIND: T.Literal["ParquetReader"] = "ParquetReader"
71
72    path: str
73
74    @T.override
75    def read(self) -> pd.DataFrame:
76        # can't limit rows at read time
77        data = pd.read_parquet(self.path)
78        if self.limit is not None:
79            data = data.head(self.limit)
80        return data
81
82    @T.override
83    def lineage(
84        self,
85        name: str,
86        data: pd.DataFrame,
87        targets: str | None = None,
88        predictions: str | None = None,
89    ) -> Lineage:
90        return lineage.from_pandas(
91            df=data, name=name, source=self.path, targets=targets, predictions=predictions
92        )

Read a dataframe from a parquet file.

Arguments:
  • path (str): local path to the dataset.
KIND: Literal['ParquetReader']
path: str
@T.override
def read(self) -> pandas.core.frame.DataFrame:
74    @T.override
75    def read(self) -> pd.DataFrame:
76        # can't limit rows at read time
77        data = pd.read_parquet(self.path)
78        if self.limit is not None:
79            data = data.head(self.limit)
80        return data

Read a dataframe from a dataset.

Returns:

pd.DataFrame: dataframe representation.

@T.override
def lineage( self, name: str, data: pandas.core.frame.DataFrame, targets: str | None = None, predictions: str | None = None) -> mlflow.data.pandas_dataset.PandasDataset:
82    @T.override
83    def lineage(
84        self,
85        name: str,
86        data: pd.DataFrame,
87        targets: str | None = None,
88        predictions: str | None = None,
89    ) -> Lineage:
90        return lineage.from_pandas(
91            df=data, name=name, source=self.path, targets=targets, predictions=predictions
92        )

Generate lineage information.

Arguments:
  • name (str): dataset name.
  • data (pd.DataFrame): reader dataframe.
  • targets (str | None): name of the target column.
  • predictions (str | None): name of the prediction column.
Returns:

Lineage: lineage information.

model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=Literal['ParquetReader'], required=False, default='ParquetReader'), 'limit': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'path': FieldInfo(annotation=str, required=True)}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
Reader
limit
ReaderKind = <class 'ParquetReader'>
class Writer(abc.ABC, pydantic.main.BaseModel):
100class Writer(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"):
101    """Base class for a dataset writer.
102
103    Use a writer to save a dataset from memory.
104    e.g., to write file, database, cloud storage, ...
105    """
106
107    KIND: str
108
109    @abc.abstractmethod
110    def write(self, data: pd.DataFrame) -> None:
111        """Write a dataframe to a dataset.
112
113        Args:
114            data (pd.DataFrame): dataframe representation.
115        """

Base class for a dataset writer.

Use a writer to save a dataset from memory. e.g., to write file, database, cloud storage, ...

KIND: str
@abc.abstractmethod
def write(self, data: pandas.core.frame.DataFrame) -> None:
109    @abc.abstractmethod
110    def write(self, data: pd.DataFrame) -> None:
111        """Write a dataframe to a dataset.
112
113        Args:
114            data (pd.DataFrame): dataframe representation.
115        """

Write a dataframe to a dataset.

Arguments:
  • data (pd.DataFrame): dataframe representation.
model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=str, required=True)}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
class ParquetWriter(Writer):
118class ParquetWriter(Writer):
119    """Writer a dataframe to a parquet file.
120
121    Parameters:
122        path (str): local or S3 path to the dataset.
123    """
124
125    KIND: T.Literal["ParquetWriter"] = "ParquetWriter"
126
127    path: str
128
129    @T.override
130    def write(self, data: pd.DataFrame) -> None:
131        pd.DataFrame.to_parquet(data, self.path)

Writer a dataframe to a parquet file.

Arguments:
  • path (str): local or S3 path to the dataset.
KIND: Literal['ParquetWriter']
path: str
@T.override
def write(self, data: pandas.core.frame.DataFrame) -> None:
129    @T.override
130    def write(self, data: pd.DataFrame) -> None:
131        pd.DataFrame.to_parquet(data, self.path)

Write a dataframe to a dataset.

Arguments:
  • data (pd.DataFrame): dataframe representation.
model_config = {'strict': True, 'frozen': True, 'extra': 'forbid'}
model_fields = {'KIND': FieldInfo(annotation=Literal['ParquetWriter'], required=False, default='ParquetWriter'), 'path': FieldInfo(annotation=str, required=True)}
model_computed_fields = {}
Inherited Members
pydantic.main.BaseModel
BaseModel
model_extra
model_fields_set
model_construct
model_copy
model_dump
model_dump_json
model_json_schema
model_parametrized_name
model_post_init
model_rebuild
model_validate
model_validate_json
model_validate_strings
dict
json
parse_obj
parse_raw
parse_file
from_orm
construct
copy
schema
schema_json
validate
update_forward_refs
WriterKind = <class 'ParquetWriter'>