Skip to content

KOTO

KOTO

KOTO is an international collaboration of an experiment based at the JPARC HEP facility in Japan.

Access Point

KOTO submits out of the Connect node: login.collab.ci-connect.net. To request an account, visit the portal at https://ci-connect.net and ask to join the PATh Collaborations group. KOTO is a subgroup under PATh collaborations and a separate request needs to be made to join by clicking the request memberhip button.

Storage access

Besides their home directories on login.collab.ci-connect.net, users have access to local storage in /scratch/ and distributed storage in /collab/user/. In addition, the collaboration has access to /collab/project/KOTO for sharing data products between the users.

The distributed storage is http accessible: http://stash.osgstorage.org/collab/KOTO and data can be downloaded using a browser or command line utilities like wget or curl.

Jobs to the OSPool

Inlined below is a submit script to run a KOTO job on the OSPool

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
#!/bin/bash
Universe = Vanilla
Executable     = run_koto.sh
Requirements = HAS_SINGULARITY == TRUE
+SingularityImage = "/cvmfs/singularity.opensciencegrid.org/ppaschos/koto-dev:latest"
transfer_input_files = e14_201605.mac
Error   = output.err.$(Cluster)-$(Process)
Output  = output.out.$(Cluster)-$(Process)
Log     = output.log.$(Cluster)
should_transfer_files = YES
WhenToTransferOutput = ON_EXIT
request_cpus = 1
request_memory = 2GB
request_disk = 2GB
+ProjectName="collab.KOTO"
Queue 1

KOTO uses a software stack that is included in the Singularity image - shown in the script above. The image is deployed on the remote execution nodes and jobs run within the Singularity container. The input file, e14_201605.mac, is sent to the execution node as part of the job. The execution script, run_koto.sh, sets the environment and includes all invocations to executables. The following script, run_koto.sh, runs a benchmark example from the E14 library.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
#!/bin/bash
echo $HOSTNAME
source /opt/koto/root/v6.22.02/bin/thisroot.sh
source /opt/koto/geant4/10.05.p01/bin/geant4.sh
echo "PATH --> " $PATH
echo "LD_LIBRARY_PATH --> " $LD_LIBRARY_PATH
echo "ROOTSYS --> " $ROOTSYS
echo "G4 data environent"
env | grep G4
export E14_TOP_DIR="/opt/koto/e14/201605v6.2/e14"
echo "E14_TOP_DIR=" $E14_TOP_DIR
source /opt/koto/e14/201605v6.2/setup.sh
output="output.root"
nevent=1000
seed=0
echo "Running Code"
/opt/koto/e14/201605v6.2/e14/examples/gsim4test/bin/gsim4test e14_201605.mac ${output} ${nevent} ${seed} ${seed}

Pegasus Workflows

The Pegasus project encompasses a set of technologies that help workflow-based applications execute in a number of different environments including desktops, campus clusters, grids, and clouds. Pegasus bridges the scientific domain and the execution environment by automatically mapping high-level workflow descriptions onto distributed resources. It automatically locates the necessary input data and computational resources necessary for workflow execution. Pegasus enables scientists to construct workflows in abstract terms without worrying about the details of the underlying execution environment or the particulars of the low-level specifications required by the middleware. Some of the advantages of using Pegasus include:

  • Portability / Reuse - User created workflows can easily be run in different environments without alteration. Pegasus currently runs workflows on compute systems scheduled via HTCondor, including the OSPool, and other other systems or via other schedulers (e.g. ACCESS resources, Amazon EC2, Google Cloud, and many campus clusters). The same workflow can run on a single system or across a heterogeneous set of resources.

  • Performance - The Pegasus mapper can reorder, group, and prioritize tasks in order to increase the overall workflow performance.

  • Scalability - Pegasus can easily scale both the size of the workflow, and the resources that the workflow is distributed over. Pegasus runs workflows ranging from just a few computational tasks up to 1 million tasks. The number of resources involved in executing a workflow can scale as needed without any impediments to performance.

  • Provenance - By default, all jobs in Pegasus are launched via the kickstart process that captures runtime provenance of the job and helps in debugging. The provenance data is collected in a database, and the data can be summarized with tools such as pegasus-statistics or directly with SQL queries.

  • Data Management - Pegasus handles replica selection, data transfers and output registrations in data catalogs. These tasks are added to a workflow as auxiliary jobs by the Pegasus planner.

  • Reliability - Jobs and data transfers are automatically retried in case of failures. Debugging tools such as pegasus-analyzer help the user to debug the workflow in case of non-recoverable failures.

  • Error Recovery - When errors occur, Pegasus tries to recover when possible by retrying tasks, retrying the entire workflow, providing workflow-level checkpointing, re-mapping portions of the workflow, trying alternative data sources for staging data, and, when all else fails, providing a rescue workflow containing a description of only the work that remains to be done. Pegasus keeps track of what has been done (provenance) including the locations of data used and produced, and which software was used with which parameters.

OSG has no read/write enabled shared file system across the resources. Jobs are required to either bring inputs along with the job, or as part of the job stage the inputs from a remote location. The following examples highlight how Pegasus can be used to manage KOTO workloads in such an environment by providing an abstraction layer around things like data movements and job retries, enabling the users to run larger workloads, spending less time developing job management tools and babysitting their computations.

Pegasus workflows have 4 components:

  1. Site Catalog - Describes the execution environment in which the workflow will be executed.

  2. Transformation Catalog - Specifies locations of the executables used by the workflow.

  3. Replica Catalog - Specifies locations of the input data to the workflow.

  4. Workflow Description - An abstract workflow description containing compute steps and dependencies between the steps. We refer to this workflow as abstract because it does not contain data locations and available software.

When developing a Pegasus Workflow using the Python API, all four components may be defined in the same file.

For details, please refer to the Pegasus documentation.

KOTO workflow example

The GitHub repo https://github.com/pegasus-isi/koto-pegasus contains a sample Pegasus workflow for KOTO processing. It is a very simple Monte Carlo example, containing just N independent jobs:

fig 1

This example is using OSG StashCache for data transfers. Credentials are transparant to the end users.

Additionally, this example uses a custom container to run jobs. This ensures a consistent and complete environment for the jobs. Which container to use is defined in workflow.py with the +SingularityImage attribute.

When invoked, the workflow script (workflow.py) does the following:

  1. Writes the file pegasus.properties. This file contains configuration settings used by Pegasus and HTCondor.

     # --- Properties ---------------------------------------------------------------
     props = Properties()
     props["pegasus.data.configuration"] = "nonsharedfs"
    
     # Provide a full kickstart record, including the environment, even for successful jobs
     props["pegasus.gridstart.arguments"] = "-f"
    
     # Limit the number of idle jobs for large workflows
     props["dagman.maxidle"] = "1000"
    
     # use timestamps for all our dirs - this make sure we have unique names in stash
     props["pegasus.dir.useTimestamp"] = "true"
    
     # seperate output dir for each workflow run
     props["pegasus.dir.storage.deep"] = "true"
    
     # Help Pegasus developers by sharing performance data (optional)
     props["pegasus.monitord.encoding"] = "json"
     props["pegasus.catalog.workflow.amqp.url"] = "amqp://friend:[email protected]:5672/prod/workflows"
     props["pegasus.metrics.app"] = "KOTO"
    
  2. Writes the file sites.yml. This file describes the execution environment in which the workflow will be run.

     # --- Sites --------------------------------------------------------------------
     sc = SiteCatalog()
    
     # local site (submit machine)
     local_site = Site(name="local")
    
     local_shared_scratch = Directory(directory_type=Directory.SHARED_SCRATCH, path=WORK_DIR / "scratch")
     local_shared_scratch.add_file_servers(FileServer(url="file://" + str(WORK_DIR / "scratch"), operation_type=Operation.ALL))
     local_site.add_directories(local_shared_scratch)
    
     local_storage = Directory(directory_type=Directory.LOCAL_STORAGE, path=WORK_DIR / "outputs")
     local_storage.add_file_servers(FileServer(url="file://" + str(WORK_DIR / "outputs"), operation_type=Operation.ALL))
     local_site.add_directories(local_storage)
    
     local_site.add_env(PATH=os.environ["PATH"])
     sc.add_sites(local_site)
    
     # stash site (staging site, where intermediate data will be stored)
     stash_site = Site(name="stash", arch=Arch.X86_64, os_type=OS.LINUX)
     stash_staging_path = "/collab/user/{USER}/staging".format(USER=getpass.getuser())
     stash_shared_scratch = Directory(directory_type=Directory.SHARED_SCRATCH, path=stash_staging_path)
     stash_shared_scratch.add_file_servers(
         FileServer(
             url="stash:///osgconnect{STASH_STAGING_PATH}".format(STASH_STAGING_PATH=stash_staging_path), 
             operation_type=Operation.ALL)
     )
     stash_site.add_directories(stash_shared_scratch)
     sc.add_sites(stash_site)
    
     # condorpool (execution site)
     condorpool_site = Site(name="condorpool", arch=Arch.X86_64, os_type=OS.LINUX)
     condorpool_site.add_pegasus_profile(style="condor")
     condorpool_site.add_condor_profile(
         universe="vanilla",
         requirements="HAS_SINGULARITY == True && HAS_AVX2 == True",
         request_cpus=1,
         request_memory="4 GB",
         request_disk="10 GB",
     )
     condorpool_site.add_profiles(
         Namespace.CONDOR, 
         key="+SingularityImage", 
         value='"/cvmfs/singularity.opensciencegrid.org/chiehlin0212/koto-dev:latest"'
     )
     condorpool_site.add_profiles(
         Namespace.CONDOR, 
         key="+ProjectName", 
         value='"collab.KOTO"'
     )
    
     sc.add_sites(condorpool_site)
    
     # write SiteCatalog to ./sites.yml
     sc.write()
    
  3. Writes the file transformations.yml. This file specifies the executables used in the workflow and contains the locations where they are physically located. In this example, we only have run_koto.sh.

     # --- Transformations ----------------------------------------------------------
     run_koto = Transformation(
                    name="run_koto",
                    site="local",
                    pfn=TOP_DIR / "bin/run_koto.sh",
                    is_stageable=True
                ).add_pegasus_profile(clusters_size=1)
    
     tc = TransformationCatalog()
     tc.add_transformations(run_koto)
    
     # write TransformationCatalog to ./transformations.yml
     tc.write()
    
  4. Writes the file replicas.yml. This file specifies the physical locations of any input files used by the workflow. In this example, there is an entry for each file in the inputs/ directory, and all files matching /collab/project/KOTO/external/run87/accidental/64kW/**/*.root

     # --- Replicas -----------------------------------------------------------------
    
     rc = ReplicaCatalog()
    
     input_files = []
    
     # all files in the inputs/ dir
     input_files = glob.glob(str(TOP_DIR / "inputs/*"))
    
     for f in input_files:
         lfn = os.path.basename(f)    
         rc.add_replica(site="local", lfn=lfn, pfn=f)
    
     # also add all wave files
     wave_files = glob.glob("/collab/project/KOTO/external/run87/accidental/64kW/**/*.root", recursive=True)
    
     for f in wave_files:
         lfn = os.path.basename(f)    
         rc.add_replica(site="local", lfn=lfn, pfn=f)
    
     # write ReplicaCatalog to replicas.yml
     rc.write()
    
  5. Builds the wordfreq workflow and submits it for execution. When wf.plan is invoked, pegasus.properties, sites.yml, transformations.yml, and replicas.yml will be consumed as part of the workflow planning process. Note that in this step there is no mention of data movement and job details as these are added by Pegasus when the workflow is planned into an executable workflow. As part of the planning process, additional jobs which handle scratch directory creation, data staging, and data cleanup are added to the workflow.

     # --- Workflow -----------------------------------------------------------------
     wf = Workflow(name="koto")
    
     random.seed()
     for n in range(10):
    
         # arguments
         decay_mode = "KL3pi0"
         seed = str(random.randint(0, 10000))
         wave = os.path.basename(random.choice(wave_files))
    
         # TODO: check if we already have a job with the seed/wave combination
         print(" ... added mc job with wave={} / seed={}".format(wave, seed))
    
         out_root = File("{}_BasicVetoAndCuts_Accidental_{}.root".format(decay_mode, seed))
         job = Job(run_koto)\
                  .add_args(decay_mode, seed, wave)\
                  .add_outputs(out_root)
    
         # inputs, only use the lfn
         for inp in input_files + [wave]:
             job.add_inputs(os.path.basename(inp))
    
         wf.add_jobs(job)
    
     # plan and run the workflow
     wf.plan(
         dir=WORK_DIR / "runs",
         sites=["condorpool"],
         staging_sites={"condorpool": "stash"},
         output_sites=["local"],
         cluster=["horizontal"],
         submit=True
     )
    

Submit the workflow by executing workflow.py.

$ ./workflow.py

Note that when Pegasus plans/submits a workflow, a workflow directory is created and presented in the output. In the following example output, the workflow directory is /home/user/workflows/runs/pegasus/run0014.

2020.12.18 14:33:07.059 CST:   -----------------------------------------------------------------------
2020.12.18 14:33:07.064 CST:   File for submitting this DAG to HTCondor           : koto-workflow-0.dag.condor.sub
2020.12.18 14:33:07.070 CST:   Log of DAGMan debugging messages                 : koto-workflow-0.dag.dagman.out
2020.12.18 14:33:07.075 CST:   Log of HTCondor library output                     : koto-workflow-0.dag.lib.out
2020.12.18 14:33:07.080 CST:   Log of HTCondor library error messages             : koto-workflow-0.dag.lib.err
2020.12.18 14:33:07.086 CST:   Log of the life of condor_dagman itself          : koto-workflow-0.dag.dagman.log
2020.12.18 14:33:07.091 CST:
2020.12.18 14:33:07.096 CST:   -no_submit given, not submitting DAG to HTCondor.  You can do this with:
2020.12.18 14:33:07.107 CST:   -----------------------------------------------------------------------
2020.12.18 14:33:11.352 CST:   Your database is compatible with Pegasus version: 5.1.0dev
[WARNING]  Submitting to condor koto-workflow-0.dag.condor.sub
2020.12.18 14:33:12.060 CST:   Time taken to execute is 5.818 seconds

Your workflow has been started and is running in the base directory:

/home/user/workflows/runs/koto-workflow/run0014

*** To monitor the workflow you can run ***

pegasus-status -l /home/user/workflows/runs/koto-workflow/run0014


*** To remove your workflow run ***

pegasus-remove /home/user/workflows/runs/koto-workflow/run0014

This directory is the handle to the workflow instance and is used by Pegasus command line tools. Some useful tools to know about:

  • pegasus-status -v [wfdir] Provides status on a currently running workflow. (more)
  • pegasus-analyzer [wfdir] Provides debugging clues why a workflow failed. Run this after a workflow has failed. (more)
  • pegasus-statistics [wfdir] Provides statistics, such as walltimes, on a workflow after it has completed. (more)
  • pegasus-remove [wfdir] Removes a workflow from the system. (more)