Sysquake Pro – Table of Contents
Sysquake for LaTeX – Table of Contents
Parallel execution
Parallel execution extends LME to execute multiple tasks concurrently. It takes advantage of computers with mutiple cores per microprocessor, and/or multiple microprocessors.
Parallel execution relies on the following object classes:
- task
- Tasks are the basic unit of work. They consist in an LME function call with input and output arguments.
- job
- Jobs are groups of tasks seen as a single unit. A job is submitted as a whole, its tasks are executed independently and the outputs are collected and made available once all of them have finished running.
- cluster
- Clusters are groups of workers. Each job is associated with a single cluster.
- Worker
- Workers are the software and hardware resource units required to execute tasks. A worker contains a complete LME environment. If a job contains more tasks than workers, workers execute multiple tasks one after the other.
- Currently, workers are OS-level threads which the OS schedules on the available microprocessor cores. For tests, serial execution can be selected with parcluster('serial').
- Workers are internal objects not associated with an LME class.
Examples
Task defined as an anonymous function
The first example can be run from the command window of Sysquake. Function fun estimates pi with Monte Carlo integration, by counting the ratio of m uniformly-distributed random points in the unit square whose distance to the origin is smaller than 1. A job with n independent tasks is run, and we wait until it is completed; at the end, its state will be either 'finished' if all tasks are successful or 'failed' if there has been an error. The results of all tasks are collected in rl, converted from a list of lists to a double array r, and their mean and the expected standard deviation of their mean is computed. Note that the pseudo-random number generator of each worker is automatically initialized with a different seed; therefore the pseudo-random numbers used by each task are independent.
cluster = parcluster('local') job = createJob(cluster) m = 1e5; n = 20; fun = @(m) nnz(rand(m,1).^2+rand(m,1).^2<1)*4/m; for i = 1:n createTask(job, fun, 1, {m}); end submit(job); wait(job, 'finished'); jobState = job.State rl = fetchOutputs(job); delete(job); r = list2num([rl{:}]); piApproxMean = mean(r) piApproxStd = std(r) / sqrt(length(r))
User functions
Workers run in separate LME instances. To give tasks access to user-defined functions, two mechanisms are provided:
- Automatic library import
- By default, worker LME instances are initialized with the same libraries which have been explicitly imported with use in the context which calls submit, as well as the library containing the function calling submit itself. This means that you can use your functions transparently.
- In the next example, the task function is passed as a function reference, not as an anonymous function as in the first example above. It produces four output arguments, the number of points in unit disks centered around the four unit square corners.
- This function can be stored in a library or in the function block of an SQ file (not copied/pasted as a whole directly in the command window of Sysquake). The following statements are very similar to the first example. They define the task as a function reference instead of an anonymous function, and use the arithmetic mean of the four outputs (rl is a list whose elements contain the output arguments of each task, as a list of four values).
- To prevent workers from being initialized with the libraries of the calling LME context, the job property AutoUseLib should be set to false.
- Worker startup commands
- Arbitrary commands can be run by each worker when a job is submitted. Libraries can be specified there, as well as other settings such as format, global variables etc.
- The following example shows how the format of floating-point numbers is changed.
function (r00, r01, r10, r11) = task(m) x = rand(m, 1); y = rand(m, 1); r00 = nnz(x.^2 + y.^2 < 1) * 4 / m; r01 = nnz((1 - x).^2 + y.^2 < 1) * 4 / m; r10 = nnz(x.^2 + (1 - y).^2 < 1) * 4 / m; r11 = nnz((1 - x).^2 + (1 - y).^2 < 1) * 4 / m;
cluster = parcluster('local'); job = createJob(cluster); m = 1e5; n = 20; for i = 1:n createTask(job, @task, 4, {m}); end submit(job); wait(job, 'finished'); jobState = job.State rl = fetchOutputs(job); delete(job); r = list2num(map(@mean, [rl{:}])); piApprox = mean(r) piApproxStd = std(r) / sqrt(length(r))
cluster = parcluster('local'); job = createJob(cluster); job.Startup = 'format long'; task = createTask(job, @() disp(pi), 0, {}); task.CaptureDiary = true; submit(job); wait(job, 'finished'); taskDiary = task.Diary
Diary
By default, text output to stdout and stderr produced by tasks is discarded. To get it, the CaptureDiary property of task objects should be set to true; once the task is completed, either in 'finished' or 'failed' state, the task Diary property contains all the output to stdout and stderr (file descriptor 1 or 2) as a string.
cluster = parcluster('local'); job = createJob(cluster); for i = 1:3 task = createTask(job, @() disp(rand), 0, {}); task.CaptureDiary = true; end submit(job); wait(job, 'finished'); map(@(task) disp(task.Diary), job.Tasks)
Sysquake background processing
In Sysquake, parallel execution can be used to carry out heavy computations without adversary effect on the responsiveness of the user interface. The following SQ file shows how the idle handler is used to supervise parallel jobs, using and updating SQ variables shared with a figure. The parallel job computes an approximation of pi with a task function passed by reference. The idle handler submits it continuously with the number of tasks n specified with a slider in the figure.
variable n = 20 variable piEst = 0 variable piStd = 0 variable job = null idle (piEst, piStd, job) = idle(piEst, piStd, job, n) figure "Parallel Test" draw drawFig(n, piEst, piStd) mousedrag 1 n = round(_x1) functions {@ function drawFig(n, piEst, piStd) settabs('Nb tasks 9999\t'); slider(sprintf('Nb tasks %d', n), n, [1, 500], 'l', id=1); settabs('Estimation std dev: \t'); text(sprintf('Estimation of pi:\t%.6f', piEst)); text(sprintf('Estimation std dev:\t%.6f', piStd)); function p = taskFun(m) x = rand(m, 1); y = rand(m, 1); p = nnz(x.^2 + y.^2 < 1) * 4 / m; function (piEst, piStd, job) = idle(piEst, piStd, job, n) if job ~== null switch job.State case 'finished' rl = fetchOutputs(job); r = list2num([rl{:}]); piEst = mean(r); piStd = std(r) / sqrt(length(r)); case 'failed' piEst = nan; piStd = nan; otherwise // still running, don't update figure cancel(false); end delete(job); end m = 1e5; cluster = parcluster('local'); job = createJob(cluster); for i = 1:n createTask(job, @taskFun, 1, {m}); end submit(job); @}
batch
Create a job with a single task.
Syntax
job = batch(fun, nargout, arginList)
See also
cancel
Cancel a job.
Syntax
cancel(job)
Description
cancel(job) cancels a job: pending and running tasks are brought to the failed state (running tasks are interrupted). Tasks which are already finished are left unchanged.
See also
createJob
Create a new job.
Syntax
job = createJob(cluster)
Description
createJob(cluster) creates a new job to be run on the specified cluster. It returns the job object. The next steps are typically to add tasks with createTask, to execute the job with submit, to fetch results with fetchOutputs, and to release resources allocated for the job with delete.
Objects of class job have the following properties:
- AutoUseLib
- true to let workers use the same libraries as those imported in the current context of the function which calls submit, including the library of the function itself (default)
- ElapsedTime
- wall-clock time spent from call to submit to the completion of the last task
- ID
- job id (integer number)
- Parent
- cluster object the job is associated with
- Startup
- code executed at startup by each worker
- State
- current job state as a string
- Tasks
- list of tasks belonging to the job
Properties AutoUseLib and Startup can be set; all other properties are read-only.
Example
Create a job with the cluster 'local':
cluster = parcluster('local'); job = createJob(cluster);
See also
createTask
Create a new task.
Syntax
task = createTask(job, fun, nargout, arginList)
Description
createTask(job,fun,nargout,arginList) creates a new task for the specified job. A single job can contain as many jobs as required. The task is specified by the last three arguments:
- fun
- function, as a function reference or an anonymous or inline function
- nargout
- number of output arguments produced by fun (non-negative integer)
- arginList
- list of input arguments
createTask returns a task object, which can often be ignored because a list of all the tasks created for a job can be retrieved with function findTask or job property Tasks. Task objects have the following properties:
- CaptureDiary
- true to capture task output in Diary property
- Diary
- task output as a string (empty if CaptureDiary is false)
- ErrorIdentifier
- error identifier if state is 'failed'
- ErrorMessage
- error message if state is 'failed'
- Function
- function
- ID
- task id (integer number)
- InputArguments
- list of input arguments
- NumOutputArguments
- number of output arguments
- OutputArguments
- list of output arguments if state is 'finished'
- Parent
- job object the task belongs to
- State
- current task state as a string
Property CaptureDiary can be set; all other properties are read-only.
Examples
Add 10 tasks which compute the mean of n=1e5 pseudo-random values:
for i = 1:10 createTask(job, @(n) mean(rand(1,n)), 1, {1e5}); end
Run a job with 1000 tasks which compute the (local) minima of function y=fun(X), where X is a vector of length 10, y is a scalar, and the starting points are chosen randomly in the unit hypercube. Function fminsearch is asked two output arguments, X and y. All results are collected in 10-by-1000 array Xa and 1-by-1000 array ya. The index of the minimum value of ya gives an approximation of the global minimum Xopt and yopt.
cluster = parcluster('local'); job = createJob(cluster); for i = 1:1000 createTask(job, @fminsearch, 2, {@fun, rand(10,1)}); end submit(job); wait(job, 'finished'); rl = fetchOutputs(job); delete(job); r = list2num([rl{:}]); Xa = [map(@(taskOutputs) taskOutputs{1}, rl){:}]; ya = [map(@(taskOutputs) taskOutputs{2}, rl){:}]; (yopt, ixopt) = min(ya); Xopt = Xa(:, ixopt);
See also
delete
Delete a job or a task.
Syntax
delete(job) delete(task)
delete(job) deletes the specified job and all its tasks. It should be called once the job is completed to release its resources.
delete(task) deletes the specified task. It is superfluous if delete(job) is called.
See also
fetchOutputs
Fetch output of all tasks in a job.
Syntax
outputList = fetchOutputs(job)
Description
fetchOutputs(job) gets all the task outputs. Its value is a list where each elements corresponds to a task. For tasks whose state is 'finished', it is itself a list of nargout values, where nargout is the number of task output arguments specified by createTask. For tasks whose state is not 'finished', it is an empty list.
Usually, fetchOutput should be called when the job is completed, i.e. when its state is 'finished' or 'failed'. This can be achieved by calling wait(job,'finished') or by checking the job State property.
See also
findTask
Find tasks in a job.
Syntax
tasks = findTask(job) (pendingTasks, runningTasks, completedTasks) = findTask(job)
Description
With one output argument, findTask(job) gets a list of all the tasks defined for the specified job. It gives the same result as the job property Tasks.
With three output arguments, findTask(job) gets separate lists for the tasks whose state is 'pending' or 'queued', 'running', and 'finished' or 'failed'.
See also
parcluster
Get a cluster.
Syntax
cluster = parcluster cluster = parcluster(name)
Description
parcluster(name) gives the cluster whose name is specified as a string argument. The following names are valid:
- 'local' (default)
- Workers running in threads dispatched by the operating system on the cores of the local computer. Each worker runs in a separate LME context, which is reset at the beginning of each job submission. To make Monte-Carlo processes easier, the pseudo-random generator of each worker is initialized with a different seed.
- 'serial'
- Serial execution of each task in the base LME environment. Function submit returns only once all the tasks have been completed. Output is displayed immediately and not stored in the task's diary, regardless of task property CaptureDiary.
Without input argument, parcluster gives the default cluster, which can be changed with pardefaultcluster.
Objects of class cluster have the following properties:
- Jobs
- list of jobs
- MaxNumWorkers
- maximum number of workers
- Name
- cluster name
- NumWorkers
- number of workers; should be set typically to the number of CPU times the number of cores per CPU times the number of hardware threads per core (hyperthreading or multithreading)
- LMEMemory ('local' only)
- amount of memory in bytes allocated to LME by each worker, between 256KB and 2047MB (default: 32MB)
Properties NumWorkers and LMEMemory can be set; all other properties are read-only.
See also
pardefaultcluster
Get or set default cluster.
Syntax
cluster = pardefaultcluster pardefaultcluster(cluster) pardefaultcluster(name)
Description
Without input argument, pardefaultcluster gives the current default cluster, i.e. the cluster returned by parcluster when no name is given.
With an input argument, pardefaultcluster(cluster) sets cluster as the default cluster. With a string argument, pardefaultcluster(name) is equivalent to pardefaultcluster(parcluster(name)).
See also
submit
Submit a job for execution.
Syntax
submit(job)
Description
submit(job) submits the specified job so that its tasks are executed on the cluster job belongs to. It returns immediately.
A job can be submitted only once.
Example
Submit a job, wait until all tasks have completed, and retrieve its results.
submit(job); wait(job, 'finished'); results = fetchOutputs(job);
See also
createJob, wait, cancel, fetchOutputs
wait
Wait until a job state has changed.
Syntax
wait(job) wait(job, state)
Description
With one input argument, wait(job) waits until the job state has changed. If the job is already in state 'finished' or 'failed', it returns immediately.
With two input arguments, wait(job,state) waits until the job state has reached the state specified in the second argument. States 'finished' and 'failed' are considered to be equivalent.
Examples
Submit a job and wait until it is completed.
submit(job); wait(job, 'finished');
After a job has been submitted, instead of waiting until it is completed with wait, the percentage of completed tasks can be obtained with findTask. The following lines can be repeated.
(p, r, c) = findTask(job); p100Completed = 100 * length(c) / (length(p) + length(r) + length(c))
After a job is completed, if it has failed, i.e. if at least one task has thrown an error, get the error identifier and message of the first failed task and throw the error.
submit(job); wait(job, 'finished'); errId = ''; if job.State === 'failed' for task = job.Tasks if task.State === 'failed' errId = task.ErrorIdentifier; errMsg = task.ErrorMessage; break; end end else outputs = fetchOutputs(job); end delete(job); if errId error(errId, errMsg); end