Run pipeline user Joman

From PyPedia
Jump to: navigation, search

Contents

[edit] Documentation

Runs a pipeline

A pipeline is declared as a list of computation steps. Example:

import pypedia
from pypedia import Run_pipeline_user_Joman

def Make_test_pipeline():

	pipeline = []

	pipeline += [{
			"name" : "step 1",
			"command" : "Execute_command",
			"params" : {
				"command" : "sleep 5",
				},
			"prereq" : [],
			"mem" : "1GB",
			"walltime" : "1:00:00",
		}]

	pipeline += [{
			"name" : "step 2",
			"command" : "Execute_command",
			"params" : {
				"command" : "sleep 5",
				},
			"prereq" : ["step 1"],
			"mem" : "1GB",
			"walltime" : "1:00:00",
		}]

	return pipeline

Run_pipeline_user_Joman(
	pipeline = Make_test_pipeline()
)


[edit] Parameters

<inputs>
</inputs>


[edit] Return

[edit] See also

Execute_article_in_PBS_user_Kantale

[edit] Code

import json
import time
import sys 

def Run_pipeline_user_Joman(
	pipeline = None,
	temp_dir = None,
	done = None,
	verbose = True,
	dummy = False,
	pbsStats = None,
	delay = None,
	max_steps = None, # If set, run only the first max_steps of the pipeline
	push_the_button = None,
	output_filename = None,
):
	
	command = "mkdir -p " + temp_dir
	Execute_command_user_Kantale(command = command)

	length = max_steps if max_steps else len(pipeline)
	done_steps = []

	#Print pipeline
	if verbose:
		print "Running pipeline:"
		try:
			print json.dumps(pipeline, indent=4)
		except TypeError:
			print "Could not print pipeline. Contains lambda function"

	pbs_done_ids = {}

	#Marking done jobs
	if done:
		done_steps = list(done)
		for x in done:
			pbs_done_ids[x] = None

	while len(done_steps) < length:
		for step in pipeline:
			print "Done steps:", len(done_steps), "   Max:", length, "\n"
			if step["name"] not in done_steps:
				#This is a step that has not been done
				#Can we execute it?

				prerequisites = [prerequisite for prerequisite in step["prereq"] if prerequisite not in done_steps]
				if not prerequisites:
					#We can execute this step
					print "-" * 150 + "\n" + "-" * 150 + "\n" + time.ctime()
					if pbsStats:
						pbsStats["afterok"] = [pbs_done_ids[x] for x in step["prereq"]]
						job_id =	Execute_pipeline_step_in_PBS(
									step = step,
									pbsStats = pbsStats,
									temp_dir = temp_dir,
									dummy = dummy,
									delay = delay,
									verbose = verbose,
								)[1]
						pbs_done_ids[step["name"]] = job_id
					else:
						Execute_pipeline_step(step, verbose, dummy)
					print "-" * 150 + "\n" + "-" * 150 

					done_steps += [step["name"]]
					if verbose:
						print "Done with step : " + step["name"]

					if len(done_steps) > length: 
						break

	#Writing log file
	if output_filename:
		output_file = open(output_filename, "w")
		for k, v in pbs_done_ids.iteritems():
			output_file.write(str(k).replace(" ", "_") + "\t" + str(v) + "\n")
		output_file.close()

	if pbsStats and push_the_button:
		print "Enacting the pipeline by deleting the file: ", push_the_button
		Delete_filename_user_Kantale(push_the_button)


def Execute_pipeline_step(
	step = None,
	verbose = True,
	dummy = False,
	delay = None,
):
	if verbose:
		print "Running pipeline step:", step["name"]
		print json.dumps(step, indent=4)

	if not dummy:
		method = Import_PyPedia_article_user_Kantale(step["command"])
		method(**step["params"])

def Execute_pipeline_step_in_PBS(
	step = None,
	pbsStats = None,
	temp_dir = None,
	verbose = True,
	dummy = False,
	delay = None,
):

	if verbose:
		print "Running pipeline step:", step["name"]
		try:
			print json.dumps(step, indent=4)
		except TypeError:
			print "Could not print execution step.. Contains a lambda function"

	pbs_name = step["name"].replace(" ", "_").replace("/", "_")
	
	pbsStats["mem"] = step["mem"]
	pbsStats["walltime"] = step["walltime"]
	pbsStats["output"] = temp_dir + "/" + pbs_name + ".output"
	pbsStats["error"] = temp_dir + "/" + pbs_name + ".error"

	if not dummy:
		#Bring the method locally
		Import_PyPedia_article_user_Kantale(step["command"])

		#Sometimes PBS requires a time delay before you can submit a job
		if delay:
			time.sleep(delay)

		return Execute_article_in_PBS_user_Kantale(
			article_name = step["command"],
			parameters = step["params"],
			pbsStats = pbsStats,
			python_filename = temp_dir + "/" + pbs_name + ".py",
			sub_filename = temp_dir + "/" + pbs_name + ".sh",
			import_article = False,
		)
	else:
		return ("-1",-1)

[edit] Unit Tests

def uni1():
	return True

[edit] Development Code

def Run_pipeline_user_Joman():
	pass

[edit] Permissions

[edit] Documentation Permissions

Joman

[edit] Code Permissions

Joman

[edit] Unit Tests Permissions

Joman

[edit] Permissions Permissions

Joman

Personal tools
Namespaces

Variants
Actions
Navigation
Toolbox