'How to use group_by_value to create batches in benthos
I have been trying to understand how to batch things in benthos but am a bit confused on how to do it..
Take this example:
input:
generate:
interval: ""
count: 40
mapping: |
root = count("test")
pipeline:
processors:
- log:
level: INFO
message: 'Test! ${! (this) } ${! (this % 2 == 0) } ${! batch_size() }'
- group_by_value:
value: ${! (this % 2 == 0) }
- archive:
format: tar
- compress:
algorithm: gzip
output:
file:
path: test/${! (this % 2 == 0) }.tar.gz
codec: all-bytes
My expectation with this would be 2 files in test/.. one called "true.tar" and another called "false.tar", with 20 elements each, (odd and even numbers). What I get instead is a single file with the last message. I understand from the logs that it is not actually batching these based on that condition
I thought group_by_value would kind of create "two streams/batches" of messages that would get separately handled in the output/archive, but it looks like it doesn't behave like that
Could you please help me understand how it works?
additionally, I was also going to limit the size of each of these streams to a certain number, so each would get their number of entries in the TAR limited
Thanks!!
EDIT 1
This is something which works more like expected, but this was I have to "know" how many items I want to batch before actually being able to filter them.. I wonder if I can't just "accumulate" things based on this group_by_value condition and batch them based on a count later?
input:
broker:
inputs:
- generate:
interval: ""
count: 40
mapping: |
root = count("test")
batching:
count: 40
pipeline:
processors:
- group_by_value:
value: ${! (this % 2 == 0) }
- log:
level: INFO
message: 'Test! ${! (this) } ${! (this % 2 == 0) } ${! batch_size() }'
- bloblang: |
meta name = (this) % 2 == 0
- archive:
format: tar
path: ${! (this) }
output:
file:
path: test/${! (meta("name")) }.tar
codec: all-bytes
Solution 1:[1]
As you already noticed, group_by_value operates on message batches, which is why your first example produces a single file as output. In fact, it produces a file for each message, but since the file name is identical, each new file ends up overwriting the previous one.
From your edit, I'm not sure I get what you're trying to achieve. The batch policy documentation explains that byte_size, count and period are the available conditions for composing batches. When either of those is met, a batch is flushed, so you don't necessarily have to rely on a specific count. For convenience, the batching policy also has a processors field, which allows you to define an optional list of processors to apply to each batch before it is flushed.
The windowed processing documentation might also be of interest, since it explains how the system_window buffer can be used to chop a stream of messages into tumbling or sliding windows of fixed temporal size. It has a section on grouping here.
Update 22.02.2022: Here's an example of how to perform output batching based on some key, as requested in the comments:
input:
generate:
interval: "500ms"
count: 9
mapping: |
root.key = if count("key_counter") % 3 == 0 {
"foo"
} else {
"bar"
}
root.content = uuid_v4()
pipeline:
processors:
- bloblang: |
root = this
# 3 is the number of messages you'd like to have in the "foo" batch.
root.foo_key_end = this.key == "foo" && count("foo_key_counter") % 3 == 0
output:
broker:
outputs:
- stdout: {}
processors:
- group_by_value:
value: ${! json("key") }
- bloblang: |
root = this
root.foo_key_end = deleted()
root.batch_size = batch_size()
root.batch_index = batch_index()
batching:
# Something big so, unless something bad happens, you should see enough
# messages with key = "foo" before reaching this number
count: 1000
check: this.foo_key_end
Sample output:
> benthos --log.level error -c config_group_by.yaml
{"batch_index":0,"batch_size":3,"content":"84e51d8b-a4e0-42c8-8cbb-13a8b7b37823","key":"foo"}
{"batch_index":1,"batch_size":3,"content":"1b35ff8b-7121-426e-8447-11e834610b90","key":"foo"}
{"batch_index":2,"batch_size":3,"content":"a9d9c661-1068-447f-9324-c418b0d7de9d","key":"foo"}
{"batch_index":0,"batch_size":6,"content":"5c9d26aa-f1dc-46ae-9845-3b035c1e569e","key":"bar"}
{"batch_index":1,"batch_size":6,"content":"17bbc7c1-94ec-4c9e-b0c5-b9c11f18498f","key":"bar"}
{"batch_index":2,"batch_size":6,"content":"7d7b9621-e174-4ca2-8a2e-1679e8177335","key":"bar"}
{"batch_index":3,"batch_size":6,"content":"db24273f-7064-498e-9914-9dd4c671dcd7","key":"bar"}
{"batch_index":4,"batch_size":6,"content":"4cfbea0e-dcc4-4d84-a87f-6930dd797737","key":"bar"}
{"batch_index":5,"batch_size":6,"content":"d6cb4726-4796-444d-91df-a5c278860106","key":"bar"}
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 |
