[M3devel] pthreads status

mika at async.caltech.edu mika at async.caltech.edu
Mon Aug 25 07:49:21 CEST 2014


Hi m3devel,

I hope this email finds you well and you had a good weekend.  I spent
mine with a debugger, as will become clear.

I have a programming problem I chose to solve as I choose to solve most
programming problems, by writing a program in Modula-3.  The particular
programming problem is one in (electronic) design automation and requires
lots of memory (200 GB of RAM maybe) and has lots of FINE-grained
parallelism.  In other words, it's a prime candidate for multiprocessing
via pthreads in Modula-3.  One of the reasons I even chose to approach the
problem this way is that I saw some noises on this mailing list suggesting
that the pthreads threading library "now works".  I've tried the pthreads
many times over the years and things have always been so broken that I
found the code to be unusable so I was really happy to hear that the
pthreads were now working.

Well, of course the pthreads library wasn't working.  That's not a
criticism, really.  I understand from reading the code and communicating
with Tony that many of the problems stem from issues in understanding
the underlying POSIX facilities, which are not always documented properly
(in particular issues with interactions between threading and signals).
In any case there was a blatant race condition in the code.

But now, after lots of invaluable assistance from Tony and countless gigabytes
of log files and beating on code... I am reasonably confident that we have
a version of the threading library, over pthreads, that is actually correct!
For now it is on one architecture only, namely AMD64_LINUX.  It crashes and
burns on AMD64_FREEBSD.  Don't ask me why---the code on FreeBSD uses less
"tricky stuff" and ought to be monotonically easier to get working than the
Linux code.  I've run the thread tester in its grumpiest modes for several CPU
weeks against the new library, as well as my CAD application, all without
evident problems.

I don't know the precise procedure for submitting things anymore, with the 
git migration.  Tony has a copy of the code I'm also attaching to this email,
which is the correctly functioning ThreadPThread.m3 for AMD64_LINUX.

Not everything is roses with this, but I'll leave the bad news for another
email (I need some advice, too).

   Best regards,
     Mika
-------------- next part --------------
(* Copyright (C) 2005, Purdue Research Foundation                  *)
(* All rights reserved.                                            *)
(* See the file COPYRIGHT-PURDUE for a full description.           *)

UNSAFE MODULE ThreadPThread EXPORTS Thread, ThreadF, RTThread, Scheduler,
SchedulerPosix, RTOS, RTHooks, ThreadPThread;

IMPORT Cerrno, FloatMode, MutexRep, RTCollectorSRC, RTError, RTHeapRep, RTIO,
       RTParams, RTPerfTool, RTProcess, ThreadEvent, Time,
       Word, Usched, Uerror, Uexec;
FROM Compiler IMPORT ThisFile, ThisLine;
FROM Ctypes IMPORT int;
IMPORT RuntimeError AS RTE;
FROM ThreadInternal IMPORT Poll;

(*----------------------------------------------------- types and globals ---*)

CONST
  MILLION = 1000 * 1000;
  WAIT_UNIT = MILLION; (* one million nanoseconds, one thousandth of a second *)
  RETRY_INTERVAL = 10 * MILLION; (* 10 million nanoseconds, one hundredth of a second *)

REVEAL
  Mutex = MutexRep.Public BRANDED "Mutex Pthread-1.0" OBJECT
    mutex: pthread_mutex_t := NIL;
    holder: Activation := NIL;
    waiters: Activation := NIL;
  OVERRIDES
    acquire := LockMutex;
    release := UnlockMutex;
  END;

  Condition = BRANDED "Thread.Condition Pthread-1.0" OBJECT
    mutex: pthread_mutex_t := NIL;
    waiters: Activation := NIL;     (* LL = mutex *)
  END;

  T = BRANDED "Thread.T Pthread-1.6" OBJECT
    act: Activation := NIL;         (* live untraced thread data *)
    closure: Closure := NIL;        (* our work and its result *)
    result: REFANY := NIL;          (* our work and its result *)
    join: Condition;                (* wait here to join; NIL when done *)
    joined: BOOLEAN := FALSE;       (* Is anyone waiting yet? *)
  END;

TYPE
  ActState = { Starting, Started, Stopping, Stopped };
  REVEAL Activation = UNTRACED BRANDED REF RECORD
    frame: ADDRESS := NIL;              (* exception handling support *)
    mutex: pthread_mutex_t := NIL;      (* write-once in CreateT *)
    cond: pthread_cond_t := NIL;        (* write-once in CreateT; a place to park while waiting *)
    alerted : BOOLEAN := FALSE;         (* LL = mutex; the alert flag *)
    waitingOn: pthread_mutex_t := NIL;  (* LL = mutex; The CV's mutex *)
    nextWaiter: Activation := NIL;      (* LL = mutex; waiting thread queue *)
    next, prev: Activation := NIL;      (* LL = activeMu; global doubly-linked, circular list of all active threads *)
    handle: pthread_t := NIL;           (* LL = activeMu; thread handle *)
    stackbase: ADDRESS := NIL;          (* LL = activeMu; stack base for GC *)
    context: ADDRESS := NIL;            (* LL = activeMu *)
    state := ActState.Started;          (* LL = activeMu *)
    slot: CARDINAL := 0;                (* LL = slotMu; index in slots *)
    floatState : FloatMode.ThreadState; (* per-thread floating point state *)
    heapState : RTHeapRep.ThreadState;  (* per-thread heap state *)
  END;

PROCEDURE SetState (act: Activation;  state: ActState) =
  CONST text = ARRAY ActState OF TEXT
    { "Starting", "Started", "Stopping", "Stopped" };
  BEGIN
    act.state := state;
    IF DEBUG THEN
      RTIO.PutText(text[state]);
      RTIO.PutText(" act=");
      RTIO.PutAddr(act);
      RTIO.PutText("\n");
      RTIO.Flush();
    END;
  END SetState;

(*----------------------------------------------------------------- Mutex ---*)

PROCEDURE Acquire (m: Mutex) =
  BEGIN
    m.acquire ();
  END Acquire;

PROCEDURE Release (m: Mutex) =
  BEGIN
    m.release ();
  END Release;

PROCEDURE CleanMutex (r: REFANY) =
  VAR m := NARROW(r, Mutex);
  BEGIN
    pthread_mutex_delete(m.mutex);
    m.mutex := NIL;
  END CleanMutex;

PROCEDURE InitMutex (VAR m: pthread_mutex_t; root: REFANY;
                     Clean: PROCEDURE(root: REFANY)) =
  VAR mutex := pthread_mutex_new();
  BEGIN
    TRY
      WITH r = pthread_mutex_lock(initMu) DO <*ASSERT r=0*> END;
      (* Did someone else win the race? *)
      IF m # NIL THEN RETURN END;
      (* We won the race, but we might have failed to allocate. *)
      IF mutex = NIL THEN RTE.Raise (RTE.T.OutOfMemory) END;
      RTHeapRep.RegisterFinalCleanup (root, Clean);
      m := mutex;
      mutex := NIL;
    FINALLY
      WITH r = pthread_mutex_unlock(initMu) DO <*ASSERT r=0*> END;
      pthread_mutex_delete(mutex);
    END;
  END InitMutex;

PROCEDURE LockMutex (m: Mutex) =
  VAR self := GetActivation();
  BEGIN
    IF perfOn THEN PerfChanged(State.locking) END;
    IF m.mutex = NIL THEN InitMutex(m.mutex, m, CleanMutex) END;
    WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_lock(m.mutex) DO <*ASSERT r=0*> END;
    IF m.holder = NIL THEN
      m.holder := self;
      WITH r = pthread_mutex_unlock(m.mutex) DO <*ASSERT r=0*> END;
      WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
      IF perfOn THEN PerfRunning() END;
      RETURN;
    END;
    <*ASSERT self.waitingOn = NIL*>
    <*ASSERT self.nextWaiter = NIL*>
    self.waitingOn := m.mutex;
    self.nextWaiter := m.waiters;
    m.waiters := self;
    IF m.holder = self THEN Die(ThisLine(), "impossible acquire") END;
    WITH r = pthread_mutex_unlock(m.mutex) DO <*ASSERT r=0*> END;
    REPEAT
      WITH r = pthread_cond_wait(self.cond, self.mutex) DO <*ASSERT r=0*> END;
    UNTIL self.waitingOn = NIL; (* m.holder = self *)
    <*ASSERT m.holder = self*>
    <*ASSERT self.nextWaiter = NIL*>
    WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
    IF perfOn THEN PerfRunning() END;
  END LockMutex;

PROCEDURE UnlockMutex (m: Mutex) =
  VAR
    self := GetActivation();
    t, prev: Activation;
  BEGIN
    IF m.mutex = NIL THEN InitMutex(m.mutex, m, CleanMutex) END;
    WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_lock(m.mutex) DO <*ASSERT r=0*> END;
    IF m.holder # self THEN Die(ThisLine(), "illegal release") END;
    t := m.waiters;
    IF t = NIL THEN
      m.holder := NIL;
      WITH r = pthread_mutex_unlock(m.mutex) DO <*ASSERT r=0*> END;
      WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
      RETURN;
    END;
    prev := NIL;
    WHILE t.nextWaiter # NIL DO
      prev := t;
      t := t.nextWaiter;
    END;
    IF prev # NIL
      THEN prev.nextWaiter := NIL;
      ELSE m.waiters := NIL;
    END;
    m.holder := t;
    WITH r = pthread_mutex_unlock(m.mutex) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_lock(t.mutex) DO <*ASSERT r=0*> END;
    <*ASSERT t.waitingOn = m.mutex*>
    t.nextWaiter := NIL;
    t.waitingOn := NIL;
    WITH r = pthread_cond_signal(t.cond) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_unlock(t.mutex) DO <*ASSERT r=0*> END;
  END UnlockMutex;

(*---------------------------------------- Condition variables and Alerts ---*)

PROCEDURE CleanCondition (r: REFANY) =
  VAR c := NARROW(r, Condition);
  BEGIN
    pthread_mutex_delete(c.mutex);
    c.mutex := NIL;
  END CleanCondition;

PROCEDURE XWait (self: Activation; m: Mutex; c: Condition; alertable: BOOLEAN)
  RAISES {Alerted} =
  (* LL = m *)
  VAR next, prev: Activation;
  BEGIN
    IF perfOn THEN PerfChanged(State.waiting) END;
    IF c.mutex = NIL THEN InitMutex(c.mutex, c, CleanCondition) END;
    WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END;
    <*ASSERT self.waitingOn = NIL*>
    <*ASSERT self.nextWaiter = NIL*>

    WITH r = pthread_mutex_lock(c.mutex) DO <*ASSERT r=0*> END;
    self.waitingOn := c.mutex;
    self.nextWaiter := c.waiters;
    c.waiters := self;
    WITH r = pthread_mutex_unlock(c.mutex) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;

    m.release();

    WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END;
    LOOP
      IF alertable AND self.alerted THEN
        self.alerted := FALSE;
        <*ASSERT self.waitingOn = c.mutex*>
        WITH r = pthread_mutex_lock(c.mutex) DO <*ASSERT r=0*> END;
        next := c.waiters; prev := NIL;
        WHILE next # self DO
          <*ASSERT next # NIL*>
          prev := next; next := next.nextWaiter;
        END;
        IF prev = NIL
          THEN c.waiters := self.nextWaiter;
          ELSE prev.nextWaiter := self.nextWaiter;
        END;
        WITH r = pthread_mutex_unlock(c.mutex) DO <*ASSERT r=0*> END;
        self.nextWaiter := NIL;
        self.waitingOn := NIL;
        WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
        m.acquire();
        RAISE Alerted;
      ELSIF self.waitingOn = NIL THEN
        <*ASSERT self.nextWaiter = NIL*>
        WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
        m.acquire();
        RETURN;
      END;
      WITH r = pthread_cond_wait(self.cond, self.mutex) DO <*ASSERT r=0*> END;
    END;
  END XWait;

PROCEDURE AlertWait (m: Mutex; c: Condition) RAISES {Alerted} =
  (* LL = m *)
  VAR self := GetActivation();
  BEGIN
    XWait(self, m, c, alertable := TRUE);
  END AlertWait;

PROCEDURE Wait (m: Mutex; c: Condition) =
  <*FATAL Alerted*>
  (* LL = m *)
  VAR self := GetActivation();
  BEGIN
    XWait(self, m, c, alertable := FALSE);
  END Wait;

PROCEDURE Signal (c: Condition) =
  VAR
    self := GetActivation();
    t: Activation;
  BEGIN
    IF c.mutex = NIL THEN InitMutex(c.mutex, c, CleanCondition) END;
    WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_lock(c.mutex) DO <*ASSERT r=0*> END;
    t := c.waiters;
    IF t # NIL THEN
      c.waiters := t.nextWaiter;
    END;
    WITH r = pthread_mutex_unlock(c.mutex) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
    IF t # NIL THEN
      WITH r = pthread_mutex_lock(t.mutex) DO <*ASSERT r=0*> END;
      t.nextWaiter := NIL;
      t.waitingOn := NIL;
      WITH r = pthread_cond_signal(t.cond) DO <*ASSERT r=0*> END;
      WITH r = pthread_mutex_unlock(t.mutex) DO <*ASSERT r=0*> END;
    END;
  END Signal;

PROCEDURE Broadcast (c: Condition) =
  VAR
    self := GetActivation();
    t, next: Activation;
  BEGIN
    IF c.mutex = NIL THEN InitMutex(c.mutex, c, CleanCondition) END;
    WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_lock(c.mutex) DO <*ASSERT r=0*> END;
    t := c.waiters;
    c.waiters := NIL;
    WITH r = pthread_mutex_unlock(c.mutex) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
    WHILE t # NIL DO
      WITH r = pthread_mutex_lock(t.mutex) DO <*ASSERT r=0*> END;
      next := t.nextWaiter;
      t.nextWaiter := NIL;
      t.waitingOn := NIL;
      WITH r = pthread_cond_signal(t.cond) DO <*ASSERT r=0*> END;
      WITH r = pthread_mutex_unlock(t.mutex) DO <*ASSERT r=0*> END;
      t := next;
    END;
  END Broadcast;

PROCEDURE Alert (thread: T) =
  VAR t := thread.act;
  BEGIN
    WITH r = pthread_mutex_lock(t.mutex) DO <*ASSERT r=0*> END;
    t.alerted := TRUE;
    WITH r = pthread_cond_signal(t.cond) DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_unlock(t.mutex) DO <*ASSERT r=0*> END;
  END Alert;

PROCEDURE XTestAlert (self: Activation): BOOLEAN =
  VAR result: BOOLEAN;
  BEGIN
    WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END;
    result := self.alerted;
    self.alerted := FALSE;
    WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
    RETURN result;
  END XTestAlert;

PROCEDURE TestAlert (): BOOLEAN =
  VAR self := GetActivation();
  BEGIN
    RETURN XTestAlert(self);
  END TestAlert;

(*------------------------------------------------------------------ Self ---*)

VAR (* LL = slotMu *)
  n_slotted: CARDINAL;
  next_slot: CARDINAL;      (* NOTE: we don't use slots[0] *)
  slots: REF ARRAY OF T;    (* NOTE: we don't use slots[0] *)

PROCEDURE InitActivations (me: Activation) =
  BEGIN
    me.handle := pthread_self();
    me.next := me;
    me.prev := me;
    SetActivation(me);
    (* Explicitly (re)initialize to handle fork(). *)
    next_slot := 1;     (* no threads created yet *)
    slots := NIL;       (* no threads created yet *)
    n_slotted := 0;     (* no threads created yet *)
    allThreads := me;
    FloatMode.InitThread(me.floatState);
  END InitActivations;

PROCEDURE Self (): T =
  (* If not the initial thread and not created by Fork, returns NIL *)
  VAR
    me := GetActivation();
    t: T;
  BEGIN
    IF me = NIL THEN Die(ThisLine(), "Thread primitive called from non-Modula-3 thread") END;
    WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END;
      t := slots[me.slot];
    WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END;
    IF (t.act # me) THEN Die(ThisLine(), "thread with bad slot!") END;
    RETURN t;
  END Self;

PROCEDURE AssignSlot (t: T): INTEGER =
  (* LL = 0, cause we allocate stuff with NEW! *)
  VAR n: CARDINAL;  new_slots: REF ARRAY OF T;  slot: CARDINAL;
  BEGIN
    WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END;

      (* make sure we have room to register this guy *)
      IF (slots = NIL) THEN
        WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END;
          slots := NEW (REF ARRAY OF T, 20);
        WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END;
      END;
      IF (n_slotted >= LAST (slots^)) THEN
        n := NUMBER (slots^);
        WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END;
          new_slots := NEW (REF ARRAY OF T, n+n);
        WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END;
        IF (n = NUMBER (slots^)) THEN
          (* we won any races that may have occurred. *)
          SUBARRAY (new_slots^, 0, n) := slots^;
          slots := new_slots;
        ELSIF (n_slotted < LAST (slots^)) THEN
          (* we lost a race while allocating a new slot table,
             and the new table has room for us. *)
        ELSE
          (* ouch, the new table is full too!   Bail out and retry *)
          WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END;
          RETURN AssignSlot (t);
        END;
      END;

      (* look for an empty slot *)
      WHILE (slots [next_slot] # NIL) DO
        INC (next_slot);
        IF (next_slot >= NUMBER (slots^)) THEN next_slot := 1; END;
      END;

      INC (n_slotted);
      slot := next_slot;
      slots [slot] := t;

    WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END;
    RETURN slot;
  END AssignSlot;

PROCEDURE FreeSlot (self: T) =
  (* LL = 0 *)
  BEGIN
    WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END;

      DEC (n_slotted);
      WITH z = slots [self.act.slot] DO
        IF z # self THEN Die (ThisLine(), "unslotted thread!"); END;
        z := NIL;
      END;
      self.act.slot := 0;
    WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END;
  END FreeSlot;

PROCEDURE DumpThread (t: Activation) =
  BEGIN
    RTIO.PutText("Activation:   "); RTIO.PutAddr(t);             RTIO.PutChar('\n');
    RTIO.PutText("  slot:       "); RTIO.PutInt(t.slot);         RTIO.PutChar('\n');
    RTIO.PutText("  mutex:      "); RTIO.PutAddr(t.mutex);       RTIO.PutChar('\n');
    RTIO.PutText("  cond:       "); RTIO.PutAddr(t.cond);        RTIO.PutChar('\n');
    RTIO.PutText("  alerted:    "); RTIO.PutInt(ORD(t.alerted)); RTIO.PutChar('\n');
    RTIO.PutText("  waitingOn:  "); RTIO.PutAddr(t.waitingOn);   RTIO.PutChar('\n');
    RTIO.PutText("  nextWaiter: "); RTIO.PutAddr(t.nextWaiter);  RTIO.PutChar('\n');
    RTIO.PutText("  frame:      "); RTIO.PutAddr(t.frame);       RTIO.PutChar('\n');
    RTIO.PutText("  next:       "); RTIO.PutAddr(t.next);        RTIO.PutChar('\n');
    RTIO.PutText("  prev:       "); RTIO.PutAddr(t.prev);        RTIO.PutChar('\n');
    RTIO.PutText("  handle:     "); RTIO.PutAddr(t.handle);      RTIO.PutChar('\n');
    RTIO.PutText("  stackbase:  "); RTIO.PutAddr(t.stackbase);   RTIO.PutChar('\n');
    RTIO.PutText("  context:    "); RTIO.PutAddr(t.context);     RTIO.PutChar('\n');
    RTIO.PutText("  state:      ");
    CASE t.state OF
    | ActState.Started => RTIO.PutText("Started\n");
    | ActState.Stopped => RTIO.PutText("Stopped\n");
    | ActState.Starting => RTIO.PutText("Starting\n");
    | ActState.Stopping => RTIO.PutText("Stopping\n");
    END;
    RTIO.Flush();
  END DumpThread;

PROCEDURE DumpThreads () =
  VAR t := allThreads;
  BEGIN
    REPEAT
      DumpThread(t);
      t := t.next
    UNTIL t = allThreads;
  END DumpThreads;

(*------------------------------------------------------------ Fork, Join ---*)

VAR (* LL=activeMu *)
  allThreads: Activation := NIL;            (* global list of active threads *)

PROCEDURE CleanThread (r: REFANY) =
  VAR t := NARROW(r, T);
  BEGIN
    pthread_mutex_delete(t.act.mutex);
    pthread_cond_delete(t.act.cond);
    DISPOSE(t.act);
  END CleanThread;

(* ThreadBase calls RunThread after finding (approximately) where
   its stack begins.  This dance ensures that all of ThreadMain's
   traced references are within the stack scanned by the collector. *)

PROCEDURE ThreadBase (param: ADDRESS): ADDRESS =
  VAR
    me: Activation := param;
  BEGIN
    SetActivation(me);
    me.stackbase := ADR(me); (* enable GC scanning of this stack *)
    me.handle := pthread_self();

    (* add to the list of active threads *)
    WITH r = pthread_mutex_lock(activeMu) DO <*ASSERT r=0*> END;
      me.next := allThreads;
      me.prev := allThreads.prev;
      allThreads.prev.next := me;
      allThreads.prev := me;
    WITH r = pthread_mutex_unlock(activeMu) DO <*ASSERT r=0*> END;
    FloatMode.InitThread (me.floatState);

    RunThread(me);

    (* remove from the list of active threads *)
    WITH r = pthread_mutex_lock(activeMu) DO <*ASSERT r=0*> END;
      <*ASSERT allThreads # me*>
      me.stackbase := NIL; (* disable GC scanning of my stack *)
      me.next.prev := me.prev;
      me.prev.next := me.next;
      WITH r = pthread_detach_self() DO <*ASSERT r=0*> END;
    WITH r = pthread_mutex_unlock(activeMu) DO <*ASSERT r=0*> END;
    me.next := NIL;
    me.prev := NIL;
    RETURN NIL;
  END ThreadBase;

PROCEDURE RunThread (me: Activation) =
  VAR self: T;
  BEGIN
    IF perfOn THEN PerfChanged(State.alive) END;

    WITH r = pthread_mutex_lock(slotsMu) DO <*ASSERT r=0*> END;
      self := slots [me.slot];
    WITH r = pthread_mutex_unlock(slotsMu) DO <*ASSERT r=0*> END;

    IF perfOn THEN PerfRunning() END;

    (*** Run the user-level code. ***)
    self.result := self.closure.apply();

    IF perfOn THEN PerfChanged(State.dying) END;

    (* Join *)
    LOCK joinMu DO
      Broadcast(self.join);
      self.join := NIL;     (* mark me done *)
    END;

    IF perfOn THEN PerfChanged(State.dead) END;

    (* we're dying *)
    RTHeapRep.FlushThreadState(me.heapState);

    IF perfOn THEN PerfDeleted() END;
    FreeSlot(self);  (* note: needs self.act ! *)
    (* Since we're no longer slotted, we cannot touch traced refs. *)
  END RunThread;

VAR joinMu: MUTEX;

PROCEDURE Fork (closure: Closure): T =
  VAR
    act := NEW(Activation,
               mutex := pthread_mutex_new(),
               cond := pthread_cond_new());
    size := defaultStackSize;
    t: T := NIL;
  BEGIN
    TRY
      IF act.mutex = NIL OR act.cond = NIL THEN
        RTE.Raise(RTE.T.OutOfMemory);
      END;
      t := NEW(T, act := act, closure := closure, join := NEW(Condition));
      RTHeapRep.RegisterFinalCleanup(t, CleanThread);
      act.slot := AssignSlot(t);
    FINALLY
      IF act.slot = 0 THEN
        (* we failed, cleanup *)
        pthread_mutex_delete(act.mutex);
        pthread_cond_delete(act.cond);
        DISPOSE(act);
      END;
    END;
    (* determine the initial size of the stack for this thread *)
    TYPECASE closure OF
    | SizedClosure (scl) => size := scl.stackSize;
    ELSE (*skip*)
    END;
    WITH r = thread_create(size * ADRSIZE(Word.T), ThreadBase, act) DO
      IF r # 0 THEN DieI(ThisLine(), r) END;
    END;
    RETURN t;
  END Fork;

PROCEDURE XJoin (self: Activation; t: T; alertable: BOOLEAN):
  REFANY RAISES {Alerted} =
  BEGIN
    LOCK joinMu DO
      IF t.joined THEN Die(ThisLine(), "attempt to join with thread twice") END;
      TRY
        t.joined := TRUE;
        WHILE t.join # NIL DO XWait(self, joinMu, t.join, alertable) END;
      FINALLY
        IF t.join # NIL THEN t.joined := FALSE END;
      END;
    END;
    RETURN t.result;
  END XJoin;

PROCEDURE Join (t: T): REFANY =
  <*FATAL Alerted*>
  VAR self := GetActivation();
  BEGIN
    RETURN XJoin(self, t, alertable := FALSE);
  END Join;

PROCEDURE AlertJoin (t: T): REFANY RAISES {Alerted} =
  VAR self := GetActivation();
  BEGIN
    RETURN XJoin(self, t, alertable := TRUE);
  END AlertJoin;

(*---------------------------------------------------- Scheduling support ---*)

PROCEDURE XPause (self: Activation; n: LONGREAL; alertable: BOOLEAN)
  RAISES {Alerted} =
  VAR until := Time.Now() + n;
  BEGIN
    IF perfOn THEN PerfChanged(State.pausing) END;
    WITH r = pthread_mutex_lock(self.mutex) DO <*ASSERT r=0*> END;
    <*ASSERT self.waitingOn = NIL*>
    <*ASSERT self.nextWaiter = NIL*>

    LOOP
      IF alertable AND self.alerted THEN
        self.alerted := FALSE;
        WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
        IF perfOn THEN PerfRunning() END;
        RAISE Alerted;
      END;
      WITH r = pthread_cond_timedwait(self.cond, self.mutex, until) DO
        IF r = Uerror.ETIMEDOUT THEN
          WITH r = pthread_mutex_unlock(self.mutex) DO <*ASSERT r=0*> END;
          IF perfOn THEN PerfRunning() END;
          RETURN;
        END;
        <*ASSERT r=0*>
      END;
    END;
  END XPause;

PROCEDURE Pause (n: LONGREAL) =
  <*FATAL Alerted*>
  VAR self := GetActivation();
  BEGIN
    XPause(self, n, alertable := FALSE);
  END Pause;

PROCEDURE AlertPause (n: LONGREAL) RAISES {Alerted} =
  VAR self := GetActivation();
  BEGIN
    XPause(self, n, alertable := TRUE);
  END AlertPause;

PROCEDURE Yield () =
  BEGIN
    WITH r = Usched.yield() DO
      IF r # 0 THEN DieI(ThisLine(), Cerrno.GetErrno()) END;
    END;
  END Yield;

PROCEDURE IOWait (fd: CARDINAL; read: BOOLEAN;
                  timeoutInterval: LONGREAL := -1.0D0): WaitResult =
  <*FATAL Alerted*>
  VAR self := GetActivation();
  BEGIN
    TRY
      IF perfOn THEN PerfChanged(State.blocking) END;
      RETURN XIOWait(self, fd, read, timeoutInterval, alertable := FALSE);
    FINALLY
      IF perfOn THEN PerfRunning() END;
    END;
  END IOWait;

PROCEDURE IOAlertWait (fd: CARDINAL; read: BOOLEAN;
                       timeoutInterval: LONGREAL := -1.0D0): WaitResult
  RAISES {Alerted} =
  VAR self := GetActivation();
  BEGIN
    TRY
      IF perfOn THEN PerfChanged(State.blocking) END;
      RETURN XIOWait(self, fd, read, timeoutInterval, alertable := TRUE);
    FINALLY
      IF perfOn THEN PerfRunning() END;
    END;
  END IOAlertWait;

PROCEDURE XIOWait (self: Activation; fd: CARDINAL; read: BOOLEAN;
                   interval: LONGREAL; alertable: BOOLEAN): WaitResult
  RAISES {Alerted} =
  VAR res: WaitResult;
      subInterval: LONGREAL := 1.0d0;
      err: int := 0;
      again := FALSE;
  BEGIN
    IF NOT alertable THEN
      subInterval := interval;
    ELSIF interval < 0.0d0 THEN
      interval := LAST(LONGREAL);
    ELSIF interval < subInterval THEN
      subInterval := interval;
    END;

    IF alertable AND XTestAlert(self) THEN RAISE Alerted END;
    LOOP
      res := VAL(Poll(fd, ORD(read), subInterval), WaitResult);

      IF alertable AND XTestAlert(self) THEN RAISE Alerted END;

      CASE res OF
        | WaitResult.FDError, WaitResult.Ready =>
          RETURN res;
        | WaitResult.Error =>
          err := Cerrno.GetErrno();
          IF err = Uerror.EINTR THEN
            (* spurious wakeups are OK *)
          ELSIF err = Uerror.EAGAIN AND NOT again THEN
            again := TRUE; (* try just once more *)
          ELSE
            RETURN WaitResult.Error;
          END;
        | WaitResult.Timeout =>
          interval := interval - subInterval;
          IF interval <= 0.0d0 THEN RETURN WaitResult.Timeout END;
          IF interval < subInterval THEN
            subInterval := interval;
          END;
      END;
    END;
  END XIOWait;

PROCEDURE WaitProcess (pid: int; VAR status: int): int =
  (* ThreadPThread.m3 and ThreadPosix.m3 are very similar. *)
  BEGIN
    LOOP
      WITH r = Uexec.waitpid(pid, ADR(status), 0) DO
        <*ASSERT r # 0*>
        IF r > 0 THEN RETURN r END;
        IF Cerrno.GetErrno() # Uerror.EINTR THEN RETURN r END;
      END;
    END;
  END WaitProcess;

(*--------------------------------------------------- Stack size controls ---*)

VAR defaultStackSize := 4096;

PROCEDURE GetDefaultStackSize (): CARDINAL =
  BEGIN
    RETURN defaultStackSize;
  END GetDefaultStackSize;

PROCEDURE MinDefaultStackSize (size: CARDINAL) =
  BEGIN
    defaultStackSize := MAX(defaultStackSize, size);
  END MinDefaultStackSize;

PROCEDURE IncDefaultStackSize (inc: CARDINAL) =
  BEGIN
    INC(defaultStackSize, inc);
  END IncDefaultStackSize;

(*--------------------------------------------- Garbage collector support ---*)
(* NOTE: These routines are called indirectly by the low-level page fault
   handler of the garbage collector.  So, if they touched traced references,
   they could trigger indefinite invocations of the fault handler. *)

(* In versions of SuspendOthers prior to the addition of the incremental
   collector, it acquired 'cm' to guarantee that no suspended thread held it.
   That way when the collector tried to acquire a mutex or signal a
   condition, it wouldn't deadlock with the suspended thread that held cm.

   With the VM-synchronized, incremental collector this design is inadequate.
   Here's a deadlock that occurred:
      Thread.Broadcast held cm,
      then it touched its condition argument,
      the page containing the condition was protected by the collector,
      another thread started running the page fault handler,
      the handler called SuspendOthers,
      SuspendOthers tried to acquire cm.

   So, SuspendOthers does not grab "cm" before shutting down the other
   threads.  If the collector tries to use any of the thread functions
   that acquire "cm", it'll be deadlocked.
*)

VAR suspended: BOOLEAN := FALSE;        (* LL=activeMu *)

PROCEDURE SuspendOthers () =
  (* LL=0. Always bracketed with ResumeOthers which releases "activeMu" *)
  BEGIN
    WITH r = pthread_mutex_lock(activeMu) DO <*ASSERT r=0*> END;
    StopWorld();
    <*ASSERT NOT suspended*>
    suspended := TRUE;
  END SuspendOthers;

PROCEDURE ResumeOthers () =
  (* LL=activeMu.  Always preceded by SuspendOthers. *)
  BEGIN
    <*ASSERT suspended*>
    suspended := FALSE;
    StartWorld();
    WITH r = pthread_mutex_unlock(activeMu) DO <*ASSERT r=0*> END;
  END ResumeOthers;

PROCEDURE ProcessStacks (p: PROCEDURE (start, limit: ADDRESS)) =
  (* LL=activeMu.  Only called within {SuspendOthers, ResumeOthers} *)
  VAR
    me := GetActivation();
    act: Activation;
  BEGIN
    ProcessMe(me, p);
    act := me.next;
    WHILE act # me DO
      ProcessOther(act, p);
      act := act.next;
    END;
  END ProcessStacks;

PROCEDURE ProcessEachStack (p: PROCEDURE (start, limit: ADDRESS)) =
  (* LL=0 *)
  VAR
    me := GetActivation();
    act: Activation;
    acks: int;
    nLive, nDead, newlySent: INTEGER := 0;
    wait_nsecs := RETRY_INTERVAL;
  BEGIN
    WITH r = pthread_mutex_lock(activeMu) DO <*ASSERT r=0*> END;

    ProcessMe(me, p);

    act := me.next;
    WHILE act # me DO
      (* stop *)
      LOOP
        <*ASSERT act.state = ActState.Started*>
        SetState(act, ActState.Stopping);
        IF SIG_SUSPEND = 0 THEN
          IF StopThread(act) THEN
            SetState(act, ActState.Stopped);
            EXIT;
          ELSE
            SetState(act, ActState.Started);
          END;
        ELSE
          SignalThread(act);
          INC(nLive);
          EXIT;
        END;
        Nanosleep(WAIT_UNIT);
      END;
      WHILE nLive > 0 DO
        <*ASSERT SIG_SUSPEND # 0*>
        WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END;
        IF acks = nLive THEN EXIT END;
        <*ASSERT acks < nLive*>
        IF wait_nsecs <= 0 THEN
          newlySent := 0;
          <*ASSERT act.state # ActState.Starting*>
          IF act.state # ActState.Stopped THEN
            SetState(act, ActState.Stopping);
            SignalThread(act);
            INC(newlySent);
          END;
          wait_nsecs := RETRY_INTERVAL;
        ELSE
          Nanosleep(WAIT_UNIT);
          DEC(wait_nsecs, WAIT_UNIT);
        END;
      END;
      FOR i := 0 TO nLive - 1 DO
        WHILE sem_wait() # 0 DO
          WITH r = Cerrno.GetErrno() DO
            IF r # Uerror.EINTR THEN DieI(ThisLine(), r) END;
          END;
          (*retry*)
        END;
      END;

      (* process *)
      ProcessOther(act, p);

      (* start *)
      nDead := 0;
      LOOP
        <*ASSERT act.state = ActState.Stopped*>
        SetState(act, ActState.Starting);
        IF SIG_SUSPEND = 0 THEN
          IF StartThread(act) THEN
            SetState(act, ActState.Started);
            EXIT;
          ELSE
            SetState(act, ActState.Stopped);
          END;
        ELSE
          SignalThread(act);
          INC(nDead);
          EXIT;
        END;
        Nanosleep(WAIT_UNIT);
      END;
      WHILE nDead > 0 DO
        <*ASSERT SIG_SUSPEND # 0*>
        WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END;
        IF acks = nDead THEN EXIT END;
        <*ASSERT acks < nDead*>
        IF wait_nsecs <= 0 THEN
          newlySent := 0;
          <*ASSERT act.state # ActState.Stopping*>
          IF act.state # ActState.Started THEN
            SignalThread(act);
            INC(newlySent);
          END;
          wait_nsecs := RETRY_INTERVAL;
        ELSE
          Nanosleep(WAIT_UNIT);
          DEC(wait_nsecs, WAIT_UNIT);
        END;
      END;
      FOR i := 0 TO nDead - 1 DO
        WHILE sem_wait() # 0 DO
          WITH r = Cerrno.GetErrno() DO
            IF r # Uerror.EINTR THEN DieI(ThisLine(), r) END;
          END;
          (*retry*)
        END;
      END;
    END;

    WITH r = pthread_mutex_unlock(activeMu) DO <*ASSERT r=0*> END;
  END ProcessEachStack;

PROCEDURE ProcessMe (me: Activation;  p: PROCEDURE (start, limit: ADDRESS)) =
  (* LL=activeMu *)
  BEGIN
    <*ASSERT me.state # ActState.Stopped*>
    IF DEBUG THEN
      RTIO.PutText("Processing act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush();
    END;
    RTHeapRep.FlushThreadState(me.heapState);
    ProcessLive(me.stackbase, p);
  END ProcessMe;

PROCEDURE ProcessOther (act: Activation;  p: PROCEDURE (start, stop: ADDRESS)) =
  (* LL=activeMu *)
  BEGIN
    <*ASSERT act.state = ActState.Stopped*>
    IF DEBUG THEN
      RTIO.PutText("Processing act="); RTIO.PutAddr(act); RTIO.PutText("\n"); RTIO.Flush();
    END;
    RTHeapRep.FlushThreadState(act.heapState);
    IF act.stackbase # NIL THEN
      ProcessStopped(act.handle, act.stackbase, act.context, p);
    END;
  END ProcessOther;

(* Signal based suspend/resume *)

PROCEDURE SignalThread(act: Activation) =
  (* LL=activeMu *)
  BEGIN
    <*ASSERT SIG_SUSPEND # 0*>
    LOOP
      WITH z = pthread_kill(act.handle, SIG_SUSPEND) DO
        IF z = 0 THEN EXIT END;
        IF z # Uerror.EAGAIN THEN DieI(ThisLine(), z) END;
        (* try it again... *)
      END;
    END;
  END SignalThread;

PROCEDURE StopThread (act: Activation): BOOLEAN =
  (* LL=activeMu *)
  BEGIN
    <*ASSERT act.state = ActState.Stopping*>
    <*ASSERT SIG_SUSPEND = 0*>
    IF NOT SuspendThread(act.handle) THEN RETURN FALSE END;
    IF act.heapState.inCritical # 0 THEN
      IF NOT RestartThread(act.handle) THEN <*ASSERT FALSE*> END;
      RETURN FALSE;
    END;
    RETURN TRUE;
  END StopThread;

PROCEDURE StartThread (act: Activation): BOOLEAN =
  (* LL=activeMu *)
  BEGIN
    <*ASSERT act.state = ActState.Starting*>
    <*ASSERT SIG_SUSPEND = 0*>
    RETURN RestartThread(act.handle);
  END StartThread;

PROCEDURE StopWorld () =
  (* LL=activeMu *)
  VAR
    me := GetActivation();
    act: Activation;
    acks: int;
    nLive, newlySent: INTEGER;
    retry: BOOLEAN;
    wait_nsecs := RETRY_INTERVAL;
  BEGIN
    IF DEBUG THEN
      RTIO.PutText("Stopping from act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush();
    END;

    nLive := 0;
    LOOP
      retry := FALSE;
      act := me.next;
      WHILE act # me DO
        <*ASSERT act.state # ActState.Starting*>
        IF act.state = ActState.Started THEN
          SetState(act, ActState.Stopping);
          IF SIG_SUSPEND = 0 THEN
            IF StopThread(act) THEN
              SetState(act, ActState.Stopped);
            ELSE
              SetState(act, ActState.Started);
              retry := TRUE;
            END;
          ELSE
            SignalThread(act);
            INC(nLive);
          END;
        END;
        act := act.next;
      END;
      IF NOT retry THEN EXIT END;
      Nanosleep(WAIT_UNIT);
    END;
    WHILE nLive > 0 DO
      <*ASSERT SIG_SUSPEND # 0*>
      WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END;
      IF acks = nLive THEN EXIT END;
      <*ASSERT acks < nLive*>
      IF wait_nsecs <= 0 THEN
        newlySent := 0;
        act := me.next;
        WHILE act # me DO
          <*ASSERT act.state # ActState.Starting*>
          IF act.state # ActState.Stopped THEN
            SetState(act, ActState.Stopping);
            SignalThread(act);
            INC(newlySent);
          END;
          act := act.next;
        END;
        wait_nsecs := RETRY_INTERVAL;
      ELSE
        Nanosleep(WAIT_UNIT);
        DEC(wait_nsecs, WAIT_UNIT);
      END;
    END;
    (* drain semaphore *)
    FOR i := 0 TO nLive-1 DO
      WHILE sem_wait() # 0 DO
        WITH r = Cerrno.GetErrno() DO
          IF r # Uerror.EINTR THEN DieI(ThisLine(), r) END;
        END;
        (*retry*)
      END;
    END;

    IF DEBUG THEN
      RTIO.PutText("Stopped from act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush();
      DumpThreads();
    END;
  END StopWorld;

PROCEDURE StartWorld () =
  (* LL=activeMu *)
  VAR
    me := GetActivation();
    act: Activation;
    acks: int;
    nDead, newlySent: INTEGER;
    retry: BOOLEAN;
    wait_nsecs := RETRY_INTERVAL;
  BEGIN
    IF DEBUG THEN
      RTIO.PutText("Starting from act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush();
    END;

    nDead := 0;
    LOOP
      retry := FALSE;
      act := me.next;
      WHILE act # me DO
        <*ASSERT act.state # ActState.Stopping*>
        IF act.state # ActState.Started THEN
          SetState(act, ActState.Starting);
          IF SIG_SUSPEND = 0 THEN
            IF StartThread(act) THEN
              SetState(act, ActState.Started);
            ELSE
              SetState(act, ActState.Stopped);
              retry := TRUE;
            END;
          ELSE
            SignalThread(act);
            INC(nDead);
          END;
        END;
        act := act.next;
      END;
      IF NOT retry THEN EXIT END;
      Nanosleep(WAIT_UNIT);
    END;
    WHILE nDead > 0 DO
      <*ASSERT SIG_SUSPEND # 0*>
      WITH r = sem_getvalue(acks) DO <*ASSERT r=0*> END;
      IF acks = nDead THEN EXIT END;
      <*ASSERT acks < nDead*>
      IF wait_nsecs <= 0 THEN
        newlySent := 0;
        act := me.next;
        WHILE act # me DO
          <*ASSERT act.state # ActState.Stopping*>
          IF act.state # ActState.Started THEN
            SignalThread(act);
            INC(newlySent);
          END;
          act := act.next;
        END;
        wait_nsecs := RETRY_INTERVAL;
      ELSE
        Nanosleep(WAIT_UNIT);
        DEC(wait_nsecs, WAIT_UNIT);
      END;
    END;
    (* drain semaphore *)
    FOR i := 0 TO nDead-1 DO
      WHILE sem_wait() # 0 DO
        WITH r = Cerrno.GetErrno() DO
          IF r # Uerror.EINTR THEN DieI(ThisLine(), r) END;
        END;
        (*retry*)
      END;
    END;

    IF DEBUG THEN
      RTIO.PutText("Started from act="); RTIO.PutAddr(me); RTIO.PutText("\n"); RTIO.Flush();
      DumpThreads();
    END;
  END StartWorld;

PROCEDURE SignalHandler (sig: int; <*UNUSED*>info: ADDRESS; context: ADDRESS) =
  VAR
    errno := Cerrno.GetErrno();
    me := GetActivation();
  BEGIN
    <*ASSERT sig = SIG_SUSPEND*>
    IF me.state = ActState.Stopping THEN
      IF me.heapState.inCritical # 0 THEN
        me.state := ActState.Started;
        RETURN;
      END;
      me.state := ActState.Stopped;
      <*ASSERT me.context = NIL*>
      me.context := context;
      WITH r = sem_post() DO <*ASSERT r=0*> END;
      REPEAT sigsuspend() UNTIL me.state = ActState.Starting;
      me.context := NIL;
      me.state := ActState.Started;
      WITH r = sem_post() DO <*ASSERT r=0*> END;
    END;
    Cerrno.SetErrno(errno);
  END SignalHandler;

(*----------------------------------------------------------- misc. stuff ---*)

PROCEDURE MyId (): Id RAISES {} =
  VAR me := GetActivation();
  BEGIN
    IF me = NIL
      THEN RETURN 0
      ELSE RETURN me.slot;
    END;
  END MyId;

PROCEDURE MyFPState (): UNTRACED REF FloatMode.ThreadState =
  VAR me := GetActivation();
  BEGIN
    RETURN ADR(me.floatState);
  END MyFPState;

PROCEDURE MyHeapState (): UNTRACED REF RTHeapRep.ThreadState =
  VAR me := GetActivation();
  BEGIN
    RETURN ADR(me.heapState);
  END MyHeapState;

PROCEDURE DisableSwitching () =
  BEGIN
    (* no user-level thread switching *)
  END DisableSwitching;

PROCEDURE EnableSwitching () =
  BEGIN
    (* no user-level thread switching *)
  END EnableSwitching;

(*---------------------------------------------------------------- errors ---*)

PROCEDURE Die (lineno: INTEGER; msg: TEXT) =
  BEGIN
    RTError.Msg (ThisFile(), lineno, "Thread client error: ", msg);
  END Die;

PROCEDURE DieI (lineno: INTEGER; i: INTEGER) =
  BEGIN
    RTError.MsgI (ThisFile(), lineno, "Thread client error: ", i);
  END DieI;

(*------------------------------------------------------ ShowThread hooks ---*)

VAR
  perfW : RTPerfTool.Handle;
  perfOn: BOOLEAN := FALSE;     (* LL = perfMu *)

PROCEDURE PerfStart () =
  BEGIN
    IF RTPerfTool.Start ("showthread", perfW) THEN
      perfOn := TRUE;
      RTProcess.RegisterExitor (PerfStop);
    END;
  END PerfStart;

PROCEDURE PerfStop () =
  BEGIN
    (* UNSAFE, but needed to prevent deadlock if we're crashing! *)
    RTPerfTool.Close (perfW);
  END PerfStop;

CONST
  EventSize = (BITSIZE(ThreadEvent.T) + BITSIZE(CHAR) - 1) DIV BITSIZE(CHAR);

TYPE
  TE = ThreadEvent.Kind;

PROCEDURE PerfChanged (s: State) =
  VAR
    e := ThreadEvent.T {kind := TE.Changed, id := MyId(), state := s};
  BEGIN
    WITH r = pthread_mutex_lock(perfMu) DO <*ASSERT r=0*> END;
      perfOn := RTPerfTool.Send (perfW, ADR (e), EventSize);
    WITH r = pthread_mutex_unlock(perfMu) DO <*ASSERT r=0*> END;
  END PerfChanged;

PROCEDURE PerfDeleted () =
  VAR
    e := ThreadEvent.T {kind := TE.Deleted, id := MyId()};
  BEGIN
    WITH r = pthread_mutex_lock(perfMu) DO <*ASSERT r=0*> END;
      perfOn := RTPerfTool.Send (perfW, ADR (e), EventSize);
    WITH r = pthread_mutex_unlock(perfMu) DO <*ASSERT r=0*> END;
  END PerfDeleted;

PROCEDURE PerfRunning () =
  VAR
    e := ThreadEvent.T {kind := TE.Running, id := MyId()};
  BEGIN
    WITH r = pthread_mutex_lock(perfMu) DO <*ASSERT r=0*> END;
      perfOn := RTPerfTool.Send (perfW, ADR (e), EventSize);
    WITH r = pthread_mutex_unlock(perfMu) DO <*ASSERT r=0*> END;
  END PerfRunning;

(*-------------------------------------------------------- Initialization ---*)

PROCEDURE InitWithStackBase (stackbase: ADDRESS) =
  VAR
    self: T;
    me: Activation;
  BEGIN
    InitC(stackbase);

    me := NEW(Activation,
              mutex := pthread_mutex_new(),
              cond := pthread_cond_new());
    InitActivations(me);
    me.stackbase := stackbase;
    IF me.mutex = NIL OR me.cond = NIL THEN
      Die(ThisLine(), "Thread initialization failed.");
    END;
    self := NEW(T, act := me, closure := NIL, join := NIL);
    me.slot := AssignSlot(self);

    joinMu := NEW(MUTEX);

    PerfStart();
    IF perfOn THEN PerfRunning() END;
    IF RTParams.IsPresent("backgroundgc") THEN
      RTCollectorSRC.StartBackgroundCollection();
    END;
    IF RTParams.IsPresent("foregroundgc") THEN
      RTCollectorSRC.StartForegroundCollection();
    END;
  END InitWithStackBase;

PROCEDURE Init ()=
  VAR r: INTEGER;
  BEGIN
    r := RTProcess.RegisterForkHandlers(AtForkPrepare,
                                        AtForkParent,
                                        AtForkChild);
    IF r # 0 THEN DieI(ThisLine(), r) END;
    InitWithStackBase(ADR(r)); (* not quite accurate but hopefully ok *)
  END Init;

PROCEDURE PThreadLockMutex(mutex: pthread_mutex_t; line: INTEGER) =
  BEGIN
    IF mutex # NIL THEN
      WITH r = pthread_mutex_lock(mutex) DO
        IF r # 0 THEN DieI(line, r) END;
      END;
    END;
  END PThreadLockMutex;

PROCEDURE PThreadUnlockMutex(mutex: pthread_mutex_t;  line: INTEGER) =
  BEGIN
    IF mutex # NIL THEN
      WITH r = pthread_mutex_unlock(mutex) DO
        IF r # 0 THEN DieI(line, r) END;
      END;
    END;
  END PThreadUnlockMutex;

PROCEDURE AtForkPrepare() =
  VAR me := GetActivation();
      act: Activation;
  BEGIN
    PThreadLockMutex(slotsMu, ThisLine());
    PThreadLockMutex(perfMu, ThisLine());
    PThreadLockMutex(initMu, ThisLine()); (* InitMutex => RegisterFinalCleanup => LockHeap *)
    LockHeap();
    PThreadLockMutex(activeMu, ThisLine()); (* LockHeap => SuspendOthers => activeMu *)
    (* Walk activations and lock all threads.
     * NOTE: We have initMu, activeMu, so slots won't change, conditions and
     * mutexes won't be initialized on-demand.
     *)
    act := me;
    REPEAT
      PThreadLockMutex(act.mutex, ThisLine());
      act := act.next;
    UNTIL act = me;
  END AtForkPrepare;

PROCEDURE AtForkParent() =
  VAR me := GetActivation();
      act: Activation;
  BEGIN
    (* Walk activations and unlock all threads, conditions. *)
    act := me;
    REPEAT
      PThreadUnlockMutex(act.mutex, ThisLine());
      act := act.next;
    UNTIL act = me;
    PThreadUnlockMutex(activeMu, ThisLine());
    UnlockHeap();
    PThreadUnlockMutex(initMu, ThisLine());
    PThreadUnlockMutex(perfMu, ThisLine());
    PThreadUnlockMutex(slotsMu, ThisLine());
  END AtForkParent;

PROCEDURE AtForkChild() =
  BEGIN
    AtForkParent();
    InitWithStackBase(GetActivation().stackbase);
  END AtForkChild;

(*------------------------------------------------------------- collector ---*)
(* These procedures provide synchronization primitives for the allocator
   and collector. *)

VAR
  holder: pthread_t;
  inCritical := 0;

PROCEDURE LockHeap () =
  VAR self := pthread_self();
  BEGIN
    IF pthread_equal(holder, self) = 0 THEN
      WITH r = pthread_mutex_lock(heapMu) DO <*ASSERT r=0*> END;
      holder := self;
    END;
    INC(inCritical);
  END LockHeap;

PROCEDURE UnlockHeap () =
  BEGIN
    <*ASSERT pthread_equal(holder, pthread_self()) # 0*>
    DEC(inCritical);
    IF inCritical = 0 THEN
      holder := NIL;
      WITH r = pthread_mutex_unlock(heapMu) DO <*ASSERT r=0*> END;
    END;
  END UnlockHeap;

PROCEDURE WaitHeap () =
  VAR self := pthread_self();
  BEGIN
    <*ASSERT pthread_equal(holder, self) # 0*>
    DEC(inCritical);
    <*ASSERT inCritical = 0*>
    WITH r = pthread_cond_wait(heapCond, heapMu) DO <*ASSERT r=0*> END;
    holder := self;
    <*ASSERT inCritical = 0*>
    INC(inCritical);
  END WaitHeap;

PROCEDURE BroadcastHeap () =
  BEGIN
    WITH r = pthread_cond_broadcast(heapCond) DO <*ASSERT r=0*> END;
  END BroadcastHeap;

(*--------------------------------------------- exception handling support --*)

PROCEDURE GetCurrentHandlers (): ADDRESS =
  VAR me := GetActivation();
  BEGIN
    RETURN me.frame;
  END GetCurrentHandlers;

PROCEDURE SetCurrentHandlers (h: ADDRESS) =
  VAR me := GetActivation();
  BEGIN
    me.frame := h;
  END SetCurrentHandlers;

(*RTHooks.PushEFrame*)
PROCEDURE PushEFrame (frame: ADDRESS) =
  TYPE Frame = UNTRACED REF RECORD next: ADDRESS END;
  VAR
    me := GetActivation();
    f: Frame := frame;
  BEGIN
    f.next := me.frame;
    me.frame := f;
  END PushEFrame;

(*RTHooks.PopEFrame*)
PROCEDURE PopEFrame (frame: ADDRESS) =
  VAR me := GetActivation();
  BEGIN
    me.frame := frame;
  END PopEFrame;

VAR DEBUG := RTParams.IsPresent("debugthreads");

BEGIN
END ThreadPThread.


More information about the M3devel mailing list