Align VCF to reference

From PyPedia
Jump to: navigation, search

Contents

[edit] Documentation

Align a VCF filename with a reference fasta file. Important note: the tab-delimited FASTA file refers to UCSC tab-delimited file format coming out of fastaFromBED. It also assumes both input files are sorted by chromosome and position.

Parameters

  • input_fasta_tab_filename: The filename of the FASTA tab delimited file.
  • input_VCF_filename: The filename of the VCF file
  • Flip: if set True, it flips the strand. Note that this will discard all A/T and C/G SNPs as they are ambiguous.
  • output_VCF_filename: The filename of the output file.

Credits

  • Initially written in Perl by Laurent Francioli.


[edit] Parameters

<inputs>
</inputs>


[edit] Return

The filename of the output VCF file.

[edit] See also

[edit] Code

import re

def Align_VCF_to_reference(
	input_VCF_filename = None,
	input_fasta_tab_filename = None,
	output_VCF_filename = None,
	flip = False,
	):

	if type(input_VCF_filename).__name__ == 'str':
		try:
			VCF = open(input_VCF_filename, 'U')
		except IOError:
			raise Exception("Could not open input vcf file: " + input_VCF_filename)
	else:
		VCF = input_VCF_filename

	if type(input_fasta_tab_filename).__name__ == 'str':
		try:
			FASTA = open(input_fasta_tab_filename, 'U')
		except IOError:
			raise Exception("Could not open input fasta-tab file: " + input_fasta_tab_filename)
	else:
		FASTA = input_fasta_tab_filename

	if type(output_VCF_filename).__name__ == 'str':
		try:
			VCF_OUT = open(output_VCF_filename, "w")
		except IOError:
			raise Exception("Could not create or open target vcf file: " + output_VCF_filename)
	else:
		VCF_OUT = output_VCF_filename

	#Initialize vars
	fasta_fields = [0,0,0,0] #Initialized to be at virtual Ch0 pos 0.
	vcf_ln = 0
	fasta_ln = 0

	total_unchanged = 0
	total_changed = 0
	total_warnings = 0
	total_fasta_warnings = 0
	total_errors = 0
	total_strand_flipped = 0
	total_ambiguous = 0
	total_fasta_skipped = 0
	total_fasta_N = 0
	get_fasta_next_line = True

	#Flipped Strand conversion
	strandFlip = {'A' : 'T', 'T' : 'A', 'C' : 'G', 'G' : 'C', '.' : '.'}

	#Loop over input VCF file
	#The inner loop over the FASTA file is bound by position
	while True:

		vcf = VCF.readline()
		if not vcf:
			break

		vcf_ln += 1
		vcf = vcf.replace("\n", "")


		match_found = 1
		get_VCF_next_line = True


		#Skip header lines
		if re.search("^[#@]", vcf):
			VCF_OUT.write(vcf + "\n")
			continue
	
		#Parse nicely formatted lines
		else:
			match = re.search("^(\S+)\s(\d+)\s(\S+)\s([ACGT\.])\s([ACGT\.])(.+)", vcf)

			if match:
		
				#Check VCF field number
				vcf_fields = vcf.split()
				vcf_fields[1] = int(vcf_fields[1])
				if len(vcf_fields) < 9:
					print "ERROR: Line %s of the VCF file does not contain enough fields (only %i found where at least 9 are expected). Line skipped." % (vcf_ln, len(vcf_fields))
					total_errors += 1
					continue
		
		
				#Loop over FASTA file
				while True:

					if get_fasta_next_line:
						fasta = FASTA.readline()
						if not fasta:
							break

						fasta_ln += 1

					get_fasta_next_line = True

					#Check and parse FASTA
					fasta_match = re.search("^(\S+):(\d+)-(\d+)\t([ACTGN])", fasta)
					if fasta_match:
						fasta_fields[0] = fasta_match.group(1)
						fasta_fields[1] = int(fasta_match.group(2))
						fasta_fields[2] = int(fasta_match.group(3))
						fasta_fields[3] = fasta_match.group(4)
					else:
						print "WARNING: Line %i of the FASTA file is not formated correctly. Line skipped." % (fasta_ln)
						total_fasta_warnings += 1
						continue
					
					if fasta_fields[3] == "N":
						print "WARNING: line %i of the FASTA file does not contain reference information (N)" % (fasta_ln)
						total_fasta_N += 1
						break

					#Check if fasta is "behind"
					if ((getChrNum(fasta_fields[0]) == getChrNum(vcf_fields[0]) and fasta_fields[2] < vcf_fields[1]) or getChrNum(fasta_fields[0]) < getChrNum(vcf_fields[0])):
						print "WARNING: misiligned line in VCF (%s:%i) Skipping line %i of fasta (%s:%i)" % (vcf_fields[0], vcf_fields[1], fasta_ln, fasta_fields[0], fasta_fields[2]  )
						total_fasta_skipped += 1
						continue
			
					#Check if VCF is behind
					#Check that the position was found
					if fasta_fields[2] != vcf_fields[1]:		
						print "WARNING: Position %s %i in VCF file at line %d not found in FASTA file line %i, which contains %s:%i. Line unchanged." % (vcf_fields[0], vcf_fields[1], vcf_ln, fasta_ln, fasta_fields[0], fasta_fields[2])
						get_fasta_next_line = False
						total_warnings += 1
						match_found = 0
						break
		
					#If the input file is not only on + strand, then verify that SNPs are not ambiguous
					elif flip and (vcf_fields[3] == strandFlip[vcf_fields[4]]):
						total_ambiguous += 1
						match_found = 0
		
					#Flip alleles where necessary
					elif fasta_fields[3] != vcf_fields[3]:

						#Check that reverse alleles match
						if fasta_fields[3] == vcf_fields[4]:
				
							#Flip alleles
							temp = vcf_fields[3]
							vcf_fields[3] = vcf_fields[4]
							vcf_fields[4] = temp
							#Modify homozygous individuals appropriately

							for i in xrange(9, len(vcf_fields) + 0):
								vcf_fields_match = re.match("(.*)0\/0(.*)", vcf_fields[i])
								if vcf_fields_match:
									vcf_fields[i] = vcf_fields_match.group(1) + "1/1" + vcf_fields_match.group(2)

								else:
									vcf_fields_match = re.match("(.*)1\/1(.*)", vcf_fields[i])
									if vcf_fields_match:
										vcf_fields[i] = vcf_fields_match.group(1) + "0/0" + vcf_fields_match.group(2)
					
				
							vcf = str.join("\t",[str(x) for x in vcf_fields])

							total_changed += 1
			
						#If the option to flip strand is activated, try flipping strand
						elif flip:

							#Case strand needs to be flipped
							if strandFlip[vcf_fields[3]] == fasta_fields[3]:
								vcf_fields[3] = strandFlip[vcf_fields[3]]
								vcf_fields[4] = strandFlip[vcf_fields[4]]
								vcf = str.join("\t", [str(x) for x in vcf_fields])
								total_strand_flipped += 1
				
							#Case strand and alleles need to be flipped
							elif strandFlip[vcf_fields[4]] == fasta_fields[3]:
								#Flip alleles
								temp = strandFlip[vcf_fields[3]]
								vcf_fields[3] = strandFlip[vcf_fields[4]]
								vcf_fields[4] = temp
								#Modify homozygous individuals appropriately
								for i in xrange(9, len(vcf_fields) + 0):
									vcf_fields_match = re.match("(.*)0\/0(.*)", vcf_fields[i])
									if vcf_fields_match:
										vcf_fields[i] = vcf_fields_match.group(1) + "1/1" + vcf_fields_match.group(2) 

									else:
										vcf_fields_match = re.match("(.*)1\/1(.*)", vcf_fields[i])
										if vcf_fields_match:
											vcf_fields[i] = vcf_fields_match.group(1) + "0/0" + vcf_fields_match.group(2)


								vcf = str.join("\t", [str(x) for x in vcf_fields])
								total_strand_flipped += 1

							else:
								match_found = 0			
								print "WARNING: [FLIP] No allele match for SNP at VCF position %s %i at line %i. Line skipped." % (vcf_fields[0], vcf_fields[1], vcf_ln)
								total_warnings += 1

						#Report non-matching alleles
						else:
							match_found = 0
							print "WARNING: No allele match for SNP at VCF position %s %i at line %i. Line skipped." % (vcf_fields[0], vcf_fields[1], vcf_ln)
							total_warnings += 1


					else:
						total_unchanged += 1

					break
		
				if match_found:
					VCF_OUT.write(vcf + "\n")
	

			#Report ill formated lines
			else:
				print "ERROR: Line %i of the VCF is not formated correctly. Line skipped." % (vcf_ln)
				total_errors += 1



	print "VCF file update completed.\n\n%i SNPs did match the reference previously.\n%i SNPs updated." % (total_unchanged, total_changed)
	if flip:
		print "%i SNPs were strand-flipped\n%i SNPs were ambiguous (A/T, C/G), could not be strand-flipped and were skipped." % (total_strand_flipped, total_ambiguous)

	print "%i SNPs did not match the reference or were lacking in the reference and could not be updated.\n%i SNPs skipped due to incorrect formatting\n%i FASTA lines not parsed." % (total_warnings, total_errors, total_fasta_warnings)
	print "%i Fasta lines skipped." % (total_fasta_skipped)
	print "%i Fasta lines contain no refererence (N)." % (total_fasta_N)

	VCF.close()
	FASTA.close()
	VCF_OUT.close()
	
	return output_VCF_filename

def getChrNum(ch):
	"""
	Returns a number for a given chrom incl. X,Y,XY and MT.
	Arguments:
	[0]String: Chrom ID
	Returns:
	[0]int: Chrom Number
	"""

	if re.search("^\d+$", ch): return ch

	if ch.lower() == "x": return 23
	if ch.lower() == "y": return 24
	if ch.lower() == "xy": return 25
	if ch.lower() == "mt": return 26
	return 27

[edit] Unit Tests

def uni1():
	return True

[edit] Development Code

def Align_VCF_to_reference():
	pass

[edit] Permissions

[edit] Documentation Permissions

Kantale

[edit] Code Permissions

Kantale

[edit] Unit Tests Permissions

Kantale

[edit] Permissions Permissions

Kantale

Personal tools
Namespaces

Variants
Actions
Navigation
Toolbox