[M3devel] More on threading
Mika Nystrom
mika at async.caltech.edu
Sun Feb 13 23:27:51 CET 2011
All right, I have some more interesting observations on user threading.
As well as on parallelizing CM3. They are connected because the
latter is the source of the former.
User threads on FreeBSD (FreeBSD7 /amd64)
=======================
The fact that my process was getting stuck in "umtxn" (I think that is
what top called it) turned out to be something inside fork() in libthr.so.
Now, libthr.so is FreeBSD's name for pthreads, and this was a user
threading program. This does not seem right.
I tried the following:
1. remove the call to pthread_atfork
No change in behavior. My parallelized CM3 still got stuck in umtxn.
2. remove the -pthread linking flag
Success! Parallelized CM3 works!
So, clearly -pthread is harmful to the health of Modula-3 programs running
with user threading on FreeBSD. This is on FreeBSD 7.1, so it means
pthread_atfork has to be removed as well. I'm not sure how to make the
building of this as transparent as possible, but what's currently checked
in is in any case wrong.
Parallelizing CM3
=================
I found one bug in my earlier CM3 patch. Calls to Utils.* need to be
monitored because there is a shared hash table in that module.
New diff attached. With this diff the compiler runs correctly even
when it is running 100 instances of the backend in parallel.
Speedup is about 2.0 on a machine with four cores for a largish source
directory. The max. number of subprocesses is set on line 985
of Builder.m3 .
I am not claiming this patch is anywhere near "polished" enough to go
into the CM3 tree at the moment. It's an experiment and starting point
for further work...
Mika
Index: cm3/src/Builder.m3
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/cm3/src/Builder.m3,v
retrieving revision 1.36
diff -c -r1.36 Builder.m3
*** cm3/src/Builder.m3 24 Aug 2010 05:24:24 -0000 1.36
--- cm3/src/Builder.m3 13 Feb 2011 22:17:16 -0000
***************
*** 15,20 ****
--- 15,22 ----
IMPORT QIdent;
FROM Target IMPORT M3BackendMode_t, BackendAssembly, BackendModeStrings;
FROM M3Path IMPORT OSKind, OSKindStrings;
+ IMPORT Pathname;
+ IMPORT RefSeq;
TYPE
UK = M3Unit.Kind;
***************
*** 131,136 ****
--- 133,139 ----
link_coverage : TEXT; (* coverage library *)
m3_front_flags: Arg.List; (* configuration options for the front *)
m3_options : Arg.List; (* misc. user options for the frontend *)
+ delayBackend := FALSE;
END;
TYPE
***************
*** 932,937 ****
--- 935,956 ----
(*------------------------------------------------------------ compilation --*)
+ TYPE MarkerPromise = QMachine.Promise BRANDED OBJECT OVERRIDES fulfil := FulfilNothing END; (* an empty promise, use as a marker *)
+
+ PROCEDURE FulfilNothing(<*UNUSED*>mp : MarkerPromise) = BEGIN END FulfilNothing;
+
+ TYPE SeqClosure = Thread.Closure OBJECT seq : RefSeq.T; OVERRIDES apply := SeqApply END;
+
+ PROCEDURE SeqApply(cl : SeqClosure) : REFANY =
+ BEGIN (* force a Sequence of promises *)
+ FOR i := 0 TO cl.seq.size()-1 DO
+ WITH p = NARROW(cl.seq.get(i),QMachine.Promise) DO
+ p.fulfil()
+ END
+ END;
+ RETURN NIL
+ END SeqApply;
+
PROCEDURE CompileEverything (s: State; schedule: SourceList) =
VAR u: M3Unit.T;
BEGIN
***************
*** 941,948 ****
(* compile all the sources using the initial schedule *)
FOR i := 0 TO LAST (schedule^) DO
! CompileOne (s, schedule[i]);
END;
FlushPending (s);
(* recompile any interfaces where we goofed on the exports *)
--- 960,1000 ----
(* compile all the sources using the initial schedule *)
FOR i := 0 TO LAST (schedule^) DO
! s.delayBackend := TRUE; (* tell backend to promise instead of executing *)
! TRY
! CompileOne (s, schedule[i]);
! FINALLY
! s.delayBackend := FALSE; (* tell backend to go back to executing *)
! END;
!
! s.machine.promises.addhi(NEW(MarkerPromise)); (* promises, promises *)
!
END;
+
+ VAR
+ curSeq := NEW(RefSeq.T).init(); (* sequential chain *)
+ threads := NEW(RefSeq.T).init(); (* parallel chains *)
+ BEGIN
+ FOR i := 0 TO s.machine.promises.size()-1 DO
+ WITH p = s.machine.promises.get(i) DO
+ curSeq.addhi(p);
+ IF i = s.machine.promises.size()-1 OR ISTYPE(p,MarkerPromise) THEN (* mark as end of one sequential chain of promises *)
+ WITH cl = NEW(SeqClosure, seq := curSeq) DO
+ threads.addhi (Thread.Fork(cl)); (* start sequential chain *)
+
+ IF threads.size() > 400 THEN EVAL Thread.Join(threads.remlo()) END; (* limit number of sequential chains executing in parallel *)
+
+ curSeq := NEW(RefSeq.T).init()
+ END
+ END
+ END
+ END;
+ WHILE threads.size() > 0 DO EVAL Thread.Join(threads.remlo()) END; (* wait for all sequential chains to end *)
+ END;
+
+ EVAL s.machine.promises.init(); (* clear out promises *)
+
+
FlushPending (s);
(* recompile any interfaces where we goofed on the exports *)
***************
*** 1151,1156 ****
--- 1203,1229 ----
END;
END CompileM3;
+ TYPE
+ NotePromise = QMachine.Promise OBJECT
+ nam : Pathname.T;
+ OVERRIDES
+ fulfil := FulfilNP;
+ END;
+
+ RemovePromise = QMachine.Promise OBJECT
+ nam : Pathname.T;
+ OVERRIDES
+ fulfil := FulfilRP;
+ END;
+
+ VAR utilsMu := NEW(MUTEX); (* Utils.* fiddles with a table *)
+
+ PROCEDURE FulfilNP(np : NotePromise) =
+ BEGIN LOCK utilsMu DO Utils.NoteTempFile(np.nam) END END FulfilNP;
+
+ PROCEDURE FulfilRP(rp : RemovePromise) =
+ BEGIN LOCK utilsMu DO Utils.Remove(rp.nam) END END FulfilRP;
+
PROCEDURE PushOneM3 (s: State; u: M3Unit.T): BOOLEAN =
VAR
tmpC, tmpS: TEXT;
***************
*** 1191,1208 ****
| 3 => (* -bootstrap, +m3back, +asm *)
tmpC := TempCName (u);
tmpS := TempSName (u);
IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpC) END;
IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpS) END;
IF RunM3 (s, u, tmpC) THEN
! IF RunM3Back (s, tmpC, tmpS, u.debug, u.optimize)
! AND RunAsm (s, tmpS, u.object) THEN
END;
need_merge := TRUE;
END;
IF (NOT s.keep_files) THEN Utils.Remove (tmpC) END;
IF (NOT s.keep_files) THEN Utils.Remove (tmpS) END;
| 6, (* +bootstrap, +m3back, -asm *)
7 => (* +bootstrap, +m3back, +asm *)
--- 1264,1320 ----
| 3 => (* -bootstrap, +m3back, +asm *)
+ IF s.delayBackend THEN
tmpC := TempCName (u);
tmpS := TempSName (u);
+ (*
IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpC) END;
IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpS) END;
+ *)
+ IF (NOT s.keep_files) THEN
+ s.machine.promises.addhi(NEW(NotePromise, nam := tmpC))
+ END;
+ IF (NOT s.keep_files) THEN
+ s.machine.promises.addhi(NEW(NotePromise, nam := tmpS))
+ END;
+
IF RunM3 (s, u, tmpC) THEN
! s.machine.record(TRUE);
! TRY
! IF RunM3Back (s, tmpC, tmpS, u.debug, u.optimize)
! AND RunAsm (s, tmpS, u.object) THEN
! END;
! FINALLY
! s.machine.record(FALSE)
END;
+
need_merge := TRUE;
END;
+ (*
IF (NOT s.keep_files) THEN Utils.Remove (tmpC) END;
IF (NOT s.keep_files) THEN Utils.Remove (tmpS) END;
+ *)
+ IF (NOT s.keep_files) THEN
+ s.machine.promises.addhi(NEW(RemovePromise, nam := tmpC))
+ END;
+ IF (NOT s.keep_files) THEN
+ s.machine.promises.addhi(NEW(RemovePromise, nam := tmpS))
+ END;
+
+ ELSE
+ tmpC := TempCName (u);
+ tmpS := TempSName (u);
+ IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpC) END;
+ IF (NOT s.keep_files) THEN Utils.NoteTempFile (tmpS) END;
+ IF RunM3 (s, u, tmpC) THEN
+ IF RunM3Back (s, tmpC, tmpS, u.debug, u.optimize)
+ AND RunAsm (s, tmpS, u.object) THEN
+ END;
+ need_merge := TRUE;
+ END;
+ IF (NOT s.keep_files) THEN Utils.Remove (tmpC) END;
+ IF (NOT s.keep_files) THEN Utils.Remove (tmpS) END;
+ END
| 6, (* +bootstrap, +m3back, -asm *)
7 => (* +bootstrap, +m3back, +asm *)
Index: m3quake/src/QMachine.i3
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/m3quake/src/QMachine.i3,v
retrieving revision 1.6
diff -c -r1.6 QMachine.i3
*** m3quake/src/QMachine.i3 4 Sep 2009 10:24:07 -0000 1.6
--- m3quake/src/QMachine.i3 13 Feb 2011 22:17:18 -0000
***************
*** 8,13 ****
--- 8,14 ----
IMPORT Thread, Wr, QValue, QCode;
FROM Quake IMPORT Machine, Error, ID, IDMap;
+ IMPORT RefSeq;
REVEAL
T <: T_;
***************
*** 15,20 ****
--- 16,22 ----
T = Machine;
T_ = OBJECT
map: IDMap := NIL; (* READONLY *)
+ promises : RefSeq.T;
METHODS
init (map: IDMap): T;
evaluate (s: QCode.Stream) RAISES {Error, Thread.Alerted};
***************
*** 37,42 ****
--- 39,46 ----
set_wr (wr: Wr.T);
exec_echo (b: BOOLEAN): BOOLEAN;
trace (b: BOOLEAN);
+
+ record(on : BOOLEAN); (* instead of performing certain acts, promise *)
END;
PROCEDURE PushBool (t: T; b: BOOLEAN);
***************
*** 51,54 ****
--- 55,63 ----
PROCEDURE GetEnv (default, v0, v1, v2, v3, v4: TEXT := NIL): TEXT;
+ TYPE Promise = OBJECT METHODS fulfil() RAISES { Error } END;
+
END QMachine.
+
+
+
Index: m3quake/src/QMachine.m3
===================================================================
RCS file: /usr/cvs/cm3/m3-sys/m3quake/src/QMachine.m3,v
retrieving revision 1.35
diff -c -r1.35 QMachine.m3
*** m3quake/src/QMachine.m3 3 Aug 2010 09:40:04 -0000 1.35
--- m3quake/src/QMachine.m3 13 Feb 2011 22:17:18 -0000
***************
*** 16,21 ****
--- 16,22 ----
IMPORT TextUtils, FSUtils, System, DirStack; (* sysutils *)
IMPORT Compiler;
IMPORT M3Path;
+ IMPORT RefSeq;
CONST
OnUnix = (Compiler.ThisOS = Compiler.OS.POSIX);
***************
*** 44,49 ****
--- 45,52 ----
shell : TEXT := NIL;
sh_option : TEXT := NIL;
tmp_dir : TEXT := NIL;
+
+ doRecord := FALSE;
OVERRIDES
init := Init;
evaluate := Evaluate;
***************
*** 66,73 ****
--- 69,81 ----
set_wr := SetWr;
exec_echo := ExecEcho;
trace := Trace;
+
+ record := Record;
END;
+ PROCEDURE Record(t : T; on : BOOLEAN) =
+ BEGIN t.doRecord := on END Record;
+
TYPE
Registers = RECORD
cp : QCode.Stream := NIL; (* code pointer *)
***************
*** 139,144 ****
--- 147,154 ----
t.globals := NEW (IntRefTbl.Default).init ();
t.default_wr := Stdio.stdout;
+ t.promises := NEW(RefSeq.T).init();
+
InitOSEnv (t);
InitBuiltins (t);
***************
*** 1555,1564 ****
END;
ELSE
FlushIO ();
! Process.GetStandardFileHandles (stdin, stdout, stderr);
! handle := Process.Create (t.shell, SUBARRAY (args, 0, n_shell_args),
! stdin := stdin, stdout := stdout,
! stderr := stderr);
END;
EXCEPT
| Thread.Alerted =>
--- 1565,1594 ----
END;
ELSE
FlushIO ();
! IF t.doRecord THEN
! handle := NIL;
! WITH a = NEW(REF ARRAY OF TEXT, n_shell_args) DO
! a^ := SUBARRAY(args,0,n_shell_args);
! VAR wrx : Wr.T; BEGIN
! IF echo OR t.do_echo THEN
! wrx := wr
! ELSE
! wrx := NIL
! END;
! t.promises.addhi(NEW(ExecPromise,
! cmd := t.shell,
! wr := wrx,
! args := a,
! t := t,
! ignore_errors := ignore_errors))
! END
! END
! ELSE
! Process.GetStandardFileHandles (stdin, stdout, stderr);
! handle := Process.Create (t.shell, SUBARRAY (args, 0, n_shell_args),
! stdin := stdin, stdout := stdout,
! stderr := stderr);
! END;
END;
EXCEPT
| Thread.Alerted =>
***************
*** 1573,1579 ****
END;
(* wait for everything to shutdown... *)
! exit_code := Process.Wait (handle);
END;
IF onlyTry THEN
--- 1603,1613 ----
END;
(* wait for everything to shutdown... *)
! IF handle = NIL THEN
! exit_code := 0
! ELSE
! exit_code := Process.Wait (handle);
! END; (* else we're only promising *)
END;
IF onlyTry THEN
***************
*** 1589,1594 ****
--- 1623,1664 ----
END ExecCommand;
+ TYPE
+ ExecPromise = Promise OBJECT
+ cmd : TEXT;
+ args : REF ARRAY OF TEXT;
+ t : T;
+ wr : Wr.T;
+ ignore_errors : BOOLEAN;
+ OVERRIDES
+ fulfil := FulfilExecPromise;
+ END;
+
+ PROCEDURE FulfilExecPromise(ep : ExecPromise) RAISES { Error } =
+ VAR
+ stdin, stdout, stderr: File.T;
+ BEGIN
+ Process.GetStandardFileHandles (stdin, stdout, stderr);
+ TRY
+ IF ep.wr # NIL THEN
+ Wr.PutText (ep.wr, ep.args[1]);
+ Wr.PutText (ep.wr, Wr.EOL);
+ FlushIO ();
+ END;
+ WITH handle = Process.Create (ep.cmd, ep.args^,
+ stdin := stdin, stdout := stdout,
+ stderr := stderr),
+ exit_code = Process.Wait(handle) DO
+ IF exit_code # 0 AND NOT ep.ignore_errors THEN
+ Err (ep.t, Fmt.F("exit %s: %s", Fmt.Int(exit_code), ep.cmd))
+ END
+ END
+ EXCEPT
+ OSError.E (ec) =>
+ Err (ep.t, Fmt.F ("exec failed%s *** %s", OSErr (ec), ep.cmd));
+ END
+ END FulfilExecPromise;
+
PROCEDURE KillProcess (handle: Process.T) =
BEGIN
IF (handle # NIL) THEN
More information about the M3devel
mailing list