declarative-concurrency

Table of Contents

1 Declarative Concurrency - Introduction

Extend the declarative sequential model to deal with concurrency. The result of execution is the same as a sequential execution. The program results can be calculated incrementally.

declare Gen A B X Y
fun {Gen L N}
    {Delay 100}
    if L == N then nil else L|{Gen L+1 N} end
end

A = {Gen 1 10}
B = {Map A fun {$ X} X * X end}
{Browse A B}

thread X={Gen 1 10} end
thread Y={Map X fun {$ X} X*X end} end
{Browse Y}

We will extend the declarative sequential model in two steps.

(1) Add threads:

thread <s> end

This gives us one form of declarative concurrency.

that

(2) Extend the model with another execution order

{ByNeed P X}

This adds the possibility of doing ``demand-driven'' or lazy computation. This gives us the other form of declarative concurrency.

We will also deal with soft real-time programming, that is, program with time constraints. The term "soft" refers to the fact that missing a deadline might not be catastrophic, but the quality will decline.

The resulting declarative concurrent model will not have Exceptions…


2 Basic Concepts

The new model allows more than one executing statement to access the store. As if all of those are executing "at the same time".

2.1 Interleaving and Simultaneity

What execution order do we mean when we say threads execute simultaneously? There are two aspects to concurrency: interleaving among steps, and overlapping of two computation steps.

2.1.1 Language viewpoint:

There is one global sequence of computation steps, and threads take turns executing it. Computation steps do not overlap.

2.1.2 Implementation Viewpoint:

Threads may not be executing interleaved - for example, on a multiprocessor system, they could be executing concurrently. This is parallel execution.

Important Observation: Whatever the parallel execution of a program is, there is at least one interleaved execution that is observationally equivalent to it - that is, the evolution of the store during both the parallel and the interleaved execution is the same. It is hence sufficient to study interleaving semantics.

2.2 Causal Order

All computation steps form a global order called a partial order. This is called the causal order.

Example:

Causal Order:

      ------>---->
      ^ a      b
      | 
----->------>----->
  I     J      K           


Valid Execution Orders:
I a b J K
I a J b K
I a J K b
I J a b K
I J a K b
I J K a b

2.3 Nondeterminism

There is an execution state in which there is a choice of what thread to execute next.

In a declarative model, the nondeterminism is not visible to the programmer. This is due to:

(1) The single assignment store (2) Any operation has to wait until all its variables are bound. If we allow operations which the programmer can use to determine whether to wait or proceed, then the nondeterminism will be visible.

Browsing sometimes exhibits nondeterministic output - does this violate the above statement?

2.4 Thread Scheduling

The Mozart system has a thread scheduler, which decides which thread to run next. A thread can be in the following states: (1) Ready/Runnable : (2) Suspended : e.g. blocked on an unbound variable (3) Terminated

Fair Thread Scheduler: Every runnable thread will eventually execute.

An interesting observation: fairness is related to modularity. Each thread's effect can be studied independent of the existence of other threads (is this possible in fair, nondeclarative models?)


3 Semantics of Threads

Abstract machine now has several semantic stacks. Each semantic stack captures the notion of a ``thread".

All threads share the single assignment store. Threads communicate with each other through this store.

We extend the abstract machine in Section 2. An execution state is a pair (MST, s) where MST is a multiset of semantic stacks, and s is a single-assignment store. (Why a multiset?) A computation is a sequence of such execution states. No other concept is changed in the abstract machine.

3.1 Program Execution

The initial execution state is

Semantic StackSAS
{[<s>,0]}0

Note that the semantic stack is a multiset.

At each step, one runnable semantic stack ST is selected from MST, leaving MST'.

MST = {ST} \multisetunion MST'.

One computation step is done in ST according to the semantics.

(ST, s) ——> (ST', s')

The choice of which stack to pick is up to the scheduler. The scheduler ensures fairness.

What happens when the currently executing statement blocks? In the sequential model, the interpreter would just hang. In the concurrent model, the interpreter has to try and run other stacks…

If there are no runnable stacks, and if every stack is terminated, then the computation terminates. If there are no runnable stacks, but at least one stack is suspended, then the computation blocks.

3.2 Semantics of the thread statement

The syntax of a thread statement: thread <s> end

Step: Create a new stack! What should be its contents?

Suppose the multiset is MST. if the selected stack ST is of the form

[thread <s> end, E]     % Stack Top
ST'                     % Rest of stack

then the new multi set is:

[<s>, E]  \multisetunion     ST'          \multisetunion    MST'

3.3 Memory Management

A terminated semantic stack can be deallocated.

A blocked semantic stack: Reclaimed if its activation condition depends on an unreachable variable. In this case, the thread will never be ready again, so execution is unchanged by removing this thread.

3.4 Example Execution

local B in
  thread B=true end
  if B then true end
end

When the local statement has executed, the semantic multistack looks thus:

StackStore
([thread…end, if…end]){b}

After executing the thread statement, we get

StackStore
([b=true], [if b=true then true end]),{b}

Now, there are two stacks. The second is blocked since its activation depends on b, which is unbound. So the scheduler picks the other thread. After executing b=tre, the stack is now

StackStore
([],[if b=true then true end]){b=true}

Now the empty stack is removed.

StackStore
([if b=true then true end]){b=true}

The scheduler then executes the single thread containing the if statement.


4 Declarative Concurrency

There are two issues:

(1) The inputs and the outputs are not necessarily values, since they may contain unbound variables. (2) The computation might not terminate.

4.1 Partial Termination


fun {Double L} case L of nil then nil else H|T then 2*H | {Double T} end end Y = {Double X}


If X keeps growing, then so does Y. But if X does not change, then Y stops growing and the program remains unchanged.

If the inputs do not change, the program stops executing any further. This is called partial termination.

4.2 Logical Equivalence

A set of store bindings: constraint.

For each variable x and constraint c, values (x,c) is the set of all possible values x can have, given that c holds.

Two constraints c1 and c2 are logically equivalent if (1) They contain the same variables and (2) For each variable x, values(x, c1) = values(x,c2)

For example,

x = rec(a b) and a = d is

equivalent to

x = rec(d b) and a = d

4.3 Declarative Concurrency - Definition

A program is said to be declaratively concurrent if for all inputs, the program does the following. All the executions either

(1) fail to terminate (2) terminate and are logically equivalent.

4.4 Failure and Confinement

If a declarative concurrent program results in a failure for a given set of inputs, then all possible executions with that set will fail.

For example, the bindings in

thread X=1 end
thread Y=X+1 end
thread X=Y end

will eventually conflict. The program terminates.

One way to handle this is using exceptions. But, with these, the executions are no longer declarative, since the store after different execution sequences could be different. For example, Y might be bound to 1 or to 2 before the execution fails in the example above.

Hiding the nondeterminism in case of failure: This is up to the programmer.

declare X Y
local XTrial YTrial StatusX StatusY StatusV in
  thread
    try
      XTrial = 1
      StatusX = success
    catch _ then StatusX=error end
  end
  thread
    try
      YTrial = XTrial+1
      StatusX = success
    catch _ then StatusX=error end
  end
  thread
    try
      XTrial = YTrial
      StatusX = success
    catch _ then StatusV=error end
  end
  if StatusX==error orelse 
     StatusY==error orelse 
     StatusV==error then
    X=1
    Y=1
  else 
    X=XTrial
    Y=YTrial
  end 
end
% Rest of the code using X and Y

5 Programming with threads

proc {ForAll Xs P}
     case Xs of nil then skip
     []   X|Xr then {P X} {ForAll Xr P} end
end

declare L in
thread {ForAll L Browse} end
end

Now, bind L

declare L1 L2 in
thread L = 1|L1 end
thread L1 = 2|L2 end
thread L2 = 3|nil end

The effect is the same as sequential execution, but the results can be computed incrementally.

5.1 Time Slices

Allowing each process to run for one step can increase the overload on the scheduler, or in other cases, hog the processor for too long executing an intensive step. The Mozart scheduler uses a preemptive scheduling based on timers.

Allowing one whole computation step leads to a deterministic scheduler. Running the threads multiple times preempts the threads exactly at the same steps.

The timer based-approach is hardware-implemented, hence more efficient. This is however, nondeterministic, and depends on external events in the system as well.

Either the thread itself, or the scheduler can keep track of the time a thread has run. The second is easier to implement.

A short time slice -

  • responsive system.
  • large overhead for thread switches.

Large number of threads with long timeslices (~10 ms)? == if the threads are interdependent, then long timeslices are ok. == if the threads are independent, long timeslices are not ok. You need a hard-real-time OS.

5.2 Priority Levels

Provides more control over how processor time is shared between threads. Three priority levels in Mozart: high, medium, low. Each has a guaranteed lower bound on processor time share. 100:10:1

When a thread creates a child thread, then child thread inherits the parent's priority. This is important for threading high-priority applications.

5.3 Cooperative vs. Competitive Concurrency

Threads are cooperative in completing a global task. On the other hand, you can have a competitive model where entities compete to complete a task. Usually those are OS processes.

Competitive Concurrency in Mozart - distributed computation model and Remote module. The distributed model is network-transparent.


6 Streams

A potentially unbounded list of messages. Its tail is an unbound dataflow variable (not a general partial value).

Sending a message: Send one element to the stream. Bind the tail to a list pair consisting of the element, and a new unbound tail.

Receiving a message: read one stream element.

A thread communicating through streams : Stream Object.

No locking or mutual exclusion is necessary since each variable is bound by only one thread.

Stream-based programming: e.g. Unix Pipes

            Summary--what's most important.
    To put my strongest concerns into a nutshell:
1. We should have some ways of coupling programs like
garden hose--screw in another segment when it becomes when
it becomes necessary to massage data in another way.
This is the way of IO also.
2. Our loader should be able to do link-loading and
controlled establishment.
3. Our library filing scheme should allow for rather
general indexing, responsibility, generations, data path
switching.
4. It should be possible to get private system components
(all routines are system components) for buggering around with.

                                                M. D. McIlroy
                                                October 11, 1964

6.1 Basic Producer/Consumer

Recall the list of random bits in Homework 1. The function which produced the stream of random bits is like a producer, and the function which computed running averages is like the consumer. This shows how to form a basic producer/consumer style program in the declarative model with threads.

You can also have multiple consumers without affecting the execution in any way.

local Xs S1 S2 in
    thread Xs={GenerateRandom} end
    thread S1={Average Count1} end
    thread S2={Average Count2} end
end

6.2 Transducers

We can put many other stream objects between a producer and a consumer. These are called **Transducers**. A sequence of stream objects each of which feeds the next is called a **pipeline**. The simplest stream is a filter.

6.2.1 Sum of odd numbers in a stream

%-----------
% Producer
%-----------
fun {Generate N Limit}
   if N<Limit
   then N|{Generate N+1 Limit}
   end
end
%------------
% Consumer
%------------
fun {Sum Xs Accumulator}
   case Xs
   of X|Xr then {Sum Xr X+Accumulator}
   [] nil then Accumulator
   end
end
%-----------
% A filter in between producer and consumer
%-----------
local Xs Ys S in
   thread Xs={Generate 0 1000000} end
   thread Ys={Filter Xs 
                     fun {$ X} X mod 2\=0 end} end % Take odd numbers
   thread S={Sum Ys 0} end
end

This can be represented graphically using the following diagram (sometimes called a Henderson Diagram.) The dashed arrows represent stream inputs, and the dotted arrow represents non-stream inputs.

               fun {$ X} X mod 2 \= 0 end
                          .
                          .
                          V
+----------+         +----------+       +----------+
|          |         |          |       |          |
|          | Xs      |          | Ys    |          |
| Producer |-------->| Filter   |------>| Consumer |
|          |         |          |       |          |
|          |         |          |       |          |
+----------+         +----------+       +----------+

6.2.2 Eratosthenes Sieve

We implement the Eratosthenes' Sieve as a stream-based Oz program. The Sieve generates prime numbers, starting from 2. The algorithm works as follows. First, we consider all consecutive numbers from 2, and remove multiples of 2 from this. At any stage, we pick the first number ~n~from the list whose multiples have not yet been removed, and remove all its multiples from the list, except for n itself. This process is iterated. The ultimate stream will consist of prime numbers only.

We can program this in the stream model as follows.

%---------------
% Returns an infinite stream of integers starting from N
%---------------
fun {IntsFrom N}
    N|{IntsFrom N+1}
end
%--------------------
% Returns a function which checks whether its argument is a multiple
% of N
%--------------------
fun {NonMultiple N}
   fun {$ M}
       M mod N \= 0
   end
end
%--------------------
% Sieve of Eratosthenes
%--------------------
fun {Sieve Xs}
    case Xs
    of X|Xr then
        thread
           Ys = {Filter Xr {NonMultiple X}} % Remove multiples of X
        end
        X|{Sieve Ys}
    [] nil then nil
    end
end
%---------------------
% Example Execution
%---------------------
local Xs Ys in
   thread Xs={IntsFrom 2} end
   thread Ys={Sieve Xs} end
   {Browse Ys}
end

% Think about why the usage code was not simply
% local Ys in
%    Ys = {Sieve {IntsFrom 2}} % Eager version
% end

6.3 Stream Objects

As an abstraction, we can introduce a new concept called a Stream Object. A stream object is a recursive procedure, which executes in its own thread, communicates with other stream objects via input and output streams, and finally, maintains a state.

proc {StreamObject InStream CurrentState ?OutStream}
   case InStream
   of InMsg|InStreamTail then
       NextState OutMsg OutStreamTail
   in
       % NexStateProcedure is the transition procedure
       {NextStateProcedure InMsg CurrentState OutMsg NextState}
       OutStream = OutMsg|OutStreamTail % Why is this done before recursing?
       {StreamObject InStreamTail NextState OutStreamTail}
   end
end

7 Using the declarative Concurrent model directly

We can use the declarative concurrent model directly without relying on to Streams or Stream Objects. The ingredients are partial values, and threads. We consider a few examples.

7.1 Concurrent Composition

We can create new threads using a thread statement. In a system with threading, the two important operations that are usually supported are thread creation, and thread join. Joining is the process whereby the creator of the new thread waits until the newly created thread is destroyed, before it continues.

The basic idea is for the creator thread to Wait for the binding of an unbound variable, which is bound by the newly created thread just before it terminates. The procedure {Wait X} blocks until X is bound, and gets unblocked when X is bound.

local X in
   thread {P1} X=unit end
   {Wait X} % Creator thread blocks until X is bound
end

This is easily extended to multiple processes.

local X1 X2 X3 in
   thread {P1} X1=unit end
   thread {P2} X2=X1   end
   thread {P3} X3=X2   end
   {Wait X3} % blocks until X3 is bound,
             % which happens only when all three new threads terminate.
end

We can abstract this into a procedure {Barrier Procs}, which takes in a list Procs of procedures. This executes each procedure in the list in its own thread, and blocks until all the threads have terminated.

The procedure merely generalizes the methodology described above.

proc {Barrier Procs}
   %------------------------
   % Takes a finite list of procedures, and a variable L used for 
   % synchronizing.
   %
   % Runs each process in its own thread,
   % creates a new variable M, binds it to L, and passes M recursively
   % to run other procedures. 
   %
   % Return value is L if there is no procedure to execute,
   % and the result of the recursive call otherwise.
   %------------------------
   fun {BarrierLoop Procs L}
      case Procs
      of  Proc|Procr then
          M
      in
          thread {Proc} M=L end
          {BarrierLoop Procr M}
      [] nil then L
   end


   S = {BarrierLoop Procs unit}
in
   {Wait S}
end

%------------- Example Usage-------------------
{Barrier [ proc {$} X = 1 end
           proc {$} {Browse X} end]}

Date: 2012-10-15 15:52:19 India Standard Time

Author:

Org version 7.8.11 with Emacs version 24

Validate XHTML 1.0