Logo Search packages:      
Sourcecode: octave-general version File versions  Download package

parcellfun.m

## Copyright (C) 2009 VZLU Prague, a.s., Czech Republic
##
## Author: Jaroslav Hajek
## 
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 3 of the License, or
## (at your option) any later version.
## 
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
## GNU General Public License for more details.
## 
## You should have received a copy of the GNU General Public License
## along with this program; see the file COPYING.  If not, see
## <http://www.gnu.org/licenses/>.

## -*- texinfo -*-
## @deftypefn{Function File} [@var{o1}, @var{o2}, @dots{}] = parcellfun (@var{nproc}, @var{fun}, @var{a1}, @var{a2}, @dots{})
## @deftypefnx{Function File} parcellfun (nproc, fun, @dots{}, "UniformOutput", @var{val})
## @deftypefnx{Function File} parcellfun (nproc, fun, @dots{}, "ErrorHandler", @var{errfunc})
## Evaluates a function for multiple argument sets using multiple processes.
## @var{nproc} should specify the number of processes. A maximum recommended value is
## equal to number of CPUs on your machine or one less. 
## @var{fun} is a function handle pointing to the requested evaluating function.
## @var{a1}, @var{a2} etc. should be cell arrays of equal size.
## @var{o1}, @var{o2} etc. will be set to corresponding output arguments.
##
## The UniformOutput and ErrorHandler options are supported with meaning identical
## to @dfn{cellfun}.
##
## NOTE: this function is implemented using "fork" and a number of pipes for IPC.
## Suitable for systems with an efficient "fork" implementation (such as GNU/Linux), 
## on other systems (Windows) it should be used with caution.
## Also, if you use a multithreaded BLAS, it may be wise to turn off multi-threading
## when using this function.
##
## CAUTION: This function should be regarded as experimental. Although all subprocesses
## should be cleared in theory, there is always a danger of a subprocess hanging up,
## especially if unhandled errors occur. Under GNU and compatible systems, the following
## shell command may be used to display orphaned Octave processes:
## ps --ppid 1 | grep octave
## 
## @end deftypefn

function varargout = parcellfun (nproc, fun, varargin)
  
  if (nargin < 3 || ! isscalar (nproc) || nproc <= 0)
    print_usage ();
  endif

  if (ischar (fun))
    fun = str2func (fun);
  elseif (! isa (fun, "function_handle"))
    error ("parcellfun: fun must be either a function handle or name")
  endif

  uniform_output = true;
  error_handler = [];

  args = varargin;
  nargs = length (varargin);

  ## parse options
  if (nargs > 1)
    do
      if (strcmp (args{nargs-1}, "UniformOutput"))
        uniform_output = args{nargs};
        nargs -= 2;
        continue;
      endif
      if (strcmp (args{nargs-1}, "ErrorHandler"))
        error_handler = args{nargs};
        nargs -= 2;
        continue;
      endif
      break;
    until (nargs < 2);
  endif

  args = args(1:nargs);
  
  if (length (args) == 0)
    print_usage ();
  elseif (length (args) > 1 && ! size_equal (args{:}))
    error ("arguments size must match");
  endif

  ## create communication pipes.
  cmdr = cmdw = resr = resw = zeros (nproc, 1);
  err = 0;
  for i = 1:nproc
    ## command pipes
    [cmdr(i), cmdw(i), err, msg] = pipe ();
    if (err)
      break;
    endif
    ## result pipes
    [resr(i), resw(i), err, msg] = pipe ();
    if (err)
      break;
    endif
  endfor
  if (! err)
    ## status pipe
    [statr, statw, err, msg] = pipe ();
  endif
  if (err)
    error ("failed to open pipe: %s", msg);
  endif

  iproc = 0; # the parent process
  nsuc = 0; # number of processes succesfully forked.

  fflush (stdout); # prevent subprocesses from inheriting buffered output

  pids = zeros (nproc, 1);

  ## fork subprocesses
  for i = 1:nproc
    [pid, msg] = fork ();
    if (pid > 0)
      ## parent process. fork succeded.
      nsuc ++;
      pids(i) = pid;
    elseif (pid == 0)
      ## child process.
      iproc = i;
      break;
    elseif (pid < 0)
      ## parent process. fork failed.
      err = 1;
      break;
    endif
  endfor

  if (iproc)
    ## child process. close unnecessary pipe ends.
    fclose (statr);
    for i = 1:nproc
      ## we won't write commands and read results
      fclose (cmdw (i));
      fclose (resr (i));
      if (i != iproc)
        ## close also those pipes that don't belong to us.
        fclose (cmdr (i));
        fclose (resw (i));
      endif
    endfor
  else
    ## parent process. close unnecessary pipe ends.
    fclose (statw);
    for i = 1:nproc
      ## we won't read commands and write results
      fclose (cmdr (i));
      fclose (resw (i));
    endfor

    if (nsuc)
      ## we forked some processes. if this is less than we opted for, gripe
      ## but continue.
      if (nsuc < nproc)
        warning ("parcellfun: only %d out of %d processes forked", nsuc, nproc);
        nproc = nsuc;
      endif
    else
      ## this is bad.
      error ("parcellfun: failed to fork processes");
    endif
  endif

  ## At this point, everything should be OK (?)

  if (iproc)
    ## the border patrol. we really don't want errors escape after the forks.
    unwind_protect
      try

        ## child process. indicate ready state.
        fwrite (statw, -iproc, "double");
        fflush (statw);

        do
          ## get command
          cmd = fread (cmdr(iproc), 1, "double");
          if (cmd)
            ## we've got a job to do. prepare argument and return lists.
            res = cell (1, nargout);
            argsc = cell (1, nargs);
            for i = 1:nargs
              argsc{i} = args{i}{cmd};
            endfor

            if (isempty (error_handler))
              ## unguarded evaluation.
               = fun (argsc{:});
            else
              ## guarded evaluation
              try
                 = fun (argsc{:});
              catch
                errs.index = cmd;
                [errs.message, errs.identifier] = lasterr ();
                 = error_handler (errs, argsc{:});
              end_try_catch
            endif

            ## indicate ready state.
            fwrite (statw, iproc, "double");
            fflush (statw);

            ## write the result.
            ## FIXME: this can fail.
            fsave (resw(iproc), res);
            fflush (resw(iproc));

          endif
        until (cmd == 0)

      catch

        ## just indicate the error. don't quit this function !!!!
        fputs (stderr, "\n");
        warning ("parcellfun: unhandled error in subprocess %d", iproc);

        ## send a termination notice.
        fwrite (statw, -iproc, "double");
        fflush (statw);

      end_try_catch

    unwind_protect_cleanup

      ## This is enclosed in another handler to prevent errors from escaping.
      ## If something goes wrong, we'll get a broken pipe signal, but anything
      ## is better than skipping the following __exit__.
      try
        fclose (statw);
        fclose (resw(iproc));
        fclose (cmdr(iproc));
      end_try_catch

      ## no more work for us. We call __exit__, which bypasses termination sequences.
      __exit__ ();

      ## we should never get here.
      exit ();

    end_unwind_protect

  else
    ## parent process. 
    njobs = numel (varargin{1});
    res = cell (nargout, njobs);

    pjobs = 0;
    pending = zeros (1, nproc);

    unwind_protect

      while (pjobs < njobs || any (pending))
        ## if pipe contains no more data, that's bad
        if (feof (statr))
          warning ("parcellfun: premature exit due to closed pipe");
          break;
        endif
        ## wait for a process state.
        isubp = fread (statr, 1, "double");
        if (isubp > 0)
          ijob = pending(isubp);
          ## we have a result ready.
          res(:, ijob) = fload (resr(isubp));
        else
          isubp = -isubp;
          if (pending(isubp))
            ## premature exit means an unhandled error occured in a subprocess.
            ## the process should have griped, we just try to exit gracefully.
            pending(isubp) = 0;
            ## no more jobs to start.
            njobs = pjobs;
            continue;
          endif
        endif
        if (pjobs < njobs)
          ijob = ++pjobs;
          ## send the next job to the process.
          fwrite (cmdw(isubp), ijob, "double");
          fflush (cmdw(isubp));
          ## set pending state
          pending(isubp) = ijob;
        elseif (pending(isubp))
          ## send terminating signal
          fwrite (cmdw(isubp), 0, "double");
          fclose (cmdw(isubp));
          ## clear pending state
          pending(isubp) = 0;
        endif
        fprintf (stderr, "\rparcellfun: %d/%d jobs done", pjobs - sum (pending != 0), njobs);
        fflush (stderr);
      endwhile
      fputs (stderr, "\n");
      fflush (stderr);

    unwind_protect_cleanup

      ## send termination signals to active processes.
      for isubp = find (pending)
        ## send terminating signal
        fwrite (cmdw(isubp), 0, "double");
        fclose (cmdw(isubp));
      endfor

      ## close all pipe ends
      fclose (statr);
      for i = 1:nproc
        fclose (resr(i));
      endfor

      ## explicitly recognize all terminated processes.
      for i = 1:nproc
        [pid, status] = waitpid (pids(i));
      endfor

    end_unwind_protect

    ## we're finished. transform the result.
    varargout = cell (1, nargout);
    shape = size (varargin{1});
    for i = 1:nargout
      varargout{i} = reshape (res(i,:), shape);
      if (uniform_output)
        varargout{i} = cell2mat (varargout{i});
      endif
    endfor
    
  endif

endfunction

Generated by  Doxygen 1.6.0   Back to index