Source code for bioat.hictools
import gc
import time
import pandas as pd
from bioat.devtools import profile
from bioat.logger import LoggerManager
lm = LoggerManager(mod_name="bioat.hictools")
[docs]
class HiCTools:
"""Hi-C toolbox."""
lm.set_names(cls_name="HiCTools")
def __init__(self):
pass
@profile
def get_effective_resolutions(
self,
genome_index,
valid_pairs,
output,
log_level: str = 'INFO'
):
"""Get deepest resolution of cis-interation.
输入为result文件,输出为matrix的大致resolution.
:param genome_index: genome.fa.fai
:param valid_pairs: rm_dup_pairs.allValidPairs by HiCTools-Pro
:param output: table_output, TSV file
:param log_level: 'CRITICAL', 'ERROR', 'WARNING', 'INFO', 'DEBUG', 'NOTSET'
"""
lm.set_names(func_name="get_effective_resolutions")
lm.set_level(log_level)
lm.logger.debug("Develop mode is on")
# load ref genome lengths
df_chromosome = pd.read_csv(
genome_index,
sep='\t',
usecols=[0, 1],
names=['chromosome', 'length'],
index_col='chromosome',
)
# load sample valid pairs by HiCTools-Pro after remove duplication and filter.
reader = pd.read_csv(
valid_pairs,
sep='\t',
header=None,
names=[
'read_name', 'chr_A', 'start_A', 'strand_A', 'chr_B', 'start_B', 'strand_B', 'insert_size',
'fragment_name_A', 'fragment_name_B', 'mapQ_A', 'mapQ_B', 'allele_specific_info'
],
usecols=['chr_A', 'start_A', 'chr_B', 'start_B'],
dtype={
'chr_A': str,
'start_A': int,
'chr_B': str,
'start_B': int,
},
iterator=True
)
chunks = []
while True:
try:
chunk = reader.get_chunk(10_000_000)
chunks.append(chunk)
except StopIteration:
lm.logger.debug("pandas reader iteration is done.")
break
df_valid_pairs = pd.concat(chunks, ignore_index=True)
lm.logger.debug(df_valid_pairs.info(memory_usage="deep"))
# 思路:二分法确认new_bin下线
output = open(output, 'wt')
write_lines = 0
output.write('test_bin(kb)\ttotal_bins\tthreshold(%)\tis_effective\n')
write_lines += 1
# 思路:二分法确认new_bin下线
circle = 1
new_bin = 512_000 # 512kb
def get_bin_index(x):
cumsum = df_chromosome['bins_cumsum'][x['chr_A']]
bin_count_in_this_chrom_a = int(x['start_A'] / new_bin) + 1
bin_count_in_this_chrom_b = int(x['start_B'] / new_bin) + 1
bin_index_a = cumsum + bin_count_in_this_chrom_a - 1
bin_index_b = cumsum + bin_count_in_this_chrom_b - 1
return bin_index_a, bin_index_b
while True:
# 每次迭代都把new_bin / 2
new_bin = int(new_bin / circle)
circle = 2
# calculate total bins
df_chromosome["bins"] = df_chromosome["length"].map(
lambda x, new_bin=new_bin: int(x / new_bin) + 1
)
df_chromosome['bins_cumsum'] = df_chromosome['bins'].cumsum()
total_bins = df_chromosome.iloc[-1, 2]
# calculate mapped bin index
df_valid_pairs[["bin_index_A", "bin_index_B"]] = df_valid_pairs.apply(
# df_valid_pairs[['bin_index_A', 'bin_index_B']] = df_valid_pairs.apply(
get_bin_index,
axis=1,
result_type="expand",
)
# print(df_valid_pairs)
# threshold calculation
s_bin_idx_count1 = df_valid_pairs['bin_index_A'].value_counts()
s_bin_idx_count2 = df_valid_pairs['bin_index_B'].value_counts()
df_bin_interaction_count = pd.concat([s_bin_idx_count1, s_bin_idx_count2], axis=1)
df_bin_interaction_count.fillna(0, inplace=True)
df_bin_interaction_count['total'] = df_bin_interaction_count['bin_index_A'] + df_bin_interaction_count[
'bin_index_B'
]
threshold = round((df_bin_interaction_count['total'] >= 1000).sum() / total_bins * 100, 1)
if threshold > 80:
# 输出结果为有效,进入下一个循环
output.write(f'{int(new_bin / 1000)}\t{total_bins}\t{threshold}\tYES\n')
write_lines += 1
else:
# if write_lines != 1:
# # 第1+n次计算无效,直接终止程序
# output.write(f'{int(new_bin / 1000)}\t{total_bins}\t{threshold}\tNO\n')
# else:
# # 第一次计算就无效,直接终止程序
output.write(f'{int(new_bin / 1000)}\t{total_bins}\t{threshold}\tNO\n')
break
del s_bin_idx_count1, s_bin_idx_count2, df_bin_interaction_count
gc.collect()
output.close()
if __name__ == '__main__':
a = time.time()
hic = HiCTools()
hic.get_effective_resolutions(
genome_index="/Volumes/zhaohn_HD/Bio/1.database/db_genomes/genome_fa/genome_ucsc_hg38/genome_ucsc_hg38.fa.fai",
valid_pairs="../../data/hic/test.rm_dup_pairs.allValidPairs",
output="/Users/zhaohuanan/Downloads/testout"
)
a = time.time() - a
# print(f'\n{a:.3f}s')