Programming with Streams

Table of Contents

1 Credits

The material, and the examples in this section is adapted from the section on streams in the textbook Structure and Interpretation of Computer Programs by Abelson and Sussman, for the Oz language.

2 Motivation for Programming using Streams

In iterative programming in languages with mutable state, it is usual to compute by the means of one variable changing over time. For example, in order to compute the sum of \(N\) input numbers, we will keep a variable sum which is initially 0. We iteratively read input values, and store the partial sums of the input values read so far in sum, until finally it contains the sum of all the input numbers.

Is there some other way to model such time-dependent behaviour? For inspiration, we look to the way mathematicians model such behaviour. We consider time incrementing in discrete steps. A standard way to model such behaviour in mathematics is to consider a sequence \(x(0), x(1), \dots\) where \(x(0)\) is the initial value ("at time 0"), and in general, for a natural number \(t\), the value \(x(t)\) is the value at time instant \(t\).

The time-dependent behaviour is thus a function \(x\) mapping time to values.

An advantage of this kind of modelling is that we model time-dependent behaviour without using mutable variables - \(x\) is a function which doesn't "change". At the same time, we do have access to the varying quantities. This is just what we need for functional programming!

This is accomplished using a style of programming that is typically called stream-based programming. Adaptations of this idea are used in iterators over data structures (as in C++), and in databases.

We will not directly use the mathematical idea of having time-dependent behaviour as a function. Instead, the time sequence will be represented as a potentially infinite list of values, where the value at position \(t\) will model the value at "time \(t\)".

We will now explore what kind of programming can be done in this style. We have already seen this style in earlier chapters, but here we will reexamine this style of programming emphasizing its connections with declarative concurrency.

3 Streams in Oz

We have powerful abstraction tools for manipulating finite lists - for instance, Map, FoldL/FoldR, Filter etc. We have also seen lazy versions of these functions which can compute with potentially infinite lists.

fun lazy {PrimesFrom A}
    local IntsFrom in
        fun lazy {LazyIntsFrom A}
           A|{LazyIntsBetween A+1}
        end
        fun lazy {LazyFilter Predicate Xs}
           if {Predicate Xs.1}
           then Xs.1|{LazyFilter Predicate Xs.2}
           else {LazyFilter Predicate Xs.2}
           end
        end
        {LazyFilter IsPrime {IntsFrom A}}
    end
end

We now have an additional way to program this, using threads!

fun {PrimesFrom A}
    local X Y in
       thread  X = {IntsFrom A} end         % eager list
       thread  Y = {Filter IsPrime X} end   % eager filter
       Y
    end
end
{Browse {PrimesFrom 2}}

A fair thread scheduler ensures that the first thread does not go into an infinite computation. Thus the whole computation will produce an infinite list of primes. We accomplished this without using laziness - it is the fairness of the thread scheduling which ensures that both the threads can co-operate to produce the desired output.

Another example is an infinite list of Fibonacci numbers, which we program using threads as follows.

fun {FibsFrom A B}           % A < B
     A+B | {FibsFrom B A+B}
end
thread {FibsFrom 0 1} end

4 Implicit (self-referential) streams

In the declarative sequential semantics, we mentioned that Oz allows self-referential structures similar to the following.

declare Ones = 1|Ones

This will imply that Ones is an infinite list, each of its entry being 1. We say that Ones is defined implicitly, or in a self-referential manner.(At this point, it is worth revisiting how this is exactly done in the declarative sequential semantics without forcing non-termination.)

This seems to be a curious artefact allowed by the semantics of Oz, without any seeming practical use. Why would anyone need to define such structures? We now show that this facility allows elegant expressions of certain programs, in the declarative concurrent model with laziness.

We now see progressively sophisticated examples of streams defined implicitly.

4.1 Positive Integers

declare
PositiveIntgers = 1 | {AddStreams Ones PositiveIntegers}

This produces a stream of positive integers. To see how this works, we seek recurrences to produce such a stream. A recurrence which produces an infinite stream of positive integers is:

PositiveIntegers[1] = 1
PositiveIntegers[n] = 1 + PositiveIntegers[n-1], for n >= 2.

The AddStreams call ensures that the number in the nth position is the sum of the numbers in the (n-1)st positions in Ones and PositiveIntegers, which is a correct implementation of the recursion.

It is also clear that the lazy function AddStreams can compute the nth value, since the first n-1 terms of PositiveIntegers will be available by then. That is, just enough elements of PositiveIntegers are available to enable the computation of the next element of PositiveIntegers.

                     Time 1       Time 2            Time 3              Time 4
                     ------       ------            ------              ------
PositiveIntegers =   1 | _        1|_               1 | 2 | _           1 | 2 | 3 | _
                                    ^                       ^                       ^
                                    .                       .                       .
                                    .                       .                       .
PositiveIntegers =   1 | _          1 | _               1 | 2 | _           1 | 2 | 3 | _
                                    +                       +                       +
Ones             =                  1 | 1 | ...         1 | 1 | ...         1 | 1 | 1 | ...

4.2 Implicit definition of the Fibonacci sequence

declare
Fibs = 0 | 1 | {AddStreams Fibs Fibs.2}

This works as follows.

            Time 2             Time 3                     Time 4                         Time 3
            ------             ------                     ------                         ------
Fibs    =   0 | 1 | _          0 | 1 | 1 | _              0 | 1 | 1 | 2 | _              0 | 1 | 1 | 2 | 3 | _
                    ^                      ^                              ^                                  ^
                    .                      .                              .                                  .
                    .                      .                              .                                  .
                    .                      .                              .                                  .

Fibs    =           0 | 1 |_           0 | 1 | 1 | _              0 | 1 | 1 | 2 | _              0 | 1 | 1 | 2 | 3 | _
                    +                      +                              +                                  +
Fibs|2  =           1 | _              1 | 1 | _                  1 | 1 | 2 | _                  1 | 1 | 2 | 3 | _

Thus it implements the famous recurrence of the Fibonacci sequence

Fib[1] = 0
Fib[2] = 1
Fib[n] = Fib[n-1]+Fib[n-2], for n >= 3

Here the (n-2)nd term comes from the Fib argument, and the (n-1)st from the Fib.2 argument.

4.3 Implicit definition of the powers of two

declare
PowersOfTwo = 1 | {ScaleStream 2 PowersOfTwo}

implementing the recurrence

PowersOfTwo[1] = 1
PowersOfTwo[n] = 2 * PowersOfTwo[n-1], for n >= 2.

Alternatively, it can be also written as

declare
PowersOfTwo = 1 | {AddStream PowersOfTwo PowersOfTwo}

implementing the recurrence

PowersOfTwo[1] = 1
PowersOfTwo[n] = PowersOfTwo[n-1] + PowersOfTwo[n-1], for n>=2.

4.4 Implicit definition of Primes

declare Primes = 2 | {FilterStream IsPrime {IntsFrom 3}}

This does not seem to be an implicit definition. However, we will define IsPrime in a way that depends on the Primes list itself. We know that a number \(N\) is prime if and only if no number less than or equal to \(\sqrt{N}\) divides it. It is not difficult to see it is sufficient to check that if no prime below \(\sqrt{N}\) divides \(N\), then \(N\) is prime. We now write a function to realise this test.

declare
fun {IsPrime N}
    local Aux in
        fun {Aux Ps}
            if {Square Ps.1} > N
            then true
            else
                if {Divisible N Ps.1}
                then false
                else {Aux Ps.2}
            end
        end
        {Aux Primes} % implicit reference to Primes
     end
end

The test works because the primes less than or equal to \(\sqrt{N}\) will have been generated by the time we consider the primality of \(N\).

5 Exploiting Streams - Iterations as Streams

A loop in an imperative language is often controlled by a loop variable, which starts with an initial value, and at every iteration, undergoes some update operation at every step.

This can easily be represented as a stream. Abstractly, if \(x(0), x(1), \dots\) is the sequence of values produced by the updates, it can be represented as a stream. (picturized as follows)

+------+        +------+        +-------+
|      |        |      |        |       |
| x(0) |        | x(1) |        | x(2)  |  . . .
|      |        |      |        |       |
+---+--+        +----+-+        +-------+
    |            /|\ |             /|\ |
    |             |  |              |  |
    |             |  |              |  |
    +---Update----+  +-----Update---+  +-------

We can define the same stream implicitly as follows.

declare
X = Initial | {MapStream Update X}

For example, consider a simple for loop in C.

int i;
for (i=0; ; i++)
        ;

This can be represented as the following stream.

define NonZeroInts = 0 | {AddStream Ones NonZeroInts}

We consider now a slightly more interesting example.

5.1 Bisection method to find square roots.

In the bisection method to find the square root of a non-negative real number \(x\), we start with an initial guess \(g\), and iteratively refine it until we find an acceptable approximation to \(\sqrt{x}\).

Either \(g \le \sqrt{x}\) in which case \(g \le \sqrt{x} \le x/g\), or \(g \ge \sqrt{x}\), in which case \(x/g \le \sqrt{x} \le g\). Thus, we can refine the guess \(g\) by taking the average of \(g\) and \(x/g\) as the next guess. In practical scenarios, this refinement is continued until \(g\) and \(x/g\) are fairly close to each other, at which point \(g\) is a fairly good approximation of \(\sqrt{x}\).

This is a nice example of an iterative procedure, where \(g\) is the variable whose value changes every iteration. We will now code this in the stream mode. The stream will be the sequence of guesses.

declare SqrtImprove SqrtEstimateStream
fun {SqrtImprove Guess X}
    (Guess + (X/Guess))/2
end

fun {SqrtEstimateStream X}
    local Guesses in 
        Guesses = 1.0 | {MapStream   fun {$ A} {SqrtImprove A X} end
                                     Guesses}
    end
end

{Browse {SqrtEstimateStream 2}}

5.2 Accelerating Streams (Advanced Topic)

In stream-based iteration, we have the entire history of the iterated variable with us. Can we put this to good use? Here's a nice example.

We know that \(\frac{\pi}{4} = 1 - \frac{1}{3} + \frac{1}{5} - \frac{1}{7} \dots\).

declare PiSummands PiStream EulerTransform MakeStreamOfStreams

fun lazy {PiSummands N}
  (1/N) | {MapStream  fun {$ A} A*~1 end    % negative, for the alternating series
                      {PiSummands N+1}}
end

%--------------------------
% implement the recurrence
%    {PartialSummands Xs}[1] = Xs[1]
%    {PartialSummands Xs}[n] = Xs[n] + {PartialSummands Xs}[n-1]
%-------------------------
fun lazy {PartialSummands Xs}
    local Summands in 
        Summands = Xs.1 | {AddStream Xs.2 Summands}
    end
end

PiStream = {ScaleStream {PartialSums {PiSummands 1}} 4}

In an alternating series, we can "accelerate" the series to produce another series that converges to the same sum, via a procedure called the Euler transform. It transforms the series with \(n^\text{th}\) term \(S_n\) into another series where the \(n^\text{th}\) term is $$S_{n+1} - \frac{(S_{n+1} - S_n)^2}{S_{n-1} - 2S_n + S_{n+1}}.$$ We can define a function to accelerate the series as follows.

fun {EulerTransform S}
    local S0 S1 S2 in
        S0 = {Nth S 0}         % S[n-1]
        S1 = {Nth S 1}         % S[n]
        S2 = {Nth S 2}         % S[n+1]

        S2 - {Square (S1-S0)}/(S0 - 2*S1 + S2) | {EulerTransform S.2}
    end
end

Thus, we take the initial stream of the summands of pi, and refine it to produce an accelerated stream of the summands of pi. We can now iterate this procedure to produce a stream of streams, where each stream is the Euler transform of the previous.

fun {MakeStreamOfStreams Xs}
    local StreamOfStreams in
        StreamOfStreams = Xs | {MapStream EulerTransform StreamOfStreams}
    end
end

Date: 2014-10-10 18:01:30 India Standard Time

Author: Satyadev Nandakumar

Org version 7.8.11 with Emacs version 24

Validate XHTML 1.0