Source code for bioat.hictools

import gc
import time

import pandas as pd

from bioat.devtools import profile
from bioat.logger import LoggerManager

lm = LoggerManager(mod_name="bioat.hictools")


[docs] class HiCTools: """Hi-C toolbox.""" lm.set_names(cls_name="HiCTools") def __init__(self): pass @profile def get_effective_resolutions( self, genome_index, valid_pairs, output, log_level: str = "INFO", ): """Get deepest resolution of cis-interation. 输入为result文件,输出为matrix的大致resolution. :param genome_index: genome.fa.fai :param valid_pairs: rm_dup_pairs.allValidPairs by HiCTools-Pro :param output: table_output, TSV file :param log_level: 'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'NOTSET' """ lm.set_names(func_name="get_effective_resolutions") lm.set_level(log_level) lm.logger.debug("Develop mode is on") # load ref genome lengths df_chromosome = pd.read_csv( genome_index, sep="\t", usecols=[0, 1], names=["chromosome", "length"], index_col="chromosome", ) # load sample valid pairs by HiCTools-Pro after remove duplication and filter. reader = pd.read_csv( valid_pairs, sep="\t", header=None, names=[ "read_name", "chr_A", "start_A", "strand_A", "chr_B", "start_B", "strand_B", "insert_size", "fragment_name_A", "fragment_name_B", "mapQ_A", "mapQ_B", "allele_specific_info", ], usecols=["chr_A", "start_A", "chr_B", "start_B"], dtype={ "chr_A": str, "start_A": int, "chr_B": str, "start_B": int, }, iterator=True, ) chunks = [] while True: try: chunk = reader.get_chunk(10_000_000) chunks.append(chunk) except StopIteration: lm.logger.debug("pandas reader iteration is done.") break df_valid_pairs = pd.concat(chunks, ignore_index=True) lm.logger.debug(df_valid_pairs.info(memory_usage="deep")) # 思路:二分法确认new_bin下线 output = open(output, "w") write_lines = 0 output.write("test_bin(kb)\ttotal_bins\tthreshold(%)\tis_effective\n") write_lines += 1 # 思路:二分法确认new_bin下线 circle = 1 new_bin = 512_000 # 512kb def get_bin_index(x): cumsum = df_chromosome["bins_cumsum"][x["chr_A"]] bin_count_in_this_chrom_a = int(x["start_A"] / new_bin) + 1 bin_count_in_this_chrom_b = int(x["start_B"] / new_bin) + 1 bin_index_a = cumsum + bin_count_in_this_chrom_a - 1 bin_index_b = cumsum + bin_count_in_this_chrom_b - 1 return bin_index_a, bin_index_b while True: # 每次迭代都把new_bin / 2 new_bin = int(new_bin / circle) circle = 2 # calculate total bins df_chromosome["bins"] = df_chromosome["length"].map( lambda x, new_bin=new_bin: int(x / new_bin) + 1, ) df_chromosome["bins_cumsum"] = df_chromosome["bins"].cumsum() total_bins = df_chromosome.iloc[-1, 2] # calculate mapped bin index df_valid_pairs[["bin_index_A", "bin_index_B"]] = df_valid_pairs.apply( # df_valid_pairs[['bin_index_A', 'bin_index_B']] = df_valid_pairs.apply( get_bin_index, axis=1, result_type="expand", ) # print(df_valid_pairs) # threshold calculation s_bin_idx_count1 = df_valid_pairs["bin_index_A"].value_counts() s_bin_idx_count2 = df_valid_pairs["bin_index_B"].value_counts() df_bin_interaction_count = pd.concat( [s_bin_idx_count1, s_bin_idx_count2], axis=1 ) df_bin_interaction_count = df_bin_interaction_count.fillna(0) df_bin_interaction_count["total"] = ( df_bin_interaction_count["bin_index_A"] + df_bin_interaction_count["bin_index_B"] ) threshold = round( (df_bin_interaction_count["total"] >= 1000).sum() / total_bins * 100, 1 ) if threshold > 80: # 输出结果为有效,进入下一个循环 output.write(f"{int(new_bin / 1000)}\t{total_bins}\t{threshold}\tYES\n") write_lines += 1 else: # if write_lines != 1: # # 第1+n次计算无效,直接终止程序 # output.write(f'{int(new_bin / 1000)}\t{total_bins}\t{threshold}\tNO\n') # else: # # 第一次计算就无效,直接终止程序 output.write(f"{int(new_bin / 1000)}\t{total_bins}\t{threshold}\tNO\n") break del s_bin_idx_count1, s_bin_idx_count2, df_bin_interaction_count gc.collect() output.close()
if __name__ == "__main__": a = time.time() hic = HiCTools() hic.get_effective_resolutions( genome_index="/Volumes/zhaohn_HD/Bio/1.database/db_genomes/genome_fa/genome_ucsc_hg38/genome_ucsc_hg38.fa.fai", valid_pairs="../../data/hic/test.rm_dup_pairs.allValidPairs", output="/Users/zhaohuanan/Downloads/testout", ) a = time.time() - a # print(f'\n{a:.3f}s')