bikes.io.datasets
Read/Write datasets from/to external sources/destinations.
1"""Read/Write datasets from/to external sources/destinations.""" 2 3# %% IMPORTS 4 5import abc 6import typing as T 7 8import mlflow.data.pandas_dataset as lineage 9import pandas as pd 10import pydantic as pdt 11 12# %% TYPINGS 13 14Lineage: T.TypeAlias = lineage.PandasDataset 15 16# %% READERS 17 18 19class Reader(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 20 """Base class for a dataset reader. 21 22 Use a reader to load a dataset in memory. 23 e.g., to read file, database, cloud storage, ... 24 25 Parameters: 26 limit (int, optional): maximum number of rows to read. Defaults to None. 27 """ 28 29 KIND: str 30 31 limit: int | None = None 32 33 @abc.abstractmethod 34 def read(self) -> pd.DataFrame: 35 """Read a dataframe from a dataset. 36 37 Returns: 38 pd.DataFrame: dataframe representation. 39 """ 40 41 @abc.abstractmethod 42 def lineage( 43 self, 44 name: str, 45 data: pd.DataFrame, 46 targets: str | None = None, 47 predictions: str | None = None, 48 ) -> Lineage: 49 """Generate lineage information. 50 51 Args: 52 name (str): dataset name. 53 data (pd.DataFrame): reader dataframe. 54 targets (str | None): name of the target column. 55 predictions (str | None): name of the prediction column. 56 57 Returns: 58 Lineage: lineage information. 59 """ 60 61 62class ParquetReader(Reader): 63 """Read a dataframe from a parquet file. 64 65 Parameters: 66 path (str): local path to the dataset. 67 """ 68 69 KIND: T.Literal["ParquetReader"] = "ParquetReader" 70 71 path: str 72 backend: T.Literal["pyarrow", "numpy_nullable"] = "pyarrow" 73 74 @T.override 75 def read(self) -> pd.DataFrame: 76 # can't limit rows at read time 77 data = pd.read_parquet(self.path, dtype_backend="pyarrow") 78 if self.limit is not None: 79 data = data.head(self.limit) 80 return data 81 82 @T.override 83 def lineage( 84 self, 85 name: str, 86 data: pd.DataFrame, 87 targets: str | None = None, 88 predictions: str | None = None, 89 ) -> Lineage: 90 return lineage.from_pandas( 91 df=data, 92 name=name, 93 source=self.path, 94 targets=targets, 95 predictions=predictions, 96 ) 97 98 99ReaderKind = ParquetReader 100 101# %% WRITERS 102 103 104class Writer(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 105 """Base class for a dataset writer. 106 107 Use a writer to save a dataset from memory. 108 e.g., to write file, database, cloud storage, ... 109 """ 110 111 KIND: str 112 113 @abc.abstractmethod 114 def write(self, data: pd.DataFrame) -> None: 115 """Write a dataframe to a dataset. 116 117 Args: 118 data (pd.DataFrame): dataframe representation. 119 """ 120 121 122class ParquetWriter(Writer): 123 """Writer a dataframe to a parquet file. 124 125 Parameters: 126 path (str): local or S3 path to the dataset. 127 """ 128 129 KIND: T.Literal["ParquetWriter"] = "ParquetWriter" 130 131 path: str 132 133 @T.override 134 def write(self, data: pd.DataFrame) -> None: 135 pd.DataFrame.to_parquet(data, self.path) 136 137 138WriterKind = ParquetWriter
Lineage: TypeAlias =
mlflow.data.pandas_dataset.PandasDataset
class
Reader(abc.ABC, pydantic.main.BaseModel):
20class Reader(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 21 """Base class for a dataset reader. 22 23 Use a reader to load a dataset in memory. 24 e.g., to read file, database, cloud storage, ... 25 26 Parameters: 27 limit (int, optional): maximum number of rows to read. Defaults to None. 28 """ 29 30 KIND: str 31 32 limit: int | None = None 33 34 @abc.abstractmethod 35 def read(self) -> pd.DataFrame: 36 """Read a dataframe from a dataset. 37 38 Returns: 39 pd.DataFrame: dataframe representation. 40 """ 41 42 @abc.abstractmethod 43 def lineage( 44 self, 45 name: str, 46 data: pd.DataFrame, 47 targets: str | None = None, 48 predictions: str | None = None, 49 ) -> Lineage: 50 """Generate lineage information. 51 52 Args: 53 name (str): dataset name. 54 data (pd.DataFrame): reader dataframe. 55 targets (str | None): name of the target column. 56 predictions (str | None): name of the prediction column. 57 58 Returns: 59 Lineage: lineage information. 60 """
Base class for a dataset reader.
Use a reader to load a dataset in memory. e.g., to read file, database, cloud storage, ...
Arguments:
- limit (int, optional): maximum number of rows to read. Defaults to None.
@abc.abstractmethod
def
read(self) -> pandas.core.frame.DataFrame:
34 @abc.abstractmethod 35 def read(self) -> pd.DataFrame: 36 """Read a dataframe from a dataset. 37 38 Returns: 39 pd.DataFrame: dataframe representation. 40 """
Read a dataframe from a dataset.
Returns:
pd.DataFrame: dataframe representation.
@abc.abstractmethod
def
lineage( self, name: str, data: pandas.core.frame.DataFrame, targets: str | None = None, predictions: str | None = None) -> mlflow.data.pandas_dataset.PandasDataset:
42 @abc.abstractmethod 43 def lineage( 44 self, 45 name: str, 46 data: pd.DataFrame, 47 targets: str | None = None, 48 predictions: str | None = None, 49 ) -> Lineage: 50 """Generate lineage information. 51 52 Args: 53 name (str): dataset name. 54 data (pd.DataFrame): reader dataframe. 55 targets (str | None): name of the target column. 56 predictions (str | None): name of the prediction column. 57 58 Returns: 59 Lineage: lineage information. 60 """
Generate lineage information.
Arguments:
- name (str): dataset name.
- data (pd.DataFrame): reader dataframe.
- targets (str | None): name of the target column.
- predictions (str | None): name of the prediction column.
Returns:
Lineage: lineage information.
63class ParquetReader(Reader): 64 """Read a dataframe from a parquet file. 65 66 Parameters: 67 path (str): local path to the dataset. 68 """ 69 70 KIND: T.Literal["ParquetReader"] = "ParquetReader" 71 72 path: str 73 backend: T.Literal["pyarrow", "numpy_nullable"] = "pyarrow" 74 75 @T.override 76 def read(self) -> pd.DataFrame: 77 # can't limit rows at read time 78 data = pd.read_parquet(self.path, dtype_backend="pyarrow") 79 if self.limit is not None: 80 data = data.head(self.limit) 81 return data 82 83 @T.override 84 def lineage( 85 self, 86 name: str, 87 data: pd.DataFrame, 88 targets: str | None = None, 89 predictions: str | None = None, 90 ) -> Lineage: 91 return lineage.from_pandas( 92 df=data, 93 name=name, 94 source=self.path, 95 targets=targets, 96 predictions=predictions, 97 )
Read a dataframe from a parquet file.
Arguments:
- path (str): local path to the dataset.
@T.override
def
read(self) -> pandas.core.frame.DataFrame:
75 @T.override 76 def read(self) -> pd.DataFrame: 77 # can't limit rows at read time 78 data = pd.read_parquet(self.path, dtype_backend="pyarrow") 79 if self.limit is not None: 80 data = data.head(self.limit) 81 return data
Read a dataframe from a dataset.
Returns:
pd.DataFrame: dataframe representation.
@T.override
def
lineage( self, name: str, data: pandas.core.frame.DataFrame, targets: str | None = None, predictions: str | None = None) -> mlflow.data.pandas_dataset.PandasDataset:
83 @T.override 84 def lineage( 85 self, 86 name: str, 87 data: pd.DataFrame, 88 targets: str | None = None, 89 predictions: str | None = None, 90 ) -> Lineage: 91 return lineage.from_pandas( 92 df=data, 93 name=name, 94 source=self.path, 95 targets=targets, 96 predictions=predictions, 97 )
Generate lineage information.
Arguments:
- name (str): dataset name.
- data (pd.DataFrame): reader dataframe.
- targets (str | None): name of the target column.
- predictions (str | None): name of the prediction column.
Returns:
Lineage: lineage information.
ReaderKind =
<class 'ParquetReader'>
class
Writer(abc.ABC, pydantic.main.BaseModel):
105class Writer(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 106 """Base class for a dataset writer. 107 108 Use a writer to save a dataset from memory. 109 e.g., to write file, database, cloud storage, ... 110 """ 111 112 KIND: str 113 114 @abc.abstractmethod 115 def write(self, data: pd.DataFrame) -> None: 116 """Write a dataframe to a dataset. 117 118 Args: 119 data (pd.DataFrame): dataframe representation. 120 """
Base class for a dataset writer.
Use a writer to save a dataset from memory. e.g., to write file, database, cloud storage, ...
@abc.abstractmethod
def
write(self, data: pandas.core.frame.DataFrame) -> None:
114 @abc.abstractmethod 115 def write(self, data: pd.DataFrame) -> None: 116 """Write a dataframe to a dataset. 117 118 Args: 119 data (pd.DataFrame): dataframe representation. 120 """
Write a dataframe to a dataset.
Arguments:
- data (pd.DataFrame): dataframe representation.
123class ParquetWriter(Writer): 124 """Writer a dataframe to a parquet file. 125 126 Parameters: 127 path (str): local or S3 path to the dataset. 128 """ 129 130 KIND: T.Literal["ParquetWriter"] = "ParquetWriter" 131 132 path: str 133 134 @T.override 135 def write(self, data: pd.DataFrame) -> None: 136 pd.DataFrame.to_parquet(data, self.path)
Writer a dataframe to a parquet file.
Arguments:
- path (str): local or S3 path to the dataset.
WriterKind =
<class 'ParquetWriter'>