bikes.io.datasets
Read/Write datasets from/to external sources/destinations.
1"""Read/Write datasets from/to external sources/destinations.""" 2 3# %% IMPORTS 4 5import abc 6import typing as T 7 8import mlflow.data.pandas_dataset as lineage 9import pandas as pd 10import pydantic as pdt 11 12# %% TYPINGS 13 14Lineage: T.TypeAlias = lineage.PandasDataset 15 16# %% READERS 17 18 19class Reader(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 20 """Base class for a dataset reader. 21 22 Use a reader to load a dataset in memory. 23 e.g., to read file, database, cloud storage, ... 24 25 Parameters: 26 limit (int, optional): maximum number of rows to read. Defaults to None. 27 """ 28 29 KIND: str 30 31 limit: int | None = None 32 33 @abc.abstractmethod 34 def read(self) -> pd.DataFrame: 35 """Read a dataframe from a dataset. 36 37 Returns: 38 pd.DataFrame: dataframe representation. 39 """ 40 41 @abc.abstractmethod 42 def lineage( 43 self, 44 name: str, 45 data: pd.DataFrame, 46 targets: str | None = None, 47 predictions: str | None = None, 48 ) -> Lineage: 49 """Generate lineage information. 50 51 Args: 52 name (str): dataset name. 53 data (pd.DataFrame): reader dataframe. 54 targets (str | None): name of the target column. 55 predictions (str | None): name of the prediction column. 56 57 Returns: 58 Lineage: lineage information. 59 """ 60 61 62class ParquetReader(Reader): 63 """Read a dataframe from a parquet file. 64 65 Parameters: 66 path (str): local path to the dataset. 67 """ 68 69 KIND: T.Literal["ParquetReader"] = "ParquetReader" 70 71 path: str 72 73 @T.override 74 def read(self) -> pd.DataFrame: 75 # can't limit rows at read time 76 data = pd.read_parquet(self.path) 77 if self.limit is not None: 78 data = data.head(self.limit) 79 return data 80 81 @T.override 82 def lineage( 83 self, 84 name: str, 85 data: pd.DataFrame, 86 targets: str | None = None, 87 predictions: str | None = None, 88 ) -> Lineage: 89 return lineage.from_pandas( 90 df=data, name=name, source=self.path, targets=targets, predictions=predictions 91 ) 92 93 94ReaderKind = ParquetReader 95 96# %% WRITERS 97 98 99class Writer(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 100 """Base class for a dataset writer. 101 102 Use a writer to save a dataset from memory. 103 e.g., to write file, database, cloud storage, ... 104 """ 105 106 KIND: str 107 108 @abc.abstractmethod 109 def write(self, data: pd.DataFrame) -> None: 110 """Write a dataframe to a dataset. 111 112 Args: 113 data (pd.DataFrame): dataframe representation. 114 """ 115 116 117class ParquetWriter(Writer): 118 """Writer a dataframe to a parquet file. 119 120 Parameters: 121 path (str): local or S3 path to the dataset. 122 """ 123 124 KIND: T.Literal["ParquetWriter"] = "ParquetWriter" 125 126 path: str 127 128 @T.override 129 def write(self, data: pd.DataFrame) -> None: 130 pd.DataFrame.to_parquet(data, self.path) 131 132 133WriterKind = ParquetWriter
Lineage: TypeAlias =
mlflow.data.pandas_dataset.PandasDataset
class
Reader(abc.ABC, pydantic.main.BaseModel):
20class Reader(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 21 """Base class for a dataset reader. 22 23 Use a reader to load a dataset in memory. 24 e.g., to read file, database, cloud storage, ... 25 26 Parameters: 27 limit (int, optional): maximum number of rows to read. Defaults to None. 28 """ 29 30 KIND: str 31 32 limit: int | None = None 33 34 @abc.abstractmethod 35 def read(self) -> pd.DataFrame: 36 """Read a dataframe from a dataset. 37 38 Returns: 39 pd.DataFrame: dataframe representation. 40 """ 41 42 @abc.abstractmethod 43 def lineage( 44 self, 45 name: str, 46 data: pd.DataFrame, 47 targets: str | None = None, 48 predictions: str | None = None, 49 ) -> Lineage: 50 """Generate lineage information. 51 52 Args: 53 name (str): dataset name. 54 data (pd.DataFrame): reader dataframe. 55 targets (str | None): name of the target column. 56 predictions (str | None): name of the prediction column. 57 58 Returns: 59 Lineage: lineage information. 60 """
Base class for a dataset reader.
Use a reader to load a dataset in memory. e.g., to read file, database, cloud storage, ...
Arguments:
- limit (int, optional): maximum number of rows to read. Defaults to None.
@abc.abstractmethod
def
read(self) -> pandas.core.frame.DataFrame:
34 @abc.abstractmethod 35 def read(self) -> pd.DataFrame: 36 """Read a dataframe from a dataset. 37 38 Returns: 39 pd.DataFrame: dataframe representation. 40 """
Read a dataframe from a dataset.
Returns:
pd.DataFrame: dataframe representation.
@abc.abstractmethod
def
lineage( self, name: str, data: pandas.core.frame.DataFrame, targets: str | None = None, predictions: str | None = None) -> mlflow.data.pandas_dataset.PandasDataset:
42 @abc.abstractmethod 43 def lineage( 44 self, 45 name: str, 46 data: pd.DataFrame, 47 targets: str | None = None, 48 predictions: str | None = None, 49 ) -> Lineage: 50 """Generate lineage information. 51 52 Args: 53 name (str): dataset name. 54 data (pd.DataFrame): reader dataframe. 55 targets (str | None): name of the target column. 56 predictions (str | None): name of the prediction column. 57 58 Returns: 59 Lineage: lineage information. 60 """
Generate lineage information.
Arguments:
- name (str): dataset name.
- data (pd.DataFrame): reader dataframe.
- targets (str | None): name of the target column.
- predictions (str | None): name of the prediction column.
Returns:
Lineage: lineage information.
model_fields =
{'KIND': FieldInfo(annotation=str, required=True), 'limit': FieldInfo(annotation=Union[int, NoneType], required=False, default=None)}
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
63class ParquetReader(Reader): 64 """Read a dataframe from a parquet file. 65 66 Parameters: 67 path (str): local path to the dataset. 68 """ 69 70 KIND: T.Literal["ParquetReader"] = "ParquetReader" 71 72 path: str 73 74 @T.override 75 def read(self) -> pd.DataFrame: 76 # can't limit rows at read time 77 data = pd.read_parquet(self.path) 78 if self.limit is not None: 79 data = data.head(self.limit) 80 return data 81 82 @T.override 83 def lineage( 84 self, 85 name: str, 86 data: pd.DataFrame, 87 targets: str | None = None, 88 predictions: str | None = None, 89 ) -> Lineage: 90 return lineage.from_pandas( 91 df=data, name=name, source=self.path, targets=targets, predictions=predictions 92 )
Read a dataframe from a parquet file.
Arguments:
- path (str): local path to the dataset.
@T.override
def
read(self) -> pandas.core.frame.DataFrame:
74 @T.override 75 def read(self) -> pd.DataFrame: 76 # can't limit rows at read time 77 data = pd.read_parquet(self.path) 78 if self.limit is not None: 79 data = data.head(self.limit) 80 return data
Read a dataframe from a dataset.
Returns:
pd.DataFrame: dataframe representation.
@T.override
def
lineage( self, name: str, data: pandas.core.frame.DataFrame, targets: str | None = None, predictions: str | None = None) -> mlflow.data.pandas_dataset.PandasDataset:
82 @T.override 83 def lineage( 84 self, 85 name: str, 86 data: pd.DataFrame, 87 targets: str | None = None, 88 predictions: str | None = None, 89 ) -> Lineage: 90 return lineage.from_pandas( 91 df=data, name=name, source=self.path, targets=targets, predictions=predictions 92 )
Generate lineage information.
Arguments:
- name (str): dataset name.
- data (pd.DataFrame): reader dataframe.
- targets (str | None): name of the target column.
- predictions (str | None): name of the prediction column.
Returns:
Lineage: lineage information.
model_fields =
{'KIND': FieldInfo(annotation=Literal['ParquetReader'], required=False, default='ParquetReader'), 'limit': FieldInfo(annotation=Union[int, NoneType], required=False, default=None), 'path': FieldInfo(annotation=str, required=True)}
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
ReaderKind =
<class 'ParquetReader'>
class
Writer(abc.ABC, pydantic.main.BaseModel):
100class Writer(abc.ABC, pdt.BaseModel, strict=True, frozen=True, extra="forbid"): 101 """Base class for a dataset writer. 102 103 Use a writer to save a dataset from memory. 104 e.g., to write file, database, cloud storage, ... 105 """ 106 107 KIND: str 108 109 @abc.abstractmethod 110 def write(self, data: pd.DataFrame) -> None: 111 """Write a dataframe to a dataset. 112 113 Args: 114 data (pd.DataFrame): dataframe representation. 115 """
Base class for a dataset writer.
Use a writer to save a dataset from memory. e.g., to write file, database, cloud storage, ...
@abc.abstractmethod
def
write(self, data: pandas.core.frame.DataFrame) -> None:
109 @abc.abstractmethod 110 def write(self, data: pd.DataFrame) -> None: 111 """Write a dataframe to a dataset. 112 113 Args: 114 data (pd.DataFrame): dataframe representation. 115 """
Write a dataframe to a dataset.
Arguments:
- data (pd.DataFrame): dataframe representation.
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
118class ParquetWriter(Writer): 119 """Writer a dataframe to a parquet file. 120 121 Parameters: 122 path (str): local or S3 path to the dataset. 123 """ 124 125 KIND: T.Literal["ParquetWriter"] = "ParquetWriter" 126 127 path: str 128 129 @T.override 130 def write(self, data: pd.DataFrame) -> None: 131 pd.DataFrame.to_parquet(data, self.path)
Writer a dataframe to a parquet file.
Arguments:
- path (str): local or S3 path to the dataset.
@T.override
def
write(self, data: pandas.core.frame.DataFrame) -> None:
129 @T.override 130 def write(self, data: pd.DataFrame) -> None: 131 pd.DataFrame.to_parquet(data, self.path)
Write a dataframe to a dataset.
Arguments:
- data (pd.DataFrame): dataframe representation.
model_fields =
{'KIND': FieldInfo(annotation=Literal['ParquetWriter'], required=False, default='ParquetWriter'), 'path': FieldInfo(annotation=str, required=True)}
Inherited Members
- pydantic.main.BaseModel
- BaseModel
- model_extra
- model_fields_set
- model_construct
- model_copy
- model_dump
- model_dump_json
- model_json_schema
- model_parametrized_name
- model_post_init
- model_rebuild
- model_validate
- model_validate_json
- model_validate_strings
- dict
- json
- parse_obj
- parse_raw
- parse_file
- from_orm
- construct
- copy
- schema
- schema_json
- validate
- update_forward_refs
WriterKind =
<class 'ParquetWriter'>