'How to batch process values on a channel

I'm trying to figure out how to batch incoming requests, do an action with the values in those requests, and then return the result of that action to each request. A slightly simplified version of my problem looks like the following:

Incoming requests make calls to

(defn process 
  [values] 
  ;; put values on the queue and wait for result, then return the result
  ...)

Periodically, another function is called

(defn batch-process
  []
  ;; take up to 10 of the values from the queue, sum those values, 
  ;; then return the result to their process requests
  ...)

I think I am lacking the vocabulary to figure out how I should be doing this. Any advice or pointers would be appreciated!



Solution 1:[1]

I think I figured it out. The key was passing the out-channels into the batch-process call

(defn batch-process
  []
  (let [trigger (chan)
        in-chan (chan 100)]
    (go (loop []
          (let [trigger-val (<! trigger)]
            (if trigger-val
              (let [temp-chan (take (min 10 (.count (.buf in-chan))) in-chan)
                    chan-vals (<! (into [] temp-chan))
                    sum-vals (reduce (fn [cur-sum [num out-chan]] (+ cur-sum num))
                                     0
                                     chan-vals)]
                (do (doseq [[num out-chan] chan-vals]
                      (>! out-chan [num sum-vals]))
                    (recur)))))))
    [trigger in-chan]))

(defn process 
  [value in-chan] 
    (let [out-chan (chan)]
      (>!! in-chan [2 out-chan])
      (<!! out-chan)))

Then keep track of trigger and in-chan after calling batch-process and pass in-chan to process. Putting a "true" value on trigger will trigger a batch-process.

Solution 2:[2]

i would propose different approach, simply accumulating data and flush on desired count achieved, providing one more channel to force flush:

(require '[clojure.core.async :as a])

(defn batch-consume [n in]
  (let [flush-chan (a/chan)
        out-chan (a/chan)]
    (a/go-loop [data []]
      (a/alt! in ([v] (let [data (conj data v)]
                        (if (= n (count data))
                          (do (a/>! out-chan data)
                              (recur []))
                          (recur data))))
              flush-chan (do (a/>! out-chan data)
                             (recur []))))
    {:out out-chan
     :flush flush-chan}))

so that could be used somehow like this:

(let [ch (a/chan)
      {:keys [out flush]} (batch-consume 3 ch)]
  (a/go-loop []
    (let [data (a/<! out)]
      ;; processing batch
      (println data (apply + data)))
    (recur))
  (a/go (dotimes [i 10] ;; automatic flush demo
          (a/>! ch i))
        (a/>! flush :flush) ;; flushing the pending 10th item    
        (dotimes [i 3]      ;; force flushing by 2 items
          (dotimes [j 2]
            (a/>! ch (+ (* 10 i) j)))
          (a/>! flush :flush))))

output:

 ;; [0 1 2] 3
 ;; [3 4 5] 12
 ;; [6 7 8] 21
 ;; [9] 9
 ;; [0 1] 1
 ;; [10 11] 21
 ;; [20 21] 41

notice, that if you pass non positive n to the batch-consume function, you're left with only a force flush (which could also be usable in some cases):

(let [ch (a/chan)
      {:keys [out flush]} (batch-consume -1 ch)]
  (a/go-loop []
    (let [data (a/<! out)]
      (println data (apply + data)))
    (recur))
  (a/go (dotimes [i 10]
          (a/>! ch i))
        (a/>! flush :flush)        
        (dotimes [i 3]
          (dotimes [j 2]
            (a/>! ch (+ (* 10 i) j)))
          (a/>! flush :flush))))

;; [0 1 2 3 4 5 6 7 8 9] 45
;; [0 1] 1
;; [10 11] 21
;; [20 21] 41

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 DazedAndConfused
Solution 2 leetwinski