diff --git a/src/miniq.py b/src/miniq.py new file mode 100755 index 0000000..8edfd2c --- /dev/null +++ b/src/miniq.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +""" +Log jobs to be processed by the job processor. +""" + +import os, json, pathlib, sys, time +from settings import MINIQ_ROOT + +# Sub-dir for queued jobs with params +QUEUED_JOBS = os.path.join(MINIQ_ROOT, 'queue') + +# Sub-dir for job definition files +JOB_DEFS = os.path.join(MINIQ_ROOT, 'jobs') + +# Make sure we specified the job type +if len(sys.argv) < 2: + print('Missing name of job to enqueue.') + sys.exit(1) + +# Get the job type and any arguments from the CLI +_, job_name, *args = sys.argv + +data = { + 'job': job_name, + 'arguments': args, +} + +# Make sure the job type is valid +if not os.path.exists(os.path.join(JOB_DEFS, data['job'] + '.job')): + print('Invalid job type.') + sys.exit(1) + +# This is the name of the file that will be written to the queued jobs +file_name = str(time.time_ns()) + '.' + str(job_name) + '.json' + +# Write the job data to the queue file +with open(os.path.join(QUEUED_JOBS, file_name), 'w') as out_file: + out_file.write(json.dumps(data)) + +print('Queued ' + job_name + ' as ' + file_name) diff --git a/src/process_jobs.py b/src/process_jobs.py new file mode 100755 index 0000000..bdb5aef --- /dev/null +++ b/src/process_jobs.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +""" +Processes jobs queued by the miniq.py tool by searching the +queued jobs directory for job instances that still need processing. +""" + +import os, json, pathlib +from settings import MINIQ_ROOT + +# Sub-dir for queued jobs with params +QUEUED_JOBS = os.path.join(MINIQ_ROOT, 'queue') + +# Sub-dir for job definition files +JOB_DEFS = os.path.join(MINIQ_ROOT, 'jobs') + +# Guarantee that the sub-dirs all exist +pathlib.Path(QUEUED_JOBS).mkdir(parents=True, exist_ok=True) +pathlib.Path(JOB_DEFS).mkdir(parents=True, exist_ok=True) + + +def is_pending_job(file): + """ + Returns true if the given file name is a pending job. + """ + return os.path.isfile(os.path.join(QUEUED_JOBS, file)) and not file.startswith('processing') + + +# List of pending job file names +job_files = [f for f in os.listdir(QUEUED_JOBS) if is_pending_job(f)] + +# Rename the pending job files with "processing." so they only get run once +for pending in job_files: + os.rename(os.path.join(QUEUED_JOBS, pending), os.path.join(QUEUED_JOBS, 'processing.' + pending)) + +job_files = ['processing.' + f for f in job_files] + +for queued in job_files: + print('\nProcessing ' + queued + '...') + + # Read the arguments and job name from the queue file JSON + with open(os.path.join(QUEUED_JOBS, queued)) as f: + data = json.load(f) + args = ['"' + str(arg) + '"' for arg in data['arguments']] + + # Execute the job definition file with the given arguments + command = os.path.join(JOB_DEFS, data['job'] + '.job') + ' ' + ' '.join(args) + os.system(command) + + # Remove the job queue file + os.remove(os.path.join(QUEUED_JOBS, queued)) diff --git a/src/settings.py b/src/settings.py new file mode 100644 index 0000000..c0e1b64 --- /dev/null +++ b/src/settings.py @@ -0,0 +1,7 @@ +import os + +# The root of the miniq queue files and job definitions +MINIQ_ROOT = '/nfs/storage/miniq' + +if 'MINIQ_ROOT' in os.environ: + MINIQ_ROOT = os.environ['MINIQ_ROOT']