1、文本文件
2、表明基因组的一段区域
3、标准的bed文件最少三列,最多十二列
eg:1、chrom 孔 2、start 开始 3、end 结束 4、name 名称 5、score 存一个数 6、 strand + or -
1、储存基因区
2、储存基因组的某些位点信息
3、储存CHIP-seq、ATAC-seq等的富集的peak信息
① 两个Bed文件的交集(bedtools intersect)
②bed文件按照基因组坐标排序(bedtools sort)
③对bed文件进行扩大、平移(bedtools shift)
④对bed文件进行随机提取(bedtools random)
⑤根据提供的bed文件在基因组进行随机抽取(bedtools shuffle)
读取文件
# step load reference order
ref_order_dict = {}
ref_fai = open("./xxxx.fa.fai","rb")
index = 0
for line in ref_fai:
line_list = line.strip().split("\t")
ref_order_dict[line_list[0]] = index
index += 1
ref_fai.close()
#step2 test interect
#2.1 open file
from cv2 import split
bed_a = open("./CTCF_rep1.sort.bed","r")
bed_b = open("./CTCF_rep2.sort.bed","r")
#init
line_a = bed_a.readline()
line_b = bed_b.readline()
overlap_count = 0
while bed_a and bed_b:
line_list_a = line_a.strip().split("\t")
line_list_b = line_b.strip().split("\t")
#same chromosome 相同染色体
if line_list_a[0] == line_list_b[0]:
start_a = int(line_list_a[1])
end_a = int(line_list_a[2])
start_b = int(line_list_b[1])
end_b = int(line_list_b[2])
#no-overlap a upstream 上游没有重叠
if end_a < start_b:
line_a = bed_a.readline()
#no-overlap a downstream 下游没有重叠
if start_a > end_b:
line_b = bed_b.readline()
#overlap 重叠
else:
overlap_count += 1
temp_line_a = line_a
temp_line_b = line_b
line_a = bed_a.readline()
line_b = bed_b.readline()
#differ chomosome #不同的染色体
else:
order_a = ref_order_dict.get(line_list_a[0])
order_b = ref_order_dict.get(line_list_b[0])
if order_a < order_b:
line_a = bed_a.readline()
else:
line_b = bed_b.readline()
print(overlap_count)
多对多
def cmp_region(region_a,region_b,ref_order_dict):
"""
INPUT:
<region_a> <region_b>
str, line from BED file
OUTPUT:
-1 region_a at upstream of region_b
0 region_a overlaps with region_b
1 region_a at downstream of region_b
"""
if region_a is None:
return 1
if region_b is None:
return -1
region_list_a = region_a.strip().split("\t")
region_list_b = region_b.strip().split("\t")
order_a = ref_order_dict.get(region_list_a[0])
order_b = ref_order_dict.get(region_list_b[0])
if order_a < order_b:
return -1
if order_a > order_b:
return 1
#get region in
start_a = int(region_list_a[1])
end_a = int(region_list_a[2])
start_b = int(region_list_b[1])
end_b = int(region_list_b[2])
#no-overlap a upstream
if end_a < start_b:
return -1
if start_a > end_b:
return 1
return 0
def update_temp_info(temp_merge_line,temp_chr_name,temp_chr_start,temp_chr_end,new_line):
"""
"""
new_line_list = new_line.strip().split("\t")
if temp_merge_line is None:
temp_merge_line = new_line
temp_chr_name = new_line_list[0]
temp_chr_start = int(new_line_list[1])
temp_chr_end = int(new_line_list[2])
else:
temp_chr_name = new_line_list[0]
temp_chr_start = min(temp_chr_start, int(new_line_list[1]))
temp_chr_end = max(temp_chr_end, int(new_line_list[2]))
temp_merge_line = "%s\t%s\t%s" % (temp_chr_name, temp_chr_start, temp_chr_end)
return temp_merge_line, temp_chr_name, temp_chr_start, temp_chr_end
# find all a-b intersect
# 2.1 open file
bed_a = open("./test_multi_region_a.sort.bed","r")
bed_b = open("./test_multi_region_b.sort.bed","r")
line_a = bed_a.readline()
line_b = bed_b.readline()
temp_list = []
temp_chr_name = None
temp_chr_start = None
temp_chr_end = None
temp_merge_line = None
new_line_state = None
while line_a and line_a:
#check temp list
if len(temp_list) > 0 and new_line_state:
temp_cmp_res = cmp_region(line_a,temp_merge_line, ref_order_dict)
if temp_cmp_res == 1:
temp_list = []
temp_chr_name = None
temp_chr_start = None
temp_chr_end = None
temp_merge_line = None
elif temp_cmp_res == 0:
for temp_line_b in temp_list:
run_cmp_res = cmp_region(line_a, temp_line_b, ref_order_dict)
if run_cmp_res == 0:
print(line_a.strip(), temp_line_b.strip())
print("-" * 80)
#check current
cmp_res = cmp_region(line_a, line_b, ref_order_dict)
if cmp_res == -1:
line_a = bed_a.readline()
new_line_state = True
elif cmp_res == 1:
new_line_state = False
line_b_list = line_b.strip().split("\t")
if line_a.split("\t")[0] == line_b_list[0]:
temp_list.append(line_b)
temp_merge_line, temp_chr_name, temp_chr_start, temp_chr_end = update_temp_info(
temp_merge_line,
temp_chr_name,
temp_chr_start,
temp_chr_end,
line_b
)
else:
temp_list = []
temp_chr_name = None
temp_chr_start = None
temp_chr_end = None
temp_merge_line = None
line_b = bed_b.readline()
else:
new_line_state = False
print(line_a.strip(), line_b.strip())
print("-" * 80)
temp_list.append(line_b)
temp_merge_line, temp_chr_name, temp_chr_start, temp_chr_end = update_temp_info(
temp_merge_line,
temp_chr_name,
temp_chr_start,
temp_chr_end,
line_b
)
line_b = bed_b.readline()
# check file end
file_b_end_state = False
file_a_end_state = False
if line_b == "":
file_b_end_state = True
if line_a == "":
file_a_end_state = True
while not file_a_end_state:
# read new line
line_a = bed_a.readline()
new_line_state = True
if line_a == "":
break
# check temp list
if len(temp_list) > 0 and new_line_state:
temp_cmp_res = cmp_region(line_a, temp_merge_line, ref_order_dict)
if temp_cmp_res == 1:
break
elif temp_cmp_res == 0:
for temp_line_b in temp_list:
run_cmp_res = cmp_region(line_a, temp_line_b, ref_order_dict)
if run_cmp_res == 0:
print(line_a.strip(), temp_line_b.strip())
print("-" * 80)
写程序的本质还是数学吧 慢慢学习