[M3devel] More on threading
Mika Nystrom
mika at async.caltech.edu
Sun Feb 13 08:04:14 CET 2011
Hi again m3devel,
Well I thought I would relate another story having to do with threading
and CM3. It's half-baked at the moment, but it only reflects a few hours'
work on my part, sorry... I'm about to give up now because I don't know
if the problem is in my code, or in threading, or what... maybe it is
the atfork problem Jay talked about?
Programmers spend a lot of time waiting for their compilers to run, so
it would be nice if the compilation process were faster. After perusing
the source code for the CM3 I realized that there is no feedback at all
from the back-end and assembler steps to the rest of the compiler, and
I believe that with the GCC back-end the vast majority of the runtime
of the compiler is spent in these stages.
So.... how hard could this be to parallelize? I decided to try making
the two stages (cm3cg1 and as) into "promises" rather than running them
on the spot. The QMachine adds a promise to call the back-end to a
RefSeq.T called "promises", to which the Builder can also add promises.
Then, after making all the promises, collect them and run them ("force
them" a true Schemer would say) in parallel.
Of course something goes wrong somewhere. With 10 threads I am able to
run 31 seconds of CPU time in 19 seconds of wallclock time (using an
average 160% of CPU) but then something goes wrong and my process
is stuck in state "uxmxn". This is using user threads on AMD64_FREEBSD
on a machine with four CPUs.
Mika
P.S. diff attached, it's a bit ugly, but maybe it works on some other
system.... the code works just fine if you run one thread at a time.
I.e., if you change
IF threads.size() > 10 THEN EVAL Thread.Join(threads.remlo()) END;
to
IF threads.size() > 0 THEN EVAL Thread.Join(threads.remlo()) END;
Index: cm3/src/Builder.m3
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/cm3/src/Builder.m3,v
retrieving revision 1.36
diff -c -r1.36 Builder.m3
*** cm3/src/Builder.m3 24 Aug 2010 05:24:24 -0000 1.36
--- cm3/src/Builder.m3 13 Feb 2011 06:47:40 -0000
***************
*** 15,20 ****
--- 15,22 ----
IMPORT QIdent;
FROM Target IMPORT M3BackendMode_t, BackendAssembly, BackendModeStrings;
FROM M3Path IMPORT OSKind, OSKindStrings;
+ IMPORT Pathname;
+ IMPORT RefSeq;
TYPE
UK = M3Unit.Kind;
***************
*** 131,136 ****
--- 133,139 ----
link_coverage : TEXT; (* coverage library *)
m3_front_flags: Arg.List; (* configuration options for the front *)
m3_options : Arg.List; (* misc. user options for the frontend *)
+ delayBackend := FALSE;
END;
TYPE
***************
*** 932,937 ****
--- 935,956 ----
(*------------------------------------------------------------ compilation --*)
+ TYPE MarkerPromise = QMachine.Promise BRANDED OBJECT OVERRIDES fulfil := FulfilNothing END;
+
+ PROCEDURE FulfilNothing(<*UNUSED*>mp : MarkerPromise) = BEGIN END FulfilNothing;
+
+ TYPE SeqClosure = Thread.Closure OBJECT seq : RefSeq.T; OVERRIDES apply := SeqApply END;
+
+ PROCEDURE SeqApply(cl : SeqClosure) : REFANY =
+ BEGIN
+ FOR i := 0 TO cl.seq.size()-1 DO
+ WITH p = NARROW(cl.seq.get(i),QMachine.Promise) DO
+ p.fulfil()
+ END
+ END;
+ RETURN NIL
+ END SeqApply;
+
PROCEDURE CompileEverything (s: State; schedule: SourceList) =
VAR u: M3Unit.T;
BEGIN
***************
*** 941,948 ****
(* compile all the sources using the initial schedule *)
FOR i := 0 TO LAST (schedule^) DO
! CompileOne (s, schedule[i]);
END;
FlushPending (s);
(* recompile any interfaces where we goofed on the exports *)
--- 960,1000 ----
(* compile all the sources using the initial schedule *)
FOR i := 0 TO LAST (schedule^) DO
! s.delayBackend := TRUE;
! TRY
! CompileOne (s, schedule[i]);
! FINALLY
! s.delayBackend := FALSE;
! END;
!
! s.machine.promises.addhi(NEW(MarkerPromise));
!
! END;
!
! VAR
! curSeq := NEW(RefSeq.T).init();
! threads := NEW(RefSeq.T).init();
! BEGIN
! FOR i := 0 TO s.machine.promises.size()-1 DO
! WITH p = s.machine.promises.get(i) DO
! curSeq.addhi(p);
! IF i = s.machine.promises.size()-1 OR ISTYPE(p,MarkerPromise) THEN
! WITH cl = NEW(SeqClosure, seq := curSeq) DO
! threads.addhi (Thread.Fork(cl));
!
! IF threads.size() > 10 THEN EVAL Thread.Join(threads.remlo()) END;
!
! curSeq := NEW(RefSeq.T).init()
! END
! END
! END
! END;
! WHILE threads.size() > 0 DO EVAL Thread.Join(threads.remlo()) END;
END;
+
+ EVAL s.machine.promises.init();
+
+
FlushPending (s);
(* recompile any interfaces where we goofed on the exports *)
***************
*** 1151,1156 ****
--- 1203,1227 ----
END;
END CompileM3;
+ TYPE
+ NotePromise = QMachine.Promise OBJECT
+ nam : Pathname.T;
+ OVERRIDES
+ fulfil := FulfilNP;
+ END;
+
+ RemovePromise = QMachine.Promise OBJECT
+ nam : Pathname.T;
+ OVERRIDES
+ fulfil := FulfilRP;
+ END;
+
+ PROCEDURE FulfilNP(np : NotePromise) =
+ BEGIN Utils.NoteTempFile(np.nam) END FulfilNP;
+
+ PROCEDURE FulfilRP(rp : RemovePromise) =
+ BEGIN Utils.Remove(rp.nam) END FulfilRP;
+
PROCEDURE PushOneM3 (s: State; u: M3Unit.T): BOOLEAN =
VAR
tmpC, tmpS: TEXT;
***************
*** 1191,1208 ****
| 3 => (* -bootstrap, +m3back, +asm *)
tmpC := TempCName (u);
tmpS := TempSName (u);
IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpC) END;
IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpS) END;
IF RunM3 (s, u, tmpC) THEN
! IF RunM3Back (s, tmpC, tmpS, u.debug, u.optimize)
! AND RunAsm (s, tmpS, u.object) THEN
END;
need_merge := TRUE;
END;
IF (NOT s.keep_files) THEN Utils.Remove (tmpC) END;
IF (NOT s.keep_files) THEN Utils.Remove (tmpS) END;
| 6, (* +bootstrap, +m3back, -asm *)
7 => (* +bootstrap, +m3back, +asm *)
--- 1262,1318 ----
| 3 => (* -bootstrap, +m3back, +asm *)
+ IF s.delayBackend THEN
tmpC := TempCName (u);
tmpS := TempSName (u);
+ (*
IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpC) END;
IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpS) END;
+ *)
+ IF (NOT s.keep_files) THEN
+ s.machine.promises.addhi(NEW(NotePromise, nam := tmpC))
+ END;
+ IF (NOT s.keep_files) THEN
+ s.machine.promises.addhi(NEW(NotePromise, nam := tmpS))
+ END;
+
IF RunM3 (s, u, tmpC) THEN
! s.machine.record(TRUE);
! TRY
! IF RunM3Back (s, tmpC, tmpS, u.debug, u.optimize)
! AND RunAsm (s, tmpS, u.object) THEN
! END;
! FINALLY
! s.machine.record(FALSE)
END;
+
need_merge := TRUE;
END;
+ (*
IF (NOT s.keep_files) THEN Utils.Remove (tmpC) END;
IF (NOT s.keep_files) THEN Utils.Remove (tmpS) END;
+ *)
+ IF (NOT s.keep_files) THEN
+ s.machine.promises.addhi(NEW(RemovePromise, nam := tmpC))
+ END;
+ IF (NOT s.keep_files) THEN
+ s.machine.promises.addhi(NEW(RemovePromise, nam := tmpS))
+ END;
+
+ ELSE
+ tmpC := TempCName (u);
+ tmpS := TempSName (u);
+ IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpC) END;
+ IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpS) END;
+ IF RunM3 (s, u, tmpC) THEN
+ IF RunM3Back (s, tmpC, tmpS, u.debug, u.optimize)
+ AND RunAsm (s, tmpS, u.object) THEN
+ END;
+ need_merge := TRUE;
+ END;
+ IF (NOT s.keep_files) THEN Utils.Remove (tmpC) END;
+ IF (NOT s.keep_files) THEN Utils.Remove (tmpS) END;
+ END
| 6, (* +bootstrap, +m3back, -asm *)
7 => (* +bootstrap, +m3back, +asm *)
Index: cm3/src/m3makefile
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/cm3/src/m3makefile,v
retrieving revision 1.24
diff -c -r1.24 m3makefile
*** cm3/src/m3makefile 8 Dec 2010 07:30:57 -0000 1.24
--- cm3/src/m3makefile 13 Feb 2011 06:47:42 -0000
***************
*** 29,34 ****
--- 29,35 ----
end
module ("Arg")
module ("Builder")
module ("Dirs")
module ("M3Build")
module ("M3Loc")
Index: m3quake/src/QMachine.i3
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/m3quake/src/QMachine.i3,v
retrieving revision 1.6
diff -c -r1.6 QMachine.i3
*** m3quake/src/QMachine.i3 4 Sep 2009 10:24:07 -0000 1.6
--- m3quake/src/QMachine.i3 13 Feb 2011 06:47:43 -0000
***************
*** 8,13 ****
--- 8,14 ----
IMPORT Thread, Wr, QValue, QCode;
FROM Quake IMPORT Machine, Error, ID, IDMap;
+ IMPORT RefSeq;
REVEAL
T <: T_;
***************
*** 15,20 ****
--- 16,22 ----
T = Machine;
T_ = OBJECT
map: IDMap := NIL; (* READONLY *)
+ promises : RefSeq.T;
METHODS
init (map: IDMap): T;
evaluate (s: QCode.Stream) RAISES {Error, Thread.Alerted};
***************
*** 37,42 ****
--- 39,46 ----
set_wr (wr: Wr.T);
exec_echo (b: BOOLEAN): BOOLEAN;
trace (b: BOOLEAN);
+
+ record(on : BOOLEAN); (* instead of performing certain acts, promise *)
END;
PROCEDURE PushBool (t: T; b: BOOLEAN);
***************
*** 51,54 ****
--- 55,63 ----
PROCEDURE GetEnv (default, v0, v1, v2, v3, v4: TEXT := NIL): TEXT;
+ TYPE Promise = OBJECT METHODS fulfil() RAISES { Error } END;
+
END QMachine.
+
+
+
Index: m3quake/src/QMachine.m3
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/m3quake/src/QMachine.m3,v
retrieving revision 1.35
diff -c -r1.35 QMachine.m3
*** m3quake/src/QMachine.m3 3 Aug 2010 09:40:04 -0000 1.35
--- m3quake/src/QMachine.m3 13 Feb 2011 06:47:44 -0000
***************
*** 16,21 ****
--- 16,22 ----
IMPORT TextUtils, FSUtils, System, DirStack; (* sysutils *)
IMPORT Compiler;
IMPORT M3Path;
+ IMPORT RefSeq;
CONST
OnUnix = (Compiler.ThisOS = Compiler.OS.POSIX);
***************
*** 44,49 ****
--- 45,52 ----
shell : TEXT := NIL;
sh_option : TEXT := NIL;
tmp_dir : TEXT := NIL;
+
+ doRecord := FALSE;
OVERRIDES
init := Init;
evaluate := Evaluate;
***************
*** 66,73 ****
--- 69,81 ----
set_wr := SetWr;
exec_echo := ExecEcho;
trace := Trace;
+
+ record := Record;
END;
+ PROCEDURE Record(t : T; on : BOOLEAN) =
+ BEGIN t.doRecord := on END Record;
+
TYPE
Registers = RECORD
cp : QCode.Stream := NIL; (* code pointer *)
***************
*** 139,144 ****
--- 147,154 ----
t.globals := NEW (IntRefTbl.Default).init ();
t.default_wr := Stdio.stdout;
+ t.promises := NEW(RefSeq.T).init();
+
InitOSEnv (t);
InitBuiltins (t);
***************
*** 1555,1564 ****
END;
ELSE
FlushIO ();
! Process.GetStandardFileHandles (stdin, stdout, stderr);
! handle := Process.Create (t.shell, SUBARRAY (args, 0, n_shell_args),
! stdin := stdin, stdout := stdout,
! stderr := stderr);
END;
EXCEPT
| Thread.Alerted =>
--- 1565,1594 ----
END;
ELSE
FlushIO ();
! IF t.doRecord THEN
! handle := NIL;
! WITH a = NEW(REF ARRAY OF TEXT, n_shell_args) DO
! a^ := SUBARRAY(args,0,n_shell_args);
! VAR wrx : Wr.T; BEGIN
! IF echo OR t.do_echo THEN
! wrx := wr
! ELSE
! wrx := NIL
! END;
! t.promises.addhi(NEW(ExecPromise,
! cmd := t.shell,
! wr := wrx,
! args := a,
! t := t,
! ignore_errors := ignore_errors))
! END
! END
! ELSE
! Process.GetStandardFileHandles (stdin, stdout, stderr);
! handle := Process.Create (t.shell, SUBARRAY (args, 0, n_shell_args),
! stdin := stdin, stdout := stdout,
! stderr := stderr);
! END;
END;
EXCEPT
| Thread.Alerted =>
***************
*** 1573,1579 ****
END;
(* wait for everything to shutdown... *)
! exit_code := Process.Wait (handle);
END;
IF onlyTry THEN
--- 1603,1613 ----
END;
(* wait for everything to shutdown... *)
! IF handle = NIL THEN
! exit_code := 0
! ELSE
! exit_code := Process.Wait (handle);
! END; (* else we're only promising *)
END;
IF onlyTry THEN
***************
*** 1589,1594 ****
--- 1623,1664 ----
END ExecCommand;
+ TYPE
+ ExecPromise = Promise OBJECT
+ cmd : TEXT;
+ args : REF ARRAY OF TEXT;
+ t : T;
+ wr : Wr.T;
+ ignore_errors : BOOLEAN;
+ OVERRIDES
+ fulfil := FulfilExecPromise;
+ END;
+
+ PROCEDURE FulfilExecPromise(ep : ExecPromise) RAISES { Error } =
+ VAR
+ stdin, stdout, stderr: File.T;
+ BEGIN
+ Process.GetStandardFileHandles (stdin, stdout, stderr);
+ TRY
+ IF ep.wr # NIL THEN
+ Wr.PutText (ep.wr, ep.args[1]);
+ Wr.PutText (ep.wr, Wr.EOL);
+ FlushIO ();
+ END;
+ WITH handle = Process.Create (ep.cmd, ep.args^,
+ stdin := stdin, stdout := stdout,
+ stderr := stderr),
+ exit_code = Process.Wait(handle) DO
+ IF exit_code # 0 AND NOT ep.ignore_errors THEN
+ Err (ep.t, Fmt.F("exit %s: %s", Fmt.Int(exit_code), ep.cmd))
+ END
+ END
+ EXCEPT
+ OSError.E (ec) =>
+ Err (ep.t, Fmt.F ("exec failed%s *** %s", OSErr (ec), ep.cmd));
+ END
+ END FulfilExecPromise;
+
PROCEDURE KillProcess (handle: Process.T) =
BEGIN
IF (handle # NIL) THEN
More information about the M3devel
mailing list