Kotlin: Stream type
The class Stream implements lazy lists where elements are only evaluated when they are needed. A Stream is like a List, except that its elements are computed lazily, in a manner similar to how a view creates a lazy version of a collection. Because Stream elements are computed lazily, a Stream can be long ... infinitely long. Other than this behavior, a Stream behaves similarly to a List.
Motivation
A stream
is an ordered collection containing a possibly infinite number of elements.
Strange as it may seem an infinite sequence of values finds many uses in real
applications. Several phone applications, such as the mail app, use
lists of indefinite length. A social media application page loads the first several
pages of updates then lets the user load more.
A
stream is sometimes referred to as a lazy
list. It informs us that the elements are evaluated lazily. In effect the
computation part is delayed until an element from the stream is requested. That
way we can defer expensive operations until they are actually required.
The
custom class Stream defines a lazy stream. It has a similar interface to
that of List, including the member
functions take, map and foldLeft. The StreamF object declaration has definitions for functions such as from (create a Stream from a source such as an immutable List) and range (create a Stream over a range of numeric values) that would not normally be member
functions. This simple example should need little explanation:
val numbers10: Stream<Int> = StreamF.closedRange(1, 10)
assertEquals(10, numbers10.size())
assertEquals(1, numbers10.head())
assertEquals(55, numbers10.foldLeft(0){acc -> {n -> acc + n}})
The member functions take and drop operate exactly as their List counterparts. Two simple examples are:
val numbers1000: Stream<Int> = StreamF.range(0, 1000)
assertEquals(StreamF.of(0, 1, 2, 3, 4), numbers1000.take(5))
assertEquals(StreamF.of(100, 101, 102, 103, 104), numbers1000.drop(100).take(5))
We create a Stream of objects using the overloaded function from from the associated StreamF object declaration:
data class Person(val name: String, val age: Int)
val people: List<Person> = ListF.of(
Person("Ken", 25),
Person("John", 31),
Person("Jessie", 22)
)
val totalAges = StreamF.from(people).map{per -> per.age}.foldLeft(0){acc -> {age -> acc + age}}
assertEquals(78, totalAges)
We also have various fold operations on a Stream. Folding Streams is somewhat different from folding Lists. The primary difference is that Streams are unevaluated. The function foldRight from the List class cannot be made stack safe and for large Lists could overflow the stack. Similarly, the Stream class function foldRight will overflow the stack if, for example, it is adding the elements of a Stream<Int>:
val numbers5: Stream<Int> = StreamF.of(1, 2, 3, 4, 5)
assertEquals(15, numbers5.foldRight(0){n -> {acc -> acc + n}})
val numbers10000: Stream<Int> = StreamF.closedRange(1, 10000)
numbers10000.foldRight(0){n -> {acc -> acc + n}} // stack overflow
We cannot make foldRight stack safe. Notwithstanding, it has many interesting use-cases. In the following we implement the map function using foldRight:
fun <A, B> map(stream: Stream<A>, f: (A) -> B): Stream<B> =
stream.foldRight(StreamF.empty()){a: A ->
{sb: Stream<B> ->
StreamF.cons(f(a), sb)
}
}
assertEquals(
StreamF.of(false, true, false, true, false),
map(StreamF.closedRange(1, 5)){n: Int -> isEven(n)}
)
It is possible to generate an infinite stream, i.e. a Stream of infinite length. Function
range delivers a stream of integers
starting from its argument value, say 1. In effect it produces the stream 1,
2, 3, ..., lazily. In the following example the function firstOddNumbers finds an initial stream of odd numbers. It is implemented with the
functions range, filter and take.
fun firstOddNumbers(n: Int): Stream<Int> =
StreamF.range(1).filter{n -> isOdd(n)}.take(n)
val numbers: Stream<Int> = StreamF.range(1)
assertEquals(StreamF.of(1, 2, 3, 4, 5), numbers.take(5))
assertEquals(StreamF.of(1, 3, 5, 7, 9), firstOddNumbers(5))
A
positive integer greater than one is prime if it is divisible by itself and by
one. The Sieve of Eratosthenes is an
algorithm that works by cancelling out all the multiples of numbers, once they
are established as prime. The primes are the values that remain.
We
begin with a sequence of numbers starting with 2. The head element is 2 and is prime, and we
remove all the multiples of 2 from the sequence. The head of the remainder of
the sequence is 3 and is prime. We sieve the remainder of the sequence removing
all multiples of 3. The process is repeated indefinitely.
In
the next example function sieve is an
implementation for this algorithm using a lazy stream. The function accepts the
stream of integers [2, 3, 4, 5, ...] and converts it into a stream of primes
[2, 3, 5, ...], lazily. The head element of the input stream is the head of the
stream of primes. The tail of the primes is defined by the function literal. It
makes a recursive call on sieve
filtering out all the multiples from the tail of the input.
fun seive(numbers: Stream<Int>): Stream<Int> {
val head: Int = numbers.head()
return StreamF.cons(head, object: Product1<Stream<Int>>() {
override fun first(): Stream<Int> =
seive(numbers.tailP().first().removeAll{n: Int -> (n % head == 0)})
})
}
val primes: Stream<Int> = seive(StreamF.range(2))
val primes10: Stream<Int> = primes.take(10)
assertEquals(StreamF.of(2, 3, 5, 7, 11, 13, 17, 19, 23, 29), primes10)
Observe
how the stream with all the multiples removed is incorporated into the lazy
result. The tail of the result stream is not a stream itself, as happens with
the list, rather it is an instance of Product1
that wraps a single value representing the stream tail. The abstract class Product1 requires a definition for the
accessor member function first used
to obtain the wrapped single value. Defining a Product1 object where the result stream is required is how the
laziness is achieved.
Stream pipelines
To perform a computation, stream operations are composed into a stream pipeline. A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream, such as filter(predicate)), and a terminal operation (which produces a result or side-effect, such as toList). Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed. The earlier example with the Person class has map as an intermediate operation and foldLeft as the terminal operation.
Achieving Laziness
The Stream type is similar to the singly linked list, List. The Stream type has some subtle and important differences. The starting point for our Stream data type is:
sealed class Stream<A> {
class Nil<A>() : Stream<A>()
class Cons<A>(val hd: A, val tl: Product1<Stream<A>>) : Stream<A>()
}
The Cons class has head (hd) and tail (tl) properties. Importantly, the tail property is an instance of the Product1 class that implements the laziness. The class is abstract and concrete implementations must define the abstract first member function. We saw this class in use with the seive function.
abstract class Product1<A> {
abstract fun first(): A
}
The function cons (in the StreamF object declaration) is defined by:
fun <A> cons(a: A, stream: Stream<A>): Stream<A> {
return Cons(a, object : Product1<Stream<A>>() {
override fun first(): Stream<A> = stream
})
} // cons
where we lazily create, without evaluation, the tail part of the new Stream.
Performance
Streams offer significant performance improvements over Lists. In the following a Stream of 10000 integers 1, 2, 3, ..., 10000 is first filtered to take only the even-values, squaring the resulting values, filtering out those that are below 1000, then taking only the first 10.
stream.filter{n -> isEven(n)}.map{n -> n * n}.filter{n -> (n < 1000)}.take(10)
In the next table we show the timings to filter/map/filter/take integers from various integer collections containing 1, 2, 3, ..., 10000. Our Stream is significantly faster than all others by not having to create intermediate structures.
filter/map/filter/take pipeline 10000 integers from our immutable/persistent List: 0.965
filter/map/filter/take pipeline 10000 integers from Kotlin's MutableList type: 0.464
filter/map/filter/take pipeline 10000 integers from Kotlin's List type: 0.461
filter/map/filter/take pipeline 10000 integers from kotlinx PersistentList type: 0.478
filter/map/filter/take pipeline 10000 integers from our Stream: 0.001
The code for the Dogs library can be found at:
https://github.com/KenBarclay/TBA
No comments:
Post a Comment