Wednesday, March 16, 2022

Kotlin: Stream type

 Kotlin: Stream type

The class Stream implements lazy lists where elements are only evaluated when they are needed. A Stream is like a List, except that its elements are computed lazily, in a manner similar to how a view creates a lazy version of a collection. Because Stream elements are computed lazily, a Stream can be long ... infinitely long. Other than this behavior, a Stream behaves similarly to a List.



Motivation

A stream is an ordered collection containing a possibly infinite number of elements. Strange as it may seem an infinite sequence of values finds many uses in real applications. Several phone applications, such as the mail app, use lists of indefinite length. A social media application page loads the first several pages of updates then lets the user load more.

A stream is sometimes referred to as a lazy list. It informs us that the elements are evaluated lazily. In effect the computation part is delayed until an element from the stream is requested. That way we can defer expensive operations until they are actually required.

The custom class Stream defines a lazy stream. It has a similar interface to that of List, including the member functions take, map and foldLeft. The StreamF object declaration has definitions for functions such as from (create a Stream from a source such as an immutable List) and range (create a Stream over a range of numeric values) that would not normally be member functions. This simple example should need little explanation:

val numbers10: Stream<Int> = StreamF.closedRange(1, 10)
assertEquals(10, numbers10.size())
assertEquals(1, numbers10.head())
assertEquals(55, numbers10.foldLeft(0){acc -> {n -> acc + n}})

The member functions take and drop operate exactly as their List counterparts. Two simple examples are:

val numbers1000: Stream<Int> = StreamF.range(0, 1000)
assertEquals(StreamF.of(0, 1, 2, 3, 4), numbers1000.take(5))
assertEquals(StreamF.of(100, 101, 102, 103, 104), numbers1000.drop(100).take(5))

We create a Stream of objects using the overloaded function from from the associated StreamF object declaration:

data class Person(val name: String, val age: Int)
val people: List<Person> = ListF.of(
    Person("Ken", 25),
    Person("John", 31),
    Person("Jessie", 22)
)
val totalAges = StreamF.from(people).map{per -> per.age}.foldLeft(0){acc -> {age -> acc + age}}
assertEquals(78, totalAges)

We also have various fold operations on a Stream. Folding Streams is somewhat different from folding Lists. The primary difference is that Streams are unevaluated. The function foldRight from the List class cannot be made stack safe and for large Lists could overflow the stack. Similarly, the Stream class function foldRight will overflow the stack if, for example, it is adding the elements of a Stream<Int>:

val numbers5: Stream<Int> = StreamF.of(1, 2, 3, 4, 5)
assertEquals(15, numbers5.foldRight(0){n -> {acc -> acc + n}})

val numbers10000: Stream<Int> = StreamF.closedRange(1, 10000)
numbers10000.foldRight(0){n -> {acc -> acc + n}}     // stack overflow

We cannot make foldRight stack safe. Notwithstanding, it has many interesting use-cases. In the following we implement the map function using foldRight:

fun <A, B> map(stream: Stream<A>, f: (A) -> B): Stream<B> =
    stream.foldRight(StreamF.empty()){a: A ->
        {sb: Stream<B> ->
            StreamF.cons(f(a), sb)
        }
    }
assertEquals(
    StreamF.of(false, true, false, true, false),
    map(StreamF.closedRange(1, 5)){n: Int -> isEven(n)}
)

It is possible to generate an infinite stream, i.e. a Stream of infinite length. Function range delivers a stream of integers starting from its argument value, say 1. In effect it produces the stream 1, 2, 3, ..., lazily. In the following example the function firstOddNumbers finds an initial stream of odd numbers. It is implemented with the functions range, filter and take.

fun firstOddNumbers(n: Int): Stream<Int> =
    StreamF.range(1).filter{n -> isOdd(n)}.take(n)
val numbers: Stream<Int> = StreamF.range(1)
assertEquals(StreamF.of(1, 2, 3, 4, 5), numbers.take(5))
assertEquals(StreamF.of(1, 3, 5, 7, 9), firstOddNumbers(5))

A positive integer greater than one is prime if it is divisible by itself and by one. The Sieve of Eratosthenes is an algorithm that works by cancelling out all the multiples of numbers, once they are established as prime. The primes are the values that remain.

We begin with a sequence of numbers starting with 2. The head element is 2 and is prime, and we remove all the multiples of 2 from the sequence. The head of the remainder of the sequence is 3 and is prime. We sieve the remainder of the sequence removing all multiples of 3. The process is repeated indefinitely.

In the next example function sieve is an implementation for this algorithm using a lazy stream. The function accepts the stream of integers [2, 3, 4, 5, ...] and converts it into a stream of primes [2, 3, 5, ...], lazily. The head element of the input stream is the head of the stream of primes. The tail of the primes is defined by the function literal. It makes a recursive call on sieve filtering out all the multiples from the tail of the input.

fun seive(numbers: Stream<Int>): Stream<Int> {
    val head: Int = numbers.head()
    return StreamF.cons(head, object: Product1<Stream<Int>>() {
        override fun first(): Stream<Int> =
            seive(numbers.tailP().first().removeAll{n: Int -> (n % head == 0)})
    })
}

val primes: Stream<Int> = seive(StreamF.range(2))
val primes10: Stream<Int> = primes.take(10)
assertEquals(StreamF.of(2, 3, 5, 7, 11, 13, 17, 19, 23, 29), primes10)

Observe how the stream with all the multiples removed is incorporated into the lazy result. The tail of the result stream is not a stream itself, as happens with the list, rather it is an instance of Product1 that wraps a single value representing the stream tail. The abstract class Product1 requires a definition for the accessor member function first used to obtain the wrapped single value. Defining a Product1 object where the result stream is required is how the laziness is achieved.



Stream pipelines

To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as filter(predicate)), and a terminal operation (which produces a result or side-effect, such as toList). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed. The earlier example with the Person class has map as an intermediate operation and foldLeft as the terminal operation.



Achieving Laziness

The Stream type is similar to the singly linked list, List. The Stream type has some subtle and important differences. The starting point for our Stream data type is:

sealed class Stream<A> {

    class Nil<A>() : Stream<A>()

    class Cons<A>(val hd: A, val tl: Product1<Stream<A>>) : Stream<A>()

}

The Cons class has head (hd) and tail (tl) properties. Importantly, the tail property is an instance of the Product1 class that implements the laziness. The class is abstract and concrete implementations must define the abstract first member function. We saw this class in use with the seive function.

abstract class Product1<A> {

    abstract fun first(): A

}

The function cons (in the StreamF object declaration) is defined by:

fun <A> cons(a: A, stream: Stream<A>): Stream<A> {
    return Cons(a, object : Product1<Stream<A>>() {
        override fun first(): Stream<A> = stream
    })
}   // cons

where we lazily create, without evaluation, the tail part of the new Stream.




Performance

Streams offer significant performance improvements over Lists. In the following a Stream of 10000 integers 1, 2, 3, ..., 10000 is first filtered to take only the even-values, squaring the resulting values, filtering out those that are below 1000, then taking only the first 10.

stream.filter{n -> isEven(n)}.map{n -> n * n}.filter{n -> (n < 1000)}.take(10)

In the next table we show the timings to filter/map/filter/take integers from various integer collections containing 1, 2, 3, ..., 10000. Our Stream is significantly faster than all others by not having to create intermediate structures.

filter/map/filter/take pipeline 10000 integers from our immutable/persistent List: 0.965
filter/map/filter/take pipeline 10000 integers from Kotlin's MutableList type: 0.464
filter/map/filter/take pipeline 10000 integers from Kotlin's List type: 0.461
filter/map/filter/take pipeline 10000 integers from kotlinx PersistentList type: 0.478
filter/map/filter/take pipeline 10000 integers from our Stream: 0.001



The code for the Dogs library can be found at:

https://github.com/KenBarclay/TBA

No comments: