'Filtering another stream based on item in current stream using rxpy
I want to find a match in another stream and combine it with current item.
numbers = [1, 2, 3, 4, 5]
numbers_in_char = ["2", "1", "3", "5", "4"]
textnumbers_in_stream = rx.defer(rx.from_iterable(numbers_in_char))
def exists_in_words(number, words):
return words.pipe(
op.filter(lambda w: int(w) == number),
op.map(lambda w: (number, w))
)
rx \
.from_iterable(numbers) \
.pipe(op.map(lambda number: exists_in_words(number, textnumbers_in_stream))) \
.subscribe(lambda row: print(row))
I expect to have these printed:
(1,"1")
(2,"2")
(3,"3")
...
But I have:
None
None
None
...
Someone can give me an idea what i have done wrong?
many thanks in advance
Solution 1:[1]
You are missing a return statement here:
def exists_in_words(number, words):
return words.pipe(
op.filter(lambda w: int(w) == number),
op.map(lambda w: (number, w))
)
If you change this to print the tuple:
rx.from_iterable(numbers) \
.pipe(op.map(lambda number: exists_in_words(number, textnumbers_in_stream))) \
.subscribe(lambda row: print(row[0], row[1]))
The output is:
<rx.core.observable.observable.Observable object at 0x7f559b1df898> <rx.core.observable.observable.Observable object at 0x7f559b1dfba8>
<rx.core.observable.observable.Observable object at 0x7f559b1dfba8> <rx.core.observable.observable.Observable object at 0x7f559b1df198>
<rx.core.observable.observable.Observable object at 0x7f559b1df198> <rx.core.observable.observable.Observable object at 0x7f559b1df240>
<rx.core.observable.observable.Observable object at 0x7f559b1df240> <rx.core.observable.observable.Observable object at 0x7f559b1df898>
<rx.core.observable.observable.Observable object at 0x7f559b1df898> <rx.core.observable.observable.Observable object at 0x7f559b1dfba8>
To solve that string representation take a look at this issue .
Solution 2:[2]
from rx import operators as op
import rx
def exists_in_words(number, words):
return words.pipe(
op.filter(lambda w: int(w) == number),
op.map(lambda w: (number, w))
)
def my_on_next(item):
print(f"####### {item}", flush=True)
def my_on_error(throwable):
print(throwable)
def my_on_completed():
print('Done')
numbers = [1, 2, 3, 4, 5]
numbers_in_char = ["1", "2", "3", "4", "5"]
text_as_numbers_in_stream = rx.from_iterable(numbers_in_char)
def print_tuple(x):
print(f"{x[0]}:{x[1]}", flush=True)
return x
rx \
.from_iterable(numbers) \
.pipe(op.flat_map(lambda number: exists_in_words(number, text_as_numbers_in_stream))) \
.subscribe(lambda x: my_on_next(x), my_on_error, my_on_completed)
I find it..
I change defer to .from_iterable() for the second stream..
I filter an item from first stream based on second stream in by simply reverse the process :
def exists_in_words(number, words):
return words.pipe(
op.filter(lambda w: int(w) == number),
op.map(lambda w: (number, w))
)
later flatMap() filter result which is an Observable. I do it by feeling, done it many times in java reactor. Basically to remove its encloser (Observer object) and get its content...
and tadaaaaaa.. it works.
####### (1, '1')
####### (2, '2')
####### (3, '3')
####### (4, '4')
####### (5, '5')
Done
Solution 3:[3]
If you want to combine two observable element wise, you can use the zip operator:
numbers = [1, 2, 3, 4, 5]
numbers_in_char = ["1", "2", "3", "4", "5"]
d = rx.from_iterable(numbers).pipe(
ops.zip(rx.from_iterable(numbers_in_char)),
).subscribe(lambda row: print(row))
This returns the expected result:
(1, '1')
(2, '2')
(3, '3')
(4, '4')
(5, '5')
Note that the current release of RxPY (3.2) has a bug that prevents this example to work. You can test this with 3.1.1, or hopefully with the upcoming release.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | slawalata |
| Solution 3 | MainRo |
