Run pipeline user Joman
From PyPedia
Contents |
[edit] Documentation
Runs a pipeline
A pipeline is declared as a list of computation steps. Example:
import pypedia from pypedia import Run_pipeline_user_Joman def Make_test_pipeline(): pipeline = [] pipeline += [{ "name" : "step 1", "command" : "Execute_command", "params" : { "command" : "sleep 5", }, "prereq" : [], "mem" : "1GB", "walltime" : "1:00:00", }] pipeline += [{ "name" : "step 2", "command" : "Execute_command", "params" : { "command" : "sleep 5", }, "prereq" : ["step 1"], "mem" : "1GB", "walltime" : "1:00:00", }] return pipeline Run_pipeline_user_Joman( pipeline = Make_test_pipeline() )
[edit] Parameters
<inputs> </inputs>
[edit] Return
[edit] See also
Execute_article_in_PBS_user_Kantale
[edit] Code
import json import time import sys def Run_pipeline_user_Joman( pipeline = None, temp_dir = None, done = None, verbose = True, dummy = False, pbsStats = None, delay = None, max_steps = None, # If set, run only the first max_steps of the pipeline push_the_button = None, output_filename = None, ): command = "mkdir -p " + temp_dir Execute_command_user_Kantale(command = command) length = max_steps if max_steps else len(pipeline) done_steps = [] #Print pipeline if verbose: print "Running pipeline:" try: print json.dumps(pipeline, indent=4) except TypeError: print "Could not print pipeline. Contains lambda function" pbs_done_ids = {} #Marking done jobs if done: done_steps = list(done) for x in done: pbs_done_ids[x] = None while len(done_steps) < length: for step in pipeline: print "Done steps:", len(done_steps), " Max:", length, "\n" if step["name"] not in done_steps: #This is a step that has not been done #Can we execute it? prerequisites = [prerequisite for prerequisite in step["prereq"] if prerequisite not in done_steps] if not prerequisites: #We can execute this step print "-" * 150 + "\n" + "-" * 150 + "\n" + time.ctime() if pbsStats: pbsStats["afterok"] = [pbs_done_ids[x] for x in step["prereq"]] job_id = Execute_pipeline_step_in_PBS( step = step, pbsStats = pbsStats, temp_dir = temp_dir, dummy = dummy, delay = delay, verbose = verbose, )[1] pbs_done_ids[step["name"]] = job_id else: Execute_pipeline_step(step, verbose, dummy) print "-" * 150 + "\n" + "-" * 150 done_steps += [step["name"]] if verbose: print "Done with step : " + step["name"] if len(done_steps) > length: break #Writing log file if output_filename: output_file = open(output_filename, "w") for k, v in pbs_done_ids.iteritems(): output_file.write(str(k).replace(" ", "_") + "\t" + str(v) + "\n") output_file.close() if pbsStats and push_the_button: print "Enacting the pipeline by deleting the file: ", push_the_button Delete_filename_user_Kantale(push_the_button) def Execute_pipeline_step( step = None, verbose = True, dummy = False, delay = None, ): if verbose: print "Running pipeline step:", step["name"] print json.dumps(step, indent=4) if not dummy: method = Import_PyPedia_article_user_Kantale(step["command"]) method(**step["params"]) def Execute_pipeline_step_in_PBS( step = None, pbsStats = None, temp_dir = None, verbose = True, dummy = False, delay = None, ): if verbose: print "Running pipeline step:", step["name"] try: print json.dumps(step, indent=4) except TypeError: print "Could not print execution step.. Contains a lambda function" pbs_name = step["name"].replace(" ", "_").replace("/", "_") pbsStats["mem"] = step["mem"] pbsStats["walltime"] = step["walltime"] pbsStats["output"] = temp_dir + "/" + pbs_name + ".output" pbsStats["error"] = temp_dir + "/" + pbs_name + ".error" if not dummy: #Bring the method locally Import_PyPedia_article_user_Kantale(step["command"]) #Sometimes PBS requires a time delay before you can submit a job if delay: time.sleep(delay) return Execute_article_in_PBS_user_Kantale( article_name = step["command"], parameters = step["params"], pbsStats = pbsStats, python_filename = temp_dir + "/" + pbs_name + ".py", sub_filename = temp_dir + "/" + pbs_name + ".sh", import_article = False, ) else: return ("-1",-1)
[edit] Unit Tests
def uni1(): return True
[edit] Development Code
def Run_pipeline_user_Joman(): pass
[edit] Permissions
[edit] Documentation Permissions
Joman
[edit] Code Permissions
Joman
[edit] Unit Tests Permissions
Joman