Compare commits

...

No commits in common. "master" and "resources" have entirely different histories.

16 changed files with 0 additions and 2013 deletions

25
.gitignore vendored
View file

@ -1,25 +0,0 @@
# Created by https://www.toptal.com/developers/gitignore/api/clojure
# Edit at https://www.toptal.com/developers/gitignore?templates=clojure
### Clojure ###
pom.xml
pom.xml.asc
*.jar
*.class
/lib/
/classes/
/bin/
/node_modules/
/.lumo_cache/
/target/
/checkouts/
.lein-deps-sum
.lein-repl-history
.lein-plugins/
.lein-failures
.nrepl-port
.cpcache/
.clj-kondo/*/
/cljs-test-runner-out/
# End of https://www.toptal.com/developers/gitignore/api/clojure

View file

@ -1,8 +0,0 @@
language: clojure
lein: lein
script: lein test
jdk:
- openjdk6
- openjdk7
- oraclejdk7
- oraclejdk8

BIN
Lost in Transduction.pdf Normal file

Binary file not shown.

333
README.md
View file

@ -1,333 +0,0 @@
# xforms
More transducers and reducing functions for Clojure(script)!
*Transducers* can be classified in three groups: regular ones, higher-order ones
(which accept other transducers as arguments) and aggregators (transducers which emit only 1 item out no matter how many went in).
Aggregators generally only make sense in the context of a higher-order transducer.
In `net.cgrand.xforms`:
* regular ones: `partition` (1 arg), `reductions`, `for`, `take-last`, `drop-last`, `sort`, `sort-by`, `wrap`, `window` and `window-by-time`
* higher-order ones: `by-key`, `into-by-key`, `multiplex`, `transjuxt`, `partition` (2+ args), `time`
* aggregators: `reduce`, `into`, `without`, `transjuxt`, `last`, `count`, `avg`, `sd`, `min`, `minimum`, `max`, `maximum`, `str`
In `net.cgrand.xforms.io`:
* `sh` to use any process as a reducible collection (of stdout lines) or as a transducers (input as stdin lines, stdout lines as output).
*Reducing functions*
* in `net.cgrand.xforms.rfs`: `min`, `minimum`, `max`, `maximum`, `str`, `str!`, `avg`, `sd`, `last` and `some`.
* in `net.cgrand.xforms.io`: `line-out` and `edn-out`.
(in `net.cgrand.xforms`)
*Transducing contexts*:
* in `net.cgrand.xforms`: `transjuxt` (for performing several transductions in a single pass), `iterator` (clojure only), `into`, `without`, `count`, `str` (2 args) and `some`.
* in `net.cgrand.xforms.io`: `line-out` (3+ args) and `edn-out` (3+ args).
* in `net.cgrand.xforms.nodejs.stream`: `transformer`.
*Reducible views* (in `net.cgrand.xforms.io`): `lines-in` and `edn-in`.
**Note:** it should always be safe to update to the latest xforms version; short of bugfixes, breaking changes are avoided.
## Add as a dependency
For specific coordinates see the [Releases](https://github.com/cgrand/xforms/releases) page.
## Usage
```clj
=> (require '[net.cgrand.xforms :as x])
```
`str` and `str!` are two reducing functions to build Strings and StringBuilders in linear time.
```clj
=> (quick-bench (reduce str (range 256)))
Execution time mean : 58,714946 µs
=> (quick-bench (reduce rf/str (range 256)))
Execution time mean : 11,609631 µs
```
`for` is the transducing cousin of `clojure.core/for`:
```clj
=> (quick-bench (reduce + (for [i (range 128) j (range i)] (* i j))))
Execution time mean : 514,932029 µs
=> (quick-bench (transduce (x/for [i % j (range i)] (* i j)) + 0 (range 128)))
Execution time mean : 373,814060 µs
```
You can also use `for` like `clojure.core/for`: `(x/for [i (range 128) j (range i)] (* i j))` expands to `(eduction (x/for [i % j (range i)] (* i j)) (range 128))`.
`by-key` and `reduce` are two new transducers. Here is an example usage:
```clj
;; reimplementing group-by
(defn my-group-by [kfn coll]
(into {} (x/by-key kfn (x/reduce conj)) coll))
;; let's go transient!
(defn my-group-by [kfn coll]
(into {} (x/by-key kfn (x/into [])) coll))
=> (quick-bench (group-by odd? (range 256)))
Execution time mean : 29,356531 µs
=> (quick-bench (my-group-by odd? (range 256)))
Execution time mean : 20,604297 µs
```
Like `by-key`, `partition` also takes a transducer as last argument to allow further computation on the partition.
```clj
=> (sequence (x/partition 4 (x/reduce +)) (range 16))
(6 22 38 54)
```
Padding is achieved as usual:
```clj
=> (sequence (x/partition 4 4 (repeat :pad) (x/into [])) (range 9))
([0 1 2 3] [4 5 6 7] [8 :pad :pad :pad])
```
`avg` is a transducer to compute the arithmetic mean. `transjuxt` is used to perform several transductions at once.
```clj
=> (into {} (x/by-key odd? (x/transjuxt [(x/reduce +) x/avg])) (range 256))
{false [16256 127], true [16384 128]}
=> (into {} (x/by-key odd? (x/transjuxt {:sum (x/reduce +) :mean x/avg :count x/count})) (range 256))
{false {:sum 16256, :mean 127, :count 128}, true {:sum 16384, :mean 128, :count 128}}
```
`window` is a new transducer to efficiently compute a windowed accumulator:
```clj
;; sum of last 3 items
=> (sequence (x/window 3 + -) (range 16))
(0 1 3 6 9 12 15 18 21 24 27 30 33 36 39 42)
=> (def nums (repeatedly 8 #(rand-int 42)))
#'user/nums
=> nums
(11 8 32 26 6 10 37 24)
;; avg of last 4 items
=> (sequence
(x/window 4 rf/avg #(rf/avg %1 %2 -1))
nums)
(11 19/2 17 77/4 18 37/2 79/4 77/4)
;; min of last 3 items
=> (sequence
(x/window 3
(fn
([] (sorted-map))
([m] (key (first m)))
([m x] (update m x (fnil inc 0))))
(fn [m x]
(let [n (dec (m x))]
(if (zero? n)
(dissoc m x)
(assoc m x (dec n))))))
nums)
(11 8 8 8 6 6 6 10)
```
## On Partitioning
Both `by-key` and `partition` takes a transducer as parameter. This transducer is used to further process each partition.
It's worth noting that all transformed outputs are subsequently interleaved. See:
```clj
=> (sequence (x/partition 2 1 identity) (range 8))
(0 1 1 2 2 3 3 4 4 5 5 6 6 7)
=> (sequence (x/by-key odd? identity) (range 8))
([false 0] [true 1] [false 2] [true 3] [false 4] [true 5] [false 6] [true 7])
```
That's why most of the time the last stage of the sub-transducer will be an aggregator like `x/reduce` or `x/into`:
```clj
=> (sequence (x/partition 2 1 (x/into [])) (range 8))
([0 1] [1 2] [2 3] [3 4] [4 5] [5 6] [6 7])
=> (sequence (x/by-key odd? (x/into [])) (range 8))
([false [0 2 4 6]] [true [1 3 5 7]])
```
## Simple examples
`(group-by kf coll)` is `(into {} (x/by-key kf (x/into []) coll))`.
`(plumbing/map-vals f m)` is `(into {} (x/by-key (map f)) m)`.
My faithful `(reduce-by kf f init coll)` is now `(into {} (x/by-key kf (x/reduce f init)))`.
`(frequencies coll)` is `(into {} (x/by-key identity x/count) coll)`.
## On key-value pairs
Clojure `reduce-kv` is able to reduce key value pairs without allocating vectors or map entries: the key and value
are passed as second and third arguments of the reducing function.
Xforms allows a reducing function to advertise its support for key value pairs (3-arg arity) by implementing the `KvRfable` protocol (in practice using the `kvrf` macro).
Several xforms transducers and transducing contexts leverage `reduce-kv` and `kvrf`. When these functions are used together, pairs can be transformed without being allocated.
<table>
<thead>
<tr><th>fn<th>kvs in?<th>kvs out?
</thead>
<tbody>
<tr><td>`for`<td>when first binding is a pair<td>when `body-expr` is a pair
<tr><td>`reduce`<td>when is `f` is a kvrf<td>no
<tr><td>1-arg `into`<br>(transducer)<td>when `to` is a map<td>no
<tr><td>3-arg `into`<br>(transducing context)<td>when `from` is a map<td>when `to` is a map
<tr><td>`by-key`<br>(as a transducer)<td>when is `kfn` and `vfn` are unspecified or `nil`<td>when `pair` is `vector` or unspecified
<tr><td>`by-key`<br>(as a transducing context on values)<td>no<td>no
</tbody>
<table>
```clj
;; plain old sequences
=> (let [m (zipmap (range 1e5) (range 1e5))]
(crit/quick-bench
(into {}
(for [[k v] m]
[k (inc v)]))))
Evaluation count : 12 in 6 samples of 2 calls.
Execution time mean : 55,150081 ms
Execution time std-deviation : 1,397185 ms
;; x/for but pairs are allocated (because of into)
=> (let [m (zipmap (range 1e5) (range 1e5))]
(crit/quick-bench
(into {}
(x/for [[k v] _]
[k (inc v)])
m)))
Evaluation count : 18 in 6 samples of 3 calls.
Execution time mean : 39,119387 ms
Execution time std-deviation : 1,456902 ms
;; x/for but no pairs are allocated (thanks to x/into)
=> (let [m (zipmap (range 1e5) (range 1e5))]
(crit/quick-bench (x/into {}
(x/for [[k v] %]
[k (inc v)])
m)))
Evaluation count : 24 in 6 samples of 4 calls.
Execution time mean : 24,276790 ms
Execution time std-deviation : 364,932996 µs
```
## Changelog
### 0.19.6
* Fix regression in 0.19.5 #54
### 0.19.5
* Support ClojureDart
### 0.19.4
* Fix ClojureScript compilation broken in `0.19.3` #49
* Fix `x/sort` and `x/sort-by` for ClojureScript #40
### 0.19.3
* Add `deps.edn` to enable usage as a [git library](https://clojure.org/guides/deps_and_cli#_using_git_libraries)
* Bump `macrovich` to make Clojure and ClojureScript provided dependencies #34
* Fix reflection warnings in `xforms.io` #35 #36
* Add compatibility with [babashka](https://github.com/babashka/babashka) #42
* Fix `x/destructuring-pair?` #44 #45
* Fix `x/into` performance hit with small maps #46 #47
* Fix reflection and shadowing warnings in tests
### 0.19.2
* Fix infinity symbol causing issues with ClojureScript #31
### 0.19.0
`time` allows to measure time spent in one transducer (excluding time spent downstream).
```clj
=> (time ; good old Clojure time
(count (into [] (comp
(x/time "mapinc" (map inc))
(x/time "filterodd" (filter odd?))) (range 1e6))))
filterodd: 61.771738 msecs
mapinc: 143.895317 msecs
"Elapsed time: 438.34291 msecs"
500000
```
First argument can be a function that gets passed the time (in ms),
this allows for example to log time instead of printing it.
### 0.9.5
* Short (up to 4) literal collections (or literal collections with `:unroll` metadata) in collection positions in `x/for` are unrolled.
This means that the collection is not allocated.
If it's a collection of pairs (e.g. maps), pairs themselves won't be allocated.
### 0.9.4
* Add `x/into-by-key` short hand
### 0.7.2
* Fix transients perf issue in Clojurescript
### 0.7.1
* Works with Clojurescript (even self-hosted).
### 0.7.0
* Added 2-arg arity to `x/count` where it acts as a transducing context e.g. `(x/count (filter odd?) (range 10))`
* Preserve type hints in `x/for` (and generally with `kvrf`).
### 0.6.0
* Added `x/reductions`
* Now if the first collection expression in `x/for` is not a placeholder then `x/for` works like `x/for` but returns an eduction and performs all iterations using reduce.
## Troubleshooting xforms in a Clojurescript dev environment
If you use xforms with Clojurescript and the Emacs editor to start your figwheel REPL be sure to include the `cider.nrepl/cider-middleware` to your figwheel's nrepl-middleware.
```
:figwheel {...
:nrepl-middleware [cider.nrepl/cider-middleware;;<= that middleware
refactor-nrepl.middleware/wrap-refactor
cemerick.piggieback/wrap-cljs-repl]
...}
```
Otherwise a strange interaction occurs and every results from your REPL evaluation would be returned as a String. Eg.:
```
cljs.user> 1
"1"
cljs.user>
```
instead of:
```
cljs.user> 1
1
cljs.user>
```
## License
Copyright © 2015-2016 Christophe Grand
Distributed under the Eclipse Public License either version 1.0 or (at
your option) any later version.

56
bb.edn
View file

@ -1,56 +0,0 @@
{:deps {local/deps {:local/root "."}}
:paths ["src" "test"]
:tasks
{:requires ([clojure.string :as str])
:init
(do
(defn kaocha [alias args]
(apply shell "bin/kaocha" alias args))
(defn test-cljs [alias args]
(apply clojure (str/join ["-M:test:cljs-test-runner" alias]) args)))
test-clj-9
{:task (kaocha :clj-1-9 *command-line-args*)}
test-clj-10
{:task (kaocha :clj-1-10 *command-line-args*)}
test-clj-11
{:task (kaocha :clj-1-11 *command-line-args*)}
test-clj
{:depends [test-clj-9 test-clj-10 test-clj-11]}
test-cljs-9
{:task (test-cljs :clj-1-9 *command-line-args*)}
test-cljs-10
{:task (test-cljs :clj-1-10 *command-line-args*)}
test-cljs-11
{:task (test-cljs :clj-1-11 *command-line-args*)}
test-cljs
{:depends [#_test-cljs-9 test-cljs-10 test-cljs-11]}
test-bb
{:requires ([clojure.test :as t]
[net.cgrand.xforms-test])
:task (t/run-tests 'net.cgrand.xforms-test)}
test-all
{:depends [test-bb test-clj test-cljs]}
perf-bb
{:requires ([net.cgrand.xforms :as x])
:task
(let [n 10000
m (zipmap (range 100) (range))
mapping (map (fn [[k v]] [k (inc v)]))
xforing (x/for [[k v] _] [k (inc v)])]
(time (dotimes [_ n] (into {} mapping m)))
(time (dotimes [_ n] (into {} xforing m)))
(time (dotimes [_ n] (x/into {} xforing m))))}}}

View file

@ -1,3 +0,0 @@
#!/usr/bin/env bash
set -x
clojure -Srepro -M:kaocha:test"$1" "${@:2}"

View file

@ -1,46 +0,0 @@
(ns build
(:require [clojure.tools.build.api :as b]
[clojure.java.shell :as sh]))
(def lib 'net.cgrand/xforms)
(def version "0.19.6" #_(format "0.0.%s" (b/git-count-revs nil)))
(def class-dir "target/classes")
(def basis (b/create-basis {:project "deps.edn"}))
(def jar-file (format "target/%s-%s.jar" (name lib) version))
(def scm {:connection "scm:git:git://github.com/cgrand/xforms.git"
:developerConnection "scm:git:git://github.com/cgrand/xforms.git"
:url "https://github.com/cgrand/xforms"})
(def extra-pom-data
[[:licenses
[:license
[:name "Eclipse Public License 1.0"]
[:url "https://opensource.org/license/epl-1-0/"]
[:distribution "repo"]]
[:license
[:name "Eclipse Public License 2.0"]
[:url "https://opensource.org/license/epl-2-0/"]
[:distribution "repo"]]]])
(defn clean [_]
(b/delete {:path "target"}))
(defn jar [_]
(b/write-pom {:class-dir class-dir
:lib lib
:version version
:basis basis
:src-dirs ["src"]
:scm (assoc scm :tag (str "v" version))
:pom-data extra-pom-data})
(b/copy-dir {:src-dirs ["src" "resources"]
:target-dir class-dir})
(b/jar {:class-dir class-dir
:jar-file jar-file}))
(defn clojars [_]
(sh/sh
"mvn" "deploy:deploy-file" (str "-Dfile=" jar-file)
;target/classes/META-INF/maven/net.cgrand/xforms/pom.xml
(format "-DpomFile=%s/META-INF/maven/%s/%s/pom.xml"
class-dir (namespace lib) (name lib))
"-DrepositoryId=clojars" "-Durl=https://clojars.org/repo/"))

View file

@ -1,45 +0,0 @@
{:deps {net.cgrand/macrovich {:mvn/version "0.2.2"}}
:paths ["src"]
:aliases
{:dev
{:extra-paths ["dev"]}
:cljd
{:extra-deps
{tensegritics/clojuredart
{:git/url "https://github.com/tensegritics/ClojureDart.git"
:sha "ae1b485e84ccc35b122f776dfc7cc62198274701"}}}
:clj-1-9
{:extra-deps
{org.clojure/clojure {:mvn/version "1.9.0"}
org.clojure/clojurescript {:mvn/version "1.9.293"}}}
:clj-1-10
{:extra-deps
{org.clojure/clojure {:mvn/version "1.10.3"}
org.clojure/clojurescript {:mvn/version "1.10.914"}}}
:clj-1-11
{:extra-deps
{org.clojure/clojure {:mvn/version "1.11.1"}
org.clojure/clojurescript {:mvn/version "1.11.60"}}}
:test
{:extra-paths ["test"]}
:kaocha
{:extra-paths ["test"]
:extra-deps {lambdaisland/kaocha {:mvn/version "1.69.1069"}}
:main-opts ["-m" "kaocha.runner"]}
:cljs-test-runner
{:extra-paths ["test"]
:extra-deps {olical/cljs-test-runner {:mvn/version "3.8.0"}}
:main-opts ["-m" "cljs-test-runner.main"]}
:build
{:paths ["."]
:deps {io.github.clojure/tools.build {:git/tag "v0.9.6" :git/sha "8e78bcc"}}
:ns-default build}}}

View file

@ -1,3 +0,0 @@
(ns user)
(set! *warn-on-reflection* true)

View file

@ -1,21 +0,0 @@
{
"name": "xforms",
"version": "0.16.0",
"description": "Extra transducers for Clojurescript",
"repository": "https://github.com/cgrand/xforms.git",
"author": "Christophe Grand <christophe@cgrand.net>",
"license": "EPL-1.0",
"directories": {
"lib": "src",
"cache": "./lumo-cache"
},
"keywords": [
"cljs",
"cljc",
"self-host",
"transducer"
],
"dependencies": {
"macrovich": "^0.2.1-SNAPSHOT"
}
}

View file

@ -1,891 +0,0 @@
(ns net.cgrand.xforms
"Extra transducers for Clojure"
{:author "Christophe Grand"}
#?(:cljs (:require-macros
[net.cgrand.macrovich :as macros]
[net.cgrand.xforms :refer [for kvrf let-complete]])
:default (:require [net.cgrand.macrovich :as macros]))
(:refer-clojure :exclude [some reduce reductions into count for partition
str last keys vals min max drop-last take-last
sort sort-by time #?@(:bb [] :cljd/clj-host [] :clj [satisfies?])])
(:require [#?(:cljd cljd.core :clj clojure.core :cljs cljs.core) :as core]
[net.cgrand.xforms.rfs :as rf]
#?@(:cljd [["dart:collection" :as dart:coll]] :clj [[clojure.core.protocols]] :cljs []))
#?(:cljd/clj-host
; customize the clj/jvm ns used for macroexpansion
(:host-ns (:require [clojure.core :as core]
[net.cgrand.macrovich :as macros])))
#?(:cljs (:import [goog.structs Queue])))
(defn- ^:macro-support pair? [x] (and (vector? x) (= 2 (core/count x))))
(def ^:macro-support destructuring-pair?
(let [kw-or-& #(or (keyword? %) (= '& %))]
(fn [x]
(and (pair? x)
(not (kw-or-& (first x)))))))
(macros/deftime
(defn- ^:macro-support no-user-meta? [x]
(= {} (dissoc (or (meta x) {}) :file :line :column :end-line :end-column)))
(defmacro unreduced->
"Thread first while threaded value is not reduced.
Doesn't unreduce the final value."
([x] x)
([x expr & exprs]
`(let [x# ~x]
(if (reduced? x#)
x#
(unreduced-> (-> x# ~expr) ~@exprs)))))
(defmacro for
"Like clojure.core/for with the first expression being replaced by % (or _). Returns a transducer.
When the first expression is not % (or _) returns an eduction."
[[binding %or_ & seq-exprs] body-expr]
(if-not (and (symbol? %or_) (#{"%" "_"} (name %or_)))
`(eduction (for [~binding ~'% ~@seq-exprs] ~body-expr) ~%or_)
(let [rf (gensym 'rf)
acc (gensym 'acc)
rpairs (core/partition 2 (rseq (vec seq-exprs)))
build (fn [init]
(core/reduce (fn [body [expr binding]]
(case binding
:let `(let ~expr ~body)
:when `(if ~expr ~body ~acc)
:while `(if ~expr ~body (reduced ~acc))
(if (and (coll? expr) (not (seq? expr))
(or (<= (core/count expr) 4) (:unroll (meta expr))))
(let [body-rf (gensym 'body-rf)]
(if (and (destructuring-pair? binding) (every? vector? expr))
`(let [~body-rf (fn [~acc ~@binding] ~body)]
(unreduced (unreduced-> ~acc
~@(map (fn [[k v]] `(~body-rf ~k ~v)) expr))))
`(let [~body-rf (fn [~acc ~binding] ~body)]
(unreduced (unreduced-> ~acc
~@(map (fn [v] `(~body-rf ~v)) expr))))))
(if (destructuring-pair? binding)
`(let [expr# ~expr]
(if (and (map? expr#) (kvreducible? expr#))
(core/reduce-kv (fn [~acc ~@binding] ~body) ~acc expr#)
(core/reduce (fn [~acc ~binding] ~body) ~acc expr#)))
`(core/reduce (fn [~acc ~binding] ~body) ~acc ~expr)))))
init rpairs))
nested-reduceds (core/for [[expr binding] rpairs
:when (not (keyword? binding))]
`reduced)
body (build `(let [acc# (~rf ~acc ~@(if (and (pair? body-expr) (no-user-meta? body-expr))
body-expr
[body-expr]))]
(if (reduced? acc#)
(-> acc# ~@nested-reduceds)
acc#)))]
`(fn [~rf]
(let [~rf (ensure-kvrf ~rf)]
(kvrf
([] (~rf))
([~acc] (~rf ~acc))
([~acc ~binding] ~body)))))))
(defn- ^:macro-support arity [[arglist & body :as fn-body]]
(let [[fixargs varargs] (split-with (complement #{'&}) arglist)]
(if (seq varargs) (zipmap (range (core/count fixargs) 4) (repeat fn-body)))
{(core/count fixargs) fn-body}))
(defmacro kvrf [name? & fn-bodies]
(let [name (if (symbol? name?) name? (gensym '_))
fn-bodies (if (symbol? name?) fn-bodies (cons name? fn-bodies))
fn-bodies (if (vector? (first fn-bodies)) (list fn-bodies) fn-bodies)
arities (core/into {} (mapcat arity) fn-bodies)
_ (when-not (core/some arities [2 3]) (throw (ex-info "Either arity 2 or 3 should be defined in kvrf." {:form &form})))
fn-bodies (cond-> fn-bodies
(not (arities 3)) (conj (let [[[acc arg] & body] (arities 2)]
(if (destructuring-pair? arg)
(let [[karg varg] arg]
`([~acc ~karg ~varg] ~@body))
(let [k (gensym "k__")
v (gensym "v__")
arg-value (macros/case
:clj `(clojure.lang.MapEntry. ~k ~v)
:cljs [k v]
:cljd `(MapEntry ~k ~v))]
`([~acc ~k ~v] (let [~arg ~arg-value] ~@body))))))
(not (arities 2)) (conj (let [[[acc karg varg] & body] (arities 3)]
`([~acc [~karg ~varg]] ~@body))))]
`(reify
#?@(:bb [] ;; babashka currently only supports reify with one Java interface at a time
:default [~@(macros/case :cljd '[cljd.core/Fn] :clj '[clojure.lang.Fn])])
KvRfable
(~'some-kvrf [this#] this#)
~(macros/case :cljs `core/IFn :clj 'clojure.lang.IFn :cljd 'cljd.core/IFn)
~@(core/for [[args & body] fn-bodies]
(let [nohint-args (map (fn [arg] (if (:tag (meta arg)) (gensym 'arg) arg)) args)
rebind (mapcat (fn [arg nohint]
(when-not (= arg nohint) [arg nohint])) args nohint-args)]
`(~(macros/case :cljd '-invoke :cljs `core/-invoke :clj 'invoke)
[~name ~@nohint-args] ~@(if (seq rebind) [`(let [~@rebind] ~@body)] body)))))))
(defmacro ^:private let-complete [[binding volatile] & body]
`(let [v# @~volatile]
(when-not (identical? v# ~volatile) ; self reference as sentinel
(vreset! ~volatile ~volatile)
(let [~binding v#]
~@body))))
)
(declare into reduce multiplex by-key)
(defprotocol KvRfable "Protocol for reducing fns that accept key and val as separate arguments."
(some-kvrf [f] "Returns a kvrf or nil"))
(macros/usetime
;; Workaround clojure.core/satisfies? being slow in Clojure
;; see https://ask.clojure.org/index.php/3304/make-satisfies-as-fast-as-a-protocol-method-call
#?(:bb nil
:cljd nil
:clj
(defn fast-satisfies?-fn
"Ported from https://github.com/clj-commons/manifold/blob/37658e91f836047a630586a909a2e22debfbbfc6/src/manifold/utils.clj#L77-L89"
[protocol-var]
(let [^java.util.concurrent.ConcurrentHashMap classes
(java.util.concurrent.ConcurrentHashMap.)]
(add-watch protocol-var ::memoization (fn [& _] (.clear classes)))
(fn [x]
(let [cls (class x)
val (.get classes cls)]
(if (nil? val)
(let [val (core/satisfies? @protocol-var x)]
(.put classes cls val)
val)
val))))))
#?(:cljs
(defn kvreducible? [coll]
(satisfies? IKVReduce coll))
:cljd
(defn kvreducible? [coll]
(satisfies? cljd.core/IKVReduce coll))
:clj
(let [satisfies-ikvreduce? #?(:bb #(satisfies? clojure.core.protocols/IKVReduce %)
:default (fast-satisfies?-fn #'clojure.core.protocols/IKVReduce))]
(if (satisfies-ikvreduce? (Object.))
(defn kvreducible?
"Clojure 1.11 makes everything satisfy IKVReduce, so we can short-circuit"
[_] true)
(defn kvreducible? [coll] (satisfies-ikvreduce? coll)))))
(extend-protocol KvRfable
#?(:cljd fallback :clj Object :cljs default) (some-kvrf [_] nil)
#?@(:clj [nil (some-kvrf [_] nil)]))
(defn ensure-kvrf [rf]
(or (some-kvrf rf)
(kvrf
([] (rf))
([acc] (rf acc))
([acc x] (rf acc x)))))
(defn reduce
"A transducer that reduces a collection to a 1-item collection consisting of only the reduced result.
Unlike reduce but like transduce it does call the completing arity (1) of the reducing fn."
([f]
(fn [rf]
(let [vacc (volatile! (f))]
(let [f (ensure-kvrf f)]
(kvrf
([] (rf))
([acc] (let-complete [f-acc vacc]
(rf (unreduced (rf acc (f (unreduced f-acc)))))))
([acc x]
(if (reduced? (vswap! vacc f x))
(reduced acc)
acc))
([acc k v]
(if (reduced? (vswap! vacc f k v))
(reduced acc)
acc)))))))
([f init]
(reduce (fn ([] init) ([acc] (f acc)) ([acc x] (f acc x))))))
(defn- into-rf [to]
(cond
#?(:cljd (satisfies? cljd.core/IEditableCollection to)
:clj (instance? clojure.lang.IEditableCollection to)
:cljs (satisfies? IEditableCollection to))
(if (map? to)
(kvrf
([] (transient to))
([acc] (persistent! acc))
([acc x] (conj! acc x))
([acc k v] (assoc! acc k v)))
(fn
([] (transient to))
([acc] (persistent! acc))
([acc x] (conj! acc x))))
(map? to)
(kvrf
([] to)
([acc] acc)
([acc x] (conj acc x))
([acc k v] (assoc acc k v)))
:else
(fn
([] to)
([acc] acc)
([acc x] (conj acc x)))))
(defn into
"Like clojure.core/into but with a 1-arg arity returning a transducer which accumulate every input in a collection and outputs only the accumulated collection."
([to]
(reduce (into-rf to)))
([to from]
(into to identity from))
([to xform from]
(let [rf (xform (into-rf to))]
(if-let [rf (and (map? from) (kvreducible? from) (some-kvrf rf))]
(rf (core/reduce-kv rf (rf) from))
(rf (core/reduce rf (rf) from))))))
(defn- without-rf [from]
(cond
#?(:cljd (satisfies? cljd.core/IEditableCollection from)
:clj (instance? clojure.lang.IEditableCollection from)
:cljs (satisfies? IEditableCollection from))
(if (map? from)
(fn
([] (transient from))
([acc] (persistent! acc))
([acc x] (dissoc! acc x)))
(fn
([] (transient from))
([acc] (persistent! acc))
([acc x] (disj! acc x))))
(map? from)
(fn
([] from)
([acc] acc)
([acc x] (dissoc acc x)))
:else
(fn
([] from)
([acc] acc)
([acc x] (disj acc x)))))
(defn without
"The opposite of x/into: dissociate or disjoin from the target."
([target]
(reduce (without-rf target)))
([target keys]
(without target identity keys))
([target xform keys]
(let [rf (xform (without-rf target))]
(if-let [rf (and (map? keys) (kvreducible? keys) (some-kvrf rf))]
(rf (core/reduce-kv rf (rf) keys))
(rf (core/reduce rf (rf) keys))))))
(defn minimum
([comparator]
(minimum comparator nil))
([comparator absolute-maximum]
(reduce (rf/minimum comparator absolute-maximum))))
(defn maximum
([comparator]
(maximum comparator nil))
([comparator absolute-minimum]
(reduce (rf/maximum comparator absolute-minimum))))
(def min (reduce rf/min))
(def max (reduce rf/max))
(defn str
"When used as a value, it's an aggregating transducer that concatenates input values
into a single output value.
When used as a function of two args (xform and coll) it's a transducing context that
concatenates all values in a string."
{:arglists '([xform coll])}
([rf] ((reduce rf/str) rf))
([xform coll]
(transduce xform rf/str coll)))
(defn wrap
"Transducer. Adds open as the first item, and close as the last. Optionally inserts delim between each input item."
([open close]
(fn [rf]
(let [vrf (volatile! nil)]
(vreset! vrf
(fn [acc x]
(let [acc (rf acc open)]
(vreset! vrf rf)
(if (reduced? acc)
acc
(rf acc x)))))
(fn
([] (rf))
([acc] (rf (unreduced (rf acc close))))
([acc x] (@vrf acc x))))))
([open close delim]
(comp (interpose delim) (wrap open close))))
(defn vals [rf]
(kvrf
([] (rf))
([acc] (rf acc))
([acc k v] (rf acc v))))
(defn keys [rf]
(kvrf
([] (rf))
([acc] (rf acc))
([acc k v] (rf acc k))))
;; for both map entries and vectors
(defn- key' [kv] (nth kv 0))
(defn- val' [kv] (nth kv 1))
(defn- nop-rf "The noop reducing function" ([acc] acc) ([acc _] acc) ([acc _ _] acc))
(defn- multiplexable
"Returns a multiplexable reducing function (doesn't init or complete the uderlying rf, wraps reduced -- like preserving-reduced)"
[rf]
(let [rf (ensure-kvrf rf)]
(kvrf
([])
([acc] acc) ; no init no complete rf
([acc x]
(let [acc (rf acc x)]
(if (reduced? acc)
(reduced acc)
acc)))
([acc k v]
(let [acc (rf acc k v)]
(if (reduced? acc)
(reduced acc)
acc))))))
(defn by-key
"Returns a transducer which partitions items according to kfn.
It applies the transform specified by xform to each partition.
Partitions contain the \"value part\" (as returned by vfn) of each item.
The resulting transformed items are wrapped back into a \"pair\" using the pair function.
Default values for kfn, vfn and pair are first, second (or identity if kfn is specified) and vector."
([xform] (by-key nil nil vector xform))
([kfn xform] (by-key kfn identity vector xform))
([kfn vfn xform] (by-key kfn vfn vector xform))
([kfn vfn pair xform]
(let [pair (if (identical? vector pair) ::default pair)]
(fn [rf]
(let [mrf (multiplexable rf)
make-rf (cond
(nil? pair) (constantly mrf)
(= ::default pair)
(fn [k] (fn ([acc] acc) ([acc v] (mrf acc k v))))
:else (fn [k] (fn ([acc] acc) ([acc v] (mrf acc (pair k v))))))
m (volatile! (transient {}))]
(if (and (nil? kfn) (nil? vfn))
(kvrf self
([] (rf))
([acc] (let-complete [m m] (rf (core/reduce (fn [acc krf] (krf acc)) acc (core/vals (persistent! m))))))
([acc k v]
(let [krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
acc (krf acc v)]
(if (reduced? acc)
(if (reduced? @acc)
(do
(vreset! m (transient {})) ; no need to run completions
@acc) ; downstream is done, propagate
(do
(vswap! m assoc! k nop-rf)
(krf @acc))) ; TODO think again
acc))))
(let [kfn (or kfn key')
vfn (or vfn val')]
(kvrf self
([] (rf))
([acc] (let-complete [m m] (rf (core/reduce (fn [acc krf] (krf acc)) acc (core/vals (persistent! m))))))
([acc x]
(let [k (kfn x)
krf (or (get @m k) (doto (xform (make-rf k)) (->> (vswap! m assoc! k))))
acc (krf acc (vfn x))]
(if (reduced? acc)
(if (reduced? @acc)
(do
(vreset! m (transient {})) ; no need to run completions
@acc) ; downstream is done, propagate
(do
(vswap! m assoc! k nop-rf)
(krf @acc)))
acc)))))))))))
(defn into-by-key
"A shorthand for the common case (comp (x/by-key ...) (x/into coll))."
[coll & by-key-args]
(comp (apply by-key by-key-args) (into coll)))
(macros/replace
[#?(:cljd {(java.util.ArrayDeque. n) (dart:coll/Queue)
.add .add
.poll .removeFirst
.size .-length})
#?(:cljs {(java.util.ArrayDeque. n) (Queue.)
.add .enqueue
.poll .dequeue
.size .getCount})
#?(:clj {(.getValues dq) dq})]
(defn partition
"Returns a partitioning transducer. Each partition is independently transformed using the xform transducer."
([n]
(partition n n (into [])))
([n step-or-xform]
(if (fn? step-or-xform)
(partition n n step-or-xform)
(partition n step-or-xform (into []))))
([#?(:cljd ^int n :default ^long n) step pad-or-xform]
(if (fn? pad-or-xform)
(let [xform pad-or-xform]
(fn [rf]
(let [mxrf (multiplexable rf)
dq (java.util.ArrayDeque. n)
barrier (volatile! n)
xform (comp (map #(if (identical? dq %) nil %)) xform)]
(fn
([] (rf))
([acc] (.clear dq) (rf acc))
([acc x]
(let [b (vswap! barrier dec)]
(when (< b n) (.add dq (if (nil? x) dq x)))
(if (zero? b)
; this transduce may return a reduced because of mxrf wrapping reduceds coming from rf
(let [acc (transduce xform mxrf acc (.getValues dq))]
(dotimes [_ (core/min n step)] (.poll dq))
(vswap! barrier + step)
acc)
acc)))))))
(partition n step pad-or-xform (into []))))
([#?(:cljd ^int n :default ^long n) step pad xform]
(fn [rf]
(let [mxrf (multiplexable rf)
dq (java.util.ArrayDeque. n)
barrier (volatile! n)
xform (comp (map #(if (identical? dq %) nil %)) xform)]
(fn
([] (rf))
([acc] (if (< @barrier n)
(let [xform (comp cat (take n) xform)
; don't use mxrf for completion: we want completion and don't want reduced-wrapping
acc (transduce xform rf acc [(.getValues dq) pad])]
(vreset! barrier n)
(.clear dq)
acc)
(rf acc)))
([acc x]
(let [b (vswap! barrier dec)]
(when (< b n) (.add dq (if (nil? x) dq x)))
(if (zero? b)
; this transduce may return a reduced because of mxrf wrapping reduceds coming from rf
(let [acc (core/transduce xform mxrf acc (.getValues dq))]
(dotimes [_ (core/min n step)] (.poll dq))
(vswap! barrier + step)
acc)
acc))))))))
#_(defn zip [xform1 xform2]
(fn [rf]
(let )))
(defn take-last [#?(:cljd ^int n :default ^long n)]
(fn [rf]
(let [dq (java.util.ArrayDeque. n)]
(fn
([] (rf))
([acc] (transduce (map #(if (identical? dq %) nil %)) rf acc (.getValues dq)))
([acc x]
(.add dq (if (nil? x) dq x))
(when (< n (.size dq)) (.poll dq))
acc)))))
(defn drop-last
([] (drop-last 1))
([#?(:cljd ^int n :default ^long n)]
(fn [rf]
(let [dq (java.util.ArrayDeque. n)
xform (map #(if (identical? dq %) nil %))
rf (xform rf)]
(fn
([] (rf))
([acc] (rf acc))
([acc x]
(.add dq (if (nil? x) dq x))
(if (< n (.size dq))
(rf acc (.poll dq))
acc)))))))
)
#?(:cljs
(defn ^:private fn->comparator
"Given a fn that might be boolean valued or a comparator,
return a fn that is a comparator.
Copied from cljs.core: https://github.com/clojure/clojurescript/blob/95c5cf384a128503b072b7b1916af1a1d5c8871c/src/main/cljs/cljs/core.cljs#L2459-L2471"
[f]
(if (= f compare)
compare
(fn [x y]
(let [r (f x y)]
(if (number? r)
r
(if r
-1
(if (f y x) 1 0))))))))
(defn sort
([] (sort compare))
([cmp]
(fn [rf]
(let [buf #?(:cljd #dart [] :clj (java.util.ArrayList.) :cljs #js [])]
(fn
([] (rf))
([acc] (rf (core/reduce rf acc (doto buf #?(:cljd (.sort (dart-comparator cmp))
:clj (java.util.Collections/sort cmp)
:cljs (.sort (fn->comparator cmp)))))))
([acc x] (#?(:cljd .add :clj .add :cljs .push) buf x) acc))))))
(defn sort-by
([kfn] (sort-by kfn compare))
([kfn cmp]
(sort (fn [a b]
#?(:cljd (cmp (kfn a) (kfn b))
:clj (.compare ^java.util.Comparator cmp (kfn a) (kfn b))
:cljs (cmp (kfn a) (kfn b)))))))
(defn reductions
"Transducer version of reductions. There's a difference in behavior when init is not provided: (f) is used.
So x/reductions works like x/reduce or transduce, not like reduce and reductions when no init and 1-item input."
([f] (reductions f (f)))
([f init]
(fn [rf]
(let [prev (volatile! nil)]
(vreset! prev prev) ; cheap sentinel to detect the first call, this is done to avoid having a 1-item delay
(fn
([] (rf)) ; no you can't emit init there since there's no guarantee that this arity is going to be called
([acc] (if (identical? @prev prev)
(rf (unreduced (rf acc init)))
(rf acc)))
([acc x]
(if (identical? @prev prev)
(let [acc (rf acc (vreset! prev init))]
(if (reduced? acc)
acc
(recur acc x)))
(let [curr (vswap! prev f x)]
(if (reduced? curr)
(ensure-reduced (rf acc @curr))
(rf acc curr))))))))))
(def avg (reduce rf/avg))
(def sd (reduce rf/sd))
(defn window
"Returns a transducer which computes an accumulator over the last n items
using two functions: f and its inverse invf.
The accumulator is initialized with (f).
It is updated to (f (invf acc out) in) where \"acc\" is the current value,
\"in\" the new item entering the window, \"out\" the item exiting the window.
The value passed to the dowstream reducing function is (f acc) enabling acc to be
mutable and 1-arity f to project its state to a value.
If you don't want to see the accumulator until the window is full then you need to
use (drop (dec n)) to remove them.
If you don't have an inverse function, consider using partition and reduce:
(x/partition 4 (x/reduce rf))"
[n f invf]
(fn [rf]
(let [ring (object-array n)
vi (volatile! (- n))
vwacc (volatile! (f))]
(fn
([] (rf))
([acc] (rf acc))
([acc x]
(let [i @vi
wacc @vwacc] ; window accumulator
(if (neg? i) ; not full yet
(do
(aset ring (+ n i) x)
(vreset! vi (inc i))
(rf acc (f (vreset! vwacc (f wacc x)))))
(let [x' (aget ring i)]
(aset ring i x)
(vreset! vi (let [i (inc i)] (if (= n i) 0 i)))
(rf acc (f (vreset! vwacc (f (invf wacc x') x))))))))))))
#?(:cljd nil
:clj
(defn iterator
"Iterator transducing context, returns an iterator on the transformed data.
Equivalent to (.iterator (eduction xform (iterator-seq src-iterator))) except there's is no buffering on values (as in iterator-seq),
This buffering may cause problems when mutable objects are returned by the src-iterator."
^java.util.Iterator [xform ^java.util.Iterator src-iterator]
(let [NULL (Object.)
dq (java.util.ArrayDeque. 32)
rf (xform (fn ([acc] acc) ([acc x] (.push dq (if (some? x) x NULL)) acc)))
vopen (volatile! true)
ensure-next #(or (some? (.peek dq))
(and @vopen
(if (.hasNext src-iterator)
(let [acc (rf nil (.next src-iterator))]
(when (reduced? acc)
(rf nil)
(vreset! vopen false))
(recur))
(do
(rf nil)
(vreset! vopen false)
(recur)))))]
(reify java.util.Iterator
(hasNext [_]
(ensure-next))
(next [_]
(if (ensure-next)
(let [x (.poll dq)]
(if (identical? NULL x) nil x))
(throw (java.util.NoSuchElementException.))))))))
#?(:cljd nil
:clj
(defn window-by-time
"ALPHA
Returns a transducer which computes a windowed accumulator over chronologically sorted items.
timef is a function from one item to its scaled timestamp (as a double). The window length is always 1.0
so timef must normalize timestamps. For example if timestamps are in seconds (and under the :ts key),
to get a 1-hour window you have to use (fn [x] (/ (:ts x) 3600.0)) as timef.
n is the integral number of steps by which the window slides. With a 1-hour window, 4 means that the window slides every 15 minutes.
f and invf work like in #'window."
([timef n f]
(window-by-time timef n
(fn
([] clojure.lang.PersistentQueue/EMPTY)
([q] (f (core/reduce f (f) q)))
([q x] (conj q x)))
(fn [q _] (pop q))))
([timef n f invf]
(let [timef (fn [x] (long (Math/floor (* n (timef x)))))]
(fn [rf]
(let [dq (java.util.ArrayDeque.)
vwacc (volatile! (f))
flush!
(fn [acc ^long from-ts ^long to-ts]
(loop [ts from-ts acc acc wacc @vwacc]
(let [x (.peekFirst dq)]
(cond
(= ts (timef x))
(do
(.pollFirst dq)
(recur ts acc (invf wacc x)))
(= ts to-ts)
(do
(vreset! vwacc wacc)
acc)
:else
(let [acc (rf acc (f wacc))]
(if (reduced? acc)
(do
(vreset! vwacc wacc)
acc)
(recur (inc ts) acc wacc)))))))]
(fn
([] (rf))
([acc]
(let [acc (if-not (.isEmpty dq)
(unreduced (rf acc (f @vwacc)))
acc)]
(rf acc)))
([acc x]
(let [limit (- (timef x) n)
prev-limit (if-some [prev-x (.peekLast dq)]
(- (timef prev-x) n)
limit)
_ (.addLast dq x) ; so dq is never empty for flush!
acc (flush! acc prev-limit limit)]
(when-not (reduced? acc)
(vswap! vwacc f x))
acc)))))))))
(defn count
"Count the number of items. Either used directly as a transducer or invoked with two args
as a transducing context."
([rf]
(let [n #?(:cljd (volatile! 0) :clj (java.util.concurrent.atomic.AtomicLong.) :cljs (volatile! 0))]
(fn
([] (rf))
([acc] (rf (unreduced (rf acc #?(:cljd @n :clj (.get n) :cljs @n)))))
([acc _] #?(:cljd (vswap! n inc) :clj (.incrementAndGet n) :cljs (vswap! n inc)) acc))))
([xform coll]
(transduce (comp xform count) rf/last coll)))
(defn multiplex
"Returns a transducer that runs several transducers (sepcified by xforms) in parallel.
If xforms is a map, values of the map are transducers and keys are used to tag each
transducer output:
=> (into [] (x/multiplex [(map inc) (map dec)]) (range 3))
[1 -1 2 0 3 1] ; no map, no tag
=> (into [] (x/multiplex {:up (map inc) :down (map dec)}) (range 3))
[[:up 1] [:down -1] [:up 2] [:down 0] [:up 3] [:down 1]]"
[xforms]
(fn [rf]
(let [mrf (multiplexable (ensure-kvrf rf))
rfs (volatile! (if (map? xforms)
(into {} (for [[k xform] %
:let [xform (comp xform (for [x %] [k x]))]]
[k (xform mrf)])
xforms)
(into #{} (map #(% mrf)) xforms)))
invoke-rfs (if (map? xforms)
(fn [acc invoke]
(reduce-kv
(fn [acc tag rf]
(let [acc (invoke rf acc)]
(if (reduced? acc)
(if (reduced? @acc)
(do
(vreset! rfs nil)
acc) ; downstream is done, propagate
(do (vswap! rfs dissoc tag) (rf @acc)))
acc)))
acc @rfs))
(fn [acc invoke]
(core/reduce
(fn [acc rf]
(let [acc (invoke rf acc)]
(if (reduced? acc)
(if (reduced? @acc)
(do
(vreset! rfs nil)
acc) ; downstream is done, propagate
(do (vswap! rfs disj rf) (rf @acc)))
acc)))
acc @rfs)))]
(kvrf
([] (rf))
([acc] (rf (invoke-rfs acc #(%1 %2))))
([acc x]
(let [acc (invoke-rfs acc #(%1 %2 x))]
(if (zero? (core/count @rfs))
(ensure-reduced acc)
acc)))
([acc k v]
(let [acc (invoke-rfs acc #(%1 %2 k v))]
(if (zero? (core/count @rfs))
(ensure-reduced acc)
acc)))))))
(def last (reduce rf/last))
(defn some
"Process coll through the specified xform and returns the first local true value."
[xform coll]
(transduce xform rf/some nil coll))
(defn transjuxt
"Performs several transductions over coll at once. xforms-map can be a map or a sequential collection.
When xforms-map is a map, returns a map with the same keyset as xforms-map.
When xforms-map is a sequential collection returns a vector of same length as xforms-map.
Returns a transducer when coll is omitted."
([xforms-map]
(let [collect-xform (if (map? xforms-map)
(into {})
(reduce (kvrf
([] (core/reduce (fn [v _] (conj! v nil))
(transient []) (range (core/count xforms-map))))
([v] (persistent! v))
([v i x] (assoc! v i x)))))
xforms-map (if (map? xforms-map) xforms-map (zipmap (range) xforms-map))]
(comp
(multiplex (into {} (by-key (map #(comp % (take 1)))) xforms-map))
collect-xform)))
([xforms-map coll]
(transduce (transjuxt xforms-map) rf/last coll)))
(macros/replace
[#?(:cljs {(java.util.concurrent.atomic.AtomicLong.) (atom 0)
(System/nanoTime) (system-time)
(.addAndGet at (- t (System/nanoTime))) (swap! at + (- t (system-time)))
(.addAndGet at (- (System/nanoTime) t)) (swap! at + (- (system-time) t))
.size .getCount})]
#?(:cljd nil
:default
(defn time
"Measures the time spent in this transformation and prints the measured time.
tag-or-f may be either a function of 1 argument (measured time in ms) in which case
this function will be called instead of printing, or tag-or-f will be print before the measured time."
([xform] (time "Elapsed time" xform))
([tag-or-f xform]
(let [pt (if (fn? tag-or-f)
tag-or-f
#(println (core/str tag-or-f ": " % " msecs")))]
(fn [rf]
(let [at (java.util.concurrent.atomic.AtomicLong.)
rf
(fn
([] (rf))
([acc] (let [t (System/nanoTime)
r (rf acc)]
(.addAndGet at (- t (System/nanoTime)))
r))
([acc x]
(let [t (System/nanoTime)
r (rf acc x)]
(.addAndGet at (- t (System/nanoTime)))
r)))
rf (xform rf)]
(fn
([] (rf))
([acc]
(let [t (System/nanoTime)
r (rf acc)
total (.addAndGet at (- (System/nanoTime) t))]
(pt #?(:clj (* total 1e-6) :cljs total))
r))
([acc x]
(let [t (System/nanoTime)
r (rf acc x)]
(.addAndGet at (- (System/nanoTime) t))
r))))))))))
#_(defn rollup
"Roll-up input data along the provided dimensions (which are functions of one input item),
Values of interest are extracted from items using the valfn function and are then summarized
by summary-fn (a reducing function over values returned by valfn or summaries).
Each level of rollup is a map with two keys: :summary and :details."
([dimensions valfn summary-fn]
(let [[dim & dims] (reverse dimensions)]
(core/reduce
(fn [xform dim]
(comp
(by-key dim xform)
(transjuxt
{:detail (into {})
:summary (comp vals (map :summary) (reduce summary-fn))})))
(comp (by-key dim (map valfn))
(transjuxt
{:detail (into {})
:summary (comp vals (reduce summary-fn))}))
dims)))
([dimensions valfn summary-fn coll]
(into {} (rollup dimensions valfn summary-fn) coll)))
)

View file

@ -1,206 +0,0 @@
(ns net.cgrand.xforms.io
(:require [clojure.java.io :as io]
[clojure.java.shell :as sh]
[clojure.edn :as edn])
(:import (java.io Reader BufferedReader IOException InputStream OutputStream BufferedWriter Writer PushbackReader InputStreamReader OutputStreamWriter Closeable)
(java.util Arrays List)
(java.util.concurrent ArrayBlockingQueue)
(java.lang ProcessBuilder$Redirect)
(clojure.lang IFn Fn IReduce)))
(defn keep-opts [m like]
(let [ns (namespace like)]
(into {}
(keep (fn [[k v]]
(when (= ns (or (namespace k) ns))
[(keyword (name k)) v])))
m)))
(defn lines-in
"Returns a reducible view over the provided input.
Input is read line by line. Coercion of the input is done by io/reader (and opts are passed to it).
Input is automatically closed upon completion or error."
[in & opts]
(let [no-init (Object.)]
(reify IReduce
(reduce [self f] (.reduce self f no-init))
(reduce [self f init]
(with-open [^Reader rdr (apply io/reader in opts)]
(let [^BufferedReader rdr (cond-> rdr (not (instance? BufferedReader rdr)) (BufferedReader.))
init (if (identical? init no-init)
(or (.readLine rdr) (f))
init)]
(loop [state init]
(if-some [line (.readLine rdr)]
(let [state (f state line)]
(if (reduced? state)
(unreduced state)
(recur state)))
state))))))))
(defn lines-out
"1-2 args: reducing function that writes values serialized to its accumulator (a java.io.BufferedWriter).
3+ args: transducing context that writes transformed values to the specified output. The output is
coerced to a BufferedWriter by passing out and opts to clojure.java.io/writer. The output is automatically closed.
Returns the writer."
([w] w)
([^BufferedWriter w line]
(doto w
(.write (str line))
(.newLine)))
([out xform coll & opts]
(with-open [^Writer w (apply io/writer out opts)]
(transduce xform lines-out w coll))))
(defn edn-in
"Returns a reducible view over the provided input.
Input is read form by form. Coercion of the input is done by io/reader.
Input is automatically closed upon completion or error.
Unqualified options are passed to both edn/read and io/writer, options qualified by clojure.java.io
are only passed (once dequalified) to io/writer, options qualified by clojure.edn are only passed to
edn/read"
[in & {:as opts}]
(let [no-init (Object.)]
(reify IReduce
(reduce [self f] (.reduce self f no-init))
(reduce [self f init]
(with-open [^Reader rdr (apply io/reader in (mapcat seq (keep-opts opts ::io/opts)))]
(let [^BufferedReader rdr (cond-> rdr (not (instance? PushbackReader rdr)) PushbackReader.)
opts (assoc (keep-opts opts ::edn/opts) :eof no-init)
init (if (identical? init no-init)
(let [form (edn/read opts rdr)]
(if (identical? no-init form)
(f)
form))
init)]
(loop [state init]
(let [form (edn/read opts rdr)]
(if (identical? no-init form)
state
(let [state (f state form)]
(if (reduced? state)
(unreduced state)
(recur state))))))))))))
(defn edn-out
"1-2 args: reducing function that writes values serialized as EDN to its accumulator (a java.io.Writer).
3+ args: transducing context that writes transformed values to the specified output. The output is
coerced to a Writer by passing out and opts to clojure.java.io/writer. The output is automatically closed.
Returns the writer."
([w] w)
([^Writer w x]
(binding [*out* w
*print-length* nil
*print-level* nil
*print-dup* false
*print-meta* false
*print-readably* true]
(prn x)
w))
([out xform coll & opts]
(with-open [^Writer w (apply io/writer out opts)]
(transduce xform edn-out w coll))))
(defn- stream-spec [x]
(into {:mode :lines :enc "UTF-8"}
(cond (map? x) x (string? x) {:enc x} (keyword? x) {:mode x})))
(defn sh
"Transducer or reducible view (in this case assumes empty stdin).
Spawns a process (program cmd with optional arguments arg1 ... argN) and pipes data through it.
Options may be:
* :env, an environment variables map, it will be merged with clojure.java.shell/*sh-env* and JVM environment (in decreasing precedence order),
* :dir, the current dir (defaults to clojure.java.shell/*sh-dir* or JVM current dir),
* :in and :out which are maps with keys :mode (:lines (default), :text or :bytes) and :enc (defaults to \"UTF-8\");
encoding applies only for modes :lines or :text; shorthands exist: a single keyword is equivalent to {:mode k :enc \"UTF-8\"},
a single string is equivalent to {:mode :lines, :enc s}.
In :bytes mode, values are bytes array.
In :lines mode, values are strings representing lines without line delimiters.
In :text mode, values are strings."
{:arglists '([cmd arg1 ... argN & opts])}
[& args]
(reify
IReduce
(reduce [self rf]
(reduce rf (eduction self nil))) ; quick way to handle no init
(reduce [self rf init]
(let [xf (self rf)]
(xf init)))
Fn
IFn
(invoke [_ rf]
(let [[cmd [& {:as opts :keys [env in out dir] :or {dir sh/*sh-dir*}}]] (split-with string? args)
env (into (or sh/*sh-env* {}) env)
env (into {} (for [[k v] env] [(name k) (str v)]))
proc (-> ^List (map str cmd) ProcessBuilder.
(.redirectError ProcessBuilder$Redirect/INHERIT)
(doto (-> .environment (.putAll env)))
(.directory (io/as-file dir))
.start)
EOS (Object.)
q (ArrayBlockingQueue. 16)
drain (fn [acc]
(loop [acc acc]
(if-some [x (.poll q)]
(let [acc (if (identical? EOS x) (reduced acc) (rf acc x))]
(if (reduced? acc)
(do
(.destroy proc)
acc)
(recur acc)))
acc)))
in (stream-spec in)
out (stream-spec out)
^Closeable stdin (cond-> (.getOutputStream proc) (#{:lines :text} (:mode in)) (-> (OutputStreamWriter. ^String (:enc in)) BufferedWriter.))
stdout (cond-> (.getInputStream proc) (#{:lines :text} (:mode out)) (-> (InputStreamReader. ^String (:enc out)) BufferedReader.))
write!
(case (:mode in)
:lines
(fn [x]
(doto ^BufferedWriter stdin
(.write (str x))
.newLine))
:text
(fn [x]
(.write ^BufferedWriter stdin (str x)))
:bytes
(fn [^bytes x]
(.write ^OutputStream stdin x)))]
(-> (case (:mode out)
:lines
#(loop []
(if-some [s (.readLine ^BufferedReader stdout)]
(do (.put q s) (recur))
(.put q EOS)))
:text
#(let [buf (char-array 1024)]
(loop []
(let [n (.read ^BufferedReader stdout buf)]
(if (neg? n)
(.put q EOS)
(do (.put q (String. buf 0 n)) (recur))))))
:bytes
#(let [buf (byte-array 1024)]
(loop []
(let [n (.read ^InputStream stdout buf)]
(if (neg? n)
(.put q EOS)
(do (.put q (Arrays/copyOf buf n)) (recur)))))))
Thread. .start)
(fn
([] (rf))
([acc]
(.close stdin)
(loop [acc acc]
(let [acc (drain acc)]
(if (reduced? acc)
(rf (unreduced acc))
(recur acc)))))
([acc x]
(let [acc (drain acc)]
(try
(when-not (reduced? acc)
(write! x))
acc
(catch IOException e
(ensure-reduced acc))))))))))

View file

@ -1,31 +0,0 @@
(ns net.cgrand.xforms.nodejs.stream)
(def ^:private Transform (.-Transform (js/require "stream")))
(defn transformer
"Returns a stream.Transform object that performs the specified transduction.
options is a js object as per stream.Transform -- however :readableObjectMode and :writableObjectMode are set to true by default."
([xform] (transformer #js {} xform))
([options xform]
(let [xrf (xform (fn
([transform] (doto transform .end))
([transform x]
(when-not (.push transform x)
(throw (js/Error. "Transformer's internal buffer is full, try passing a larger :highWaterMark option.")))
transform)))]
(specify! (Transform. (.assign js/Object #js {:readableObjectMode true
:writableObjectMode true} options))
Object
(_transform [this x _ cb]
(try
(when (reduced? (xrf this x))
(.push this nil))
(cb)
(catch :default err
(cb err))))
(_flush [this cb]
(try
(xrf this)
(cb)
(catch :default err
(cb err))))))))

View file

@ -1,178 +0,0 @@
(ns net.cgrand.xforms.rfs
{:author "Christophe Grand"}
(:refer-clojure :exclude [str last min max some])
#?(:cljs (:require-macros
[net.cgrand.macrovich :as macros]
[net.cgrand.xforms.rfs :refer [or-instance?]])
:clj (:require [net.cgrand.macrovich :as macros]))
(:require [#?(:clj clojure.core :cljs cljs.core) :as core])
#?(:cljd (:require ["dart:math" :as Math]))
#?(:cljs (:import [goog.string StringBuffer])))
(macros/deftime
(defmacro ^:private or-instance? [class x y]
(let [xsym (gensym 'x_)]
`(let [~xsym ~x]
(if #?(:cljd (dart/is? ~xsym ~class)
:default (instance? ~class ~xsym))
~(with-meta xsym {:tag class}) ~y)))))
(declare str!)
(macros/usetime
#? (:cljs
(defn ^:private cmp [f a b]
(let [r (f a b)]
(cond
(number? r) r
r -1
(f b a) 1
:else 0))))
(defn minimum
([#?(:cljd comparator :clj ^java.util.Comparator comparator :cljs comparator)]
(let [#?@(:cljd [comparator (dart-comparator comparator)] :default [])]
(fn
([] nil)
([x] x)
([a b] (cond
(nil? a) b
(nil? b) a
(pos? #?(:cljd (comparator a b)
:clj (.compare comparator a b)
:cljs (cmp comparator a b))) b
:else a)))))
([#?(:cljd comparator :clj ^java.util.Comparator comparator :cljs comparator) absolute-maximum]
(let [#?@(:cljd [comparator (dart-comparator comparator)] :default [])]
(fn
([] ::+infinity)
([x] (if (#?(:clj identical? :cljs keyword-identical?) ::+infinity x)
absolute-maximum
x))
([a b]
(if (or
(#?(:clj identical? :cljs keyword-identical?) ::+infinity a)
(pos? #?(:cljd (comparator a b)
:clj (.compare comparator a b)
:cljs (cmp comparator a b))))
b a))))))
(defn maximum
([#?(:cljd comparator :clj ^java.util.Comparator comparator :cljs comparator)]
(let [#?@(:cljd [comparator (dart-comparator comparator)] :default [])]
(fn
([] nil)
([x] x)
([a b] (cond
(nil? a) b
(nil? b) a
(neg? #?(:cljd (comparator a b)
:clj (.compare comparator a b)
:cljs (cmp comparator a b))) b
:else a)))))
([#?(:cljd comparator :clj ^java.util.Comparator comparator :cljs comparator) absolute-minimum]
(let [#?@(:cljd [comparator (dart-comparator comparator)] :default [])]
(fn
([] ::-infinity)
([x] (if (#?(:clj identical? :cljs keyword-identical?) ::-infinity x)
absolute-minimum
x))
([a b]
(if (or (#?(:clj identical? :cljs keyword-identical?) ::-infinity a)
(neg? #?(:cljd (comparator a b)
:clj (.compare comparator a b)
:cljs (cmp comparator a b))))
b a))))))
(def min (minimum compare))
(def max (maximum compare))
(defn avg
"Reducing fn to compute the arithmetic mean."
([] nil)
([#?(:cljd ^{:tag #/(List? double)} acc :clj ^doubles acc :cljs ^doubles acc)]
(when acc (/ (aget acc 1) (aget acc 0))))
([acc x] (avg acc x 1))
([#?(:cljd ^{:tag #/(List? double)} acc :clj ^doubles acc :cljs ^doubles acc) x w] ; weighted mean
(let [acc (or acc #?(:cljd (double-array 2) :clj (double-array 2) :cljs #js [0.0 0.0]))]
(doto acc
(aset 0 (+ (aget acc 0) w))
(aset 1 (+ (aget acc 1) (* w x)))))))
(defn sd
"Reducing fn to compute the standard deviation. Returns 0 if no or only one item."
([] #?(:cljd (double-array 3) :clj (double-array 3) :cljs #js [0.0 0.0 0.0]))
([#?(:cljd ^{:tag #/(List double)} a :default ^doubles a)]
(let [s (aget a 0) n (aget a 2)]
(if (< 1 n)
(Math/sqrt (/ s (dec n)))
0.0)))
([#?(:cljd ^{:tag #/(List double)} a :default ^doubles a) x]
(let [s (aget a 0) m (aget a 1) n (aget a 2)
d (- x m)
n (inc n)
m' (+ m (/ d n))]
(doto a
(aset 0 (+ s (* d (- x m'))))
(aset 1 m')
(aset 2 n)))))
(defn last
"Reducing function that returns the last value."
([] nil)
([x] x)
([_ x] x))
(defn some
"Reducing function that returns the first logical true value."
([] nil)
([x] x)
([_ x] (when x (reduced x))))
(defn str!
"Like xforms/str but returns a StringBuilder."
([] (#?(:cljd StringBuffer :clj StringBuilder. :cljs StringBuffer.)))
([sb] (or-instance? #?(:cljd StringBuffer :clj StringBuilder :cljs StringBuffer) sb
(#?(:cljd StringBuffer :clj StringBuilder. :cljs StringBuffer.) (core/str sb))))
; the instance? checks are for compatibility with str in case of seeded reduce/transduce.
([sb x] (doto (or-instance?
#?(:cljd StringBuffer :clj StringBuilder :cljs StringBuffer) sb
(#?(:cljd StringBuffer :clj StringBuilder. :cljs StringBuffer.) (core/str sb)))
(#?(:cljd .write :default .append) x))))
(def str
"Reducing function to build strings in linear time. Acts as replacement for clojure.core/str in a reduce/transduce call."
(completing str! core/str))
#_(defn juxt
"Returns a reducing fn which compute all rfns at once and whose final return
value is a vector of the final return values of each rfns."
[& rfns]
(let [rfns (mapv ensure-kvrf rfns)]
(kvrf
([] (mapv #(vector % (volatile! (%))) rfns))
([acc] (mapv (fn [[rf vacc]] (rf (unreduced @vacc))) acc))
([acc x]
(let [some-unreduced (core/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf x) true))
false acc)]
(if some-unreduced acc (reduced acc))))
([acc k v]
(let [some-unreduced (core/reduce (fn [some-unreduced [rf vacc]]
(when-not (reduced? @vacc) (vswap! vacc rf k v) true))
false acc)]
(if some-unreduced acc (reduced acc)))))))
#_(defn juxt-map
[& key-rfns]
(let [f (apply juxt (take-nth 2 (next key-rfns)))
keys (vec (take-nth 2 key-rfns))]
(let [f (ensure-kvrf f)]
(kvrf
([] (f))
([acc] (zipmap keys (f acc)))
([acc x] (f acc x))
([acc k v] (f acc k v))))))
)

View file

@ -1,166 +0,0 @@
(ns net.cgrand.xforms-test
(:refer-clojure :exclude [partition reductions])
(:require [clojure.test :refer [are is deftest testing]]
[net.cgrand.xforms :as x]))
(defn trial
"A transducing context for testing that transducers are well-behaved towards
linear use of the accumulator, init, completion and handling of reduced values.
A \"poisonous\" reducing function rf is passed to the transducer.
n is the number of calls to rf before it returns a reduced.
accs is a collection of successive return values for rf."
([xform n coll]
(trial xform n (repeatedly #(#?(:clj Object. :cljs js/Object.))) coll))
([xform n accs coll]
(let [vaccs (volatile! accs)
vstate (volatile! {:n n :acc (first @vaccs) :state :init})
check-acc (fn [acc]
(when (reduced? acc)
(throw (ex-info "Called with reduced accumulator" (assoc @vstate :actual-acc acc))))
(when-not (identical? acc (:acc @vstate))
(throw (ex-info "Called with an unexpected accumulator (either non-linear or out of thin air)" (assoc @vstate :actual-acc acc)))))
rf (fn
([]
(when-not (= :init (:state @vstate))
(throw (ex-info "Init arity called on a started or completed rf." @vstate)))
(:acc (vswap! vstate assoc :state :started)))
([acc]
(when (= :completed (:state @vstate))
(throw (ex-info "Completion arity called on an already completed rf." @vstate)))
(check-acc acc)
(:acc (vswap! vstate assoc :state :completed :acc (first (vswap! vaccs next)))))
([acc x]
(when (= :completed (:state @vstate))
(throw (ex-info "Step arity called on an already completed rf." @vstate)))
(when (= :reduced (:state @vstate))
(throw (ex-info "Step arity called on a reduced rf." @vstate)))
(check-acc acc)
(let [n (:n @vstate)]
(let [acc (first (vswap! vaccs next))]
(if (pos? n)
(:acc (vswap! vstate assoc :acc acc :n (dec n)))
(reduced (:acc (vswap! vstate assoc :acc acc :state :reduced))))))))
res (transduce xform rf coll)]
(check-acc res)
(when-not (= :completed (:state @vstate))
(throw (ex-info "Completion arity never called" @vstate)))
true)))
(deftest proper-rf-usage
(testing "Ensuring that reducing functions returned by transducers are well-behaved."
(is (trial (x/by-key odd? identity)
4 (range 16)))
(is (trial (x/by-key odd? identity nil identity)
4 (range 16)))
(is (trial (x/by-key odd? (take 2))
8 (range 16)))
(is (trial (x/by-key odd? identity)
8 (range 2)))
(is (trial (x/partition 3 identity)
4 (range 16)))
(is (trial (x/partition 3 (take 2))
8 (range 16)))
(is (trial (x/partition 3 (take 2))
8 (range 2)))
(is (trial (x/reductions conj [])
8 (range 2)))
(is (trial (x/reductions conj)
8 (range 2)))
(is (trial (x/into [])
4 (range 16)))
(is (trial (x/for [x % y (range x)] [x y])
4 (range 16)))
(is (trial (x/reduce +)
4 (range 16)))))
(deftest reductions
(is (= (into [] (x/reductions +) (range 10)) [0 0 1 3 6 10 15 21 28 36 45]))
(is (= (into [] (x/reductions +) (range 0)) [0]))
(is (= (into [] (x/reductions +) (range 1)) [0 0]))
(is (= (into [] (x/reductions +) (range 2)) [0 0 1]))
(is (= (into [] (comp (x/reductions +) (take 2)) (range)) [0 0]))
(is (= (into [] (comp (x/reductions +) (take 3)) (range)) [0 0 1]))
(is (= (into [] (comp (take 3) (x/reductions +)) (range)) [0 0 1 3]))
(is (= (into [] (x/reductions (constantly (reduced 42)) 0) (range)) [0 42])))
(deftest partition
(is (= (into [] (x/partition 2 1 nil (x/into [])) (range 8))
[[0 1] [1 2] [2 3] [3 4] [4 5] [5 6] [6 7] [7]]))
(is (= (into [] (x/partition 2 1 (x/into [])) (range 8))
[[0 1] [1 2] [2 3] [3 4] [4 5] [5 6] [6 7]]))
(is (= (into [] (comp (x/partition 2 2 nil) (x/into [])) (range 8))
[[[0 1] [2 3] [4 5] [6 7]]])))
(deftest without
(is (= {0 :ok 2 :ok 4 :ok 6 :ok 8 :ok} (x/without (zipmap (range 10) (repeat :ok)) (filter odd?) (range 20))))
(is (= #{0 2 4 6 8 } (x/without (set (range 10)) (filter odd?) (range 20)))))
#?(:bb nil ;; babashka doesn't currently support calling iterator on range type
:clj
(do
(deftest iterator
(is (true? (.hasNext (x/iterator x/count (.iterator ^java.lang.Iterable (range 5))))))
(is (is (= [5] (iterator-seq (x/iterator x/count (.iterator ^java.lang.Iterable (range 5)))))))
(is (= [[0 1] [1 2] [2 3] [3 4] [4]] (iterator-seq (x/iterator (x/partition 2 1 nil) (.iterator ^java.lang.Iterable (range 5)))))))
(deftest window-by-time
(is (= (into
[]
(x/window-by-time :ts 4
(fn
([] clojure.lang.PersistentQueue/EMPTY)
([q] (vec q))
([q x] (conj q x)))
(fn [q _] (pop q)))
(map (fn [x] {:ts x}) (concat (range 0 2 0.5) (range 3 5 0.25))))
[[{:ts 0}] ; t = 0
[{:ts 0}] ; t = 0.25
[{:ts 0} {:ts 0.5}] ; t = 0.5
[{:ts 0} {:ts 0.5}] ; t = 0.75
[{:ts 0.5} {:ts 1.0}] ; t = 1.0
[{:ts 0.5} {:ts 1.0}] ; t = 1.25
[{:ts 1.0} {:ts 1.5}] ; t = 1.5
[{:ts 1.0} {:ts 1.5}] ; t = 1.75
[{:ts 1.5}] ; t = 2.0
[{:ts 1.5}] ; t = 2.25
[] ; t = 2.5
[] ; t = 2.75
[{:ts 3}] ; t = 3.0
[{:ts 3} {:ts 3.25}] ; t = 3.25
[{:ts 3} {:ts 3.25} {:ts 3.5}] ; t = 3.5
[{:ts 3} {:ts 3.25} {:ts 3.5} {:ts 3.75}] ; t = 3.75
[{:ts 3.25} {:ts 3.5} {:ts 3.75} {:ts 4.0}] ; t = 4.0
[{:ts 3.5} {:ts 3.75} {:ts 4.0} {:ts 4.25}] ; t = 4.25
[{:ts 3.75} {:ts 4.0} {:ts 4.25} {:ts 4.5}] ; t = 4.5
[{:ts 4.0} {:ts 4.25} {:ts 4.5} {:ts 4.75}]]))))) ; t = 4.75
(deftest do-not-kvreduce-vectors
(is (= {0 nil 1 nil} (x/into {} (x/for [[k v] %] [k v]) [[0] [1]])))
(is (= {0 nil 1 nil} (x/into {} (x/for [_ % [k v] [[0] [1]]] [k v]) ["a"]))))
(deftest sorting
(is (= (range 100) (x/into [] (x/sort) (shuffle (range 100)))))
(is (= (range 100) (x/into [] (x/sort <) (shuffle (range 100)))))
(is (= (reverse (range 100)) (x/into [] (x/sort >) (shuffle (range 100)))))
(is (= (sort-by str (range 100)) (x/into [] (x/sort-by str) (shuffle (range 100)))))
(is (= (sort-by str (comp - compare) (range 100)) (x/into [] (x/sort-by str (comp - compare)) (shuffle (range 100)))))
(is (= (sort-by identity > (shuffle (range 100))) (x/into [] (x/sort-by identity >) (shuffle (range 100))))))
(deftest destructuring-pair?
(let [destructuring-pair? #'x/destructuring-pair?]
(are [candidate expected]
(= expected (destructuring-pair? candidate))
'[a b] true
'[a b c] false
'[& foo] false
'[:as foo] false
1 false
'(a b) false
'{foo bar} false
'{foo :bar} false)))
(defmacro wraps-for-with-no-destructuring []
(x/into [] (x/for [x (range 5)] x)))
(deftest for-in-macro
(is (= [0 1 2 3 4] (wraps-for-with-no-destructuring))))

View file

@ -1 +0,0 @@
#kaocha/v1 {}