mirror of
https://github.com/bluxmit/alnoda-workspaces.git
synced 2024-09-19 19:27:10 +12:00
1963 lines
64 KiB
JavaScript
Executable file
1963 lines
64 KiB
JavaScript
Executable file
// Cronicle Server Job Manager
|
|
// Copyright (c) 2015 Joseph Huckaby
|
|
// Released under the MIT License
|
|
|
|
var async = require('async');
|
|
var cp = require('child_process');
|
|
var fs = require('fs');
|
|
var os = require('os');
|
|
var path = require('path');
|
|
var sqparse = require('shell-quote').parse;
|
|
var zlib = require('zlib');
|
|
|
|
var Class = require("pixl-class");
|
|
var Tools = require("pixl-tools");
|
|
var JSONStream = require("pixl-json-stream");
|
|
var PixlMail = require('pixl-mail');
|
|
|
|
module.exports = Class.create({
|
|
|
|
launchOrQueueJob: function(event, callback) {
|
|
// launch job, or queue upon failure (if event desires)
|
|
var self = this;
|
|
|
|
// must be master to do this
|
|
if (!this.multi.master) return callback( new Error("Only a master server can launch jobs.") );
|
|
|
|
this.launchJob(event, function(err, jobs) {
|
|
if (err && event.queue) {
|
|
// event supports queuing
|
|
var queue_max = event.queue_max || 0;
|
|
if (!self.eventQueue[event.id]) self.eventQueue[event.id] = 0;
|
|
|
|
if (!queue_max || (self.eventQueue[event.id] < queue_max)) {
|
|
// queue has room for one more
|
|
self.eventQueue[event.id]++;
|
|
self.authSocketEmit( 'update', { eventQueue: self.eventQueue } );
|
|
|
|
// special 0-job response denotes an enqueue occurred
|
|
err = null;
|
|
jobs = [];
|
|
|
|
// add now time if not already set
|
|
if (!event.now) event.now = Tools.timeNow(true);
|
|
|
|
// add job to actual queue in storage, async
|
|
self.storage.listPush( 'global/event_queue/' + event.id, event, function(err) {
|
|
if (err) {
|
|
self.logError('queue', "Failed to push job onto event queue: " + err);
|
|
}
|
|
} );
|
|
}
|
|
else {
|
|
// queue is full, change error message
|
|
err = new Error( "Job could not be queued: Event queue reached max of " + queue_max + " items" );
|
|
}
|
|
}
|
|
callback(err, jobs);
|
|
});
|
|
},
|
|
|
|
launchJob: function(event, callback) {
|
|
// locate suitable server and launch job
|
|
var self = this;
|
|
var orig_event = null;
|
|
var server_group = null;
|
|
var plugin = null;
|
|
var category = null;
|
|
var servers = [];
|
|
|
|
// must be master to do this
|
|
if (!this.multi.master) return callback( new Error("Only a master server can launch jobs.") );
|
|
|
|
async.series([
|
|
function(callback) {
|
|
// event target may refer to server group OR hostname
|
|
var slave = self.slaves[ event.target ] || null;
|
|
if (slave && !slave.disabled) {
|
|
servers.push( slave );
|
|
return callback();
|
|
}
|
|
|
|
self.storage.listFind( 'global/server_groups', { id: event.target }, function(err, item) {
|
|
server_group = item;
|
|
callback(err);
|
|
} );
|
|
},
|
|
function(callback) {
|
|
self.storage.listFind( 'global/plugins', { id: event.plugin }, function(err, item) {
|
|
plugin = item;
|
|
callback(err);
|
|
} );
|
|
},
|
|
function(callback) {
|
|
self.storage.listFind( 'global/categories', { id: event.category }, function(err, item) {
|
|
category = item;
|
|
callback(err);
|
|
} );
|
|
},
|
|
function(callback) {
|
|
self.storage.listFind( 'global/schedule', { id: event.id }, function(err, item) {
|
|
orig_event = item;
|
|
callback(err);
|
|
} );
|
|
}
|
|
],
|
|
function(err) {
|
|
// all resources loaded
|
|
if (err) return callback(err);
|
|
if (!server_group && !servers.length) return callback( new Error("Server or Server Group not found: " + event.target) );
|
|
if (!plugin) return callback( new Error("Plugin not found: " + event.plugin) );
|
|
if (!category) return callback( new Error("Category not found: " + event.category) );
|
|
if (!orig_event) return callback( new Error("Event not found: " + event.id) );
|
|
|
|
var all_jobs = self.getAllActiveJobs(true); // include pending jobs
|
|
var job_list = Tools.hashValuesToArray( all_jobs );
|
|
|
|
// check running jobs vs. max children
|
|
if (orig_event.max_children) {
|
|
var event_jobs = Tools.findObjectsIdx( job_list, { 'event': event.id } );
|
|
if (event_jobs.length >= orig_event.max_children) {
|
|
// too many event children running
|
|
return callback( new Error("Maximum of "+orig_event.max_children+" "+Tools.pluralize("job", orig_event.max_children)+" already running for event: " + event.title) );
|
|
}
|
|
}
|
|
|
|
if (category.max_children) {
|
|
var cat_jobs = Tools.findObjectsIdx( job_list, { 'category': event.category } );
|
|
if (cat_jobs.length >= category.max_children) {
|
|
// too many category children running
|
|
return callback( new Error("Maximum of "+category.max_children+" "+Tools.pluralize("job", category.max_children)+" already running for category: " + category.title) );
|
|
}
|
|
}
|
|
|
|
var max_jobs = self.server.config.get('max_jobs') || 0;
|
|
if (max_jobs && (job_list.length >= max_jobs)) {
|
|
return callback( new Error("Global maximum of " + max_jobs + " " + Tools.pluralize("job", max_jobs) + " already running.") );
|
|
}
|
|
|
|
if (!category.enabled) {
|
|
return callback( new Error("Category '" + category.title + "' is disabled.") );
|
|
}
|
|
if (!plugin.enabled) {
|
|
return callback( new Error("Plugin '" + plugin.title + "' is disabled.") );
|
|
}
|
|
|
|
// automatically pick server if needed
|
|
if (!servers.length && server_group) {
|
|
var candidates = [];
|
|
var regex = new RegExp( server_group.regexp );
|
|
|
|
for (var hostname in self.slaves) {
|
|
var slave = self.slaves[hostname];
|
|
|
|
// only consider slaves that match the group hostname pattern, and are not disabled
|
|
if (hostname.match(regex) && !slave.disabled) {
|
|
candidates.push( self.slaves[hostname] );
|
|
}
|
|
}
|
|
|
|
if (!candidates.length) {
|
|
return callback( new Error("Could not find any servers for group: " + server_group.title) );
|
|
}
|
|
|
|
// sort the candidates by hostname ascending
|
|
candidates = candidates.sort( function(a, b) {
|
|
return a.hostname.localeCompare( b.hostname );
|
|
} );
|
|
|
|
if (event.multiplex) {
|
|
// run on ALL servers in group simultaneously (multiplex)
|
|
servers = candidates;
|
|
}
|
|
else {
|
|
// run on one server in group, chosen by custom algo
|
|
servers.push( self.chooseServer(candidates, event) );
|
|
}
|
|
} // find slave
|
|
|
|
if (!servers.length) {
|
|
// event was targetting server that is no longer with us
|
|
return callback( new Error("Target server is not available: " + event.target) );
|
|
}
|
|
|
|
var jobs = [];
|
|
|
|
// loop through each matched server, launching job on each
|
|
for (var idx = 0, len = servers.length; idx < len; idx++) {
|
|
var slave = servers[idx];
|
|
|
|
// construct job object based on event
|
|
var job = Tools.copyHash( event, true );
|
|
|
|
delete job.id;
|
|
delete job.title;
|
|
delete job.timing;
|
|
delete job.enabled;
|
|
delete job.max_children;
|
|
// delete job.target;
|
|
delete job.username;
|
|
delete job.api_key;
|
|
delete job.session_id;
|
|
delete job.modified;
|
|
delete job.created;
|
|
|
|
job.id = self.getUniqueID('j');
|
|
job.time_start = Tools.timeNow();
|
|
job.hostname = slave.hostname;
|
|
job.event = event.id;
|
|
job.params = event.params || {};
|
|
job.now = event.now || Tools.timeNow(true);
|
|
job.event_title = event.title;
|
|
job.plugin_title = plugin.title;
|
|
job.category_title = category.title;
|
|
job.nice_target = server_group ? server_group.title : event.target;
|
|
|
|
// pull in properties from plugin
|
|
job.command = plugin.command;
|
|
if (plugin.cwd) job.cwd = plugin.cwd;
|
|
if (plugin.uid) job.uid = plugin.uid;
|
|
if (plugin.gid) job.gid = plugin.gid;
|
|
if (plugin.env) job.env = plugin.env;
|
|
|
|
// plugin params may have changed outside of event,
|
|
// so recopy missing / hidden ones
|
|
if (plugin.params) plugin.params.forEach( function(param) {
|
|
if (!(param.id in job.params) || (param.type == 'hidden')) {
|
|
job.params[ param.id ] = param.value;
|
|
}
|
|
} );
|
|
|
|
// pull in defaults from category
|
|
if (!job.notify_success && category.notify_success) job.notify_success = category.notify_success;
|
|
if (!job.notify_fail && category.notify_fail) job.notify_fail = category.notify_fail;
|
|
if (!job.web_hook && category.web_hook) job.web_hook = category.web_hook;
|
|
if (!job.memory_limit && category.memory_limit) {
|
|
job.memory_limit = category.memory_limit;
|
|
job.memory_sustain = category.memory_sustain || 0;
|
|
}
|
|
if (!job.cpu_limit && category.cpu_limit) {
|
|
job.cpu_limit = category.cpu_limit;
|
|
job.cpu_sustain = category.cpu_sustain || 0;
|
|
}
|
|
|
|
// multiplex stagger if desired
|
|
if (event.multiplex && event.stagger && (idx > 0)) {
|
|
// delay job by N seconds, based on stagger and host position in group
|
|
job.when = Tools.timeNow() + (event.stagger * idx);
|
|
job.time_start = job.when;
|
|
}
|
|
|
|
// send remote or run local
|
|
if (slave.master) {
|
|
// run the event here
|
|
self.launchLocalJob( job );
|
|
}
|
|
else if (slave.socket) {
|
|
// send the job to remote slave server
|
|
self.logDebug(6, "Sending remote job to: " + slave.hostname, job);
|
|
slave.socket.emit( 'launch_job', job );
|
|
|
|
// Pre-insert job into slave's active_jobs, so something will show in getAllActiveJobs() right away.
|
|
// Important for when the scheduler is catching up, and may try to launch a bunch of jobs in a row.
|
|
if (!slave.active_jobs) slave.active_jobs = {};
|
|
slave.active_jobs[ job.id ] = job;
|
|
}
|
|
|
|
// fire web hook
|
|
var hook_data = Tools.mergeHashes( job, { action: 'job_start' } );
|
|
|
|
// prepare nice text summary (compatible with Slack Incoming WebHooks)
|
|
hook_data.base_app_url = self.server.config.get('base_app_url');
|
|
hook_data.job_details_url = self.server.config.get('base_app_url') + '/#JobDetails?id=' + job.id;
|
|
hook_data.edit_event_url = self.server.config.get('base_app_url') + '/#Schedule?sub=edit_event&id=' + job.event;
|
|
|
|
var hook_text_templates = self.server.config.get('web_hook_text_templates') || self.defaultWebHookTextTemplates;
|
|
|
|
if (hook_text_templates[hook_data.action]) {
|
|
hook_data.text = Tools.sub( hook_text_templates[hook_data.action], hook_data );
|
|
|
|
// include web_hook_config_keys if configured
|
|
if (self.server.config.get('web_hook_config_keys')) {
|
|
var web_hook_config_keys = self.server.config.get('web_hook_config_keys');
|
|
for (var idy = 0, ley = web_hook_config_keys.length; idy < ley; idy++) {
|
|
var key = web_hook_config_keys[idy];
|
|
hook_data[key] = self.server.config.get(key);
|
|
}
|
|
}
|
|
|
|
// include web_hook_custom_data if configured
|
|
if (self.server.config.get('web_hook_custom_data')) {
|
|
var web_hook_custom_data = self.server.config.get('web_hook_custom_data');
|
|
for (var key in web_hook_custom_data) hook_data[key] = web_hook_custom_data[key];
|
|
}
|
|
|
|
// custom http options for web hook
|
|
var hook_opts = self.server.config.get('web_hook_custom_opts') || {};
|
|
|
|
if (job.web_hook) {
|
|
self.logDebug(9, "Firing web hook for job start: " + job.id + ": " + job.web_hook);
|
|
self.request.json( job.web_hook, hook_data, hook_opts, function(err, resp, data) {
|
|
// log response
|
|
if (err) self.logDebug(9, "Web Hook Error: " + job.web_hook + ": " + err);
|
|
else self.logDebug(9, "Web Hook Response: " + job.web_hook + ": HTTP " + resp.statusCode + " " + resp.statusMessage);
|
|
} );
|
|
}
|
|
if (self.server.config.get('universal_web_hook')) {
|
|
self.logDebug(9, "Firing universal web hook for job start: " + self.server.config.get('universal_web_hook'));
|
|
self.request.json( self.server.config.get('universal_web_hook'), hook_data, hook_opts, function(err, resp, data) {
|
|
// log response
|
|
if (err) self.logDebug(9, "Universal Web Hook Error: " + err);
|
|
else self.logDebug(9, "Universal Web Hook Response: HTTP " + resp.statusCode + " " + resp.statusMessage);
|
|
} );
|
|
}
|
|
} // yes fire hook
|
|
|
|
jobs.push( job );
|
|
} // foreach slave
|
|
|
|
// no error
|
|
callback( null, jobs );
|
|
});
|
|
},
|
|
|
|
chooseServer: function(candidates, event) {
|
|
// choose server for event, based on algo
|
|
var server = null;
|
|
|
|
var hostnames = [];
|
|
for (var idx = 0, len = candidates.length; idx < len; idx++) {
|
|
hostnames.push( candidates[idx].hostname );
|
|
}
|
|
this.logDebug(9, "Choosing server for event using algo: " + event.algo || 'random', hostnames);
|
|
|
|
switch (event.algo || 'random') {
|
|
case "random":
|
|
// random server from group
|
|
server = Tools.randArray( candidates );
|
|
break;
|
|
|
|
case "round_robin":
|
|
// pick each server in sequence, repeat
|
|
if (!this.state.robins) this.state.robins = {};
|
|
var robin = this.state.robins[ event.id ] || 0;
|
|
if (robin >= candidates.length) robin = 0;
|
|
server = candidates[robin];
|
|
this.state.robins[ event.id ] = robin + 1;
|
|
break;
|
|
|
|
case "least_cpu":
|
|
// pick server with least CPU in use
|
|
var cpus = {};
|
|
var servers = this.getAllServers();
|
|
for (var hostname in servers) {
|
|
cpus[hostname] = 0;
|
|
if (servers[hostname] && servers[hostname].data && servers[hostname].data.cpu) {
|
|
cpus[hostname] = servers[hostname].data.cpu;
|
|
}
|
|
}
|
|
var jobs = this.getAllActiveJobs();
|
|
for (var job_id in jobs) {
|
|
var job = jobs[job_id];
|
|
if (job.cpu && job.cpu.current) {
|
|
if (!cpus[job.hostname]) cpus[job.hostname] = 0;
|
|
cpus[job.hostname] += job.cpu.current;
|
|
}
|
|
}
|
|
var least_value = -1;
|
|
var least_hostname = '';
|
|
for (var idx = 0, len = candidates.length; idx < len; idx++) {
|
|
var hostname = candidates[idx].hostname;
|
|
if ((least_value == -1) || (cpus[hostname] < least_value)) {
|
|
least_value = cpus[hostname];
|
|
least_hostname = hostname;
|
|
}
|
|
}
|
|
this.logDebug(9, "CPU Snapshot:", cpus);
|
|
server = Tools.findObject( candidates, { hostname: least_hostname } );
|
|
break;
|
|
|
|
case "least_mem":
|
|
// pick server with least memory in use
|
|
var mems = {};
|
|
var servers = this.getAllServers();
|
|
for (var hostname in servers) {
|
|
mems[hostname] = 0;
|
|
if (servers[hostname] && servers[hostname].data && servers[hostname].data.mem) {
|
|
mems[hostname] = servers[hostname].data.mem;
|
|
}
|
|
}
|
|
var jobs = this.getAllActiveJobs();
|
|
for (var job_id in jobs) {
|
|
var job = jobs[job_id];
|
|
if (job.mem && job.mem.current) {
|
|
if (!mems[job.hostname]) mems[job.hostname] = 0;
|
|
mems[job.hostname] += job.mem.current;
|
|
}
|
|
}
|
|
var least_value = -1;
|
|
var least_hostname = '';
|
|
for (var idx = 0, len = candidates.length; idx < len; idx++) {
|
|
var hostname = candidates[idx].hostname;
|
|
if ((least_value == -1) || (mems[hostname] < least_value)) {
|
|
least_value = mems[hostname];
|
|
least_hostname = hostname;
|
|
}
|
|
}
|
|
this.logDebug(9, "Mem Snapshot:", mems);
|
|
server = Tools.findObject( candidates, { hostname: least_hostname } );
|
|
break;
|
|
|
|
case "prefer_first":
|
|
// pick server towards top of sorted list
|
|
server = candidates[0];
|
|
break;
|
|
|
|
case "prefer_last":
|
|
// pick server towards bottom of sorted list
|
|
server = candidates[ candidates.length - 1 ];
|
|
break;
|
|
} // switch event.algo
|
|
|
|
this.logDebug(9, "Chose server: " + server.hostname + " via algo: " + (event.algo || "random") );
|
|
return server;
|
|
},
|
|
|
|
launchLocalJob: function(job) {
|
|
// launch job as a local child process
|
|
var self = this;
|
|
var child = null;
|
|
var worker = null;
|
|
|
|
// check for job delay request (multiplex stagger)
|
|
if (job.when && (job.when > Tools.timeNow())) {
|
|
this.logDebug(6, "Job " + job.id + " will be delayed for " +
|
|
Tools.getTextFromSeconds( job.when - Tools.timeNow() ));
|
|
|
|
job.action = 'launchLocalJob';
|
|
this.enqueueInternal(job);
|
|
return;
|
|
}
|
|
|
|
// construct fully qualified path to job log file
|
|
job.log_file = path.resolve( path.join(
|
|
this.server.config.get('log_dir'), 'jobs',
|
|
job.id + (job.detached ? '-detached' : '') + '.log'
|
|
) );
|
|
|
|
this.logDebug(6, "Launching local job", job);
|
|
|
|
// if we are the master server or job is detached,
|
|
// save copy of job file to disk next to log (for crash recovery)
|
|
if (this.multi.master || job.detached) {
|
|
fs.writeFile( job.log_file.replace(/\.log$/, '.json'), JSON.stringify(job), function(err) {
|
|
if (err) self.logError('job', "Failed to write JSON job file: " + job.log_file.replace(/\.log$/, '.json') + ": " + err);
|
|
} );
|
|
}
|
|
|
|
// setup environment for child
|
|
var child_opts = {
|
|
cwd: job.cwd || process.cwd(),
|
|
uid: job.uid || process.getuid(),
|
|
gid: process.getgid(),
|
|
env: Tools.mergeHashes(
|
|
this.server.config.get('job_env') || {},
|
|
Tools.mergeHashes( process.env, job.env || {} )
|
|
)
|
|
};
|
|
|
|
child_opts.env['CRONICLE'] = this.server.__version;
|
|
child_opts.env['JOB_ID'] = job.id;
|
|
child_opts.env['JOB_LOG'] = job.log_file;
|
|
child_opts.env['JOB_NOW'] = job.now;
|
|
child_opts.env['PWD'] = child_opts.cwd;
|
|
|
|
// copy all top-level job keys into child env, if number/string/boolean
|
|
for (var key in job) {
|
|
switch (typeof(job[key])) {
|
|
case 'string':
|
|
case 'number':
|
|
child_opts.env['JOB_' + key.toUpperCase()] = '' + job[key];
|
|
break;
|
|
|
|
case 'boolean':
|
|
child_opts.env['JOB_' + key.toUpperCase()] = job[key] ? 1 : 0;
|
|
break;
|
|
}
|
|
}
|
|
|
|
// get uid / gid info for child env vars
|
|
var user_info = Tools.getpwnam( child_opts.uid, true );
|
|
if (user_info) {
|
|
child_opts.uid = user_info.uid;
|
|
child_opts.gid = user_info.gid;
|
|
child_opts.env.USER = child_opts.env.USERNAME = user_info.username;
|
|
child_opts.env.HOME = user_info.dir;
|
|
child_opts.env.SHELL = user_info.shell;
|
|
}
|
|
else if (child_opts.uid != process.getuid()) {
|
|
// user not found
|
|
job.pid = 0;
|
|
job.code = 1;
|
|
job.description = "Plugin Error: User does not exist: " + child_opts.uid;
|
|
this.logError("child", job.description);
|
|
this.activeJobs[ job.id ] = job;
|
|
this.finishLocalJob( job );
|
|
return;
|
|
}
|
|
|
|
child_opts.uid = parseInt( child_opts.uid );
|
|
child_opts.gid = parseInt( child_opts.gid );
|
|
|
|
// add plugin params as env vars, expand $INLINE vars
|
|
if (job.params) {
|
|
for (var key in job.params) {
|
|
child_opts.env[key.toUpperCase()] =
|
|
(''+job.params[key]).replace(/\$(\w+)/g, function(m_all, m_g1) {
|
|
return (m_g1 in child_opts.env) ? child_opts.env[m_g1] : '';
|
|
});
|
|
}
|
|
}
|
|
|
|
this.logDebug(9, "Child spawn options:", child_opts);
|
|
|
|
// create log file, write header to it
|
|
var dargs = Tools.getDateArgs( new Date() );
|
|
|
|
fs.appendFileSync( job.log_file, [
|
|
"# Job ID: " + job.id,
|
|
"# Event Title: " + job.event_title,
|
|
"# Hostname: " + this.server.hostname,
|
|
"# Date/Time: " + dargs.yyyy_mm_dd + ' ' + dargs.hh_mi_ss + ' (' + dargs.tz + ')'
|
|
].join("\n") + "\n\n");
|
|
|
|
// make sure child can write to log file
|
|
fs.chmodSync( job.log_file, "777" );
|
|
|
|
if (job.detached) {
|
|
// spawn detached child
|
|
var temp_file = path.join( os.tmpdir(), 'cronicle-job-temp-' + job.id + '.json' );
|
|
|
|
// tell child where the queue dir is
|
|
job.queue_dir = path.resolve( this.server.config.get('queue_dir') );
|
|
|
|
// write job file
|
|
fs.writeFileSync( temp_file, JSON.stringify(job) );
|
|
fs.chmodSync( temp_file, "777" );
|
|
this.logDebug(9, "Job temp file: " + temp_file );
|
|
|
|
// spawn child
|
|
child_opts.detached = true;
|
|
child_opts.stdio = ['ignore', 'ignore', 'ignore'];
|
|
|
|
try {
|
|
child = cp.spawn( path.resolve("bin/run-detached.js"), ["detached", temp_file], child_opts );
|
|
}
|
|
catch (err) {
|
|
job.pid = 0;
|
|
job.code = 1;
|
|
job.description = "Child process error: " + Tools.getErrorDescription(err);
|
|
this.logError("child", job.description);
|
|
this.activeJobs[ job.id ] = job;
|
|
this.finishLocalJob( job );
|
|
return;
|
|
}
|
|
|
|
job.pid = child.pid || 0;
|
|
|
|
this.logDebug(3, "Spawned detached process: " + job.pid + " for job: " + job.id, job.command);
|
|
|
|
worker = {
|
|
pid: job.pid
|
|
};
|
|
|
|
child.unref();
|
|
}
|
|
else {
|
|
// spawn child normally
|
|
var child_cmd = job.command;
|
|
var child_args = [];
|
|
|
|
// if command has cli args, parse using shell-quote
|
|
if (child_cmd.match(/\s+(.+)$/)) {
|
|
var cargs_raw = RegExp.$1;
|
|
child_cmd = child_cmd.replace(/\s+(.+)$/, '');
|
|
child_args = sqparse( cargs_raw, child_opts.env );
|
|
}
|
|
|
|
worker = {};
|
|
|
|
// attach streams
|
|
worker.log_fd = fs.openSync(job.log_file, 'a');
|
|
child_opts.stdio = ['pipe', 'pipe', worker.log_fd];
|
|
|
|
// spawn child
|
|
try {
|
|
child = cp.spawn( child_cmd, child_args, child_opts );
|
|
if (!child || !child.pid || !child.stdin || !child.stdout) {
|
|
throw new Error("Child process failed to spawn (Check executable location and permissions?)");
|
|
}
|
|
}
|
|
catch (err) {
|
|
if (child) child.on('error', function() {}); // prevent crash
|
|
if (worker.log_fd) { fs.closeSync(worker.log_fd); worker.log_fd = null; }
|
|
job.pid = 0;
|
|
job.code = 1;
|
|
job.description = "Child spawn error: " + child_cmd + ": " + Tools.getErrorDescription(err);
|
|
this.logError("child", job.description);
|
|
this.activeJobs[ job.id ] = job;
|
|
this.finishLocalJob( job );
|
|
return;
|
|
}
|
|
job.pid = child.pid || 0;
|
|
|
|
this.logDebug(3, "Spawned child process: " + job.pid + " for job: " + job.id, child_cmd);
|
|
|
|
// connect json stream to child's stdio
|
|
// order reversed deliberately (out, in)
|
|
var stream = new JSONStream( child.stdout, child.stdin );
|
|
stream.recordRegExp = /^\s*\{.+\}\s*$/;
|
|
|
|
worker.pid = job.pid;
|
|
worker.child = child;
|
|
worker.stream = stream;
|
|
|
|
stream.on('json', function(data) {
|
|
// received data from child
|
|
self.handleChildResponse(job, worker, data);
|
|
} );
|
|
|
|
stream.on('text', function(line) {
|
|
// received non-json text from child, log it
|
|
fs.appendFileSync(job.log_file, line);
|
|
} );
|
|
|
|
stream.on('error', function(err, text) {
|
|
// Probably a JSON parse error (child emitting garbage)
|
|
self.logError('job', "Child stream error: Job ID " + job.id + ": PID " + job.pid + ": " + err);
|
|
if (text) fs.appendFileSync(job.log_file, text + "\n");
|
|
} );
|
|
|
|
child.on('error', function (err) {
|
|
// child error
|
|
if (worker.log_fd) { fs.closeSync(worker.log_fd); worker.log_fd = null; }
|
|
job.code = 1;
|
|
job.description = "Child process error: " + Tools.getErrorDescription(err);
|
|
worker.child_exited = true;
|
|
self.logError("child", job.description);
|
|
self.finishLocalJob( job );
|
|
} );
|
|
|
|
child.on('exit', function (code, signal) {
|
|
// child exited
|
|
self.logDebug(3, "Child " + job.pid + " exited with code: " + (code || signal || 0));
|
|
worker.child_exited = true;
|
|
|
|
if (job.complete) {
|
|
// child already reported completion, so finish job now
|
|
if (worker.log_fd) { fs.closeSync(worker.log_fd); worker.log_fd = null; }
|
|
self.finishLocalJob( job );
|
|
}
|
|
else {
|
|
// job is not complete but process exited (could be coming in next tick)
|
|
// set timeout just in case something went wrong
|
|
worker.complete_timer = setTimeout( function() {
|
|
if (worker.log_fd) { fs.closeSync(worker.log_fd); worker.log_fd = null; }
|
|
job.code = code || 1;
|
|
job.description = code ?
|
|
("Child " + job.pid + " crashed with code: " + (code || signal)) :
|
|
("Process exited without reporting job completion.");
|
|
if (!code) job.unknown = 1;
|
|
self.finishLocalJob( job );
|
|
}, 1000 );
|
|
}
|
|
} ); // on exit
|
|
|
|
// send initial job + params
|
|
stream.write( job );
|
|
|
|
// we're done writing to the child -- don't hold open its stdin
|
|
worker.child.stdin.end();
|
|
} // spawn normally
|
|
|
|
// track job in our own hash
|
|
this.activeJobs[ job.id ] = job;
|
|
this.kids[ job.pid ] = worker;
|
|
},
|
|
|
|
handleChildResponse: function(job, worker, data) {
|
|
// child sent us some datas (progress or completion)
|
|
this.logDebug(10, "Got job update from child: " + job.pid, data);
|
|
|
|
// assume success if complete but no code specified
|
|
if (data.complete && !data.code) data.code = 0;
|
|
|
|
// merge in data
|
|
Tools.mergeHashInto( job, data );
|
|
|
|
if (job.complete && worker.child_exited) {
|
|
// in case this update came in after child exited
|
|
this.finishLocalJob( job );
|
|
}
|
|
},
|
|
|
|
detachedJobUpdate: function(data) {
|
|
// receive update from detached child via queue system
|
|
var id = data.id;
|
|
delete data.id;
|
|
|
|
var in_progress = data.in_progress || false;
|
|
delete data.in_progress;
|
|
|
|
this.logDebug(9, "Received update from detached job: " + id, data);
|
|
|
|
var job = this.activeJobs[id];
|
|
if (!job) {
|
|
// if this is an in-progress update, we can just silently skip (queue files arrived out of order)
|
|
if (in_progress) return;
|
|
|
|
// service may have restarted - try to recover job from temp file
|
|
var job_file = this.server.config.get('log_dir') + '/jobs/' + id + '-detached' + '.json';
|
|
this.logDebug(6, "Detached job is not in memory: " + id + ": Attempting to recover from disk", job_file);
|
|
|
|
// okay to use sync here, as this should be a very rare event
|
|
if (fs.existsSync(job_file)) {
|
|
var json_raw = fs.readFileSync(job_file, { encoding: 'utf8' });
|
|
try { job = JSON.parse(json_raw); }
|
|
catch (err) {
|
|
this.logError('job', "Failed to read detached job file: " + job_file + ": " + err);
|
|
}
|
|
}
|
|
else {
|
|
this.logError('job', "Could not locate detached job file: " + job_file);
|
|
}
|
|
|
|
if (job) {
|
|
this.logDebug(6, "Recovered job data from disk: " + job_file, job);
|
|
this.activeJobs[id] = job;
|
|
this.kids[ job.pid ] = { pid: job.pid };
|
|
}
|
|
else {
|
|
this.logError('job', "Failed to locate active job for update: " + id, data);
|
|
return;
|
|
}
|
|
} // no job in memory
|
|
|
|
// assume success if complete but no code specified
|
|
if (data.complete && !data.code) data.code = 0;
|
|
|
|
// merge in data
|
|
Tools.mergeHashInto( job, data );
|
|
|
|
if (job.complete) {
|
|
// detached job is complete
|
|
this.finishLocalJob( job );
|
|
}
|
|
},
|
|
|
|
rewindJob: function(job) {
|
|
// reset cursor state to minute before job started (use 'now' property in case start was delayed)
|
|
// only do this if job has catch_up, was launched via the scheduler, and is not multiplexed
|
|
if (!this.multi.master) return;
|
|
|
|
if (job.catch_up && !job.source && !job.multiplex) {
|
|
var new_start = Tools.normalizeTime( job.now - 60, { sec: 0 } );
|
|
this.state.cursors[ job.event ] = new_start;
|
|
|
|
var dargs = Tools.getDateArgs( new_start );
|
|
this.logDebug(5, "Reset event " + job.event + " cursor to: " + dargs.yyyy_mm_dd + " " + dargs.hh + ":" + dargs.mi + ":00" );
|
|
}
|
|
},
|
|
|
|
findJob: function(stub) {
|
|
// find active or pending job
|
|
// stub should have: id
|
|
if (!this.multi.master) return false;
|
|
if (typeof(stub) == 'string') stub = { id: stub };
|
|
|
|
// check all jobs, local, remote and pending
|
|
var all_jobs = this.getAllActiveJobs( true );
|
|
var job = all_jobs[stub.id];
|
|
if (!job) {
|
|
// check pending jobs (they have separate IDs)
|
|
for (var key in all_jobs) {
|
|
if (all_jobs[key].id == stub.id) {
|
|
job = all_jobs[key];
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
return job || false;
|
|
},
|
|
|
|
updateJob: function(stub) {
|
|
// update active job
|
|
// stub should have: id
|
|
if (!this.multi.master) return false;
|
|
var job = this.findJob(stub);
|
|
|
|
if (!job) {
|
|
// should never happen
|
|
this.logDebug(1, "Could not locate job: " + stub.id);
|
|
return false;
|
|
}
|
|
|
|
if (job.hostname == this.server.hostname) {
|
|
// local job
|
|
this.updateLocalJob(stub);
|
|
}
|
|
else {
|
|
// remote job
|
|
var slave = this.slaves[ job.hostname ];
|
|
if (!slave) {
|
|
// should never happen
|
|
this.logDebug(1, "Could not locate slave: " + job.hostname);
|
|
return false;
|
|
}
|
|
|
|
this.logDebug(6, "Sending job update command to: " + slave.hostname, stub);
|
|
slave.socket.emit( 'update_job', stub );
|
|
}
|
|
|
|
return true;
|
|
},
|
|
|
|
updateLocalJob: function(stub) {
|
|
// update local job properties
|
|
var job = this.activeJobs[ stub.id ];
|
|
if (!job) {
|
|
// must be a pending job
|
|
if (this.internalQueue) {
|
|
for (var key in this.internalQueue) {
|
|
var task = this.internalQueue[key];
|
|
if ((task.action = 'launchLocalJob') && (task.id == stub.id)) {
|
|
job = task;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (!job) {
|
|
// should never happen
|
|
this.logDebug(1, "Could not locate job: " + stub.id);
|
|
return false;
|
|
}
|
|
}
|
|
|
|
this.logDebug(4, "Updating local job: " + stub.id, stub);
|
|
|
|
// update properties
|
|
for (var key in stub) {
|
|
if (key != 'id') job[key] = stub[key];
|
|
}
|
|
|
|
return true;
|
|
},
|
|
|
|
abortJob: function(stub) {
|
|
// abort active job
|
|
// stub should have: id, reason
|
|
if (!this.multi.master) return false;
|
|
|
|
// check all jobs, local, remote and pending
|
|
var all_jobs = this.getAllActiveJobs( true );
|
|
var job = all_jobs[stub.id];
|
|
if (!job) {
|
|
// check pending jobs (they have separate IDs)
|
|
for (var key in all_jobs) {
|
|
if (all_jobs[key].id == stub.id) {
|
|
job = all_jobs[key];
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
if (!job) {
|
|
// should never happen
|
|
this.logDebug(1, "Could not locate job: " + stub.id);
|
|
return false;
|
|
}
|
|
|
|
if (job.hostname == this.server.hostname) {
|
|
// local job
|
|
this.abortLocalJob(stub);
|
|
}
|
|
else {
|
|
// remote job
|
|
var slave = this.slaves[ job.hostname ];
|
|
if (!slave) {
|
|
// should never happen
|
|
this.logDebug(1, "Could not locate slave: " + job.hostname);
|
|
return false;
|
|
}
|
|
|
|
this.logDebug(6, "Sending job abort command to: " + slave.hostname, stub);
|
|
slave.socket.emit( 'abort_job', stub );
|
|
}
|
|
|
|
// rewind cursor if needed
|
|
if (!stub.no_rewind) this.rewindJob(job);
|
|
|
|
if (job.pending && !job.log_file) {
|
|
// job is pre-launch, so log activity
|
|
this.logActivity( 'error', { description: "Pending job #"+stub.id+" ("+(job.event_title || 'Unknown')+") was aborted pre-launch: " + stub.reason } );
|
|
}
|
|
|
|
return true;
|
|
},
|
|
|
|
abortLocalPendingJob: function(stub) {
|
|
// abort job currently in pending queue
|
|
var job = null;
|
|
|
|
if (this.internalQueue) {
|
|
for (var key in this.internalQueue) {
|
|
var task = this.internalQueue[key];
|
|
if ((task.action = 'launchLocalJob') && (task.id == stub.id)) {
|
|
job = task;
|
|
delete this.internalQueue[key];
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!job) {
|
|
// should never happen
|
|
this.logDebug(1, "Could not locate pending job to abort: " + stub.id);
|
|
return;
|
|
}
|
|
|
|
this.logDebug(4, "Aborting local pending job: " + stub.id + ": " + stub.reason, job);
|
|
job.abort_reason = stub.reason;
|
|
|
|
// determine if job needs to be 'finished' (i.e. aborted in retry delay)
|
|
// or hasn't actually launched yet (i.e. multiplex stagger)
|
|
if (job.log_file) {
|
|
this.activeJobs[ job.id ] = job; // trick it into acceptance
|
|
this.finishLocalJob( job );
|
|
}
|
|
},
|
|
|
|
abortLocalJob: function(stub) {
|
|
// abort locally running job on this server
|
|
// stub should have: id, reason
|
|
var self = this;
|
|
var job = this.activeJobs[ stub.id ];
|
|
if (!job) {
|
|
// must be a pending job
|
|
this.abortLocalPendingJob(stub);
|
|
return;
|
|
}
|
|
|
|
var worker = this.kids[ job.pid ] || {};
|
|
|
|
this.logDebug(4, "Aborting local job: " + stub.id + ": " + stub.reason, job);
|
|
job.abort_reason = stub.reason;
|
|
|
|
if (worker.child) {
|
|
// owned process
|
|
if (worker.log_fd) { fs.closeSync(worker.log_fd); worker.log_fd = null; }
|
|
|
|
worker.kill_timer = setTimeout( function() {
|
|
// child didn't die, kill with prejudice
|
|
self.logDebug(3, "Child did not exit, killing harder: " + job.pid);
|
|
worker.child.kill('SIGKILL');
|
|
}, this.server.config.get('child_kill_timeout') * 1000 );
|
|
|
|
// try killing nicely first
|
|
worker.child.kill('SIGTERM');
|
|
}
|
|
else {
|
|
// detached process
|
|
if (job.pid) {
|
|
try { process.kill( job.pid, 'SIGTERM' ); }
|
|
catch (e) {
|
|
this.logDebug(5, "Could not term process: " + job.pid + ", killing it.");
|
|
try { process.kill( job.pid, 'SIGKILL' ); } catch (e) {;}
|
|
}
|
|
|
|
// make sure process actually exits
|
|
setTimeout( function() {
|
|
var ping = false;
|
|
try { ping = process.kill( job.pid, 0 ); }
|
|
catch (e) {;}
|
|
if (ping) {
|
|
self.logDebug(3, "Child did not exit, killing: " + job.pid);
|
|
try { process.kill( job.pid, 'SIGKILL' ); } catch (e) {;}
|
|
}
|
|
}, this.server.config.get('child_kill_timeout') * 1000 );
|
|
} // job.pid
|
|
|
|
// assume job is finished at this point
|
|
this.finishLocalJob(job);
|
|
}
|
|
},
|
|
|
|
finishLocalJob: function(job) {
|
|
// complete job, remove from tracking, update history
|
|
var self = this;
|
|
|
|
// job may already be removed
|
|
if (!this.activeJobs[ job.id ]) return;
|
|
|
|
// if aborted, copy in those params
|
|
if (job.abort_reason) {
|
|
job.code = 1;
|
|
job.description = "Job Aborted: " + job.abort_reason;
|
|
job.retries = 0;
|
|
}
|
|
|
|
job.complete = 1;
|
|
|
|
this.logDebug(5, "Job completed " + (job.code ? "with error" : "successfully"), job);
|
|
|
|
// kill completion timer, if set
|
|
var worker = this.kids[ job.pid ] || {};
|
|
if (worker.complete_timer) {
|
|
clearTimeout( worker.complete_timer );
|
|
delete worker.complete_timer;
|
|
}
|
|
if (worker.kill_timer) {
|
|
clearTimeout( worker.kill_timer );
|
|
delete worker.kill_timer;
|
|
}
|
|
if (worker.log_fd) {
|
|
fs.closeSync(worker.log_fd);
|
|
delete worker.log_fd;
|
|
}
|
|
|
|
// retry on failure
|
|
if ((job.code != 0) && job.retries && !this.server.shut) {
|
|
this.logError('job', "Job failed: " + job.id + " (" + job.retries + " retries remain)");
|
|
|
|
// add blurb to job log
|
|
var blurb = "\n# Job failed with error";
|
|
if (job.code != 1) blurb += ' ' + job.code;
|
|
blurb += ": " + (job.description || 'Unknown Error') + "\n";
|
|
blurb += "# " + job.retries + " retries remain";
|
|
if (job.retry_delay) blurb += " (" + Tools.getTextFromSeconds(job.retry_delay, true, false) + " delay)";
|
|
blurb += "\n\n";
|
|
|
|
fs.appendFileSync( job.log_file, blurb);
|
|
|
|
job.retries--;
|
|
|
|
delete job.complete;
|
|
delete job.pid;
|
|
delete job.code;
|
|
delete job.description;
|
|
delete job.perf;
|
|
delete job.progress;
|
|
delete job.cpu;
|
|
delete job.mem;
|
|
|
|
delete this.activeJobs[ job.id ];
|
|
delete this.kids[ job.pid ];
|
|
|
|
// optional retry delay
|
|
if (job.retry_delay) {
|
|
job.when = Tools.timeNow() + job.retry_delay;
|
|
}
|
|
|
|
this.launchLocalJob(job);
|
|
return;
|
|
} // retry
|
|
|
|
// if non-zero code, we expect a string description
|
|
if (job.code != 0) {
|
|
if (!job.description) job.description = "Unknown Error (no description provided)";
|
|
}
|
|
if (job.description) {
|
|
job.description = '' + job.description;
|
|
}
|
|
|
|
// upload job debug log and finish job
|
|
var dargs = Tools.getDateArgs( new Date() );
|
|
var nice_date_time = dargs.yyyy_mm_dd + ' ' + dargs.hh_mi_ss + ' (' + dargs.tz + ')';
|
|
|
|
var footer = "\n";
|
|
if (job.code) {
|
|
footer += "# Job failed at " + nice_date_time + ".\n";
|
|
footer += "# Error";
|
|
if (job.code != 1) footer += " " + job.code;
|
|
footer += ": " + job.description.trim() + "\n";
|
|
}
|
|
else {
|
|
footer += "# Job completed successfully at " + nice_date_time + ".\n";
|
|
if (job.description) footer += "# Description: " + job.description.trim() + "\n";
|
|
}
|
|
footer += "# End of log.\n";
|
|
|
|
// append footer to log
|
|
try { fs.appendFileSync(job.log_file, footer); }
|
|
catch (err) {
|
|
self.logError('job', "Failed to append to job log file: " + job.log_file + ": " + err);
|
|
}
|
|
|
|
// next, get job log file size
|
|
var stats = null;
|
|
try { stats = fs.statSync( job.log_file ); }
|
|
catch (err) {
|
|
self.logError('job', "Failed to stat job log file: " + job.log_file + ": " + err);
|
|
}
|
|
|
|
// grab job log size, for e-mail
|
|
job.log_file_size = stats.size;
|
|
|
|
// only proceed if server isn't shutting down
|
|
if (!self.server.shut) {
|
|
// upload job log file async
|
|
self.uploadJobLog( job );
|
|
|
|
if (self.multi.master) {
|
|
// we're master, finish the job locally
|
|
self.finishJob(job);
|
|
} // master
|
|
else {
|
|
// we're a slave, signal master to finish job via websockets
|
|
// (this can happen parallel to job log upload)
|
|
// self.io.emit('finish_job', job);
|
|
self.masterSocketEmit('finish_job', job);
|
|
} // slave
|
|
|
|
// delete job json file (only created on master or for detached jobs)
|
|
fs.unlink( job.log_file.replace(/\.log$/, '.json'), function(err) {;} );
|
|
}
|
|
else if (self.multi.master) {
|
|
// server is shutting down and is master
|
|
// rewrite job json for recovery (so it gets pid and log_file_size)
|
|
fs.writeFileSync( job.log_file.replace(/\.log$/, '.json'), JSON.stringify(job) );
|
|
}
|
|
|
|
delete self.activeJobs[ job.id ];
|
|
if (job.pid) delete self.kids[ job.pid ];
|
|
},
|
|
|
|
uploadJobLog: function(job, callback) {
|
|
// upload local job log file
|
|
// or send to storage directly if we're master
|
|
var self = this;
|
|
var path = 'jobs/' + job.id + '/log.txt.gz';
|
|
|
|
// if we're master, upload directly to storage
|
|
if (this.multi.master) {
|
|
// call storage directly
|
|
|
|
this.logDebug(6, "Storing job log: " + job.log_file + ": " + path);
|
|
|
|
fs.stat( job.log_file, function(err, stats) {
|
|
// data will be a stream
|
|
if (err) {
|
|
var data = Buffer.from("(Empty log file)\n");
|
|
fs.writeFileSync( job.log_file, data );
|
|
}
|
|
|
|
// get read stream and prepare to compress it
|
|
var stream = fs.createReadStream( job.log_file );
|
|
var gzip = zlib.createGzip( self.server.config.get('gzip_opts') || {} );
|
|
stream.pipe( gzip );
|
|
|
|
self.storage.putStream( path, gzip, function(err) {
|
|
if (err) {
|
|
self.logError('storage', "Failed to store job log: " + path + ": " + err);
|
|
if (callback) callback(err);
|
|
return;
|
|
}
|
|
|
|
self.logDebug(9, "Job log stored successfully: " + path);
|
|
|
|
// delete or move local log file
|
|
if (self.server.config.get('copy_job_logs_to')) {
|
|
var dargs = Tools.getDateArgs( Tools.timeNow() );
|
|
var dest_path = self.server.config.get('copy_job_logs_to').replace(/\/$/, '') + '/';
|
|
if (job.event_title) dest_path += job.event_title.replace(/\W+/g, '') + '.';
|
|
dest_path += job.id + '.' + (dargs.yyyy_mm_dd + '-' + dargs.hh_mi_ss).replace(/\W+/g, '-');
|
|
dest_path += '.log';
|
|
|
|
self.logDebug(9, "Moving local file: " + job.log_file + " to: " + dest_path);
|
|
|
|
self.logger.rotate( job.log_file, dest_path, function(err) {
|
|
if (err) {
|
|
self.logError('file', "Failed to move local job log file: " + job.log_file + ": " + err);
|
|
fs.unlink( job.log_file, function(err) {;} );
|
|
}
|
|
else {
|
|
self.logDebug(9, "Successfully moved local job log file: " + job.log_file + ": " + dest_path);
|
|
}
|
|
if (callback) callback();
|
|
} );
|
|
}
|
|
else {
|
|
self.logDebug(9, "Deleting local file: " + job.log_file);
|
|
fs.unlink( job.log_file, function(err) {
|
|
// all done
|
|
if (err) {
|
|
self.logError('file', "Failed to delete local job log file: " + job.log_file + ": " + err);
|
|
}
|
|
else {
|
|
self.logDebug(9, "Successfully deleted local job log file: " + job.log_file);
|
|
}
|
|
if (callback) callback();
|
|
|
|
} ); // fs.unlink
|
|
} // delete
|
|
} ); // storage put
|
|
} ); // read file
|
|
} // master
|
|
else {
|
|
// we're a slave, so tell master via websockets to come get log
|
|
// this.io.emit('fetch_job_log', job);
|
|
if (!this.server.shut) this.masterSocketEmit('fetch_job_log', job);
|
|
if (callback) callback();
|
|
} // slave
|
|
},
|
|
|
|
fetchStoreJobLog: function(job) {
|
|
// fetch remote job log from slave, and then store in storage
|
|
var self = this;
|
|
if (!this.multi.master) return;
|
|
|
|
var slave = this.slaves[ job.hostname ];
|
|
if (!slave) {
|
|
this.logError('job', "Failed to locate slave: " + job.hostname + " for job: " + job.id);
|
|
slave = { hostname: job.hostname }; // hail mary
|
|
}
|
|
|
|
// construct url to API on remote server w/auth key
|
|
var api_url = this.getServerBaseAPIURL( slave.hostname, slave.ip ) + '/app/fetch_delete_job_log';
|
|
|
|
api_url += Tools.composeQueryString({
|
|
path: job.log_file,
|
|
auth: Tools.digestHex(job.log_file + this.server.config.get('secret_key'))
|
|
});
|
|
|
|
this.logDebug(6, "Fetching remote job log via HTTP GET: " + api_url);
|
|
|
|
// just in case remote server has different base dir than master
|
|
job.log_file = this.server.config.get('log_dir') + '/jobs/' + path.basename(job.log_file);
|
|
|
|
this.request.get( api_url, { download: job.log_file }, function(err, resp) {
|
|
// check for error
|
|
if (err) {
|
|
var err_msg = "Failed to fetch job log file: " + api_url + ": " + err;
|
|
self.logError('job', err_msg);
|
|
}
|
|
else if (resp.statusCode != 200) {
|
|
var err_msg = "Failed to fetch job log file: " + api_url + ": HTTP " + resp.statusCode + " " + resp.statusMessage;
|
|
self.logError('job', err_msg );
|
|
}
|
|
else {
|
|
// success
|
|
self.logDebug(5, "Job log was fetched successfully", api_url);
|
|
|
|
// then call uploadJobLog again to store it (this also deletes the file)
|
|
self.uploadJobLog(job);
|
|
} // http success
|
|
} ); // request.get
|
|
},
|
|
|
|
finishJob: function(job) {
|
|
// finish cleaning up job
|
|
var self = this;
|
|
|
|
if (!job.time_end) job.time_end = Tools.timeNow();
|
|
job.elapsed = Math.max(0, job.time_end - job.time_start);
|
|
|
|
var dargs = Tools.getDateArgs( job.time_end );
|
|
|
|
// log success or failure
|
|
if (job.code == 0) {
|
|
this.logTransaction('job', "Job completed successfully: " + job.id, job);
|
|
}
|
|
else {
|
|
this.logError('job', "Job failed: " + job.id, job);
|
|
}
|
|
|
|
// add to global activity, event log, and completed events
|
|
var data = Tools.copyHash(job);
|
|
|
|
// add special 'type' property for storage custom maint delete
|
|
data.type = 'cronicle_job';
|
|
|
|
// store job in its own record
|
|
this.storage.enqueue( function(task, callback) {
|
|
self.storage.put( 'jobs/' + job.id, data, callback );
|
|
});
|
|
this.storage.expire( 'jobs/' + job.id, Tools.timeNow(true) + (86400 * (job.log_expire_days || this.server.config.get('job_data_expire_days'))) );
|
|
|
|
// create stub containing a small subset of the job data, for lists
|
|
var stub = {
|
|
id: job.id,
|
|
code: job.code,
|
|
event: job.event,
|
|
category: job.category,
|
|
plugin: job.plugin,
|
|
hostname: job.hostname,
|
|
time_start: job.time_start,
|
|
elapsed: job.elapsed,
|
|
perf: job.perf || '',
|
|
cpu: job.cpu || {},
|
|
mem: job.mem || {},
|
|
log_file_size: job.log_file_size || 0,
|
|
action: 'job_complete',
|
|
epoch: Tools.timeNow(true),
|
|
|
|
event_title: job.event_title,
|
|
category_title: job.category_title,
|
|
plugin_title: job.plugin_title
|
|
};
|
|
if (job.code) stub.description = job.description || 'Unknown Error';
|
|
|
|
// only store in activity log if job failed
|
|
if (job.code != 0) {
|
|
this.storage.enqueue( function(task, callback) {
|
|
self.storage.listUnshift( 'logs/activity', stub, callback );
|
|
});
|
|
}
|
|
|
|
// store stub in log storage
|
|
this.storage.enqueue( function(task, callback) {
|
|
self.storage.listUnshift( 'logs/events/' + job.event, stub, callback );
|
|
});
|
|
this.storage.enqueue( function(task, callback) {
|
|
self.storage.listUnshift( 'logs/completed', stub, callback );
|
|
});
|
|
|
|
// notify people
|
|
var email_template = '';
|
|
var to = '';
|
|
if (job.notify_success && (job.code == 0)) {
|
|
email_template = "conf/emails/job_success.txt";
|
|
to = job.notify_success;
|
|
}
|
|
else if (job.notify_fail && (job.code != 0)) {
|
|
email_template = "conf/emails/job_fail.txt";
|
|
to = job.notify_fail;
|
|
}
|
|
|
|
if (email_template) {
|
|
// Populate e-mail data with strings for template placeholders
|
|
var email_data = Tools.copyHash(data);
|
|
|
|
email_data.env = process.env;
|
|
email_data.config = this.server.config.get();
|
|
email_data.job_log_url = this.server.config.get('base_app_url') + this.api.config.get('base_uri') + '/app/get_job_log?id=' + job.id;
|
|
email_data.edit_event_url = this.server.config.get('base_app_url') + '/#Schedule?sub=edit_event&id=' + job.event;
|
|
email_data.job_details_url = this.server.config.get('base_app_url') + '/#JobDetails?id=' + job.id;
|
|
email_data.nice_date_time = dargs.yyyy_mm_dd + ' ' + dargs.hh_mi_ss + ' (' + dargs.tz + ')';
|
|
email_data.nice_elapsed = Tools.getTextFromSeconds( data.elapsed, false, false );
|
|
email_data.perf = data.perf || '(No metrics provided)';
|
|
email_data.description = (data.description || '(No description provided)').trim();
|
|
email_data.notes = (data.notes || '(None)').trim();
|
|
email_data.nice_log_size = Tools.getTextFromBytes( data.log_file_size || 0 );
|
|
email_data.pid = data.pid || '(Unknown)';
|
|
|
|
// compose nice mem/cpu usage info
|
|
email_data.nice_mem = '(Unknown)';
|
|
if (data.mem && data.mem.count) {
|
|
var mem_avg = Math.floor( data.mem.total / data.mem.count );
|
|
email_data.nice_mem = Tools.getTextFromBytes( mem_avg );
|
|
email_data.nice_mem += ' (Peak: ' + Tools.getTextFromBytes( data.mem.max ) + ')';
|
|
}
|
|
email_data.nice_cpu = '(Unknown)';
|
|
if (data.cpu && data.cpu.count) {
|
|
var cpu_avg = Tools.shortFloat( data.cpu.total / data.cpu.count );
|
|
email_data.nice_cpu = '' + cpu_avg + '%';
|
|
email_data.nice_cpu += ' (Peak: ' + Tools.shortFloat( data.cpu.max ) + '%)';
|
|
}
|
|
|
|
// perf may be an object
|
|
if (Tools.isaHash(email_data.perf)) email_data.perf = JSON.stringify(email_data.perf);
|
|
|
|
// have link download log if too big
|
|
if (data.log_file_size > 1024 * 1024 * 10) email_data.job_log_url += '&download=1';
|
|
|
|
// construct mailer
|
|
var mail = new PixlMail( this.server.config.get('smtp_hostname'), this.server.config.get('smtp_port') || 25 );
|
|
mail.setOptions( this.server.config.get('mail_options') || {} );
|
|
|
|
// send it
|
|
mail.send( email_template, email_data, function(err, raw_email) {
|
|
if (err) {
|
|
var err_msg = "Failed to send e-mail for job: " + job.id + ": " + to + ": " + err;
|
|
self.logError( 'mail', err_msg, { text: raw_email } );
|
|
self.logActivity( 'error', { description: err_msg } );
|
|
}
|
|
else {
|
|
self.logDebug(5, "Email sent successfully for job: " + job.id, { text: raw_email } );
|
|
}
|
|
} );
|
|
} // mail
|
|
|
|
// fire web hook
|
|
var hook_data = Tools.mergeHashes(data, { action: 'job_complete' });
|
|
|
|
// prepare nice text summary (compatible with Slack Incoming WebHooks)
|
|
hook_data.base_app_url = this.server.config.get('base_app_url');
|
|
hook_data.job_details_url = this.server.config.get('base_app_url') + '/#JobDetails?id=' + job.id;
|
|
hook_data.edit_event_url = this.server.config.get('base_app_url') + '/#Schedule?sub=edit_event&id=' + job.event;
|
|
|
|
var hook_text_templates = this.server.config.get('web_hook_text_templates') || this.defaultWebHookTextTemplates;
|
|
var hook_action = hook_data.action;
|
|
if (job.code != 0) hook_action = 'job_failure';
|
|
|
|
if (hook_text_templates[hook_action]) {
|
|
hook_data.text = Tools.sub( hook_text_templates[hook_action], hook_data );
|
|
|
|
// include web_hook_config_keys if configured
|
|
if (this.server.config.get('web_hook_config_keys')) {
|
|
var web_hook_config_keys = this.server.config.get('web_hook_config_keys');
|
|
for (var idy = 0, ley = web_hook_config_keys.length; idy < ley; idy++) {
|
|
var key = web_hook_config_keys[idy];
|
|
hook_data[key] = this.server.config.get(key);
|
|
}
|
|
}
|
|
|
|
// include web_hook_custom_data if configured
|
|
if (this.server.config.get('web_hook_custom_data')) {
|
|
var web_hook_custom_data = this.server.config.get('web_hook_custom_data');
|
|
for (var key in web_hook_custom_data) hook_data[key] = web_hook_custom_data[key];
|
|
}
|
|
|
|
// custom http options for web hook
|
|
var hook_opts = this.server.config.get('web_hook_custom_opts') || {};
|
|
|
|
if (job.web_hook) {
|
|
this.logDebug(9, "Firing web hook for job complete: " + job.id + ": " + job.web_hook);
|
|
this.request.json( job.web_hook, hook_data, hook_opts, function(err, resp, data) {
|
|
// log response
|
|
if (err) self.logDebug(9, "Web Hook Error: " + job.web_hook + ": " + err);
|
|
else self.logDebug(9, "Web Hook Response: " + job.web_hook + ": HTTP " + resp.statusCode + " " + resp.statusMessage);
|
|
} );
|
|
}
|
|
if (this.server.config.get('universal_web_hook')) {
|
|
this.logDebug(9, "Firing universal web hook for job complete: " + job.id + ": " + this.server.config.get('universal_web_hook'));
|
|
this.request.json( this.server.config.get('universal_web_hook'), hook_data, hook_opts, function(err, resp, data) {
|
|
// log response
|
|
if (err) self.logDebug(9, "Universal Web Hook Error: " + err);
|
|
else self.logDebug(9, "Universal Web Hook Response: HTTP " + resp.statusCode + " " + resp.statusMessage);
|
|
} );
|
|
}
|
|
} // yes fire hook
|
|
|
|
// delete from slave job hash, if applicable
|
|
var slave = this.slaves[ job.hostname ];
|
|
if (slave && slave.active_jobs && slave.active_jobs[job.id]) {
|
|
delete slave.active_jobs[job.id];
|
|
}
|
|
|
|
// just in case job was in limbo, we can remove it now
|
|
delete this.deadJobs[ job.id ];
|
|
|
|
// we can clear high mem/cpu flags too, if applicable
|
|
if (this.state.flagged_jobs) {
|
|
delete this.state.flagged_jobs[ job.id ];
|
|
}
|
|
|
|
// update daemon stats (reset every day)
|
|
var stats = this.state.stats;
|
|
|
|
if (!stats.jobs_completed) stats.jobs_completed = 1;
|
|
else stats.jobs_completed++;
|
|
|
|
if (job.code != 0) {
|
|
if (!stats.jobs_failed) stats.jobs_failed = 1;
|
|
else stats.jobs_failed++;
|
|
}
|
|
|
|
if (!stats.jobs_elapsed) stats.jobs_elapsed = job.elapsed;
|
|
else stats.jobs_elapsed += job.elapsed;
|
|
|
|
if (!stats.jobs_log_size) stats.jobs_log_size = job.log_file_size || 0;
|
|
else stats.jobs_log_size += (job.log_file_size || 0);
|
|
|
|
// send updated stats to clients
|
|
this.authSocketEmit( 'update', { state: this.state } );
|
|
|
|
// if event is catch_up, tickle scheduler (after some safety checks)
|
|
// (in case it needs to launch another job right away)
|
|
if (job.catch_up && !this.schedulerGraceTimer && !this.schedulerTicking && (dargs.sec != 59) && !job.update_event) {
|
|
this.schedulerMinuteTick( null, true );
|
|
}
|
|
|
|
// chain reaction (success or error)
|
|
if (job.chain && job.chain.length && (job.code == 0)) {
|
|
this.chainReaction( job, job.chain );
|
|
}
|
|
else if (job.chain_error && job.chain_error.length && (job.code != 0) && !job.abort_reason) {
|
|
// only fire error chain reaction if job was not manually aborted (job.abort_reason)
|
|
this.chainReaction( job, job.chain_error );
|
|
}
|
|
|
|
// job can optionally update event
|
|
if (job.update_event) {
|
|
this.storage.listFindUpdate( 'global/schedule', { id: job.event }, job.update_event, function(err) {
|
|
if (err) {
|
|
self.logError('event', "Failed to update event: " + job.event + ": " + err);
|
|
return;
|
|
}
|
|
|
|
var event_stub = Tools.mergeHashes( job.update_event, { id: job.event, title: job.event_title } );
|
|
|
|
self.logDebug(6, "Successfully updated event: " + job.event + " (" + job.event_title + ")", job.update_event);
|
|
self.logTransaction('event_update', job.event_title, event_stub);
|
|
self.logActivity('event_update', { event: event_stub });
|
|
|
|
// broadcast update to all websocket clients
|
|
self.updateClientData( 'schedule' );
|
|
} ); // listFindUpdate
|
|
} // job.update_event
|
|
|
|
// check event queue if applicable
|
|
if (job.queue) this.checkEventQueues( job.event );
|
|
},
|
|
|
|
getAllActiveJobs: function(inc_pending) {
|
|
// gather all active jobs, local and remote
|
|
var jobs = Tools.copyHash( this.activeJobs );
|
|
|
|
// include pending jobs (i.e. stagger or retry delay) from internal queues
|
|
if (inc_pending && this.internalQueue) {
|
|
for (var key in this.internalQueue) {
|
|
var task = this.internalQueue[key];
|
|
if ((task.action == 'launchLocalJob') && task.id && !jobs[task.id]) {
|
|
jobs[key] = Tools.mergeHashes( task, { pending: 1 } );
|
|
} // is pending job
|
|
} // foreach queue item
|
|
} // internalQueue
|
|
|
|
for (var hostname in this.slaves) {
|
|
var slave = this.slaves[hostname];
|
|
if (slave.active_jobs) {
|
|
Tools.mergeHashInto( jobs, slave.active_jobs );
|
|
}
|
|
|
|
if (inc_pending && slave.queue) {
|
|
for (var key in slave.queue) {
|
|
var task = slave.queue[key];
|
|
if ((task.action == 'launchLocalJob') && task.id && !jobs[task.id]) {
|
|
jobs[key] = Tools.mergeHashes( task, { pending: 1 } );
|
|
} // is pending job
|
|
} // foreach queue item
|
|
} // has queue
|
|
} // foreach slave
|
|
|
|
return jobs;
|
|
},
|
|
|
|
abortAllLocalJobs: function() {
|
|
// abort all locally running jobs for server shutdown
|
|
// omit detached jobs
|
|
for (var id in this.activeJobs) {
|
|
var job = this.activeJobs[id];
|
|
if (!job.detached) {
|
|
this.abortLocalJob({ id: id, reason: "Shutting down server" });
|
|
|
|
// Rewind event cursor here
|
|
this.rewindJob(job);
|
|
}
|
|
else {
|
|
// detached job, update JSON job file on disk for recovery (now with PID)
|
|
this.logDebug(5, "Detached job is still running in the background: " + job.id + ": PID " + job.pid);
|
|
try {
|
|
fs.writeFileSync( job.log_file.replace(/\.log$/, '.json'), JSON.stringify(job) );
|
|
}
|
|
catch (err) {
|
|
this.logError('job', "Failed to write JSON job file: " + job.log_file.replace(/\.log$/, '.json') + ": " + err);
|
|
}
|
|
}
|
|
}
|
|
},
|
|
|
|
monitorAllActiveJobs: function() {
|
|
// monitor all active jobs, local and remote (called once per minute)
|
|
// only a master server should do this
|
|
if (!this.multi.master) return;
|
|
|
|
var all_jobs = this.getAllActiveJobs();
|
|
var now = Tools.timeNow();
|
|
|
|
// keep flagged jobs in state, so will be saved periodically
|
|
if (!this.state.flagged_jobs) this.state.flagged_jobs = {};
|
|
var flagged_jobs = this.state.flagged_jobs;
|
|
|
|
// iterate over all jobs
|
|
for (var id in all_jobs) {
|
|
var job = all_jobs[id];
|
|
|
|
var job_memory_max = job.memory_limit || this.server.config.get('job_memory_max');
|
|
var job_memory_sustain = job.memory_sustain || this.server.config.get('job_memory_sustain');
|
|
var job_cpu_max = job.cpu_limit || this.server.config.get('job_cpu_max');
|
|
var job_cpu_sustain = job.cpu_sustain || this.server.config.get('job_cpu_sustain');
|
|
var job_log_max_size = job.log_max_size || this.server.config.get('job_log_max_size');
|
|
|
|
// check for max run time
|
|
if (job.timeout && (now - job.time_start >= job.timeout)) {
|
|
this.logDebug(4, "Job has exceeded max run time and will be aborted: " + id + " (" + job.timeout + " sec)");
|
|
|
|
// JH 2021-01-23 Setting no_rewind to 1 for timeout aborts, as per GH #369
|
|
var nice_timeout = Tools.getTextFromSeconds( job.timeout, false, true );
|
|
this.abortJob({ id: id, reason: "Exceeded maximum run time ("+nice_timeout+")", no_rewind: 1 });
|
|
continue;
|
|
} // timed out
|
|
|
|
// monitor mem for threshold limits
|
|
if (job_memory_max && job.mem) {
|
|
var current = job.mem.current || 0;
|
|
if (current > job_memory_max) {
|
|
// job is currently exceeding memory limits
|
|
if (!flagged_jobs[id]) flagged_jobs[id] = {};
|
|
if (!flagged_jobs[id].mem) {
|
|
this.logDebug(6, "Job has exceeded memory usage limit: " + id, job.mem);
|
|
flagged_jobs[id].mem = now;
|
|
}
|
|
if ((now - flagged_jobs[id].mem) >= job_memory_sustain) {
|
|
// job has exceeded memory for too long -- abort it
|
|
var msg = "Exceeded memory limit of " + Tools.getTextFromBytes(job_memory_max);
|
|
if (job_memory_sustain) msg += " for over " + Tools.getTextFromSeconds(job_memory_sustain, false, true);
|
|
|
|
this.logDebug(4, "Job " + id + " is being aborted: " + msg);
|
|
this.abortJob({ id: id, reason: msg });
|
|
continue;
|
|
}
|
|
}
|
|
else {
|
|
// job mem is within limits - remove flag, if applicable
|
|
if (flagged_jobs[id] && flagged_jobs[id].mem) {
|
|
this.logDebug(6, "Job is now under the memory usage limit: " + id, job.mem);
|
|
delete flagged_jobs[id].mem;
|
|
}
|
|
if (!Tools.numKeys(flagged_jobs[id])) delete flagged_jobs[id];
|
|
}
|
|
} // mem check
|
|
|
|
// monitor cpu for threshold limits
|
|
if (job_cpu_max && job.cpu) {
|
|
var current = job.cpu.current || 0;
|
|
if (current > job_cpu_max) {
|
|
// job is currently exceeding cpu limits
|
|
if (!flagged_jobs[id]) flagged_jobs[id] = {};
|
|
if (!flagged_jobs[id].cpu) {
|
|
this.logDebug(6, "Job has exceeded CPU usage limit: " + id, job.cpu);
|
|
flagged_jobs[id].cpu = now;
|
|
}
|
|
if ((now - flagged_jobs[id].cpu) >= job_cpu_sustain) {
|
|
// job has exceeded cpu for too long -- abort it
|
|
var msg = "Exceeded CPU limit of " + job_cpu_max + "%";
|
|
if (job_cpu_sustain) msg += " for over " + Tools.getTextFromSeconds(job_cpu_sustain, false, true);
|
|
|
|
this.logDebug(4, "Job " + id + " is being aborted: " + msg);
|
|
this.abortJob({ id: id, reason: msg });
|
|
continue;
|
|
}
|
|
}
|
|
else {
|
|
// job cpu is within limits - remove flag, if applicable
|
|
if (flagged_jobs[id] && flagged_jobs[id].cpu) {
|
|
this.logDebug(6, "Job is now under the CPU usage limit: " + id, job.cpu);
|
|
delete flagged_jobs[id].cpu;
|
|
}
|
|
if (!Tools.numKeys(flagged_jobs[id])) delete flagged_jobs[id];
|
|
}
|
|
} // cpu check
|
|
|
|
// monitor job log file sizes
|
|
if (job_log_max_size && job.log_file_size && (job.log_file_size > job_log_max_size)) {
|
|
// job has exceeded log file size limit -- abort it
|
|
var msg = "Exceeded log file size limit of " + Tools.getTextFromBytes(job_log_max_size);
|
|
this.logDebug(4, "Job " + id + " is being aborted: " + msg);
|
|
this.abortJob({ id: id, reason: msg });
|
|
continue;
|
|
}
|
|
} // foreach job
|
|
|
|
// monitor jobs in limbo (i.e. caused by dead servers)
|
|
// jobs stuck in limbo for N seconds are auto-aborted
|
|
var dead_job_timeout = this.server.config.get('dead_job_timeout');
|
|
for (var id in this.deadJobs) {
|
|
var job = this.deadJobs[id];
|
|
if (now - job.time_dead >= dead_job_timeout) {
|
|
job.complete = 1;
|
|
job.code = 1;
|
|
job.description = "Aborted Job: Server '" + job.hostname + "' shut down unexpectedly.";
|
|
this.finishJob(job);
|
|
|
|
// Rewind cursor here too
|
|
this.rewindJob(job);
|
|
}
|
|
} // foreach dead job
|
|
},
|
|
|
|
monitorServerResources: function(callback) {
|
|
// monitor local CPU and memory for all active jobs (once per minute)
|
|
// shell exec to get running process cpu and memory usage
|
|
// this works on at least: OS X, Fedora, Ubuntu and CentOS
|
|
var self = this;
|
|
var now = Tools.timeNow();
|
|
|
|
var cmd = this.server.config.get('ps_monitor_cmd') || '/bin/ps -eo "ppid pid %cpu rss"';
|
|
var job_startup_grace = this.server.config.get('job_startup_grace') || 5;
|
|
|
|
this.logDebug(10, "Checking server resources: " + cmd);
|
|
|
|
var finish = function(err, stdout, stderr) {
|
|
if (err) {
|
|
self.logError('job', "Failed to exec ps: " + err);
|
|
if (callback) { callback(); callback = null; }
|
|
return;
|
|
}
|
|
var lines = stdout.split(/\n/);
|
|
var pids = {};
|
|
|
|
// process each line from ps response
|
|
for (var idx = 0, len = lines.length; idx < len; idx++) {
|
|
var line = lines[idx];
|
|
if (line.match(/(\d+)\s+(\d+)\s+([\d\.]+)\s+(\d+)/)) {
|
|
var ppid = parseInt( RegExp.$1 );
|
|
var pid = parseInt( RegExp.$2 );
|
|
var cpu = parseFloat( RegExp.$3 );
|
|
var mem = parseInt( RegExp.$4 ) * 1024; // k to bytes
|
|
pids[ pid ] = { ppid: ppid, cpu: cpu, mem: mem };
|
|
} // good line
|
|
} // foreach line
|
|
|
|
self.logDebug(10, "Raw process data:", pids);
|
|
|
|
// match up pids with jobs
|
|
for (var id in self.activeJobs) {
|
|
var job = self.activeJobs[id];
|
|
|
|
// only match jobs that have been running for more than N seconds
|
|
// this way we don't record cpu/mem for a process that is just starting up
|
|
if (pids[ job.pid ] && (now - job.time_start >= job_startup_grace)) {
|
|
var info = pids[ job.pid ];
|
|
var cpu = info.cpu;
|
|
var mem = info.mem;
|
|
|
|
// also consider children of the child (up to 100 generations deep)
|
|
var levels = 0;
|
|
var family = {};
|
|
family[ job.pid ] = 1;
|
|
|
|
while (Tools.numKeys(family) && (++levels <= 100)) {
|
|
for (var fpid in family) {
|
|
for (var cpid in pids) {
|
|
if (pids[cpid].ppid == fpid) {
|
|
family[cpid] = 1;
|
|
cpu += pids[cpid].cpu;
|
|
mem += pids[cpid].mem;
|
|
} // matched
|
|
} // cpid loop
|
|
delete family[fpid];
|
|
} // fpid loop
|
|
} // while
|
|
|
|
if (job.cpu) {
|
|
if (cpu < job.cpu.min) job.cpu.min = cpu;
|
|
if (cpu > job.cpu.max) job.cpu.max = cpu;
|
|
job.cpu.total += cpu;
|
|
job.cpu.count++;
|
|
job.cpu.current = cpu;
|
|
}
|
|
else {
|
|
job.cpu = { min: cpu, max: cpu, total: cpu, count: 1, current: cpu };
|
|
}
|
|
|
|
if (job.mem) {
|
|
if (mem < job.mem.min) job.mem.min = mem;
|
|
if (mem > job.mem.max) job.mem.max = mem;
|
|
job.mem.total += mem;
|
|
job.mem.count++;
|
|
job.mem.current = mem;
|
|
}
|
|
else {
|
|
job.mem = { min: mem, max: mem, total: mem, count: 1, current: mem };
|
|
}
|
|
|
|
if (self.debugLevel(10)) {
|
|
self.logDebug(10, "Active Job: " + job.pid + ": CPU: " + cpu + "%, Mem: " + Tools.getTextFromBytes(mem));
|
|
}
|
|
} // matched job with pid
|
|
} // foreach job
|
|
|
|
// grab stats for daemon pid as well
|
|
// store in multi.data to be shared with cluster
|
|
if (pids[ process.pid ]) {
|
|
var info = pids[ process.pid ];
|
|
self.multi.data.cpu = info.cpu;
|
|
self.multi.data.mem = info.mem;
|
|
}
|
|
|
|
// monitor all active job log sizes
|
|
async.eachOfSeries( self.activeJobs,
|
|
function(job, id, callback) {
|
|
if (job && job.log_file) {
|
|
fs.stat( job.log_file, function(err, stats) {
|
|
if (stats && stats.size) job.log_file_size = stats.size;
|
|
callback();
|
|
} );
|
|
}
|
|
else callback();
|
|
},
|
|
function() {
|
|
if (callback) { callback(); callback = null; }
|
|
}
|
|
); // eachOfSeries
|
|
}; // finish
|
|
|
|
var child = null;
|
|
try {
|
|
child = cp.exec( cmd, { timeout: 5 * 1000 }, finish );
|
|
}
|
|
catch(err) {
|
|
self.logError('job', "Failed to exec ps: " + err);
|
|
if (callback) { callback(); callback = null; }
|
|
}
|
|
if (child && child.pid && child.on) child.on('error', function (err) {
|
|
self.logError('job', "Failed to exec ps: " + err);
|
|
if (callback) { callback(); callback = null; }
|
|
});
|
|
},
|
|
|
|
watchJobLog: function(args, socket) {
|
|
// websocket request to watch live job log
|
|
var self = this;
|
|
var ip = socket.request.connection.remoteAddress || 'Unknown';
|
|
|
|
// allow active or pending jobs (retry delay)
|
|
var job = this.activeJobs[args.id];
|
|
if (!job && this.internalQueue) {
|
|
for (var key in this.internalQueue) {
|
|
var task = this.internalQueue[key];
|
|
if ((task.action = 'launchLocalJob') && (task.id == args.id)) {
|
|
job = task;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!job) {
|
|
// logging this as a debug (non-error) because it can happen naturally
|
|
// if #JobDetails page is loaded just as job is completing
|
|
self.logDebug(2, "watchJobLog: Could not locate active job: " + args.id + ", canceling watch");
|
|
return;
|
|
}
|
|
if (!args.token) {
|
|
self.logError('watchJobLog', "Missing authentication token");
|
|
return;
|
|
}
|
|
|
|
// prepare to log watch
|
|
var log_file = job.log_file;
|
|
var log_fd = null;
|
|
var log_stats = null;
|
|
var log_chunk_size = 32678;
|
|
var log_buffer = Buffer.alloc(log_chunk_size);
|
|
var log_pos = 0;
|
|
|
|
self.logDebug(5, "Socket client " + socket.id + " (IP: " + ip + ") now watching job log file: " + log_file);
|
|
|
|
// open log file and locate ideal position to start from
|
|
// (~32K from end, aligned to line boundary)
|
|
async.series([
|
|
function(callback) {
|
|
// validate auth token
|
|
var correct_token = Tools.digestHex(args.id + self.server.config.get('secret_key'));
|
|
if (args.token != correct_token) {
|
|
var err = new Error("Invalid authentication token (mismatched secret keys between servers?)");
|
|
self.logError('watchJobLog', "Socket client " + socket.id + " failed to authenticate (IP: "+ip+")");
|
|
return callback(err);
|
|
}
|
|
|
|
self.logDebug(4, "watchJobLog: Socket client " + socket.id + " has authenticated via user session (IP: "+ip+")");
|
|
socket._pixl_auth = true;
|
|
callback();
|
|
},
|
|
function(callback) {
|
|
fs.open(log_file, 'r', function(err, fd) {
|
|
log_fd = fd;
|
|
callback(err);
|
|
} );
|
|
},
|
|
function(callback) {
|
|
fs.fstat(log_fd, function(err, stats) {
|
|
log_stats = stats;
|
|
callback(err);
|
|
} );
|
|
},
|
|
function(callback) {
|
|
log_pos = Math.max(0, log_stats.size - log_chunk_size);
|
|
fs.read(log_fd, log_buffer, 0, log_chunk_size, log_pos, function(err, bytesRead, buffer) {
|
|
if (err) return callback(err);
|
|
|
|
if (bytesRead > 0) {
|
|
var slice = buffer.slice( 0, bytesRead );
|
|
var text = slice.toString();
|
|
var lines = text.split(/\n/);
|
|
if (bytesRead == log_chunk_size) {
|
|
// remove first line, as it is likely partial
|
|
var line = lines.shift();
|
|
log_pos += line.length + 1;
|
|
}
|
|
}
|
|
|
|
callback();
|
|
} );
|
|
}
|
|
],
|
|
function(err) {
|
|
if (err) {
|
|
self.logError('socket', "Could not watch job log file: " + log_file + ": " + err);
|
|
return;
|
|
}
|
|
|
|
socket._pixl_log_watcher = setInterval( function() {
|
|
// monitor log size
|
|
if (socket._pixl_disconnected) {
|
|
fs.close(log_fd, function() {});
|
|
clearTimeout( socket._pixl_log_watcher );
|
|
return;
|
|
}
|
|
|
|
fs.fstat(log_fd, function(err, stats) {
|
|
if (stats && (stats.size > log_pos)) {
|
|
// log grew, read new chunk
|
|
fs.read(log_fd, log_buffer, 0, log_chunk_size, log_pos, function(err, bytesRead, buffer) {
|
|
if (err) {
|
|
self.logError('socket', "Could not read job log file: " + log_file + ": " + err);
|
|
fs.close(log_fd, function() {});
|
|
clearTimeout( socket._pixl_log_watcher );
|
|
return;
|
|
}
|
|
|
|
if (bytesRead > 0) {
|
|
var slice = buffer.slice( 0, bytesRead );
|
|
var text = slice.toString();
|
|
var lines = text.split(/\n/);
|
|
log_pos += text.length;
|
|
|
|
if (!text.match(/\n$/)) {
|
|
// last line is partial, must compensate
|
|
var line = lines.pop();
|
|
log_pos -= line.length;
|
|
|
|
// tricky situation: single log line longer than 32K
|
|
// in this case we gotta split it up
|
|
if (!lines.length && (bytesRead == log_chunk_size)) {
|
|
lines.push( line );
|
|
log_pos += line.length;
|
|
}
|
|
}
|
|
|
|
// emit lines to client
|
|
if (lines.length && !socket._pixl_disconnected) {
|
|
socket.emit('log_data', lines);
|
|
}
|
|
} // bytesRead
|
|
} ); // fs.read
|
|
} // log grew
|
|
} ); // fs.fstat
|
|
}, 250 ); // setInterval
|
|
} ); // async.series
|
|
}
|
|
|
|
});
|