Initial commit

master
Garrett Mills 3 years ago
parent 8ef1bdb3d4
commit 07d51607c1
Signed by: garrettmills
GPG Key ID: D2BF5FBA8298F246

@ -0,0 +1,40 @@
#!/usr/bin/env python3
"""
Log jobs to be processed by the job processor.
"""
import os, json, pathlib, sys, time
from settings import MINIQ_ROOT
# Sub-dir for queued jobs with params
QUEUED_JOBS = os.path.join(MINIQ_ROOT, 'queue')
# Sub-dir for job definition files
JOB_DEFS = os.path.join(MINIQ_ROOT, 'jobs')
# Make sure we specified the job type
if len(sys.argv) < 2:
print('Missing name of job to enqueue.')
sys.exit(1)
# Get the job type and any arguments from the CLI
_, job_name, *args = sys.argv
data = {
'job': job_name,
'arguments': args,
}
# Make sure the job type is valid
if not os.path.exists(os.path.join(JOB_DEFS, data['job'] + '.job')):
print('Invalid job type.')
sys.exit(1)
# This is the name of the file that will be written to the queued jobs
file_name = str(time.time_ns()) + '.' + str(job_name) + '.json'
# Write the job data to the queue file
with open(os.path.join(QUEUED_JOBS, file_name), 'w') as out_file:
out_file.write(json.dumps(data))
print('Queued ' + job_name + ' as ' + file_name)

@ -0,0 +1,50 @@
#!/usr/bin/env python3
"""
Processes jobs queued by the miniq.py tool by searching the
queued jobs directory for job instances that still need processing.
"""
import os, json, pathlib
from settings import MINIQ_ROOT
# Sub-dir for queued jobs with params
QUEUED_JOBS = os.path.join(MINIQ_ROOT, 'queue')
# Sub-dir for job definition files
JOB_DEFS = os.path.join(MINIQ_ROOT, 'jobs')
# Guarantee that the sub-dirs all exist
pathlib.Path(QUEUED_JOBS).mkdir(parents=True, exist_ok=True)
pathlib.Path(JOB_DEFS).mkdir(parents=True, exist_ok=True)
def is_pending_job(file):
"""
Returns true if the given file name is a pending job.
"""
return os.path.isfile(os.path.join(QUEUED_JOBS, file)) and not file.startswith('processing')
# List of pending job file names
job_files = [f for f in os.listdir(QUEUED_JOBS) if is_pending_job(f)]
# Rename the pending job files with "processing." so they only get run once
for pending in job_files:
os.rename(os.path.join(QUEUED_JOBS, pending), os.path.join(QUEUED_JOBS, 'processing.' + pending))
job_files = ['processing.' + f for f in job_files]
for queued in job_files:
print('\nProcessing ' + queued + '...')
# Read the arguments and job name from the queue file JSON
with open(os.path.join(QUEUED_JOBS, queued)) as f:
data = json.load(f)
args = ['"' + str(arg) + '"' for arg in data['arguments']]
# Execute the job definition file with the given arguments
command = os.path.join(JOB_DEFS, data['job'] + '.job') + ' ' + ' '.join(args)
os.system(command)
# Remove the job queue file
os.remove(os.path.join(QUEUED_JOBS, queued))

@ -0,0 +1,7 @@
import os
# The root of the miniq queue files and job definitions
MINIQ_ROOT = '/nfs/storage/miniq'
if 'MINIQ_ROOT' in os.environ:
MINIQ_ROOT = os.environ['MINIQ_ROOT']
Loading…
Cancel
Save