Align VCF to reference
From PyPedia
Contents |
[edit] Documentation
Align a VCF filename with a reference fasta file. Important note: the tab-delimited FASTA file refers to UCSC tab-delimited file format coming out of fastaFromBED. It also assumes both input files are sorted by chromosome and position.
Parameters
- input_fasta_tab_filename: The filename of the FASTA tab delimited file.
- input_VCF_filename: The filename of the VCF file
- Flip: if set True, it flips the strand. Note that this will discard all A/T and C/G SNPs as they are ambiguous.
- output_VCF_filename: The filename of the output file.
Credits
- Initially written in Perl by Laurent Francioli.
[edit] Parameters
<inputs> </inputs>
[edit] Return
The filename of the output VCF file.
[edit] See also
- User articles: Align VCF to reference user Kantale
[edit] Code
import re def Align_VCF_to_reference( input_VCF_filename = None, input_fasta_tab_filename = None, output_VCF_filename = None, flip = False, ): if type(input_VCF_filename).__name__ == 'str': try: VCF = open(input_VCF_filename, 'U') except IOError: raise Exception("Could not open input vcf file: " + input_VCF_filename) else: VCF = input_VCF_filename if type(input_fasta_tab_filename).__name__ == 'str': try: FASTA = open(input_fasta_tab_filename, 'U') except IOError: raise Exception("Could not open input fasta-tab file: " + input_fasta_tab_filename) else: FASTA = input_fasta_tab_filename if type(output_VCF_filename).__name__ == 'str': try: VCF_OUT = open(output_VCF_filename, "w") except IOError: raise Exception("Could not create or open target vcf file: " + output_VCF_filename) else: VCF_OUT = output_VCF_filename #Initialize vars fasta_fields = [0,0,0,0] #Initialized to be at virtual Ch0 pos 0. vcf_ln = 0 fasta_ln = 0 total_unchanged = 0 total_changed = 0 total_warnings = 0 total_fasta_warnings = 0 total_errors = 0 total_strand_flipped = 0 total_ambiguous = 0 total_fasta_skipped = 0 total_fasta_N = 0 get_fasta_next_line = True #Flipped Strand conversion strandFlip = {'A' : 'T', 'T' : 'A', 'C' : 'G', 'G' : 'C', '.' : '.'} #Loop over input VCF file #The inner loop over the FASTA file is bound by position while True: vcf = VCF.readline() if not vcf: break vcf_ln += 1 vcf = vcf.replace("\n", "") match_found = 1 get_VCF_next_line = True #Skip header lines if re.search("^[#@]", vcf): VCF_OUT.write(vcf + "\n") continue #Parse nicely formatted lines else: match = re.search("^(\S+)\s(\d+)\s(\S+)\s([ACGT\.])\s([ACGT\.])(.+)", vcf) if match: #Check VCF field number vcf_fields = vcf.split() vcf_fields[1] = int(vcf_fields[1]) if len(vcf_fields) < 9: print "ERROR: Line %s of the VCF file does not contain enough fields (only %i found where at least 9 are expected). Line skipped." % (vcf_ln, len(vcf_fields)) total_errors += 1 continue #Loop over FASTA file while True: if get_fasta_next_line: fasta = FASTA.readline() if not fasta: break fasta_ln += 1 get_fasta_next_line = True #Check and parse FASTA fasta_match = re.search("^(\S+):(\d+)-(\d+)\t([ACTGN])", fasta) if fasta_match: fasta_fields[0] = fasta_match.group(1) fasta_fields[1] = int(fasta_match.group(2)) fasta_fields[2] = int(fasta_match.group(3)) fasta_fields[3] = fasta_match.group(4) else: print "WARNING: Line %i of the FASTA file is not formated correctly. Line skipped." % (fasta_ln) total_fasta_warnings += 1 continue if fasta_fields[3] == "N": print "WARNING: line %i of the FASTA file does not contain reference information (N)" % (fasta_ln) total_fasta_N += 1 break #Check if fasta is "behind" if ((getChrNum(fasta_fields[0]) == getChrNum(vcf_fields[0]) and fasta_fields[2] < vcf_fields[1]) or getChrNum(fasta_fields[0]) < getChrNum(vcf_fields[0])): print "WARNING: misiligned line in VCF (%s:%i) Skipping line %i of fasta (%s:%i)" % (vcf_fields[0], vcf_fields[1], fasta_ln, fasta_fields[0], fasta_fields[2] ) total_fasta_skipped += 1 continue #Check if VCF is behind #Check that the position was found if fasta_fields[2] != vcf_fields[1]: print "WARNING: Position %s %i in VCF file at line %d not found in FASTA file line %i, which contains %s:%i. Line unchanged." % (vcf_fields[0], vcf_fields[1], vcf_ln, fasta_ln, fasta_fields[0], fasta_fields[2]) get_fasta_next_line = False total_warnings += 1 match_found = 0 break #If the input file is not only on + strand, then verify that SNPs are not ambiguous elif flip and (vcf_fields[3] == strandFlip[vcf_fields[4]]): total_ambiguous += 1 match_found = 0 #Flip alleles where necessary elif fasta_fields[3] != vcf_fields[3]: #Check that reverse alleles match if fasta_fields[3] == vcf_fields[4]: #Flip alleles temp = vcf_fields[3] vcf_fields[3] = vcf_fields[4] vcf_fields[4] = temp #Modify homozygous individuals appropriately for i in xrange(9, len(vcf_fields) + 0): vcf_fields_match = re.match("(.*)0\/0(.*)", vcf_fields[i]) if vcf_fields_match: vcf_fields[i] = vcf_fields_match.group(1) + "1/1" + vcf_fields_match.group(2) else: vcf_fields_match = re.match("(.*)1\/1(.*)", vcf_fields[i]) if vcf_fields_match: vcf_fields[i] = vcf_fields_match.group(1) + "0/0" + vcf_fields_match.group(2) vcf = str.join("\t",[str(x) for x in vcf_fields]) total_changed += 1 #If the option to flip strand is activated, try flipping strand elif flip: #Case strand needs to be flipped if strandFlip[vcf_fields[3]] == fasta_fields[3]: vcf_fields[3] = strandFlip[vcf_fields[3]] vcf_fields[4] = strandFlip[vcf_fields[4]] vcf = str.join("\t", [str(x) for x in vcf_fields]) total_strand_flipped += 1 #Case strand and alleles need to be flipped elif strandFlip[vcf_fields[4]] == fasta_fields[3]: #Flip alleles temp = strandFlip[vcf_fields[3]] vcf_fields[3] = strandFlip[vcf_fields[4]] vcf_fields[4] = temp #Modify homozygous individuals appropriately for i in xrange(9, len(vcf_fields) + 0): vcf_fields_match = re.match("(.*)0\/0(.*)", vcf_fields[i]) if vcf_fields_match: vcf_fields[i] = vcf_fields_match.group(1) + "1/1" + vcf_fields_match.group(2) else: vcf_fields_match = re.match("(.*)1\/1(.*)", vcf_fields[i]) if vcf_fields_match: vcf_fields[i] = vcf_fields_match.group(1) + "0/0" + vcf_fields_match.group(2) vcf = str.join("\t", [str(x) for x in vcf_fields]) total_strand_flipped += 1 else: match_found = 0 print "WARNING: [FLIP] No allele match for SNP at VCF position %s %i at line %i. Line skipped." % (vcf_fields[0], vcf_fields[1], vcf_ln) total_warnings += 1 #Report non-matching alleles else: match_found = 0 print "WARNING: No allele match for SNP at VCF position %s %i at line %i. Line skipped." % (vcf_fields[0], vcf_fields[1], vcf_ln) total_warnings += 1 else: total_unchanged += 1 break if match_found: VCF_OUT.write(vcf + "\n") #Report ill formated lines else: print "ERROR: Line %i of the VCF is not formated correctly. Line skipped." % (vcf_ln) total_errors += 1 print "VCF file update completed.\n\n%i SNPs did match the reference previously.\n%i SNPs updated." % (total_unchanged, total_changed) if flip: print "%i SNPs were strand-flipped\n%i SNPs were ambiguous (A/T, C/G), could not be strand-flipped and were skipped." % (total_strand_flipped, total_ambiguous) print "%i SNPs did not match the reference or were lacking in the reference and could not be updated.\n%i SNPs skipped due to incorrect formatting\n%i FASTA lines not parsed." % (total_warnings, total_errors, total_fasta_warnings) print "%i Fasta lines skipped." % (total_fasta_skipped) print "%i Fasta lines contain no refererence (N)." % (total_fasta_N) VCF.close() FASTA.close() VCF_OUT.close() return output_VCF_filename def getChrNum(ch): """ Returns a number for a given chrom incl. X,Y,XY and MT. Arguments: [0]String: Chrom ID Returns: [0]int: Chrom Number """ if re.search("^\d+$", ch): return ch if ch.lower() == "x": return 23 if ch.lower() == "y": return 24 if ch.lower() == "xy": return 25 if ch.lower() == "mt": return 26 return 27
[edit] Unit Tests
def uni1(): return True
[edit] Development Code
def Align_VCF_to_reference(): pass
[edit] Permissions
[edit] Documentation Permissions
Kantale
[edit] Code Permissions
Kantale
[edit] Unit Tests Permissions
Kantale