Convert VCF to beagle unphased trio user Kantale
From PyPedia
Contents |
[edit] Documentation
Convert files from Variant Call Format that contain trio information to beagle unphased trio format. This is a useful preliminary step before phasing.
[edit] Parameters
<inputs> </inputs>
[edit] Return
[edit] See also
[edit] Code
def Convert_VCF_to_beagle_unphased_trio_user_Kantale( input_filename = None, output_filename = None, markers_filename = None, remove_children = False, print_mend_errors = False, print_line_number = True, phased = False, #Is the vcf files phased or not missing="0" ): file = open(input_filename) if output_filename: if type(output_filename).__name__ == "str": output_file = open(output_filename, "w") elif type(output_filename).__name__ == "file": #Allow sys.stdout assignments to output_filename output_file = output_filename if markers_filename: markers_file = open(markers_filename, "w") header = False lines = 0 mend_errors = 0 mend_errors_unknown = 0 cases_0_0 = [ [[1,1], [0,0]], [[1,1], [0,1]], [[1,1], [1,1]] ] cases_1_1 = [ [[0,0], [0,0]], [[0,0], [0,1]], [[0,0], [1,1]] ] cases_0_1 = [ [[0,0], [0,0]], [[1,1], [1,1]] ] if phased: separator = "|" else: separator = "/" while True: line = file.readline() if not line: break lines += 1 line_splitted = line.split() if line_splitted[0] == "#CHROM": header = list(line_splitted) samples = header[9:] # print header # print samples if remove_children: # Remove every third value to_print = ["I", "id"] + reduce(lambda x,y: x+y, [[x,x] for index, x in enumerate(samples) if index % 3 != 2]) else: to_print = ["I", "id"] + reduce(lambda x,y: x+y, [[x,x] for x in samples]) if output_filename: output_file.write(str.join(" ", to_print) + "\n") if header: if line_splitted[6] == "PASS": chromosome = line_splitted[0] position = line_splitted[1] id = line_splitted[2] if id == ".": id = chromosome + "_" + position ref = line_splitted[3] alt = line_splitted[4] if markers_filename: markers_file.write(str.join(" ", [id, position, ref, alt]) + "\n") to_print = ["M", id] for gen_index, genotype_field in enumerate(line_splitted[9:]): if remove_children and gen_index % 3 == 2: # Do noy print every third value continue genotype_splitted = genotype_field.split(":")[0].split(separator) possible_genotypes = [ref, alt, missing] if genotype_splitted[0] == ".": al1_index = 2 else: al1_index = int(genotype_splitted[0]) if genotype_splitted[0] == ".": al2_index = 2 else: al2_index = int(genotype_splitted[1]) al1_genotype = possible_genotypes[al1_index] al2_genotype = possible_genotypes[al2_index] if gen_index % 3 == 0: #This is parent 1 parent1_gen = [al1_index, al2_index] parent1_gen2 = [al1_genotype, al2_genotype, missing] elif gen_index % 3 == 1: #This is parent 2 parent2_gen = [al1_index, al2_index] parent2_gen2 = [al1_genotype, al2_genotype, missing] elif gen_index % 3 == 2: #This is the child error = False error_unknown = False if al1_index == 0 and al2_index == 0: to_check = cases_0_0 elif al1_index == 1 and al2_index == 1: to_check = cases_1_1 elif al1_index == 0 and al2_index == 1: to_check = cases_0_1 elif al1_index == 2 or al2_index == 2: to_check = [] for case in to_check: if parent1_gen == case[0] and parent2_gen == case[1]: error = True elif parent2_gen == case[0] and parent1_gen == case[1]: error = True if parent1_gen == [2, 2] and [al1_index, al2_index] != parent2_gen: error_unknown = True elif parent2_gen == [2, 2] and [al1_index, al2_index] != parent1_gen: error_unknown = True if error or error_unknown: if print_mend_errors: print "lineno:",lines print "position:", line_splitted[1] print to_print print "parent1_gen:", parent1_gen print "parent2_gen:", parent2_gen print "parent1_gen2:", parent1_gen2 print "parent2_gen2:", parent2_gen2 print "al1_genotype:", al1_genotype print "al2_genotype:", al2_genotype if error: mend_errors += 1 if error_unknown: mend_errors_unknown += 1 al1_genotype = missing al2_genotype = missing #a = 1/0 to_print += [al1_genotype, al2_genotype] if output_file: output_file.write(str.join(" ", to_print) + "\n") if print_line_number: if lines % 10000 == 0: print "Lines:", lines if output_filename: output_file.close() print "Mendelian errors:", mend_errors print "Mendelian errors caused by unknown values:", mend_errors_unknown
[edit] Unit Tests
def uni1(): return True
[edit] Development Code
def Convert_VCF_to_beagle_unphased_trio_user_Kantale(): pass
[edit] Permissions
[edit] Documentation Permissions
Kantale
[edit] Code Permissions
Kantale
[edit] Unit Tests Permissions
Kantale