Added support for multiple workers during content upgrade.

semantics-font
Frode Petterson 2015-02-26 16:16:20 +01:00
parent fabde2f1ce
commit 3463a3c671
4 changed files with 599 additions and 307 deletions

View File

@ -0,0 +1,275 @@
/*jshint -W083 */
var H5PUpgrades = H5PUpgrades || {};
H5P.ContentUpgradeProcess = (function (Version) {
/**
* @class
* @namespace H5P
*/
function ContentUpgradeProcess(name, oldVersion, newVersion, params, id, loadLibrary, done) {
var self = this;
// Make params possible to work with
try {
params = JSON.parse(params);
if (!(params instanceof Object)) {
throw true;
}
}
catch (event) {
return done({
type: 'errorParamsBroken',
id: id
});
}
self.loadLibrary = loadLibrary;
self.upgrade(name, oldVersion, newVersion, params, function (err, result) {
if (err) {
return done(err);
}
done(null, JSON.stringify(params));
});
}
/**
*
*/
ContentUpgradeProcess.prototype.upgrade = function (name, oldVersion, newVersion, params, done) {
var self = this;
// Load library details and upgrade routines
self.loadLibrary(name, newVersion, function (err, library) {
if (err) {
return done(err);
}
// Run upgrade routines on params
self.processParams(library, oldVersion, newVersion, params, function (err, params) {
if (err) {
return done(err);
}
// Check if any of the sub-libraries need upgrading
asyncSerial(library.semantics, function (index, field, next) {
self.processField(field, params[field.name], function (err, upgradedParams) {
if (upgradedParams) {
params[field.name] = upgradedParams;
}
next(err);
});
}, function (err) {
done(err, params);
});
});
});
};
/**
* Run upgrade hooks on params.
*
* @public
* @param {Object} library
* @param {Version} oldVersion
* @param {Version} newVersion
* @param {Object} params
* @param {Function} next
*/
ContentUpgradeProcess.prototype.processParams = function (library, oldVersion, newVersion, params, next) {
if (H5PUpgrades[library.name] === undefined) {
if (library.upgradesScript) {
// Upgrades script should be loaded so the upgrades should be here.
return next({
type: 'scriptMissing',
library: library.name + ' ' + newVersion
});
}
// No upgrades script. Move on
return next(null, params);
}
// Run upgrade hooks. Start by going through major versions
asyncSerial(H5PUpgrades[library.name], function (major, minors, nextMajor) {
if (major < oldVersion.major || major > newVersion.major) {
// Older than the current version or newer than the selected
nextMajor();
}
else {
// Go through the minor versions for this major version
asyncSerial(minors, function (minor, upgrade, nextMinor) {
if (minor <= oldVersion.minor || minor > newVersion.minor) {
// Older than or equal to the current version or newer than the selected
nextMinor();
}
else {
// We found an upgrade hook, run it
var unnecessaryWrapper = (upgrade.contentUpgrade !== undefined ? upgrade.contentUpgrade : upgrade);
try {
unnecessaryWrapper(params, function (err, upgradedParams) {
params = upgradedParams;
nextMinor(err);
});
}
catch (err) {
next(err);
}
}
}, nextMajor);
}
}, function (err) {
next(err, params);
});
};
/**
* Process parameter fields to find and upgrade sub-libraries.
*
* @public
* @param {Object} field
* @param {Object} params
* @param {Function} done
*/
ContentUpgradeProcess.prototype.processField = function (field, params, done) {
var self = this;
if (params === undefined) {
return done();
}
switch (field.type) {
case 'library':
if (params.library === undefined || params.params === undefined) {
return done();
}
// Look for available upgrades
var usedLib = params.library.split(' ', 2);
for (var i = 0; i < field.options.length; i++) {
var availableLib = field.options[i].split(' ', 2);
if (availableLib[0] === usedLib[0]) {
if (availableLib[1] === usedLib[1]) {
return done(); // Same version
}
// We have different versions
var usedVer = new Version(usedLib[1]);
var availableVer = new Version(availableLib[1]);
if (usedVer.major > availableVer.major || (usedVer.major === availableVer.major && usedVer.minor >= availableVer.minor)) {
return done(); // Larger or same version that's available
}
// A newer version is available, upgrade params
return self.upgrade(availableLib[0], usedVer, availableVer, params.params, function (err, upgraded) {
if (!err) {
params.library = availableLib[0] + ' ' + availableVer.major + '.' + availableVer.minor;
params.params = upgraded;
}
done(err, params);
});
}
}
done();
break;
case 'group':
if (field.fields.length === 1) {
// Single field to process, wrapper will be skipped
self.processField(field.fields[0], params, function (err, upgradedParams) {
if (upgradedParams) {
params = upgradedParams;
}
done(err, params);
});
}
else {
// Go through all fields in the group
asyncSerial(field.fields, function (index, subField, next) {
self.processField(subField, params[subField.name], function (err, upgradedParams) {
if (upgradedParams) {
params[subField.name] = upgradedParams;
}
next(err);
});
}, function (err) {
done(err, params);
});
}
break;
case 'list':
// Go trough all params in the list
asyncSerial(params, function (index, subParams, next) {
self.processField(field.field, subParams, function (err, upgradedParams) {
if (upgradedParams) {
params[index] = upgradedParams;
}
next(err);
});
}, function (err) {
done(err, params);
});
break;
default:
done();
}
};
/**
* Helps process each property on the given object asynchronously in serial order.
*
* @private
* @param {Object} obj
* @param {Function} process
* @param {Function} finished
*/
var asyncSerial = function (obj, process, finished) {
var id, isArray = obj instanceof Array;
// Keep track of each property that belongs to this object.
if (!isArray) {
var ids = [];
for (id in obj) {
if (obj.hasOwnProperty(id)) {
ids.push(id);
}
}
}
var i = -1; // Keeps track of the current property
/**
* Private. Process the next property
*/
var next = function () {
id = isArray ? i : ids[i];
process(id, obj[id], check);
};
/**
* Private. Check if we're done or have an error.
*
* @param {String} err
*/
var check = function (err) {
// We need to use a real async function in order for the stack to clear.
setTimeout(function () {
i++;
if (i === (isArray ? obj.length : ids.length) || (err !== undefined && err !== null)) {
finished(err);
}
else {
next();
}
}, 0);
};
check(); // Start
};
return ContentUpgradeProcess;
})(H5P.Version);

View File

@ -0,0 +1,76 @@
var H5P = H5P || {};
importScripts('/wp-content/plugins/h5p/h5p-php-library/js/h5p-version.js');
importScripts('/wp-content/plugins/h5p/h5p-php-library/js/h5p-content-upgrade-process.js');
var libraryLoadedCallback;
/**
* Register message handlers
*/
var messageHandlers = {
newJob: function (job) {
// Start new job
new H5P.ContentUpgradeProcess(job.name, new H5P.Version(job.oldVersion), new H5P.Version(job.newVersion), job.params, job.id, function loadLibrary(name, version, next) {
// TODO: Cache?
postMessage({
action: 'loadLibrary',
name: name,
version: version.toString()
});
libraryLoadedCallback = next;
}, function done(err, result) {
if (err) {
// Return error
postMessage({
action: 'error',
id: job.id,
err: err
});
return;
}
// Return upgraded content
postMessage({
action: 'done',
id: job.id,
params: result
});
});
},
libraryLoaded: function (data) {
var library = data.library;
if (library.upgradesScript) {
try {
importScripts(library.upgradesScript);
}
catch (err) {
libraryLoadedCallback(err);
return;
}
}
libraryLoadedCallback(null, data.library);
}
};
/**
* Handle messages from our master
*/
onmessage = function (event) {
if (event.data.action !== undefined && messageHandlers[event.data.action]) {
messageHandlers[event.data.action].call(this, event.data);
}
};
// if (library.upgradesScript) {
// self.loadScript(library.upgradesScript, function (err) {
// if (err) {
// err = info.errorScript.replace('%lib', name + ' ' + version);
// }
// next(err, library);
// });
// }
// else {
// next(null, library);
// }

View File

@ -1,8 +1,7 @@
/*jshint -W083 */
var H5PUpgrades = H5PUpgrades || {};
(function ($) {
var info, $container, librariesCache = {};
(function ($, Version) {
var info, $container, librariesCache = {}, scriptsCache = {};
// Initialize
$(document).ready(function () {
@ -43,87 +42,6 @@ var H5PUpgrades = H5PUpgrades || {};
}
};
/**
* Private. Helps process each property on the given object asynchronously in serial order.
*
* @param {Object} obj
* @param {Function} process
* @param {Function} finished
*/
var asyncSerial = function (obj, process, finished) {
var id, isArray = obj instanceof Array;
// Keep track of each property that belongs to this object.
if (!isArray) {
var ids = [];
for (id in obj) {
if (obj.hasOwnProperty(id)) {
ids.push(id);
}
}
}
var i = -1; // Keeps track of the current property
/**
* Private. Process the next property
*/
var next = function () {
id = isArray ? i : ids[i];
process(id, obj[id], check);
};
/**
* Private. Check if we're done or have an error.
*
* @param {String} err
*/
var check = function (err) {
// We need to use a real async function in order for the stack to clear.
setTimeout(function () {
i++;
if (i === (isArray ? obj.length : ids.length) || (err !== undefined && err !== null)) {
finished(err);
}
else {
next();
}
}, 0);
};
check(); // Start
};
/**
* Make it easy to keep track of version details.
*
* @param {String} version
* @param {Number} libraryId
* @returns {_L1.Version}
*/
function Version(version, libraryId) {
if (libraryId !== undefined) {
version = info.versions[libraryId];
// Public
this.libraryId = libraryId;
}
var versionSplit = version.split('.', 3);
// Public
this.major = versionSplit[0];
this.minor = versionSplit[1];
/**
* Public. Custom string for this object.
*
* @returns {String}
*/
this.toString = function () {
return version;
};
}
/**
* Displays a throbber in the status field.
*
@ -154,17 +72,83 @@ var H5PUpgrades = H5PUpgrades || {};
var self = this;
// Get selected version
self.version = new Version(null, libraryId);
self.version = new Version(info.versions[libraryId]);
self.version.libraryId = libraryId;
// Create throbber with loading text and progress
self.throbber = new Throbber(info.inProgress.replace('%ver', self.version));
self.started = new Date().getTime();
self.io = 0;
// Track number of working
self.working = 0;
var start = function () {
// Get the next batch
self.nextBatch({
libraryId: libraryId,
token: info.token
});
};
if (window.Worker !== undefined) {
// Prepare our workers
self.initWorkers();
start();
}
else {
// No workers, do the job our self
self.loadScript('/wp-content/plugins/h5p/h5p-php-library/js/h5p-content-upgrade-process.js', start);
}
}
/**
* Initialize workers
*/
ContentUpgrade.prototype.initWorkers = function () {
var self = this;
// Determine number of workers (defaults to 4)
var numWorkers = (window.navigator !== undefined && window.navigator.hardwareConcurrency ? window.navigator.hardwareConcurrency : 4);
self.workers = new Array(numWorkers);
// Register message handlers
var messageHandlers = {
done: function (result) {
self.workDone(result.id, result.params, this);
},
error: function (error) {
self.printError(error.err);
// Stop everything
self.terminate();
},
loadLibrary: function (details) {
var worker = this;
self.loadLibrary(details.name, new Version(details.version), function (err, library) {
if (err) {
// Reset worker?
return;
}
worker.postMessage({
action: 'libraryLoaded',
library: library
});
});
}
};
for (var i = 0; i < numWorkers; i++) {
self.workers[i] = new Worker('/wp-content/plugins/h5p/h5p-php-library/js/h5p-content-upgrade-worker.js');
self.workers[i].onmessage = function (event) {
if (event.data.action !== undefined && messageHandlers[event.data.action]) {
messageHandlers[event.data.action].call(this, event.data);
}
};
}
};
/**
* Get the next batch and start processing it.
@ -174,12 +158,21 @@ var H5PUpgrades = H5PUpgrades || {};
ContentUpgrade.prototype.nextBatch = function (outData) {
var self = this;
var start = new Date().getTime();
$.post(info.infoUrl, outData, function (inData) {
self.io += new Date().getTime() - start;
if (!(inData instanceof Object)) {
// Print errors from backend
return self.setStatus(inData);
}
if (inData.left === 0) {
var total = new Date().getTime() - self.started;
console.log('Upgrade took ' + total + 'ms');
console.log((self.io/(total/100)) + ' % of the time went to IO (' + self.io + 'ms)');
// Terminate workers
self.terminate();
// Nothing left to process
return self.setStatus(info.done);
}
@ -208,116 +201,62 @@ var H5PUpgrades = H5PUpgrades || {};
*/
ContentUpgrade.prototype.processBatch = function (parameters) {
var self = this;
var upgraded = {}; // Track upgraded params
var current = 0; // Track progress
asyncSerial(parameters, function (id, params, next) {
// Track upgraded params
self.upgraded = {};
try {
// Make params possible to work with
params = JSON.parse(params);
if (!(params instanceof Object)) {
throw true;
// Track current batch
self.parameters = parameters;
// Create id mapping
self.ids = [];
for (var id in parameters) {
if (parameters.hasOwnProperty(id)) {
self.ids.push(id);
}
}
catch (event) {
return next(info.errorContent.replace('%id', id) + ' ' + info.errorParamsBroken);
// Keep track of current content
self.current = -1;
if (self.workers !== undefined) {
// Assign each worker content to upgrade
for (var i = 0; i < self.workers.length; i++) {
self.assignWork(self.workers[i]);
}
// Upgrade this content.
self.upgrade(info.library.name, new Version(info.library.version), self.version, params, function (err, params) {
if (err) {
return next(info.errorContent.replace('%id', id) + ' ' + err);
}
else {
upgraded[id] = JSON.stringify(params);
current++;
self.throbber.setProgress(Math.round((info.total - self.left + current) / (info.total / 100)) + ' %');
next();
});
}, function (err) {
// Finished with all parameters that came in
if (err) {
return self.setStatus('<p>' + info.error + '<br/>' + err + '</p>');
self.assignWork();
}
// Save upgraded content and get next round of data to process
self.nextBatch({
libraryId: self.version.libraryId,
token: self.token,
params: JSON.stringify(upgraded)
});
});
};
/**
* Upgade the given content.
*
* @param {String} name
* @param {Version} oldVersion
* @param {Version} newVersion
* @param {Object} params
* @param {Function} next
* @returns {undefined}
*/
ContentUpgrade.prototype.upgrade = function (name, oldVersion, newVersion, params, next) {
ContentUpgrade.prototype.assignWork = function (worker) {
var self = this;
// Load library details and upgrade routines
self.loadLibrary(name, newVersion, function (err, library) {
if (err) {
return next(err);
var id = self.ids[self.current + 1];
if (id === undefined) {
return false; // Out of work
}
self.current++;
self.working++;
// Run upgrade routines on params
self.processParams(library, oldVersion, newVersion, params, function (err, params) {
if (err) {
return next(err);
if (worker) {
worker.postMessage({
action: 'newJob',
id: id,
name: info.library.name,
oldVersion: info.library.version,
newVersion: self.version.toString(),
params: self.parameters[id]
});
}
// Check if any of the sub-libraries need upgrading
asyncSerial(library.semantics, function (index, field, next) {
self.processField(field, params[field.name], function (err, upgradedParams) {
if (upgradedParams) {
params[field.name] = upgradedParams;
}
next(err);
});
}, function (err) {
next(err, params);
});
});
});
};
/**
* Load library data needed for content upgrade.
*
* @param {String} name
* @param {Version} version
* @param {Function} next
*/
ContentUpgrade.prototype.loadLibrary = function (name, version, next) {
var self = this;
var key = name + '/' + version.major + '/' + version.minor;
if (librariesCache[key] !== undefined) {
// Library has been loaded before. Return cache.
next(null, librariesCache[key]);
return;
}
$.ajax({
dataType: 'json',
cache: true,
url: info.libraryBaseUrl + '/' + key
}).fail(function () {
next(info.errorData.replace('%lib', name + ' ' + version));
}).done(function (library) {
librariesCache[key] = library;
else {
new H5P.ContentUpgradeProcess(info.library.name, new Version(info.library.version), self.version, self.parameters[id], id, function loadLibrary(name, version, next) {
self.loadLibrary(name, version, function (err, library) {
if (library.upgradesScript) {
self.loadScript(library.upgradesScript, function (err) {
if (err) {
@ -330,6 +269,105 @@ var H5PUpgrades = H5PUpgrades || {};
next(null, library);
}
});
}, function done(err, result) {
if (err) {
self.printError(err);
return ;
}
self.workDone(id, result);
});
}
};
/**
*
*/
ContentUpgrade.prototype.workDone = function (id, result, worker) {
var self = this;
self.working--;
self.upgraded[id] = result;
// Update progress message
self.throbber.setProgress(Math.round((info.total - self.left + self.current) / (info.total / 100)) + ' %');
// Assign next job
if (self.assignWork(worker) === false && self.working === 0) {
// All workers have finsihed.
self.nextBatch({
libraryId: self.version.libraryId,
token: self.token,
params: JSON.stringify(self.upgraded)
});
}
};
/**
*
*/
ContentUpgrade.prototype.terminate = function () {
var self = this;
if (self.workers) {
// Stop all workers
for (var i = 0; i < self.workers.length; i++) {
self.workers[i].terminate();
}
}
};
var librariesLoadedCallbacks = {};
/**
* Load library data needed for content upgrade.
*
* @param {String} name
* @param {Version} version
* @param {Function} next
*/
ContentUpgrade.prototype.loadLibrary = function (name, version, next) {
var self = this;
var key = name + '/' + version.major + '/' + version.minor;
if (librariesCache[key] === true) {
// Library is being loaded, que callback
if (librariesLoadedCallbacks[key] === undefined) {
librariesLoadedCallbacks[key] = [next];
return;
}
librariesLoadedCallbacks[key].push(next);
return;
}
else if (librariesCache[key] !== undefined) {
// Library has been loaded before. Return cache.
next(null, librariesCache[key]);
return;
}
var start = new Date().getTime();
librariesCache[key] = true;
$.ajax({
dataType: 'json',
cache: true,
url: info.libraryBaseUrl + '/' + key
}).fail(function () {
self.io += new Date().getTime() - start;
next(info.errorData.replace('%lib', name + ' ' + version));
}).done(function (library) {
self.io += new Date().getTime() - start;
librariesCache[key] = library;
next(null, library);
if (librariesLoadedCallbacks[key] !== undefined) {
for (var i = 0; i < librariesLoadedCallbacks[key].length; i++) {
librariesLoadedCallbacks[key][i](null, library);
}
}
delete librariesLoadedCallbacks[key];
});
};
/**
@ -339,162 +377,38 @@ var H5PUpgrades = H5PUpgrades || {};
* @param {Function} next
*/
ContentUpgrade.prototype.loadScript = function (url, next) {
if (scriptsCache[url] !== undefined) {
next();
return;
}
var start = new Date().getTime();
$.ajax({
dataType: 'script',
cache: true,
url: url
}).fail(function () {
self.io += new Date().getTime() - start;
next(true);
}).done(function () {
scriptsCache[url] = true;
self.io += new Date().getTime() - start;
next();
});
};
/**
* Run upgrade hooks on params.
*
* @param {Object} library
* @param {Version} oldVersion
* @param {Version} newVersion
* @param {Object} params
* @param {Function} next
*/
ContentUpgrade.prototype.processParams = function (library, oldVersion, newVersion, params, next) {
if (H5PUpgrades[library.name] === undefined) {
if (library.upgradesScript) {
// Upgrades script should be loaded so the upgrades should be here.
return next(info.errorScript.replace('%lib', library.name + ' ' + newVersion));
ContentUpgrade.prototype.printError = function (error) {
if (error.type === 'errorParamsBroken') {
error = info.errorContent.replace('%id', error.id) + ' ' + info.errorParamsBroken; // TODO: Translate!
}
else if (error.type === 'scriptMissing') {
error.err = info.errorScript.replace('%lib', error.library);
}
// No upgrades script. Move on
return next(null, params);
}
// Run upgrade hooks. Start by going through major versions
asyncSerial(H5PUpgrades[library.name], function (major, minors, nextMajor) {
if (major < oldVersion.major || major > newVersion.major) {
// Older than the current version or newer than the selected
nextMajor();
}
else {
// Go through the minor versions for this major version
asyncSerial(minors, function (minor, upgrade, nextMinor) {
if (minor <= oldVersion.minor || minor > newVersion.minor) {
// Older than or equal to the current version or newer than the selected
nextMinor();
}
else {
// We found an upgrade hook, run it
var unnecessaryWrapper = (upgrade.contentUpgrade !== undefined ? upgrade.contentUpgrade : upgrade);
try {
unnecessaryWrapper(params, function (err, upgradedParams) {
params = upgradedParams;
nextMinor(err);
});
}
catch (err) {
next(err);
}
}
}, nextMajor);
}
}, function (err) {
next(err, params);
});
self.setStatus('<p>' + info.error + '<br/>' + error.err + '</p>');
};
/**
* Process parameter fields to find and upgrade sub-libraries.
*
* @param {Object} field
* @param {Object} params
* @param {Function} next
*/
ContentUpgrade.prototype.processField = function (field, params, next) {
var self = this;
if (params === undefined) {
return next();
}
switch (field.type) {
case 'library':
if (params.library === undefined || params.params === undefined) {
return next();
}
// Look for available upgrades
var usedLib = params.library.split(' ', 2);
for (var i = 0; i < field.options.length; i++) {
var availableLib = field.options[i].split(' ', 2);
if (availableLib[0] === usedLib[0]) {
if (availableLib[1] === usedLib[1]) {
return next(); // Same version
}
// We have different versions
var usedVer = new Version(usedLib[1]);
var availableVer = new Version(availableLib[1]);
if (usedVer.major > availableVer.major || (usedVer.major === availableVer.major && usedVer.minor >= availableVer.minor)) {
return next(); // Larger or same version that's available
}
// A newer version is available, upgrade params
return self.upgrade(availableLib[0], usedVer, availableVer, params.params, function (err, upgraded) {
if (!err) {
params.library = availableLib[0] + ' ' + availableVer.major + '.' + availableVer.minor;
params.params = upgraded;
}
next(err, params);
});
}
}
next();
break;
case 'group':
if (field.fields.length === 1) {
// Single field to process, wrapper will be skipped
self.processField(field.fields[0], params, function (err, upgradedParams) {
if (upgradedParams) {
params = upgradedParams;
}
next(err, params);
});
}
else {
// Go through all fields in the group
asyncSerial(field.fields, function (index, subField, next) {
self.processField(subField, params[subField.name], function (err, upgradedParams) {
if (upgradedParams) {
params[subField.name] = upgradedParams;
}
next(err);
});
}, function (err) {
next(err, params);
});
}
break;
case 'list':
// Go trough all params in the list
asyncSerial(params, function (index, subParams, next) {
self.processField(field.field, subParams, function (err, upgradedParams) {
if (upgradedParams) {
params[index] = upgradedParams;
}
next(err);
});
}, function (err) {
next(err, params);
});
break;
default:
next();
}
};
})(H5P.jQuery);
})(H5P.jQuery, H5P.Version);

27
js/h5p-version.js Normal file
View File

@ -0,0 +1,27 @@
H5P.Version = (function () {
/**
* Make it easy to keep track of version details.
*
* @class
* @namespace H5P
* @param {String} version
*/
function Version(version) {
var versionSplit = version.split('.', 3);
// Public
this.major = versionSplit[0];
this.minor = versionSplit[1];
/**
* Public. Custom string for this object.
*
* @returns {String}
*/
this.toString = function () {
return version;
};
}
return Version;
})();