Programming with Streams
Table of Contents
1 Credits
The material, and the examples in this section is adapted from the section on streams in the textbook Structure and Interpretation of Computer Programs by Abelson and Sussman, for the Oz language.
2 Motivation for Programming using Streams
In iterative programming in languages with mutable state, it is usual
to compute by the means of one variable changing over time. For
example, in order to compute the sum of \(N\) input numbers, we will
keep a variable sum which is initially 0. We iteratively read input
values, and store the partial sums of the input values read so far in
sum, until finally it contains the sum of all the input numbers.
Is there some other way to model such time-dependent behaviour? For inspiration, we look to the way mathematicians model such behaviour. We consider time incrementing in discrete steps. A standard way to model such behaviour in mathematics is to consider a sequence \(x(0), x(1), \dots\) where \(x(0)\) is the initial value ("at time 0"), and in general, for a natural number \(t\), the value \(x(t)\) is the value at time instant \(t\).
The time-dependent behaviour is thus a function \(x\) mapping time to values.
An advantage of this kind of modelling is that we model time-dependent behaviour without using mutable variables - \(x\) is a function which doesn't "change". At the same time, we do have access to the varying quantities. This is just what we need for functional programming!
This is accomplished using a style of programming that is typically called stream-based programming. Adaptations of this idea are used in iterators over data structures (as in C++), and in databases.
We will not directly use the mathematical idea of having time-dependent behaviour as a function. Instead, the time sequence will be represented as a potentially infinite list of values, where the value at position \(t\) will model the value at "time \(t\)".
We will now explore what kind of programming can be done in this style. We have already seen this style in earlier chapters, but here we will reexamine this style of programming emphasizing its connections with declarative concurrency.
3 Streams in Oz
We have powerful abstraction tools for manipulating finite lists -
for instance, Map, FoldL/FoldR, Filter etc. We have also seen
lazy versions of these functions which can compute with potentially
infinite lists.
fun lazy {PrimesFrom A} local IntsFrom in fun lazy {LazyIntsFrom A} A|{LazyIntsBetween A+1} end fun lazy {LazyFilter Predicate Xs} if {Predicate Xs.1} then Xs.1|{LazyFilter Predicate Xs.2} else {LazyFilter Predicate Xs.2} end end {LazyFilter IsPrime {IntsFrom A}} end end
We now have an additional way to program this, using threads!
fun {PrimesFrom A} local X Y in thread X = {IntsFrom A} end % eager list thread Y = {Filter IsPrime X} end % eager filter Y end end {Browse {PrimesFrom 2}}
A fair thread scheduler ensures that the first thread does not go into an infinite computation. Thus the whole computation will produce an infinite list of primes. We accomplished this without using laziness - it is the fairness of the thread scheduling which ensures that both the threads can co-operate to produce the desired output.
Another example is an infinite list of Fibonacci numbers, which we program using threads as follows.
fun {FibsFrom A B} % A < B A+B | {FibsFrom B A+B} end thread {FibsFrom 0 1} end
4 Implicit (self-referential) streams
In the declarative sequential semantics, we mentioned that Oz allows self-referential structures similar to the following.
declare Ones = 1|Ones
This will imply that Ones is an infinite list, each of its entry
being 1. We say that Ones is defined implicitly, or in a
self-referential manner.(At this point, it is worth revisiting how
this is exactly done in the declarative sequential semantics without
forcing non-termination.)
This seems to be a curious artefact allowed by the semantics of Oz, without any seeming practical use. Why would anyone need to define such structures? We now show that this facility allows elegant expressions of certain programs, in the declarative concurrent model with laziness.
We now see progressively sophisticated examples of streams defined implicitly.
4.1 Positive Integers
declare PositiveIntgers = 1 | {AddStreams Ones PositiveIntegers}
This produces a stream of positive integers. To see how this works, we seek recurrences to produce such a stream. A recurrence which produces an infinite stream of positive integers is:
PositiveIntegers[1] = 1 PositiveIntegers[n] = 1 + PositiveIntegers[n-1], for n >= 2.
The AddStreams call ensures that the number in the nth position is
the sum of the numbers in the (n-1)st positions in Ones and
PositiveIntegers, which is a correct implementation of the recursion.
It is also clear that the lazy function AddStreams can compute the
nth value, since the first n-1 terms of PositiveIntegers will
be available by then. That is, just enough elements of
PositiveIntegers are available to enable the computation of the next
element of PositiveIntegers.
                     Time 1       Time 2            Time 3              Time 4
                     ------       ------            ------              ------
PositiveIntegers =   1 | _        1|_               1 | 2 | _           1 | 2 | 3 | _
                                    ^                       ^                       ^
                                    .                       .                       .
                                    .                       .                       .
PositiveIntegers =   1 | _          1 | _               1 | 2 | _           1 | 2 | 3 | _
                                    +                       +                       +
Ones             =                  1 | 1 | ...         1 | 1 | ...         1 | 1 | 1 | ...
4.2 Implicit definition of the Fibonacci sequence
declare Fibs = 0 | 1 | {AddStreams Fibs Fibs.2}
This works as follows.
            Time 2             Time 3                     Time 4                         Time 3
            ------             ------                     ------                         ------
Fibs    =   0 | 1 | _          0 | 1 | 1 | _              0 | 1 | 1 | 2 | _              0 | 1 | 1 | 2 | 3 | _
                    ^                      ^                              ^                                  ^
                    .                      .                              .                                  .
                    .                      .                              .                                  .
                    .                      .                              .                                  .
Fibs    =           0 | 1 |_           0 | 1 | 1 | _              0 | 1 | 1 | 2 | _              0 | 1 | 1 | 2 | 3 | _
                    +                      +                              +                                  +
Fibs|2  =           1 | _              1 | 1 | _                  1 | 1 | 2 | _                  1 | 1 | 2 | 3 | _
Thus it implements the famous recurrence of the Fibonacci sequence
Fib[1] = 0 Fib[2] = 1 Fib[n] = Fib[n-1]+Fib[n-2], for n >= 3
Here the (n-2)nd term comes from the Fib argument, and the
(n-1)st from the Fib.2 argument.
4.3 Implicit definition of the powers of two
declare PowersOfTwo = 1 | {ScaleStream 2 PowersOfTwo}
implementing the recurrence
PowersOfTwo[1] = 1 PowersOfTwo[n] = 2 * PowersOfTwo[n-1], for n >= 2.
Alternatively, it can be also written as
declare PowersOfTwo = 1 | {AddStream PowersOfTwo PowersOfTwo}
implementing the recurrence
PowersOfTwo[1] = 1 PowersOfTwo[n] = PowersOfTwo[n-1] + PowersOfTwo[n-1], for n>=2.
4.4 Implicit definition of Primes
declare Primes = 2 | {FilterStream IsPrime {IntsFrom 3}}
This does not seem to be an implicit definition. However, we will
define IsPrime in a way that depends on the Primes list itself. We
know that a number \(N\) is prime if and only if no number less than or
equal to \(\sqrt{N}\) divides it. It is not difficult to see it is
sufficient to check that if no prime below \(\sqrt{N}\) divides \(N\),
then \(N\) is prime. We now write a function to realise this test.
declare fun {IsPrime N} local Aux in fun {Aux Ps} if {Square Ps.1} > N then true else if {Divisible N Ps.1} then false else {Aux Ps.2} end end {Aux Primes} % implicit reference to Primes end end
The test works because the primes less than or equal to \(\sqrt{N}\) will have been generated by the time we consider the primality of \(N\).
5 Exploiting Streams - Iterations as Streams
A loop in an imperative language is often controlled by a loop variable, which starts with an initial value, and at every iteration, undergoes some update operation at every step.
This can easily be represented as a stream. Abstractly, if \(x(0), x(1), \dots\) is the sequence of values produced by the updates, it can be represented as a stream. (picturized as follows)
+------+        +------+        +-------+
|      |        |      |        |       |
| x(0) |        | x(1) |        | x(2)  |  . . .
|      |        |      |        |       |
+---+--+        +----+-+        +-------+
    |            /|\ |             /|\ |
    |             |  |              |  |
    |             |  |              |  |
    +---Update----+  +-----Update---+  +-------
We can define the same stream implicitly as follows.
declare X = Initial | {MapStream Update X}
For example, consider a simple for loop in C.
int i; for (i=0; ; i++) ;
This can be represented as the following stream.
define NonZeroInts = 0 | {AddStream Ones NonZeroInts}
We consider now a slightly more interesting example.
5.1 Bisection method to find square roots.
In the bisection method to find the square root of a non-negative real number \(x\), we start with an initial guess \(g\), and iteratively refine it until we find an acceptable approximation to \(\sqrt{x}\).
Either \(g \le \sqrt{x}\) in which case \(g \le \sqrt{x} \le x/g\), or \(g \ge \sqrt{x}\), in which case \(x/g \le \sqrt{x} \le g\). Thus, we can refine the guess \(g\) by taking the average of \(g\) and \(x/g\) as the next guess. In practical scenarios, this refinement is continued until \(g\) and \(x/g\) are fairly close to each other, at which point \(g\) is a fairly good approximation of \(\sqrt{x}\).
This is a nice example of an iterative procedure, where \(g\) is the variable whose value changes every iteration. We will now code this in the stream mode. The stream will be the sequence of guesses.
declare SqrtImprove SqrtEstimateStream fun {SqrtImprove Guess X} (Guess + (X/Guess))/2 end fun {SqrtEstimateStream X} local Guesses in Guesses = 1.0 | {MapStream fun {$ A} {SqrtImprove A X} end Guesses} end end {Browse {SqrtEstimateStream 2}}
5.2 Accelerating Streams (Advanced Topic)
In stream-based iteration, we have the entire history of the iterated variable with us. Can we put this to good use? Here's a nice example.
We know that \(\frac{\pi}{4} = 1 - \frac{1}{3} + \frac{1}{5} - \frac{1}{7} \dots\).
declare PiSummands PiStream EulerTransform MakeStreamOfStreams fun lazy {PiSummands N} (1/N) | {MapStream fun {$ A} A*~1 end % negative, for the alternating series {PiSummands N+1}} end %-------------------------- % implement the recurrence % {PartialSummands Xs}[1] = Xs[1] % {PartialSummands Xs}[n] = Xs[n] + {PartialSummands Xs}[n-1] %------------------------- fun lazy {PartialSummands Xs} local Summands in Summands = Xs.1 | {AddStream Xs.2 Summands} end end PiStream = {ScaleStream {PartialSums {PiSummands 1}} 4}
In an alternating series, we can "accelerate" the series to produce another series that converges to the same sum, via a procedure called the Euler transform. It transforms the series with \(n^\text{th}\) term \(S_n\) into another series where the \(n^\text{th}\) term is $$S_{n+1} - \frac{(S_{n+1} - S_n)^2}{S_{n-1} - 2S_n + S_{n+1}}.$$ We can define a function to accelerate the series as follows.
fun {EulerTransform S} local S0 S1 S2 in S0 = {Nth S 0} % S[n-1] S1 = {Nth S 1} % S[n] S2 = {Nth S 2} % S[n+1] S2 - {Square (S1-S0)}/(S0 - 2*S1 + S2) | {EulerTransform S.2} end end
Thus, we take the initial stream of the summands of pi, and refine it to produce an accelerated stream of the summands of pi. We can now iterate this procedure to produce a stream of streams, where each stream is the Euler transform of the previous.
fun {MakeStreamOfStreams Xs} local StreamOfStreams in StreamOfStreams = Xs | {MapStream EulerTransform StreamOfStreams} end end