当前位置: 首页 > 工具软件 > bed > 使用案例 >

BED文件与bedtools简介

胡永逸
2023-12-01

1、什么是bed格式

1、文本文件

2、表明基因组的一段区域

3、标准的bed文件最少三列,最多十二列

eg:1、chrom 孔  2、start 开始 3、end 结束 4、name 名称 5、score 存一个数 6、 strand + or -

2、bed格式的使用

1、储存基因区

2、储存基因组的某些位点信息

3、储存CHIP-seq、ATAC-seq等的富集的peak信息

3、bedtools 是一种常用的bed操作工具,可以实现非常多的常用功能

① 两个Bed文件的交集(bedtools intersect)

②bed文件按照基因组坐标排序(bedtools sort)

③对bed文件进行扩大、平移(bedtools shift)

④对bed文件进行随机提取(bedtools random)

⑤根据提供的bed文件在基因组进行随机抽取(bedtools shuffle)

4、尝试用python写bedtools里的功能

交集 (bedtools intersect)

一对一

读取文件

# step load reference order

ref_order_dict = {}

ref_fai = open("./xxxx.fa.fai","rb")

index = 0

for line in ref_fai:
    line_list = line.strip().split("\t")
    ref_order_dict[line_list[0]] = index
    index += 1


ref_fai.close()

#step2 test interect

#2.1 open file
from cv2 import split

bed_a = open("./CTCF_rep1.sort.bed","r")
bed_b = open("./CTCF_rep2.sort.bed","r")


#init

line_a = bed_a.readline()
line_b = bed_b.readline()

overlap_count = 0


while bed_a and bed_b:


    line_list_a = line_a.strip().split("\t")
    line_list_b = line_b.strip().split("\t")

    #same chromosome 相同染色体 
    if line_list_a[0] == line_list_b[0]:
        start_a = int(line_list_a[1])
        end_a = int(line_list_a[2])

        start_b = int(line_list_b[1])
        end_b = int(line_list_b[2])

        #no-overlap a upstream  上游没有重叠
        if end_a < start_b:
            line_a = bed_a.readline()

        #no-overlap a downstream  下游没有重叠
        if start_a > end_b:
            line_b = bed_b.readline()

        #overlap 重叠
        else:
            overlap_count += 1
            temp_line_a = line_a
            temp_line_b = line_b
            line_a = bed_a.readline()
            line_b = bed_b.readline()

        #differ chomosome #不同的染色体
    else:
        order_a = ref_order_dict.get(line_list_a[0])
        order_b = ref_order_dict.get(line_list_b[0])

        if order_a < order_b:
            line_a = bed_a.readline()

        else:
            line_b = bed_b.readline()

print(overlap_count)

多对多

def cmp_region(region_a,region_b,ref_order_dict):
    """
    INPUT:
          <region_a> <region_b>
             str, line from BED file
    OUTPUT:
           -1 region_a at upstream of region_b
           0  region_a overlaps with region_b
           1  region_a at downstream of region_b
    """
    if region_a is None:
        return 1
    if region_b is None:
        return -1

    region_list_a = region_a.strip().split("\t")
    region_list_b = region_b.strip().split("\t")

    order_a = ref_order_dict.get(region_list_a[0])
    order_b = ref_order_dict.get(region_list_b[0])

    if order_a < order_b:
        return -1
    
    if order_a > order_b:
        return 1


    #get region in
    start_a = int(region_list_a[1])
    end_a = int(region_list_a[2])


    start_b = int(region_list_b[1])
    end_b = int(region_list_b[2])


    #no-overlap a upstream

    if end_a < start_b:
        return -1

    if start_a > end_b:
        return 1

    return 0
def update_temp_info(temp_merge_line,temp_chr_name,temp_chr_start,temp_chr_end,new_line):
    """
    """

    new_line_list = new_line.strip().split("\t")

    if temp_merge_line is None:
        temp_merge_line = new_line
        temp_chr_name = new_line_list[0]
        temp_chr_start = int(new_line_list[1])
        temp_chr_end = int(new_line_list[2])


    else:
        temp_chr_name = new_line_list[0]
        temp_chr_start = min(temp_chr_start, int(new_line_list[1]))
        temp_chr_end = max(temp_chr_end, int(new_line_list[2]))
        temp_merge_line = "%s\t%s\t%s" % (temp_chr_name, temp_chr_start, temp_chr_end)

    return temp_merge_line, temp_chr_name, temp_chr_start, temp_chr_end
    
# find all a-b intersect

# 2.1 open file

bed_a = open("./test_multi_region_a.sort.bed","r")
bed_b = open("./test_multi_region_b.sort.bed","r")


line_a = bed_a.readline()
line_b = bed_b.readline()


temp_list = []
temp_chr_name = None
temp_chr_start = None
temp_chr_end = None
temp_merge_line = None
new_line_state = None


while line_a and line_a:

    #check temp list
    if len(temp_list) > 0 and new_line_state:

        temp_cmp_res = cmp_region(line_a,temp_merge_line, ref_order_dict)

        if temp_cmp_res == 1:
            temp_list = []
            temp_chr_name = None
            temp_chr_start = None
            temp_chr_end = None
            temp_merge_line = None
        elif temp_cmp_res == 0:
            for temp_line_b in temp_list:
                run_cmp_res = cmp_region(line_a, temp_line_b, ref_order_dict)

                if run_cmp_res == 0:
                    print(line_a.strip(), temp_line_b.strip())
                    print("-" * 80)

    #check current
    cmp_res = cmp_region(line_a, line_b, ref_order_dict)

    if cmp_res == -1:
        line_a = bed_a.readline()
        new_line_state = True


    elif cmp_res == 1:
        new_line_state = False

        line_b_list = line_b.strip().split("\t")

        if line_a.split("\t")[0] == line_b_list[0]:
            temp_list.append(line_b)
            

            temp_merge_line, temp_chr_name, temp_chr_start, temp_chr_end = update_temp_info(
                temp_merge_line, 
                temp_chr_name, 
                temp_chr_start, 
                temp_chr_end,
                line_b
            )


        else:
            temp_list = []
            temp_chr_name = None
            temp_chr_start = None
            temp_chr_end = None
            temp_merge_line = None
            
        line_b = bed_b.readline()
    
    else:
        new_line_state = False
        print(line_a.strip(), line_b.strip())
        print("-" * 80)

        temp_list.append(line_b)
        temp_merge_line, temp_chr_name, temp_chr_start, temp_chr_end = update_temp_info(
                temp_merge_line, 
                temp_chr_name, 
                temp_chr_start, 
                temp_chr_end,
                line_b
        )  
        
        line_b = bed_b.readline()
        
# check file end
file_b_end_state = False
file_a_end_state = False

if line_b == "":
    file_b_end_state = True

if line_a == "":
    file_a_end_state = True
    
while not file_a_end_state:
    
    # read new line
    line_a = bed_a.readline()
    new_line_state = True
    
    if line_a == "":
        break

    # check temp list
    if len(temp_list) > 0 and new_line_state: 

        temp_cmp_res = cmp_region(line_a, temp_merge_line, ref_order_dict)
        
        if temp_cmp_res == 1:
            break

        elif temp_cmp_res == 0:
            for temp_line_b in temp_list:
                run_cmp_res = cmp_region(line_a, temp_line_b, ref_order_dict)
                
                if run_cmp_res == 0:
                    print(line_a.strip(), temp_line_b.strip())
                    print("-" * 80)

写程序的本质还是数学吧 慢慢学习

 类似资料: