Skip to content

generator

generator

DataProductCollection pydantic-model

DataProductCollection(**kwargs: Any)

Bases: BaseModel

A collection of data products.

Use this class to serialize data products to JSON, de-serialized them from JSON, filter the products, etc.

Attributes:

Name Type Description
elements ProductList

A list of data products.

Show JSON schema:
{
  "$defs": {
    "DataProduct": {
      "additionalProperties": true,
      "description": "Base class for data products to be generated and handled.\n\nAttributes:\n    product_type (str): Product type should be the same as the class name.\n        The product type is used to search for products from a [DataProductCollection][trendify.API.DataProductCollection].\n    tags (Tags): Tags to be used for sorting data.\n    metadata (dict[str, str]): A dictionary of metadata to be used as a tool tip for mousover in grafana",
      "properties": {
        "tags": {
          "items": {
            "anyOf": []
          },
          "title": "Tags",
          "type": "array"
        },
        "metadata": {
          "additionalProperties": {
            "type": "string"
          },
          "default": {},
          "title": "Metadata",
          "type": "object"
        }
      },
      "required": [
        "tags"
      ],
      "title": "DataProduct",
      "type": "object"
    }
  },
  "description": "A collection of data products.\n\nUse this class to serialize data products to JSON, de-serialized them from JSON, filter the products, etc.\n\nAttributes:\n    elements (ProductList): A list of data products.",
  "properties": {
    "derived_from": {
      "anyOf": [
        {
          "format": "path",
          "type": "string"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "title": "Derived From"
    },
    "elements": {
      "anyOf": [
        {
          "items": {
            "$ref": "#/$defs/DataProduct"
          },
          "type": "array"
        },
        {
          "type": "null"
        }
      ],
      "default": null,
      "title": "Elements"
    }
  },
  "title": "DataProductCollection",
  "type": "object"
}

Fields:

Source code in src/trendify/api/generator/data_product_collection.py
def __init__(self, **kwargs: Any):
    DataProduct.deserialize_child_classes(key="elements", **kwargs)
    super().__init__(**kwargs)

add_products

add_products(*products: DataProduct)

Parameters:

Name Type Description Default
products Tuple[DataProduct | ProductList, ...]

Products or lists of products to be appended to collection elements.

()
Source code in src/trendify/api/generator/data_product_collection.py
def add_products(self, *products: DataProduct):
    """
    Args:
        products (Tuple[DataProduct|ProductList, ...]): Products or lists of products to be
            appended to collection elements.
    """
    self.elements.extend(flatten(products))

collect_from_all_jsons classmethod

collect_from_all_jsons(
    *dirs: Path, recursive: bool = False, data_products_filename: str | None = "*.json"
)

Loads all products from JSONs in the given list of directories. If recursive is set to True, the directories will be searched recursively (this could lead to double counting if you pass in subdirectories of a parent).

Parameters:

Name Type Description Default
dirs Tuple[Path, ...]

Directories from which to load data product JSON files.

()
recursive bool

whether or not to search each of the provided directories recursively for data product json files.

False

Returns:

Type Description
Type[Self] | None

Data product collection if JSON files are found. Otherwise, returns None if no product JSON files were found.

Source code in src/trendify/api/generator/data_product_collection.py
@classmethod
def collect_from_all_jsons(
    cls,
    *dirs: Path,
    recursive: bool = False,
    data_products_filename: str | None = "*.json",
):
    """
    Loads all products from JSONs in the given list of directories.
    If recursive is set to `True`, the directories will be searched recursively
    (this could lead to double counting if you pass in subdirectories of a parent).

    Args:
        dirs (Tuple[Path, ...]): Directories from which to load data product JSON files.
        recursive (bool): whether or not to search each of the provided directories recursively for
            data product json files.

    Returns:
        (Type[Self] | None): Data product collection if JSON files are found.
            Otherwise, returns None if no product JSON files were found.
    """
    if not recursive:
        jsons: List[Path] = list(
            flatten(chain(list(d.glob(data_products_filename)) for d in dirs))
        )
    else:
        jsons: List[Path] = list(
            flatten(
                chain(list(d.glob(f"**/{data_products_filename}")) for d in dirs)
            )
        )
    if jsons:
        return cls.union(
            *tuple([cls.model_validate_json(p.read_text()) for p in jsons])
        )
    else:
        return None

drop_products

drop_products(tag: Tag | None = None, object_type: Type[R] | None = None) -> Self[R]

Removes products matching tag and/or object_type from collection elements.

Parameters:

Name Type Description Default
tag Tag | None

Tag for which data products should be dropped

None
object_type Type | None

Type of data product to drop

None

Returns:

Type Description
DataProductCollection

A new collection from which matching elements have been dropped.

Source code in src/trendify/api/generator/data_product_collection.py
def drop_products(
    self,
    tag: Tag | None = None,
    object_type: Type[R] | None = None,
) -> Self[R]:
    """
    Removes products matching `tag` and/or `object_type` from collection elements.

    Args:
        tag (Tag | None): Tag for which data products should be dropped
        object_type (Type | None): Type of data product to drop

    Returns:
        (DataProductCollection): A new collection from which matching elements have been dropped.
    """
    match_key = tag is None, object_type is None
    match match_key:
        case (True, True):
            return type(self)(elements=self.elements)
        case (True, False):
            # assert self.elements is not None
            return type(self)(
                elements=[
                    e for e in self.elements if not isinstance(e, object_type)
                ]
            )
        case (False, True):
            # assert self.elements is not None
            return type(self)(
                elements=[e for e in self.elements if not tag in e.tags]
            )
        case (False, False):
            # assert self.elements is not None
            return type(self)(
                elements=[
                    e
                    for e in self.elements
                    if not (tag in e.tags and isinstance(e, object_type))
                ]
            )
        case _:
            raise ValueError("Something is wrong with match statement")

from_iterable classmethod

from_iterable(*products: Tuple[ProductList, ...])

Returns a new instance containing all of the products provided in the *products argument.

Parameters:

Name Type Description Default
products Tuple[ProductList, ...]

Lists of data products to combine into a collection

()

Returns:

Type Description
cls

A data product collection containing all of the provided products in the *products argument.

Source code in src/trendify/api/generator/data_product_collection.py
@classmethod
def from_iterable(cls, *products: Tuple[ProductList, ...]):
    """
    Returns a new instance containing all of the products provided in the `*products` argument.

    Args:
        products (Tuple[ProductList, ...]): Lists of data products to combine into a collection

    Returns:
        (cls): A data product collection containing all of the provided products in the `*products` argument.
    """
    return cls(elements=list(flatten(products)))

get_products

get_products(tag: Tag | None = None, object_type: Type[R] | None = None) -> Self[R]

Returns a new collection containing products matching tag and/or object_type. Both tag and object_type default to None which matches all products.

Parameters:

Name Type Description Default
tag Tag | None

Tag of data products to be kept. None matches all products.

None
object_type Type | None

Type of data product to keep. None matches all products.

None

Returns:

Type Description
DataProductCollection

A new collection containing matching elements.

Source code in src/trendify/api/generator/data_product_collection.py
def get_products(
    self, tag: Tag | None = None, object_type: Type[R] | None = None
) -> Self[R]:
    """
    Returns a new collection containing products matching `tag` and/or `object_type`.
    Both `tag` and `object_type` default to `None` which matches all products.

    Args:
        tag (Tag | None): Tag of data products to be kept.  `None` matches all products.
        object_type (Type | None): Type of data product to keep.  `None` matches all products.

    Returns:
        (DataProductCollection): A new collection containing matching elements.
    """
    match_key = tag is None, object_type is None
    match match_key:
        case (True, True):
            return type(self)(elements=self.elements)
        case (True, False):
            # assert self.elements is not None
            return type(self)(
                elements=[e for e in self.elements if isinstance(e, object_type)]
            )
        case (False, True):
            # assert self.elements is not None
            return type(self)(elements=[e for e in self.elements if tag in e.tags])
        case (False, False):
            # assert self.elements is not None
            return type(self)(
                elements=[
                    e
                    for e in self.elements
                    if tag in e.tags and isinstance(e, object_type)
                ]
            )
        case _:
            raise ValueError("Something is wrong with match statement")

get_tags

get_tags(data_product_type: Type[DataProduct] | None = None) -> set

Gets the tags related to a given type of DataProduct. Parent classes will match all child class types.

Parameters:

Name Type Description Default
data_product_type Type[DataProduct] | None

type for which you want to get the list of tags

None

Returns:

Type Description
set

set of tags applying to the given data_product_type.

Source code in src/trendify/api/generator/data_product_collection.py
def get_tags(self, data_product_type: Type[DataProduct] | None = None) -> set:
    """
    Gets the tags related to a given type of `DataProduct`.  Parent classes will match all child class types.

    Args:
        data_product_type (Type[DataProduct] | None): type for which you want to get the list of tags

    Returns:
        (set): set of tags applying to the given `data_product_type`.
    """
    tags = []
    for e in flatten(self.elements):
        if data_product_type is None or isinstance(e, data_product_type):
            for t in e.tags:
                tags.append(t)
    return set(tags)

process_collection classmethod

process_collection(
    dir_in: Path, dir_out: Path, no_tables: bool, no_xy_plots: bool, no_histograms: bool, dpi: int
)

Processes collection of elements corresponding to a single tag. This method should be called on a directory containing jsons for which the products have been sorted.

Parameters:

Name Type Description Default
dir_in Path

Input directory for loading assets

required
dir_out Path

Output directory for assets

required
no_tables bool

Suppresses table asset creation

required
no_xy_plots bool

Suppresses xy plot asset creation

required
no_histograms bool

Suppresses histogram asset creation

required
dpi int

Sets resolution of asset output

required
Source code in src/trendify/api/generator/data_product_collection.py
@classmethod
def process_collection(
    cls,
    dir_in: Path,
    dir_out: Path,
    no_tables: bool,
    no_xy_plots: bool,
    no_histograms: bool,
    dpi: int,
):
    """
    Processes collection of elements corresponding to a single tag.
    This method should be called on a directory containing jsons for which the products have been
    sorted.

    Args:
        dir_in (Path):  Input directory for loading assets
        dir_out (Path):  Output directory for assets
        no_tables (bool):  Suppresses table asset creation
        no_xy_plots (bool):  Suppresses xy plot asset creation
        no_histograms (bool):  Suppresses histogram asset creation
        dpi (int):  Sets resolution of asset output
    """

    collection = cls.collect_from_all_jsons(dir_in)

    if collection is not None:

        for tag in collection.get_tags():
            # tags = collection.get_tags()
            # try:
            #     [tag] = collection.get_tags()
            # except:
            #     breakpoint()
            saf: SingleAxisFigure | None = None
            format_2ds: list[Format2D] = []

            if not no_tables:
                table_entries: List[TableEntry] = collection.get_products(
                    tag=tag,
                    object_type=TableEntry,
                ).elements

                if table_entries:
                    from trendify.api.generator.table_builder import TableBuilder

                    logger.info(f"Making tables for {tag = }")
                    TableBuilder.process_table_entries(
                        tag=tag,
                        table_entries=table_entries,
                        out_dir=dir_out,
                    )
                    logger.info(f"Finished tables for {tag = }")

            if not no_xy_plots:
                traces: List[Trace2D] = collection.get_products(
                    tag=tag,
                    object_type=Trace2D,
                ).elements
                points: List[Point2D] = collection.get_products(
                    tag=tag,
                    object_type=Point2D,
                ).elements
                axlines: List[AxLine] = collection.get_products(
                    tag=tag,
                    object_type=AxLine,
                ).elements

                if points or traces or axlines:  # Update condition
                    from trendify.api.generator.xy_data_plotter import XYDataPlotter

                    logger.info(f"Making xy plot for {tag = }")
                    saf = XYDataPlotter.handle_points_and_traces(
                        tag=tag,
                        points=points,
                        traces=traces,
                        axlines=axlines,  # Add this parameter
                        dir_out=dir_out,
                        dpi=dpi,
                        saf=saf,
                    )

                    format_2ds += [
                        p.format2d
                        for p in points
                        if isinstance(p.format2d, Format2D)
                    ]
                    format_2ds += [
                        t.format2d
                        for t in traces
                        if isinstance(t.format2d, Format2D)
                    ]
                    format_2ds += [
                        a.format2d
                        for a in axlines
                        if isinstance(a.format2d, Format2D)
                    ]
                    logger.info(f"Finished xy plot for {tag = }")

            if not no_histograms:
                histogram_entries: List[HistogramEntry] = collection.get_products(
                    tag=tag,
                    object_type=HistogramEntry,
                ).elements

                if histogram_entries:
                    logger.info(f"Making histogram for {tag = }")
                    saf = Histogrammer.handle_histogram_entries(
                        tag=tag,
                        histogram_entries=histogram_entries,
                        dir_out=dir_out,
                        dpi=dpi,
                        saf=saf,
                    )

                    format_2ds += [
                        h.format2d
                        for h in histogram_entries
                        if isinstance(h.format2d, Format2D)
                    ]
                    logger.info(f"Finished histogram for {tag = }")

            if isinstance(saf, SingleAxisFigure):
                formats = list(set(format_2ds))
                format2d = Format2D.union_from_iterable(formats)
                saf.apply_format(format2d)

                save_path = dir_out.joinpath(*tuple(atleast_1d(tag))).with_suffix(
                    ".jpg"
                )
                save_path.parent.mkdir(exist_ok=True, parents=True)
                logger.critical(f"Saving to {save_path}")
                saf.savefig(save_path, dpi=dpi)
                del saf

sort_by_tags classmethod

sort_by_tags(
    dirs_in: List[Path], dir_out: Path, data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT
)

Loads the data product JSON files from dirs_in sorts the products. Sorted products are written to smaller files in a nested directory structure under dir_out. A nested directory structure is generated according to the data tags. Resulting product files are named according to the directory from which they were originally loaded.

Parameters:

Name Type Description Default
dirs_in List[Path]

Directories from which the data product JSON files are to be loaded.

required
dir_out Path

Directory to which the sorted data products will be written into a nested folder structure generated according to the data tags.

required
data_products_fname str

Name of data products file

DATA_PRODUCTS_FNAME_DEFAULT
Source code in src/trendify/api/generator/data_product_collection.py
@classmethod
def sort_by_tags(
    cls,
    dirs_in: List[Path],
    dir_out: Path,
    data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT,
):
    """
    Loads the data product JSON files from `dirs_in` sorts the products.
    Sorted products are written to smaller files in a nested directory structure under `dir_out`.
    A nested directory structure is generated according to the data tags.
    Resulting product files are named according to the directory from which they were originally loaded.

    Args:
        dirs_in (List[Path]): Directories from which the data product JSON files are to be loaded.
        dir_out (Path): Directory to which the sorted data products will be written into a
            nested folder structure generated according to the data tags.
        data_products_fname (str): Name of data products file
    """
    dirs_in = list(dirs_in)
    dirs_in.sort()
    len_dirs = len(dirs_in)
    for n, dir_in in enumerate(dirs_in):
        logger.info(f"Sorting tagged data from dir {n}/{len_dirs}")  # , end=f"\r")
        cls.sort_by_tags_single_directory(
            dir_in=dir_in, dir_out=dir_out, data_products_fname=data_products_fname
        )

sort_by_tags_single_directory classmethod

sort_by_tags_single_directory(
    dir_in: Path, dir_out: Path, data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT
)

Loads the data product JSON files from dir_in and sorts the products. Sorted products are written to smaller files in a nested directory structure under dir_out. A nested directory structure is generated according to the data tags. Resulting product files are named according to the directory from which they were originally loaded.

Parameters:

Name Type Description Default
dir_in List[Path]

Directories from which the data product JSON files are to be loaded.

required
dir_out Path

Directory to which the sorted data products will be written into a nested folder structure generated according to the data tags.

required
data_products_fname str

Name of data products file

DATA_PRODUCTS_FNAME_DEFAULT
Source code in src/trendify/api/generator/data_product_collection.py
@classmethod
def sort_by_tags_single_directory(
    cls,
    dir_in: Path,
    dir_out: Path,
    data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT,
):
    """
    Loads the data product JSON files from `dir_in` and sorts the products.
    Sorted products are written to smaller files in a nested directory structure under `dir_out`.
    A nested directory structure is generated according to the data tags.
    Resulting product files are named according to the directory from which they were originally loaded.

    Args:
        dir_in (List[Path]): Directories from which the data product JSON files are to be loaded.
        dir_out (Path): Directory to which the sorted data products will be written into a
            nested folder structure generated according to the data tags.
        data_products_fname (str): Name of data products file
    """
    products_file = dir_in.joinpath(data_products_fname)
    if products_file.exists():
        logger.info(f"Sorting results from {dir_in = }")
        collection = DataProductCollection.model_validate_json(
            products_file.read_text()
        )
        collection.derived_from = dir_in
        tags = collection.get_tags()
        for tag in tags:
            sub_collection = collection.get_products(tag=tag)
            save_dir = dir_out.joinpath(*atleast_1d(tag))
            save_dir.mkdir(parents=True, exist_ok=True)
            # next_index = _get_and_reserve_index(save_dir=save_dir, dir_in=dir_in)
            next_index = ProductIndexMap.get_index(
                save_dir=save_dir,
                metadata=ProductEntryMetadata(
                    source=products_file.resolve(),
                ),
            )
            file = save_dir.joinpath(str(next_index)).with_suffix(".json")
            file.write_text(sub_collection.model_dump_json())
    else:
        logger.info(f"No results found in {dir_in = }")

union classmethod

union(*collections: DataProductCollection)

Aggregates all of the products from multiple collections into a new larger collection.

Parameters:

Name Type Description Default
collections Tuple[DataProductCollection, ...]

Data product collections for which the products should be combined into a new collection.

()

Returns:

Type Description
Type[Self]

A new data product collection containing all products from the provided *collections.

Source code in src/trendify/api/generator/data_product_collection.py
@classmethod
def union(cls, *collections: DataProductCollection):
    """
    Aggregates all of the products from multiple collections into a new larger collection.

    Args:
        collections (Tuple[DataProductCollection, ...]): Data product collections
            for which the products should be combined into a new collection.

    Returns:
        (Type[Self]): A new data product collection containing all products from
            the provided `*collections`.
    """
    return cls(elements=list(flatten(chain(c.elements for c in collections))))

DataProductGenerator

DataProductGenerator(processor: ProductGenerator)

A wrapper for saving the data products generated by a user defined function

Parameters:

Name Type Description Default
processor ProductGenerator

A callable that receives a working directory and returns a list of data products.

required
Source code in src/trendify/api/generator/data_product_generator.py
def __init__(self, processor: ProductGenerator):
    self._processor = processor

process_and_save

process_and_save(workdir: Path, data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT)

Runs the user-defined processor method stored at instantiation.

Saves the returned products to a JSON file in the same directory.

Parameters:

Name Type Description Default
workdir Path

working directory on which to run the processor method.

required
data_products_fname str

Name of data products file

DATA_PRODUCTS_FNAME_DEFAULT
Source code in src/trendify/api/generator/data_product_generator.py
def process_and_save(
    self, workdir: Path, data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT
):
    """
    Runs the user-defined processor method stored at instantiation.

    Saves the returned products to a JSON file in the same directory.

    Args:
        workdir (Path): working directory on which to run the processor method.
        data_products_fname (str): Name of data products file
    """

    logger.info(f"Processing {workdir = } with {self._processor = }")
    collection = DataProductCollection.from_iterable(self._processor(workdir))
    if collection.elements:
        workdir.mkdir(exist_ok=True, parents=True)
        workdir.joinpath(data_products_fname).write_text(
            collection.model_dump_json()
        )

Histogrammer

Histogrammer(in_dirs: List[Path], out_dir: Path, dpi: int)

Class for loading data products and histogramming the [HistogramEntry][trendify.API.HistogramEntry]s

Parameters:

Name Type Description Default
in_dirs List[Path]

Directories from which the data products are to be loaded.

required
out_dir Path

Directory to which the generated histogram will be stored

required
dpi int

resolution of plot

required
Source code in src/trendify/api/generator/histogrammer.py
def __init__(
    self,
    in_dirs: List[Path],
    out_dir: Path,
    dpi: int,
):
    self.in_dirs = in_dirs
    self.out_dir = out_dir
    self.dpi = dpi

handle_histogram_entries classmethod

handle_histogram_entries(
    tag: Tag,
    histogram_entries: List[HistogramEntry],
    dir_out: Path,
    dpi: int,
    saf: SingleAxisFigure | None = None,
) -> SingleAxisFigure

Histograms the provided entries. Formats and saves the figure. Closes the figure.

Parameters:

Name Type Description Default
tag Tag

Tag used to filter the loaded data products

required
histogram_entries List[HistogramEntry]

A list of [HistogramEntry][trendify.API.HistogramEntry]s

required
dir_out Path

Directory to which the generated histogram will be stored

required
dpi int

resolution of plot

required
Source code in src/trendify/api/generator/histogrammer.py
@classmethod
def handle_histogram_entries(
    cls,
    tag: Tag,
    histogram_entries: List[HistogramEntry],
    dir_out: Path,
    dpi: int,
    saf: SingleAxisFigure | None = None,
) -> SingleAxisFigure:
    """
    Histograms the provided entries. Formats and saves the figure.  Closes the figure.

    Args:
        tag (Tag): Tag used to filter the loaded data products
        histogram_entries (List[HistogramEntry]): A list of [`HistogramEntry`][trendify.API.HistogramEntry]s
        dir_out (Path): Directory to which the generated histogram will be stored
        dpi (int): resolution of plot
    """
    if saf is None:
        saf = SingleAxisFigure.new(tag=tag)

    histogram_styles = set([h.style for h in histogram_entries])
    for s in histogram_styles:
        matching_entries = [e for e in histogram_entries if e.style == s]
        values = [e.value for e in matching_entries]
        if s is not None:
            saf.ax.hist(values, **s.as_plot_kwargs())
        else:
            saf.ax.hist(values)

    # save_path = dir_out.joinpath(*tuple(atleast_1d(tag))).with_suffix(".jpg")
    # try:
    #     format2d_set = set([h.format2d for h in histogram_entries]) - {None}
    #     [format2d] = format2d_set
    #     saf.apply_format(format2d=format2d)
    # except:
    #     print(
    #         f"Format not applied to {save_path  = } multiple entries conflict for given tag:\n\t{format2d_set = }"
    #     )
    # save_path = dir_out.joinpath(*tuple(atleast_1d(tag))).with_suffix(".jpg")
    # save_path.parent.mkdir(exist_ok=True, parents=True)
    # print(f"Saving to {save_path}")
    # saf.savefig(save_path, dpi=dpi)
    # del saf

    return saf

TableBuilder

TableBuilder(in_dirs: List[Path], out_dir: Path)

Builds tables (melted, pivot, and stats) for histogramming and including in a report or Grafana dashboard.

Parameters:

Name Type Description Default
in_dirs List[Path]

directories from which to load data products

required
out_dir Path

directory in which tables should be saved

required
Source code in src/trendify/api/generator/table_builder.py
def __init__(
    self,
    in_dirs: List[Path],
    out_dir: Path,
):
    self.in_dirs = in_dirs
    self.out_dir = out_dir

get_stats_table classmethod

get_stats_table(df: DataFrame)

Computes multiple statistics for each column

Parameters:

Name Type Description Default
df DataFrame

DataFrame for which the column statistics are to be calculated.

required

Returns:

Type Description
DataFrame

Dataframe having statistics (column headers) for each of the columns of the input df. The columns of df will be the row indices of the stats table.

Source code in src/trendify/api/generator/table_builder.py
@classmethod
def get_stats_table(
    cls,
    df: pd.DataFrame,
):
    """
    Computes multiple statistics for each column

    Args:
        df (pd.DataFrame): DataFrame for which the column statistics are to be calculated.

    Returns:
        (pd.DataFrame): Dataframe having statistics (column headers) for each of the columns
            of the input `df`.  The columns of `df` will be the row indices of the stats table.
    """
    # Try to convert to numeric, coerce errors to NaN
    numeric_df = df.apply(pd.to_numeric, errors="coerce")

    stats = {
        "min": numeric_df.min(axis=0),
        "mean": numeric_df.mean(axis=0),
        "max": numeric_df.max(axis=0),
        "sigma3": numeric_df.std(axis=0) * 3,
    }
    df_stats = pd.DataFrame(stats, index=df.columns)
    df_stats.index.name = "Name"
    return df_stats

load_table

load_table(tag: Tag, data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT)

Collects table entries from JSON files corresponding to given tag and processes them.

Saves CSV files for the melted data frame, pivot dataframe, and pivot dataframe stats.

File names will all use the tag with different suffixes 'tag_melted.csv', 'tag_pivot.csv', 'name_stats.csv'.

Parameters:

Name Type Description Default
tag Tag

product tag for which to collect and process.

required
Source code in src/trendify/api/generator/table_builder.py
def load_table(
    self,
    tag: Tag,
    data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT,
):
    """
    Collects table entries from JSON files corresponding to given tag and processes them.

    Saves CSV files for the melted data frame, pivot dataframe, and pivot dataframe stats.

    File names will all use the tag with different suffixes
    `'tag_melted.csv'`, `'tag_pivot.csv'`, `'name_stats.csv'`.

    Args:
        tag (Tag): product tag for which to collect and process.
    """
    logger.info(f"Making table for {tag = }")

    table_entries: List[TableEntry] = []
    for subdir in self.in_dirs:
        collection = DataProductCollection.model_validate_json(
            subdir.joinpath(data_products_fname).read_text()
        )
        table_entries.extend(
            collection.get_products(tag=tag, object_type=TableEntry).elements
        )

    self.process_table_entries(
        tag=tag, table_entries=table_entries, out_dir=self.out_dir
    )

process_table_entries classmethod

process_table_entries(tag: Tag, table_entries: List[TableEntry], out_dir: Path)

Saves CSV files for the melted data frame, pivot dataframe, and pivot dataframe stats.

File names will all use the tag with different suffixes 'tag_melted.csv', 'tag_pivot.csv', 'name_stats.csv'.

Parameters:

Name Type Description Default
tag Tag

product tag for which to collect and process.

required
table_entries List[TableEntry]

List of table entries

required
out_dir Path

Directory to which table CSV files should be saved

required
Source code in src/trendify/api/generator/table_builder.py
@classmethod
def process_table_entries(
    cls,
    tag: Tag,
    table_entries: List[TableEntry],
    out_dir: Path,
):
    """

    Saves CSV files for the melted data frame, pivot dataframe, and pivot dataframe stats.

    File names will all use the tag with different suffixes
    `'tag_melted.csv'`, `'tag_pivot.csv'`, `'name_stats.csv'`.

    Args:
        tag (Tag): product tag for which to collect and process.
        table_entries (List[TableEntry]): List of table entries
        out_dir (Path): Directory to which table CSV files should be saved
    """
    melted = pd.DataFrame([t.get_entry_dict() for t in table_entries])
    pivot = TableEntry.pivot_table(melted=melted)

    save_path_partial = out_dir.joinpath(*tuple(atleast_1d(tag)))
    save_path_partial.parent.mkdir(exist_ok=True, parents=True)
    logger.critical(f"Saving to {str(save_path_partial)}_*.csv")

    melted.to_csv(
        save_path_partial.with_stem(save_path_partial.stem + "_melted").with_suffix(
            ".csv"
        ),
        index=False,
    )

    if pivot is not None:
        pivot.to_csv(
            save_path_partial.with_stem(
                save_path_partial.stem + "_pivot"
            ).with_suffix(".csv"),
            index=True,
        )

        try:
            stats = cls.get_stats_table(df=pivot)
            if not stats.empty and not stats.isna().all().all():
                stats.to_csv(
                    save_path_partial.with_stem(
                        save_path_partial.stem + "_stats"
                    ).with_suffix(".csv"),
                    index=True,
                )
        except Exception as e:
            logger.error(
                f"Could not generate pivot table for {tag = }. Error: {str(e)}"
            )

XYDataPlotter

XYDataPlotter(in_dirs: List[Path], out_dir: Path, dpi: int = 500)

Plots xy data from user-specified directories to a single axis figure

Parameters:

Name Type Description Default
in_dirs List[Path]

Directories in which to search for data products from JSON files

required
out_dir Path

directory to which figure will be output

required
dpi int

Saved image resolution

500
Source code in src/trendify/api/generator/xy_data_plotter.py
def __init__(
    self,
    in_dirs: List[Path],
    out_dir: Path,
    dpi: int = 500,
):
    self.in_dirs = in_dirs
    self.out_dir = out_dir
    self.dpi = dpi

handle_points_and_traces classmethod

handle_points_and_traces(
    tag: Tag,
    points: List[Point2D],
    traces: List[Trace2D],
    axlines: List[AxLine],
    dir_out: Path,
    dpi: int,
    saf: SingleAxisFigure | None = None,
)

Plots points, traces, and axlines, formats figure, saves figure, and closes matplotlinb figure.

Parameters:

Name Type Description Default
tag Tag

Tag corresponding to the provided points and traces

required
points List[Point2D]

Points to be scattered

required
traces List[Trace2D]

List of traces to be plotted

required
axlines List[AxLine]

List of axis lines to be plotted

required
dir_out Path

directory to output the plot

required
dpi int

resolution of plot

required
Source code in src/trendify/api/generator/xy_data_plotter.py
@classmethod
def handle_points_and_traces(
    cls,
    tag: Tag,
    points: List[Point2D],
    traces: List[Trace2D],
    axlines: List[AxLine],  # Add this parameter
    dir_out: Path,
    dpi: int,
    saf: SingleAxisFigure | None = None,
):
    """
    Plots points, traces, and axlines, formats figure, saves figure, and closes matplotlinb figure.

    Args:
        tag (Tag): Tag  corresponding to the provided points and traces
        points (List[Point2D]): Points to be scattered
        traces (List[Trace2D]): List of traces to be plotted
        axlines (List[AxLine]): List of axis lines to be plotted
        dir_out (Path): directory to output the plot
        dpi (int): resolution of plot
    """

    if saf is None:
        saf = SingleAxisFigure.new(tag=tag)

    if points:
        markers = set([p.marker for p in points])
        for marker in markers:
            matching_points = [p for p in points if p.marker == marker]
            x = [p.x for p in matching_points]
            y = [p.y for p in matching_points]
            if x and y:
                saf.ax.scatter(x, y, **marker.as_scatter_plot_kwargs())

    for trace in traces:
        trace.plot_to_ax(saf.ax)

    # Add plotting of axlines
    for axline in axlines:
        axline.plot_to_ax(saf.ax)

    # formats = list(
    #     set(
    #         [p.format2d for p in points]
    #         + [t.format2d for t in traces]
    #         + [a.format2d for a in axlines]
    #     )
    # )

    # format2d = Format2D.union_from_iterable(formats)
    # saf.apply_format(format2d)
    # saf.ax.autoscale(enable=True, axis='both', tight=True)

    # save_path = dir_out.joinpath(*tuple(atleast_1d(tag))).with_suffix(".jpg")
    # save_path.parent.mkdir(exist_ok=True, parents=True)
    # print(f"Saving to {save_path = }")
    # saf.savefig(path=save_path, dpi=dpi)
    # del saf

    return saf

plot

plot(tag: Tag, data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT)
  • Collects data from json files in stored self.in_dirs,
  • plots the relevant products,
  • applies labels and formatting,
  • saves the figure
  • closes matplotlib figure

Parameters:

Name Type Description Default
tag Tag

data tag for which products are to be collected and plotted.

required
data_products_fname str

Data products file name

DATA_PRODUCTS_FNAME_DEFAULT
Source code in src/trendify/api/generator/xy_data_plotter.py
def plot(
    self,
    tag: Tag,
    data_products_fname: str = DATA_PRODUCTS_FNAME_DEFAULT,
):
    """
    - Collects data from json files in stored `self.in_dirs`,
    - plots the relevant products,
    - applies labels and formatting,
    - saves the figure
    - closes matplotlib figure

    Args:
        tag (Tag): data tag for which products are to be collected and plotted.
        data_products_fname (str): Data products file name
    """
    logger.info(f"Making xy plot for {tag = }")
    saf = SingleAxisFigure.new(tag=tag)

    for subdir in self.in_dirs:
        collection = DataProductCollection.model_validate_json(
            subdir.joinpath(data_products_fname).read_text()
        )
        traces: List[Trace2D] = collection.get_products(
            tag=tag, object_type=Trace2D
        ).elements
        points: List[Point2D] = collection.get_products(
            tag=tag, object_type=Point2D
        ).elements

        if points or traces:
            if points:
                markers = set([p.marker for p in points])
                for marker in markers:
                    matching_points = [p for p in points if p.marker == marker]
                    x = [p.x for p in matching_points]
                    y = [p.y for p in matching_points]
                    if x and y:
                        if marker is not None:
                            saf.ax.scatter(x, y, **marker.as_scatter_plot_kwargs())
                        else:
                            saf.ax.scatter(x, y)

            for trace in traces:
                trace.plot_to_ax(saf.ax)

            formats = list(
                set(
                    [p.format2d for p in points if p.format2d]
                    + [t.format2d for t in traces]
                )
                - {None}
            )
            format2d = Format2D.union_from_iterable(formats)
            saf.apply_format(format2d)
            # saf.ax.autoscale(enable=True, axis='both', tight=True)

    save_path = self.out_dir.joinpath(*tuple(atleast_1d(tag))).with_suffix(".jpg")
    save_path.parent.mkdir(exist_ok=True, parents=True)
    logger.critical(f"Saving to {save_path = }")
    saf.savefig(path=save_path, dpi=self.dpi)
    del saf

flatten

flatten(obj: Iterable)

Recursively flattens iterable up to a point (leaves str, bytes, and DataProduct unflattened)

Parameters:

Name Type Description Default
obj Iterable

Object to be flattened

required

Returns:

Type Description
Iterable

Flattned iterable

Source code in src/trendify/api/generator/data_product_collection.py
def flatten(obj: Iterable):
    """
    Recursively flattens iterable up to a point (leaves `str`, `bytes`, and `DataProduct` unflattened)

    Args:
        obj (Iterable): Object to be flattened

    Returns:
        (Iterable): Flattned iterable
    """
    if not _should_be_flattened(obj):
        yield obj
    else:
        for sublist in obj:
            yield from flatten(sublist)