Clojure - transducers, reducers and other dregs
Recently, transducers have gained some fame - a new feature from Clojure 1.7 that has not yet been released. At the time of writing, Clojure 1.7-alpha5 is relevant, but a fair number of transducer ports have already appeared in various languages: Python , Ruby , JavaScript , PHP , Java , C ++ , Lua , Erlang . And ... it's a little discouraging. After all, quite a while ago (back in Clojure 1.5), the reducers library was added . So nobody really said about reducers, didn’t port anything, although they seem to do similar things ... Or not?Let's see why we needed all these reducers & transducers in Clojure (do we really need them?), How they work, how to use them ... And finally, we’ll find out if it’s time to throw reducers into a dump.
It would be wrong to describe concepts that originated in Clojure outside the context of this language. Therefore, there will be many listings on Clojure. But there will be no matan. In general, initial knowledge of Clojure is appropriate (especially the idea of sequences ), but knowing Haskell is not necessary. I also warn you in advance that all the listings of standard functions listed are actually very changed, sometimes even “slightly” broken. All for the sake of simplification. Oh yes, the picture is the same burrito .
We turn off ...
So, Clojure language is functional, which means that the ordinary imperative cycle is not good.
Well, okay, we didn’t really want to - there is a functionally pleasing reduce !
(defn my-reduce
([rf coll] ;; этот вариант для удобства
(if-let [s (seq coll)]
(my-reduce rf (first s) (next s))
(rf)))
([rf acc coll]
(if-let [[x & xs] (seq coll)]
(recur rf (rf acc x) xs)
acc)))
In reality
reduce, of course, it is implemented in a slightly different way , but it doesn’t matter to us now, let’s forget. A function rf(let's call it a reduct function) here takes two arguments: the first is a kind of “moving state”; the second is an element from the sequence coll. If the initial state is not specified, then (first coll)or is used (rf). We run through the entire collection coll, call for each element rf, while “dragging” the state acc. When the items are over, then just return acc. A small example. Suppose we have a list of strings, we want to calculate their total length.
Here is the imperative code with a loop:
(defn length-of-strings [strings]
(with-local-vars [acc 0] ;; да-да, в Clojure есть локальные переменные!
(doseq [c strings]
(var-set
acc
(+ @acc (count c)))) ;; собственно вся работа
@acc))
The state of the loop is a simple counter
acc(number). At each iteration, we assume it is equal (+ @acc (count c)). And now one more time, only through
reduce:(defn length-of-strings [coll]
(my-reduce
(fn ([acc c] (+ acc (count c)))) ;; наша редукт-функция
0 ;; начальное значение
coll))
If you temporarily “forget” about laziness, you can implement many primitive operations, such as
mapor filter.(defn my-map [f coll]
(my-reduce
(fn [acc c] (conj acc (f c)))
[]
coll))
(defn my-filter [p coll]
(my-reduce
(fn [acc c] (if (p c) (conj acc c) acc))
[]
coll))
For implementation, the
takegiven option will reduceno longer fit - the cycle always runs through the entire sequence (this is not some kind of Haskell, where everything is lazy). In order to overcome this drawback, a special
reduced?. At the same time, they rewrote reduce, having received something like this:(defn my-reduce
([rf coll]
(if-let [s (seq coll)]
(my-reduce rf (first s) (next s))
(rf)))
([rf acc coll]
(if-let [[x & xs] (seq coll)]
(let [ret (rf acc x)]
(if (reduced? ret)
@ret
(recur rf ret xs)))
acc)))
As soon as the reduct function returns to us
(reduced ...), the cycle breaks and the value returns @ret.(defn take-r [n coll]
(my-reduce
(fn [[n1 acc] c]
(if (pos? n1)
[(dec n1) (conj acc c)]
(reduced acc)))
[n []]
coll))
;; функция поддерживает бесконечные последовательности!
(take-r 5 (range))
;; => [0 1 2 3 4]
We cannot help but recall the wonderful reductions function . In essence, this is an analogue
reduce, it only returns a lazy list of all intermediate values acc, and not just the last. It is very convenient to use when debugging. We write the algorithm step as a function, run it reduceon collections with input data. If suddenly something is wrong, replace it reducewith reductions, run in REPL and get all the intermediate steps. With cycles, it just won’t be so easy - you will have to fix the debugging crutches, which is not very convenient. It
reductionsis also useful in itself, there are some factorials to calculate:;; => *ленивая* последовательность факториалов
(def factorials (reductions *' (cons 1 (map inc (range)))))
(nth factorials 20)
;; => 2432902008176640000
Clojure uses sequences to cycle through collections. If we decide to go over a vector, a hash table, or a simple iterator, a fair amount of temporary objects will be created in the heap.
The obvious optimization that is requested in such a situation is to implement a specialized option
reducefor those collections for which it makes sense. Well, if the collection does not lend itself to such optimization, then use a standard implementation similar to the one given at the beginning of the article. There is a special protocol clojure.core.protocol / CollReduce for this . When the collection object supports it, this implementation will be used internally clojure.core/reduce. Therefore reduce, Clojure is usually faster than a similar cycle doseq.Transformers
A transformer is a function that takes one reduct function and returns a new one.
For example, here is the “increase by 1” transformer:
(defn inc-t [rf]
(fn [acc c] (rf acc (inc c))))
;; и сразу пример использования
(reduce + 0 (map inc [1 2 3 4]))
;; => 14
(reduce (inc-t +) 0 [1 2 3 4])
;; => 14
You can generalize this matter somewhat, allowing instead
incto specify any function:(defn map-t [f]
(fn [rf]
(fn [acc c] (rf acc (f c)))))
(def inc-t (map-t inc))
(def dec-t (map-t dec))
;; ...
(reduce (inc-t +) 0 [1 2 3 4])
;; => 14
And here, for example, the transformer "filter":
(defn filter-t [pred]
(fn [rf]
(fn [acc c]
(if (pred c)
(rf acc c)
acc))))
(def odd?-t (filter-t odd?))
(def even?-t (filter-t even?))
;; пример
(reduce (even?-t *) 1 [1 2 3 4])
;; => 8
Is it possible to combine several transformers? Of course!
(defn odd?-inc-t [rf]
(odd?-t (inc-t rf)))
;; ..или чуть более канонично
(def odd?-inc-t (comp (filter-t odd?) (map-t inc)))
;; что логически эквивалентно..
(def odd?-inc-t
(comp
(fn [rf]
(fn [acc c]
(if (odd? c) (rf acc c) acc)))
(fn [rf]
(fn [acc c]
(rf acc (inc c))))))
;; что даст эквивалент такой функции
(defn odd?-inc-t [rf]
(fn [acc c]
(if (odd? c)
(rf acc (inc c))
acc)))
;; пример использования
(reduce * 1 (->> [1 2 3 4 5] (filter odd?) (map inc)))
;; => 48
(reduce (odd?-inc-t *) 1 [1 2 3 4 5])
;; ==> 48
It is worth paying attention that transformers go in the "reverse" order. If we want the elements of the collection to be processed by the transformer
Abefore they get into B, then we need to glue them together (comp A B). And now the trick:(def cc (vec (range 1000000)))
(time (reduce + 0 (->> cc (filter odd?) (map inc))))
;; "Elapsed time: 171.390143 msecs"
;; => 250000500000
(time (reduce ((comp (filter-t odd?) (map-t inc)) +) 0 cc))
;; "Elapsed time: 93.246015 msecs"
;; => 250000500000
Here's how, a tangible increase in speed out of the blue! Everything, of course, depends on many details and different nuances, so in reality the gain may be different. In general, I want to say that you should not take this piece of code as a benchmark.
But overall, the results are far from surprising. When using
mapand filter2 intermediate sequences are created. We run through the original vector, create a temporary list of filtered values. Then we go over this list and build another one, but with enlarged elements. And finally, we go over it already, summing up the values. On the other hand, the option with transformers does not create any temporary collections. Instead, and
odd?, and immediately apply over the original elements inc.Where are my reducers?
And all was well until version 1.5 introduced a new standard library
clojure.core.reducers. That's right, a separate library will have to be imported explicitly. And it announced its version of map, filter, take-while, and others. And, of course, they are not compatible with regular versions of clojure.core. Therefore, it is better to write (require '[clojure.core.reducers :as r])instead of simple (use 'clojure.core.reducers). So what is a reducer? Briefly and stupidly: a reducer is any object that can be reduced. Any collection in terms
clojure.core.reducersis a reducer. The hash table is the reducer. And a java.lang.Stringreducer. Well nil, of course, too . Let's see the definition:(defn reducer [coll xf]
;; `xf` - это трансформер
(reify
clojure.core.protocols/CollReduce
(coll-reduce [this f1]
(let [f2 (xf f1)]
(clojure.core.protocols/coll-reduce coll f2 (f2))))
(coll-reduce [this f1 init]
(let [f2 (xf f1)]
(clojure.core.protocols/coll-reduce coll f2 init)))))
Here the collection is taken
coll, and a new one is returned, according to which you can start reduce, and only that. Neither add an element, nor delete, nor even go through the elements. But before each launch, the reducereduct function will be passed through the transformer xf.(def nums [1 2 3 4 5])
(def nums+1 (reducer nums inc-t))
(reduce + 0 nums)
;; => 15
(reduce + 0 nums+1)
;; => 20
As already mentioned, in the library reducers declared their options
map, filter, take-whileand the like. All of them accept the reducer and return a new one to which the corresponding transformer is “attached”. It could have looked like this
clojure.core.reducers/map(of course, it looks completely different ):(def map-r [f coll]
(reducer coll (map-t f)))
And now a few examples of how all this stuff can be used:
(require '[clojure.core.reducers :as r])
(def nums [1 2 3 4 5 6 7 8])
(type (map inc nums))
;; => clojure.lang.LazySeq
(reduce conj [] (map inc nums))
;; => [2 3 4 5 6 7 8 9]
(type (r/map inc nums))
;; => clojure.core.reducers$folder$reify__1234
;; совсем-совсем не sequence
(reduce conj [] (r/map inc nums))
;; => [2 3 4 5 6 7 8 9]
;; но все еще умеет редьюситься
(reduce conj [] (r/filter odd? nums))
;; => [1 3 5 7]
(reduce + 0 (->> nums (r/map inc) (r/map inc)))
;; => 52
;; ~~ (+ 0 (inc (inc 1)) (inc (inc 2)) ...)
(reduce + 0 (->> nums (r/filter odd?) (r/map inc)))
;; => 20
;; ~~ (+ 0 (inc 1) (inc 3) ...)
Parallel
To be honest, the "reducers" in vain so called. “Folders” would be more correct. Indeed, in addition to the protocol
CollReduce(which appeared long before reducers), another more important protocol is declared in the library CollFold:(defprotocol CollFold
(coll-fold [coll n combinef reducef]))
In principle, it is very similar, only reduct functions are now two, and an incomprehensible argument has been added
n. Idea: some collections can be run in several threads. Briefly: we divide it into blocks of about the size of nelements, each piece is collapsed with #(reduce reducef (combinef) %), then the list of results (one per block) is collapsed again, but with the help #(reduce combinef %). A reducer that can fold itself in parallel is called a folder .
Only 2 standard collections support the protocol
CollFold- vectors and hash tables.(def v (vec (range 10000000)))
;; линейно, в 1 поток
(time (reduce + v))
;; "Elapsed time: 648.897616 msecs"
;; => 49999995000000
;; в несколько потоков
(time (r/coll-fold v 512 + +))
;; "Elapsed time: 187.414147 msecs"
;; => 49999995000000
All standard reducers for which this makes sense implement
CollFold. It is, for example r/map, r/filter, r/mapcat, r/flatten. On the other hand r/take, r/take-while, r/dropdo not support parallelization. The implementation was given above r/map. Here is her updated version:(def map-r [f coll]
;; просто заменили `reducer` на `folder`
(folder coll (map-t f)))
coll-foldYou don’t need to
use it directly - for everyday needs there is a fold wrapper . It sets the default value for n(block size) - 512. In general, the hint is clear - reducers are clearly intended for large collections (> 1K items). And again: do not use coll-folddirectly, call fold. Ah, there is still foldcat . A kind of accelerated (due to multithreading) option
#(reduce conj [] %). This function returns clojure.core.reducers.Cat objects that implement both Counted, and Sequable, and CollFold.(r/map inc [1 2 3])
;; => # [2 3 4]
;; что там со скоростью...
(def v (vec (range 1000000)))
(time (count (reduce conj [] (r/map inc v))))
;; "Elapsed time: 90.124397 msecs"
;; => 1000000
;; что-то не очень, а если через `foldcat`
(time (count (r/foldcat (r/map inc v))))
;; "Elapsed time: 25.054988 msecs"
;; => 1000000
(time (count (r/foldcat (r/map inc (r/foldcat (r/map inc v))))))
;; "Elapsed time: 32.054988 msecs"
;; => 1000000
;; результат `foldcat`, кстати, тоже foldable (привет, многопоточность)
(satisfies? r/CollFold (r/foldcat []))
;; => true
They rush into the scene ...
Unlike reducers, transducers are no longer a separate library. It is rather a concept (read the idea) that will be integrated directly into the module
clojure.core. We are waiting for this welcome in version 1.7 (just a little bit left). Briefly: transducers are the same transformers, only
(def typical-transducer
(fn [rf]
(fn ([] ...) ;; возвращаем начальный элемент
([acc] ...) ;; непонятно...
([acc c] ...))) ;; собственно, тут все самое важное, как и раньше
;; новый вариант `map-t`, на 33% лучше старого
(defn map-t-improved [f]
(fn [rf]
(fn ([] (rf)) ;; пробрасываем дальше
([acc] (rf acc)) ;; пробрасываем дальше
([acc c] (rf acc (f c)))))) ;; заменяем `c` на `(f c)`
The 0-ary reduct function, as before, can be called if an initial element is needed. The 2-ary variant is used for, in fact, the reduction itself. A 1-ary version is called at the very end of the whole work (upon completion
reduce). It is needed in those cases when you need to "add" new elements after the last. Example: a dedupe transducer skipping repeats from a collection:
(defn my-dedupe []
(fn [rf]
;; острожно, состояние!
(let [prev (atom ::none)]
(fn ;; это наша редукт-функция
([] (rf))
([acc] (rf acc))
([acc c]
(let [p @prev]
(reset! prev c)
(if (= p c)
acc
(rf acc c))))))))
(def rf ((my-dedupe) +))
(reduce rf 0 [1 1, 2 2, 3, 1 1])
;; => 7
(reduce rf 0 [1 1, 2 2, 3, 1 1])
;; => 6
;; упс... `rf` не чистая, нельзя ее использовать 2 раза
The subtle point is that our transducer returns a new reduct function. Moreover, this reduct function has a mutable state and can actually do 3 different things (1 by arity).
As an example of using the 1-ary variant of the reduct function, partition-all is given . Simplified implementation:
(defn partition-all-t [n]
(fn [rf]
(let [buffer (java.util.ArrayList. n)] ;; состояние!
(fn
([] (rf))
([acc]
(if (.isEmpty buffer)
;; если буффер пустой - пробрасываем дальше
(rf acc)
;; иначе...
(let [v (vec (.toArray buffer)) ;; превращаем буфер в вектор
acc' (rf acc v)] ;; сбрасываем при помощи 2-арной `rf`
;; а теперь можно и пробросить дальше
(rf acc'))))
([acc c]
(.add buffer c)
(if (= n (.size buffer))
;; если буффер переполнился - "сбрасываем" его
(let [v (vec (.toArray buffer))]
(.clear buffer)
(rf acc v))
;; иначе - ничего не делаем
acc))))))
;; пользуем то, что наваяли (не указали начальный элемент, ведь (conj) => [])
(reduce ((partition-all-t 3) conj) (range 10))
; >> ClassCastException java.lang.Long cannot be cast to clojure.lang.IPersistentCollection
;; не работает...
;; ну ладно, а если указать []...
(reduce ((partition-all-t 3) conj) [] (range 10))
;; => [[0 1 2] [3 4 5] [6 7 8]]
;; работает, но неверно...
Hmm ... Neither the 0-ary or the 1-ary variants
((partition-all-t 3) conj)were ever called up - the ordinary reduceone knows nothing about all these innovations. It calls the 0-ary option only if the collection is empty, the 1-ary one never calls at all. Therefore, they created a new function
transduce. Here it is, unlike the "obsolete" one reduce, it (rf)always causes , unless the initial state is clearly specified. And this function is guaranteed to call (rf acc), and exactly once . And transduceshe herself calls our transducer and hides the mutable reduct function from our eyes. In other words, all the dirty work (in terms of side effects) is done “under the hood”.;; передаем сам иммутабельный трансдьюсер, а не результат его работы
(transduce (partition-all-t 3) conj (range 10))
;; => [[0 1 2] [3 4 5] [6 7 8] [9]]
;; ии... работает!
;; композиция трансдьюсеров (опять работает)
(transduce (comp (filter odd?) (partition-all-t 3)) conj (range 10))
;; => [[1 3 5] [7 9]]
But what if you try
transduceto reduceuse instead ?(reduce (identity -) 0 [1 2 3 4])
;; => -10
;; ~~ (- (- (- (- 0 1) 2) 3) 4)
(transduce identity - 0 [1 2 3 4])
;; => 10
;; не верно!
It turns out that it is not possible to directly replace
reducewith transduce- the new requirement of the 1-ary reduct function interferes. In our example, after the end of the calculations , it transducecauses (- acc)that it changes the sign of the result to the opposite. Remedy the situation will Completing :((completing -) 3 2)
;; => 1
((identity -) 1)
;; => -1
((completing -) 1)
;; => 1
(transduce completing - 0 [1 2 3 4])
;; => -10
;; вот теперь правильно!
Special functions have appeared in the core of the language for working with
map, filter, take, interpose, mapcatand the company:(map inc [1 2 3])
;; => (2 3 4)
(map inc)
;; => #
;; это трансдьюсер!
;; можно делать так
(transduce (map inc) + [1 2 3])
;; => 9
(transduce (comp (map inc) (filter even?)) + [1 2 3])
;; => 6
;; ~~ (+ (inc 1) (inc 3)) => 6
In addition,
transducethere are several more functions for working with transducers:;; применяем трансдьюсер к коллекции
(sequence (map inc) [1 2 3])
;; => (2 3 4)
;; это равносильно такому коду
(transduce (map inc) conj [1 2 3])
;; => [2 3 4]
;; Но...
;; функция `sequence` выполняет трансдьюсер *лениво* !
;; с `transduce` такой фокус уже не сработает
(take 5 (sequence (map inc) (range)))
;; => (1 2 3 4 5)
;; в функцию `into` также добавили поддержку трансдьюсеров
(into [9] (map inc) [1 2 3])
;; => [9 2 3 4]
But the funniest feature is eduction . It returns a proxy object on which you can call
seq, reduceor get a java-iterator. This object is expected to simply call transduceor sequnce. A trifle, but convenient.(def odds (eduction (filter odd?) (range)))
(def evens (eduction (remove odd?) (range)))
;; можно работать как с sequential
(take 5 odds)
;; => (1 3 5 7 9)
;; в памяти будет строится sequence из первых 100500 чисел
;; но ссылки на начало не останется - sequence соберется GC
(nth odds 100500)
;; => 2010001
;; а вот тут сразу будет запущен reduce (никаких временных LazyCol)
;; ~= (reduce ((filter even?) ((take 100500) +)) 0 (range))
(transduce (take 100500) + evens)
;; => 10100149500
Stop, stop, stop. It after all suspiciously reminds
clojure.core.reducers/reducer... That, however, it was only possible to turn off, and here still seqallowed to start. So we r/reducerthrow it in the trash! But just not r/folder, he knows how to multithread!(require '[clojure.core.reducers :as r])
(def v (vec (range 1000000)))
(time (transduce (map inc) + v))
;; "Elapsed time: 120.193971 msecs"
;; => 500000500000
(time (r/fold + (r/folder v (map inc))))
;; "Elapsed time: 37.597224 msecs"
;; => 500000500000
;; но соблюдайте осторожность!
(transduce (take 100500) + v)
;; => 5050074750
(r/fold + (r/reducer v (take 100500)))
;; => 5050074750
;; верно
;; reducer устарел - лучше использовать eduction
(r/fold + (eduction (take 100500) v))
;; => 5050074750
(reduce + (r/folder v (take 100500)))
;; => 5050074750
;; даже так верно
(r/fold + (r/folder v (take 100500)))
;; => 109071345018
;; упс...
;; не всякий трансдьюсер параллелизируется (можно превратить в фолдер)
When using transducers achieved as used about lshaya performance compared to conventional
map/ filter/ etc(based lazy sequences) and b of lshaya flexibility / abstraction. I note that here we are talking about clojur sequences: in terms of abstraction and speed, transducers are comparable to ordinary iterators / enumerators / generators (they are called differently in different languages). But back to Clojure. Earlier in core.async it had a whole lot of functions of the form
map>, map<, filter<, filter>,, etc. Today they were removed (well, as they were removed, so far they have only been suspended). But they allowed to specify a ;; не забыли подключить библиотеку в project.clj
(require '[clojure.core.async :as a])
;; самый такой обычный трансдьюсер
(def xf (filter odd?))
;; и канал с буфером
(def ch (a/chan 10 xf))
;; положили в канал числа от 0 до 9 да и закрыли его
(a/onto-chan ch (range 10))
;; достаем числа из канала
(a/ [1 3 5 7 9]
The transducer can only be hung on buffered channels. And before the element appears in the buffer, our transducer processes it. There are all sorts of pipeline 's, they also work with transducers.
To summarize
A variety of reducers / transducers are all a generalization of the convolution operation. And therefore, they require a reduct function with 2 arguments to work their useful.
In addition to the 2-ary variant, it is better to also determine the 0-ary one - it can be used if the initial state of convolution is not specified. Or it may not be used: if the original collection is not empty, then it
reducewill take its first element. But this transducedoes not meanly - either the initial state is passed into it explicitly, or the 0-ary call of the reduct function is used. On the other hand, it
transducerequires more from the reduct function - a 1-ary option is definitely needed. Which, in the general case, most often does nothing at all. Seriously, usually([x] x)- the most meaningful implementation in this case. But we are lazy, we are too lazy to rewrite the old functions (0/2-ary), so we use a wrapper completingthat adds an empty 1-ary option. Further, reducers are based on transformers. Transformer = function with type
rf -> rf. In fact, the reducer is a collection to which the transformer was tightly screwed. And, every time we reducelaunch this collection, first the transformer “spoils” our reduct function. Transducer ~ = transformer, only requires support for a 1-ary reduct function. So we always define this same ill-fated 1-arnik, and proudly declare it to everyone: “Well, of course I don’t use outdated transformers, only transducers”.
With all this, transducers are not limited to working with collections. You can fasten them to channels, input-output streams, queues, observers, etc. In general, to all that fantasy is enough.
Total:
- if we create a new sequence processing algorithm, then it is worth trying to write it as a transducer;
- when a bunch of data in memory, no I / O, no laziness needed - use
reducers; - we want to process the collection and run on it
reduce- it would be nice to trytransduce; - but this is not necessary at all - premature optimizations are not good;
- filtered channels, tricky subscribers to events ... transducers here ask for it;
- We need lazy sequence or do not know what we use - the good old days
map,filterand their friends.