Source code for bioat.tabletools

"""_summary_.

author: Herman Huanan Zhao
email: hermanzhaozzzz@gmail.com
homepage: https://github.com/hermanzhaozzzz

_description_

example 1:
    bioat list
        <in shell>:
            $ bioat list
        <in python consolo>:
            >>> from bioat.cli import Cli
            >>> bioat = Cli()
            >>> bioat.list()
            >>> print(bioat.list())

example 2:
    _example_
"""

import sys

import pandas as pd

from bioat.logger import LoggerManager

lm = LoggerManager(mod_name="bioat.tabletools")


[docs] class TableTools: """To integrate tables.""" lm.set_names(cls_name="TableTools") def __init__(self): pass
[docs] def merge( self, inputs, tags, output, input_fmt="tsv", output_fmt="tsv", input_header=False, output_header=False, log_level="WARNING", ): """A simple tool to merge same formatted tables from different sample. Params :param inputs: table files :param tags: tags for each table :param output: merged file :param input_fmt: tsv | csv :param output_fmt: tsv | csv :param input_header: True | False, input table has header or not :param output_header: True | False, output table has header or not :param log_level: log status """ lm.set_names(func_name="merge") lm.set_level(log_level) # fix params inputs = list(inputs) if isinstance(inputs, tuple) else inputs.split(",") tags = list(tags) if isinstance(tags, tuple) else tags.split(",") lm.logger.debug(f"input_fmt={input_fmt}") lm.logger.debug(f"output_fmt={output_fmt}") lm.logger.debug(f"input_header={input_header}") lm.logger.debug(f"output_header={output_header}") input_header = 0 if input_header else None output = sys.stdout if output == sys.stdout.name else output dt_sep = {"csv": ",", "tsv": "\t"} df = pd.DataFrame() for file, tag in zip(inputs, tags, strict=False): _df = pd.read_csv( file, sep=dt_sep[input_fmt], header=input_header, index_col=None ) _df.insert(0, column="<sample>", value=tag) df = pd.concat([df, _df], axis=0, ignore_index=True) df.to_csv(output, sep=dt_sep[output_fmt], header=output_header, index=None)
[docs] def split( self, input: str, n: int, output_prefix=None, input_fmt="tsv", output_fmt="tsv", input_header=False, output_header=False, compress=False, log_level="WARNING", ): """A simple tool to split table into parts. Params :param input: table to split :param n: split table into n parts :param output_prefix: name prefix for splitted parts, the same with input if not defined :param input_fmt: tsv | csv :param output_fmt: tsv | csv :param input_header: True | False, input table has header or not :param output_header: True | False, output table has header or not :param compress: True | False, gzip the output table or not :param log_level: log status """ lm.set_names(func_name="split") lm.set_level(log_level) lm.logger.debug(f"input_fmt={input_fmt}") lm.logger.debug(f"output_fmt={output_fmt}") lm.logger.debug(f"input_header={input_header}") lm.logger.debug(f"output_header={output_header}") # fix params input_header = 0 if input_header else None output_prefix = output_prefix if output_prefix else f"{input}_" compress = ".gz" if compress else "" dt_sep = {"csv": ",", "tsv": "\t"} # load table df = pd.read_csv(input, header=input_header, sep=dt_sep[input_fmt]) # max lines for each part table: chunk_size chunk_size = df.shape[0] // n if df.shape[0] % n == 0 else df.shape[0] // n + 1 # write out for idx in range(n): df_write_out = df.iloc[idx * chunk_size : (idx + 1) * chunk_size,] df_write_out.to_csv( f"{output_prefix}{idx}.%s{compress}" % output_fmt, index=None, header=output_header, sep=dt_sep[output_fmt], )
if __name__ == "__main__": pass