[M3devel] More on threading

Mika Nystrom mika at async.caltech.edu
Sun Feb 13 08:04:14 CET 2011


Hi again m3devel,

Well I thought I would relate another story having to do with threading
and CM3.  It's half-baked at the moment, but it only reflects a few hours'
work on my part, sorry... I'm about to give up now because I don't know
if the problem is in my code, or in threading, or what... maybe it is
the atfork problem Jay talked about?

Programmers spend a lot of time waiting for their compilers to run, so
it would be nice if the compilation process were faster.  After perusing
the source code for the CM3 I realized that there is no feedback at all
from the back-end and assembler steps to the rest of the compiler, and
I believe that with the GCC back-end the vast majority of the runtime
of the compiler is spent in these stages.

So.... how hard could this be to parallelize?  I decided to try making
the two stages (cm3cg1 and as) into "promises" rather than running them
on the spot.  The QMachine adds a promise to call the back-end to a
RefSeq.T called "promises", to which the Builder can also add promises.
Then, after making all the promises, collect them and run them ("force
them" a true Schemer would say) in parallel.

Of course something goes wrong somewhere.  With 10 threads I am able to
run 31 seconds of CPU time in 19 seconds of wallclock time (using an
average 160% of CPU) but then something goes wrong and my process
is stuck in state "uxmxn".  This is using user threads on AMD64_FREEBSD
on a machine with four CPUs.

     Mika

P.S. diff attached, it's a bit ugly, but maybe it works on some other
system....  the code works just fine if you run one thread at a time.
I.e., if you change

     IF threads.size() > 10 THEN EVAL Thread.Join(threads.remlo()) END;

to

     IF threads.size() > 0 THEN EVAL Thread.Join(threads.remlo()) END;



Index: cm3/src/Builder.m3
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/cm3/src/Builder.m3,v
retrieving revision 1.36
diff -c -r1.36 Builder.m3
*** cm3/src/Builder.m3	24 Aug 2010 05:24:24 -0000	1.36
--- cm3/src/Builder.m3	13 Feb 2011 06:47:40 -0000
***************
*** 15,20 ****
--- 15,22 ----
  IMPORT QIdent;
  FROM Target IMPORT M3BackendMode_t, BackendAssembly, BackendModeStrings;
  FROM M3Path IMPORT OSKind, OSKindStrings;
+ IMPORT Pathname;
+ IMPORT RefSeq;
  
  TYPE
    UK = M3Unit.Kind;
***************
*** 131,136 ****
--- 133,139 ----
      link_coverage : TEXT;               (* coverage library *)
      m3_front_flags: Arg.List;           (* configuration options for the front *)
      m3_options    : Arg.List;           (* misc. user options for the frontend *)
+     delayBackend := FALSE;
    END;
  
  TYPE
***************
*** 932,937 ****
--- 935,956 ----
  
  (*------------------------------------------------------------ compilation --*)
  
+ TYPE MarkerPromise = QMachine.Promise BRANDED OBJECT OVERRIDES fulfil := FulfilNothing END;
+ 
+ PROCEDURE FulfilNothing(<*UNUSED*>mp : MarkerPromise) = BEGIN END FulfilNothing;
+ 
+ TYPE SeqClosure = Thread.Closure OBJECT seq : RefSeq.T; OVERRIDES apply := SeqApply END;
+ 
+ PROCEDURE SeqApply(cl : SeqClosure) : REFANY =
+   BEGIN
+     FOR i := 0 TO cl.seq.size()-1 DO
+       WITH p = NARROW(cl.seq.get(i),QMachine.Promise) DO
+         p.fulfil()
+       END
+     END;
+     RETURN NIL
+   END SeqApply;
+ 
  PROCEDURE CompileEverything (s: State;  schedule: SourceList) =
    VAR u: M3Unit.T;
    BEGIN
***************
*** 941,948 ****
  
      (* compile all the sources using the initial schedule *)
      FOR i := 0 TO LAST (schedule^) DO
!       CompileOne (s, schedule[i]);
      END;
      FlushPending (s);
  
      (* recompile any interfaces where we goofed on the exports *)
--- 960,1000 ----
  
      (* compile all the sources using the initial schedule *)
      FOR i := 0 TO LAST (schedule^) DO
!       s.delayBackend := TRUE;
!       TRY
!         CompileOne (s, schedule[i]);
!       FINALLY
!         s.delayBackend := FALSE;
!       END;
! 
!       s.machine.promises.addhi(NEW(MarkerPromise));
! 
!     END;
! 
!     VAR
!       curSeq  := NEW(RefSeq.T).init();
!       threads := NEW(RefSeq.T).init();
!     BEGIN
!       FOR i := 0 TO s.machine.promises.size()-1 DO
!         WITH p = s.machine.promises.get(i) DO
!           curSeq.addhi(p);
!           IF i = s.machine.promises.size()-1 OR ISTYPE(p,MarkerPromise) THEN
!             WITH cl = NEW(SeqClosure, seq := curSeq) DO
!               threads.addhi (Thread.Fork(cl));
! 
!               IF threads.size() > 10 THEN EVAL Thread.Join(threads.remlo()) END;
!               
!               curSeq := NEW(RefSeq.T).init()
!             END
!           END
!         END
!       END;
!       WHILE threads.size() > 0 DO EVAL Thread.Join(threads.remlo()) END;
      END;
+             
+     EVAL s.machine.promises.init();
+ 
+ 
      FlushPending (s);
  
      (* recompile any interfaces where we goofed on the exports *)
***************
*** 1151,1156 ****
--- 1203,1227 ----
      END;
    END CompileM3;
  
+ TYPE
+   NotePromise = QMachine.Promise OBJECT
+     nam : Pathname.T;
+   OVERRIDES
+     fulfil := FulfilNP;
+   END;
+ 
+   RemovePromise = QMachine.Promise OBJECT
+     nam : Pathname.T;
+   OVERRIDES
+     fulfil := FulfilRP;
+   END;
+ 
+ PROCEDURE FulfilNP(np : NotePromise) = 
+   BEGIN Utils.NoteTempFile(np.nam) END FulfilNP;
+ 
+ PROCEDURE FulfilRP(rp : RemovePromise) = 
+   BEGIN Utils.Remove(rp.nam) END FulfilRP;
+ 
  PROCEDURE PushOneM3 (s: State;  u: M3Unit.T): BOOLEAN =
    VAR
      tmpC, tmpS: TEXT;
***************
*** 1191,1208 ****
  
  
      | 3 =>  (* -bootstrap, +m3back, +asm *)
          tmpC := TempCName (u);
          tmpS := TempSName (u);
          IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpC) END;
          IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpS) END;
          IF RunM3 (s, u, tmpC) THEN
!           IF  RunM3Back (s, tmpC, tmpS, u.debug, u.optimize)
!           AND RunAsm (s, tmpS, u.object) THEN
            END;
            need_merge := TRUE;
          END;
          IF (NOT s.keep_files) THEN Utils.Remove (tmpC) END;
          IF (NOT s.keep_files) THEN Utils.Remove (tmpS) END;
  
      | 6,    (* +bootstrap, +m3back, -asm *)
        7 =>  (* +bootstrap, +m3back, +asm *)
--- 1262,1318 ----
  
  
      | 3 =>  (* -bootstrap, +m3back, +asm *)
+       IF s.delayBackend THEN
          tmpC := TempCName (u);
          tmpS := TempSName (u);
+ (*
          IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpC) END;
          IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpS) END;
+ *)
+         IF (NOT s.keep_files) THEN 
+           s.machine.promises.addhi(NEW(NotePromise, nam := tmpC)) 
+         END;
+         IF (NOT s.keep_files) THEN 
+           s.machine.promises.addhi(NEW(NotePromise, nam := tmpS)) 
+         END;
+ 
          IF RunM3 (s, u, tmpC) THEN
!           s.machine.record(TRUE);
!           TRY
!             IF  RunM3Back (s, tmpC, tmpS, u.debug, u.optimize)
!             AND RunAsm (s, tmpS, u.object) THEN
!             END;
!           FINALLY
!             s.machine.record(FALSE)
            END;
+ 
            need_merge := TRUE;
          END;
+ (*
          IF (NOT s.keep_files) THEN Utils.Remove (tmpC) END;
          IF (NOT s.keep_files) THEN Utils.Remove (tmpS) END;
+ *)
+         IF (NOT s.keep_files) THEN 
+           s.machine.promises.addhi(NEW(RemovePromise, nam := tmpC)) 
+         END;
+         IF (NOT s.keep_files) THEN 
+           s.machine.promises.addhi(NEW(RemovePromise, nam := tmpS)) 
+         END;
+ 
+       ELSE
+           tmpC := TempCName (u);
+           tmpS := TempSName (u);
+           IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpC) END;
+           IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpS) END;
+           IF RunM3 (s, u, tmpC) THEN
+             IF  RunM3Back (s, tmpC, tmpS, u.debug, u.optimize)
+             AND RunAsm (s, tmpS, u.object) THEN
+             END;
+             need_merge := TRUE;
+           END;
+           IF (NOT s.keep_files) THEN Utils.Remove (tmpC) END;
+           IF (NOT s.keep_files) THEN Utils.Remove (tmpS) END;
+       END
  
      | 6,    (* +bootstrap, +m3back, -asm *)
        7 =>  (* +bootstrap, +m3back, +asm *)
Index: cm3/src/m3makefile
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/cm3/src/m3makefile,v
retrieving revision 1.24
diff -c -r1.24 m3makefile
*** cm3/src/m3makefile	8 Dec 2010 07:30:57 -0000	1.24
--- cm3/src/m3makefile	13 Feb 2011 06:47:42 -0000
***************
*** 29,34 ****
--- 29,35 ----
  end
  module ("Arg")
  module ("Builder")
  module ("Dirs")
  module ("M3Build")
  module ("M3Loc")
Index: m3quake/src/QMachine.i3
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/m3quake/src/QMachine.i3,v
retrieving revision 1.6
diff -c -r1.6 QMachine.i3
*** m3quake/src/QMachine.i3	4 Sep 2009 10:24:07 -0000	1.6
--- m3quake/src/QMachine.i3	13 Feb 2011 06:47:43 -0000
***************
*** 8,13 ****
--- 8,14 ----
  
  IMPORT Thread, Wr, QValue, QCode;
  FROM Quake IMPORT Machine, Error, ID, IDMap;
+ IMPORT RefSeq;
  
  REVEAL
    T <: T_;
***************
*** 15,20 ****
--- 16,22 ----
    T = Machine;
    T_ = OBJECT
      map: IDMap := NIL; (* READONLY *)
+     promises : RefSeq.T;
    METHODS
      init      (map: IDMap): T;
      evaluate  (s: QCode.Stream)                     RAISES {Error, Thread.Alerted};
***************
*** 37,42 ****
--- 39,46 ----
      set_wr    (wr: Wr.T);
      exec_echo (b: BOOLEAN): BOOLEAN;
      trace     (b: BOOLEAN);
+     
+     record(on : BOOLEAN);     (* instead of performing certain acts, promise *)
    END;
  
  PROCEDURE PushBool (t: T;  b: BOOLEAN);
***************
*** 51,54 ****
--- 55,63 ----
  
  PROCEDURE GetEnv (default, v0, v1, v2, v3, v4: TEXT := NIL): TEXT;
  
+ TYPE Promise = OBJECT METHODS fulfil() RAISES { Error }  END;
+ 
  END QMachine.
+ 
+ 
+ 
Index: m3quake/src/QMachine.m3
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/m3quake/src/QMachine.m3,v
retrieving revision 1.35
diff -c -r1.35 QMachine.m3
*** m3quake/src/QMachine.m3	3 Aug 2010 09:40:04 -0000	1.35
--- m3quake/src/QMachine.m3	13 Feb 2011 06:47:44 -0000
***************
*** 16,21 ****
--- 16,22 ----
  IMPORT TextUtils, FSUtils, System, DirStack; (* sysutils *)
  IMPORT Compiler;
  IMPORT M3Path;
+ IMPORT RefSeq;
  
  CONST
    OnUnix = (Compiler.ThisOS = Compiler.OS.POSIX);
***************
*** 44,49 ****
--- 45,52 ----
      shell     : TEXT         := NIL;
      sh_option : TEXT         := NIL;
      tmp_dir   : TEXT         := NIL;
+ 
+     doRecord := FALSE;
    OVERRIDES
      init      := Init;
      evaluate  := Evaluate;
***************
*** 66,73 ****
--- 69,81 ----
      set_wr    := SetWr;
      exec_echo := ExecEcho;
      trace     := Trace;
+ 
+     record := Record;
    END;
  
+ PROCEDURE Record(t : T; on : BOOLEAN) = 
+   BEGIN t.doRecord := on END Record;
+ 
  TYPE
    Registers = RECORD
      cp : QCode.Stream   := NIL; (* code pointer *)
***************
*** 139,144 ****
--- 147,154 ----
      t.globals    := NEW (IntRefTbl.Default).init ();
      t.default_wr := Stdio.stdout;
  
+     t.promises := NEW(RefSeq.T).init();
+ 
      InitOSEnv (t);
      InitBuiltins (t);
  
***************
*** 1555,1564 ****
            END;
          ELSE
            FlushIO ();
!           Process.GetStandardFileHandles (stdin, stdout, stderr);
!           handle := Process.Create (t.shell, SUBARRAY (args, 0, n_shell_args),
!                                     stdin := stdin, stdout := stdout,
!                                     stderr := stderr);
          END;
        EXCEPT
        | Thread.Alerted =>
--- 1565,1594 ----
            END;
          ELSE
            FlushIO ();
!           IF t.doRecord THEN
!             handle := NIL;
!             WITH a = NEW(REF ARRAY OF TEXT, n_shell_args) DO
!               a^ := SUBARRAY(args,0,n_shell_args);
!               VAR wrx : Wr.T; BEGIN
!                 IF echo OR t.do_echo THEN
!                   wrx := wr
!                 ELSE
!                   wrx := NIL
!                 END;
!                 t.promises.addhi(NEW(ExecPromise,
!                                      cmd := t.shell,
!                                      wr := wrx,
!                                      args := a,
!                                      t := t,
!                                      ignore_errors := ignore_errors))
!               END
!             END
!           ELSE
!             Process.GetStandardFileHandles (stdin, stdout, stderr);
!             handle := Process.Create (t.shell, SUBARRAY (args, 0, n_shell_args),
!                                       stdin := stdin, stdout := stdout,
!                                       stderr := stderr);
!           END;
          END;
        EXCEPT
        | Thread.Alerted =>
***************
*** 1573,1579 ****
        END;
  
        (* wait for everything to shutdown... *)
!       exit_code := Process.Wait (handle);
      END;
  
      IF onlyTry THEN
--- 1603,1613 ----
        END;
  
        (* wait for everything to shutdown... *)
!       IF handle = NIL THEN
!         exit_code := 0 
!       ELSE
!         exit_code := Process.Wait (handle);
!       END; (* else we're only promising *)
      END;
  
      IF onlyTry THEN
***************
*** 1589,1594 ****
--- 1623,1664 ----
  
    END ExecCommand;
  
+ TYPE 
+   ExecPromise = Promise OBJECT
+     cmd : TEXT;
+     args : REF ARRAY OF TEXT;
+     t : T;
+     wr : Wr.T;
+     ignore_errors : BOOLEAN;
+   OVERRIDES
+     fulfil := FulfilExecPromise;
+   END;
+ 
+ PROCEDURE FulfilExecPromise(ep : ExecPromise) RAISES { Error } = 
+   VAR
+     stdin, stdout, stderr: File.T;
+   BEGIN
+     Process.GetStandardFileHandles (stdin, stdout, stderr);
+     TRY
+       IF ep.wr # NIL THEN
+         Wr.PutText (ep.wr, ep.args[1]);
+         Wr.PutText (ep.wr, Wr.EOL);
+         FlushIO ();
+       END;
+       WITH handle = Process.Create (ep.cmd, ep.args^,
+                                     stdin := stdin, stdout := stdout,
+                                     stderr := stderr),
+            exit_code = Process.Wait(handle) DO
+         IF exit_code # 0 AND NOT ep.ignore_errors THEN
+           Err (ep.t, Fmt.F("exit %s: %s", Fmt.Int(exit_code), ep.cmd))
+         END
+       END
+     EXCEPT
+       OSError.E (ec) =>
+           Err (ep.t, Fmt.F ("exec failed%s *** %s", OSErr (ec), ep.cmd));
+     END
+   END FulfilExecPromise;
+ 
  PROCEDURE KillProcess (handle: Process.T) =
    BEGIN
      IF (handle # NIL) THEN




More information about the M3devel mailing list