Add read-only job queue API

master
Garrett Mills 3 years ago
parent 5ef4d91a7f
commit d9a099754b
Signed by: garrettmills
GPG Key ID: D2BF5FBA8298F246

@ -29,6 +29,12 @@ const jobs_config = {
// worker processes of the same type.
// (e.g. you can have two "main" workers)
},
connector: {
enabled: env('JOB_QUEUE_CONNECTOR', false),
mount: env('JOB_QUEUE_CONNECTOR_MOUNT'),
secret: env('JOB_QUEUE_CONNECTOR_SECRET'),
},
}
module.exports = exports = jobs_config

@ -4,6 +4,7 @@ const Job = require('./Job')
const WorkerDirective = require('./directive/WorkerDirective')
const ncp = require('ncp')
const path = require('path')
const express = require('express')
/**
* Canonically registers job definitions and sets
@ -23,7 +24,7 @@ class JobsUnit extends CanonicalUnit {
* @returns {string[]}
*/
static get services() {
return [...super.services, 'configs', 'redis', 'output', 'utility']
return [...super.services, 'configs', 'redis', 'output', 'utility', 'app']
}
/**
@ -91,7 +92,7 @@ class JobsUnit extends CanonicalUnit {
const config = this.config()
const redis_config = this.redis.config()
if ( config && Array.isArray(config.queues) ) {
if ( Array.isArray(config?.queues) ) {
this.output.info('Connecting job queues...')
for ( const queue_name of config.queues ) {
this.output.debug(`Connecting to job queue ${queue_name}...`)
@ -103,6 +104,14 @@ class JobsUnit extends CanonicalUnit {
})
}
}
if ( config?.connector?.enabled ) {
const api = this.get_connector_api()
if ( config?.connector?.mount ) {
this.app.express.use(config.connector.mount, api)
}
}
}
/**
@ -163,6 +172,141 @@ class JobsUnit extends CanonicalUnit {
})
})
}
get_connector_api() {
const router = express.Router()
const get_config = () => {
const conf = this.configs.get('jobs.connector')
return {
enabled: false,
...(conf || {})
}
}
const send_api = (res, status, message, data = {}) => {
this.output.info(`Connector API Response (HTTP ${status}) - ${message}`)
res.status(status)
res.setHeader('Content-Type', 'application/json')
return res.send({
status,
message,
data,
})
}
router.use((req, res, next) => {
if ( !req.query || !req.query.accessSecret || !get_config().secret ) {
return send_api(res, 401, 'Missing accessSecret query')
}
if ( req.query.accessSecret !== get_config().secret ) {
return send_api(res, 401, 'Invalid accessSecret')
}
return next()
})
const get_queue = (req, res, next) => {
if ( 'queue' in req.params ) {
const queue = this.queue(req.params.queue)
if ( !queue ) {
return send_api(res, 400, 'Invalid queue name')
}
req.queue = queue
return queue
}
}
router.get('/queues', (req, res, next) => {
return send_api(res, 200, 'OK', (this.config()?.queues || []).map(queue_name => {
return { queue_name }
}))
})
router.get('/queues/:queue/count/backlog', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
return send_api(res, 200, 'OK', await req.queue.count())
})
router.get('/queues/:queue/count/completed', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
return send_api(res, 200, 'OK', await req.queue.getCompletedCount())
})
router.get('/queues/:queue/count/failed', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
return send_api(res, 200, 'OK', await req.queue.getFailedCount())
})
router.get('/queues/:queue/count/delayed', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
return send_api(res, 200, 'OK', await req.queue.getDelayedCount())
})
router.get('/queues/:queue/count/active', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
return send_api(res, 200, 'OK', await req.queue.getActiveCount())
})
router.get('/queues/:queue/count/waiting', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
return send_api(res, 200, 'OK', await req.queue.getWaitingCount())
})
const job_to_json = (job) => {
return job.asJSON()
}
router.get('/queues/:queue/jobs', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
const { start = 0, end = -1 } = req.query
const jobs = await req.queue.getJobs(['completed', 'failed', 'delayed', 'repeat', 'active', 'wait', 'paused'], start, end)
jobs.sort((a, b) => Number(b.id) - Number(a.id))
return send_api(res, 200, 'OK', jobs.map(x => job_to_json(x)))
})
router.get('/queues/:queue/jobs/:jobType', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
const { start = 0, end = -1 } = req.query
const jobs = await req.queue.getJobs([req.params.jobType], start, end)
jobs.sort((a, b) => Number(b.id) - Number(a.id))
return send_api(res, 200, 'OK', jobs.map(x => job_to_json(x)))
})
router.get('/queues/:queue/workers', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
return send_api(res, 200, 'OK', await req.queue.getWorkers())
})
router.get('/queues/:queue/job/:jobId', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
const job = await req.queue.getJob(req.params.jobId)
if ( !job ) {
return send_api(res, 404, 'Job not found with that ID.')
}
return send_api(res, 200, 'OK', job_to_json(job))
})
router.get('/queues/:queue/job/:jobId/logs', async (req, res, next) => {
if ( !get_queue(req, res, next) ) return;
const { start = 0, end = -1 } = req.query
return send_api(res, 200, 'OK', await req.queue.getJobLogs(req.params.jobId, start, end))
})
router.use((req, res, next) => {
send_api(res, 404, 'Not Found')
})
return router
}
}
module.exports = exports = JobsUnit

Loading…
Cancel
Save