'Implement 'print' connector with sliding window sink
I'm trying to combine the SlidingWindows/sliding-windows.py example, with the GettingStarted/getting-started.py. Specifically, I've created my own variant of sliding_window.py, with an associated datagen/stock.py. However, when I run my variation (first kickoff the datagen/stock.py followed by running sliding_window.py), nothing happens in the Pycharm output, it just hangs:
/Users/jeff1evesque/opt/miniconda3/envs/kinesis_analytics/bin/python3.8 /Users/jeff1evesque/application/kinesis-analytics/flink/sliding_window.py
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/Users/jeff1evesque/opt/miniconda3/envs/kinesis_analytics/lib/python3.8/site-packages/pyflink/lib/flink-dist_2.11-1.13.2.jar) to field java.util.Collections$SingletonList.serialVersionUID
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
While nothing is printing to the stdout, the Kinesis Stream metrics indicate that during the same time span, both data is being ingested, and consumed from the stream. However, if I comment out the first sliding_window_table assignment, and uncomment the second assignment:
#sliding_window_table = (
# input_table.window(
# Slide.over(sliding_window_over)
# .every(sliding_window_every)
# .on(sliding_window_on)
# .alias(sliding_window_alias)
# )
# .group_by('ticker, {}'.format(sliding_window_alias))
# .select('ticker, price.min as p, {}.end as t'.format(
# sliding_window_alias,
# sliding_window_on
# ))
#)
sliding_window_table = input_table.select('ticker, price, utc')
When sliding-windows.py is restarted, the PyCharm console captures expected output:
4> +I[AMZN, 14.93, 2022-05-15T01:24:17.779138]
4> +I[TSLA, 99.03, 2022-05-15T01:24:17.794715]
4> +I[TSLA, 71.55, 2022-05-15T01:24:17.812022]
4> +I[AMZN, 49.15, 2022-05-15T01:24:17.826044]
4> +I[AMZN, 4.66, 2022-05-15T01:24:17.843096]
4> +I[AAPL, 13.64, 2022-05-15T01:24:17.858505]
[...REMAINING-TRACE-OMITTED...]
However, my desire is to combine both the AWS provided sliding window example with the print connector example. Once I can get that baseline setup, I plan on deploying pyflink to AWS Kinesis Data Analytics. From there, I intend to join pyflink streaming data with potentially s3 data using Apache Iceberg. However, I may try to utilize Apache iceberg locally using the above PyCharm pyflink setup, before promoting the deployment package to Kinesis Analytics.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
