'i have a kafka datastream and wanted to do some calculations over the data using pyflink how can i do that?

i wanted to calculate the average power[x] = ch[x]*((60/5)*100/500)watts and Energy Consumed[x] = ch[x]/500kwh

ch1-ch6 are the energy measurment values i will attach an image of what the data from kafka looks like,which will be on the terminal. Click to see the data(image)



Solution 1:[1]

I assume the data from Kafka looks something like this:

{"gw_id": "sn001", "t": 1650270347, "ch1": 38, "ch2": 0, "ch3": 1, "ch4": 2, "ch5": 0, "ch6": 50}

I also assume that you are wanting to perform stateless computations on each of the ch[x] values, such as:

p[x] = ch[x]*((60/5)*100/500)
e[x] = ch[x]/500

By stateless, I mean that the output of the computation does not require previous values of ch[x].

There are 2 ways you can use Flink for your use case: use the DataStream API or the Table/SQL API. The PyFlink docs also describe how to use these APIs in a Python environment.

The SQL approach is simpler - if you really need very custom processing or to process data in a non-relational way then you can consider using the DataStream API, but I'm only going to consider the SQL approach here.

NOTE: I have not attempted to run the below code, so there may well be syntax errors.

The first step is to define an input table using a connector that is appropriate for your data source, i.e. your Kafka stream. Make sure you use the Kafka table connector, not the DataStream one.

def create_input():
    return """
        CREATE TABLE input (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka',
          'format' = 'json',
          <see docs for other props>
        )
    """

Now we define a function that will perform processing, here's an example that just returns the input and where we put that function in a separate module to make it easier to test:

# within module mymodule
@udf(result_type=DataTypes.BIGINT())
def my_func(i):
    return i



def create_some_function():
    return """
    CREATE TEMPORARY FUNCTION my_func AS 'mymodule.my_func' LANGUAGE PYTHON
    """

Now create your output table, for which we'll just use the print connector:

def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'print'
        )
    """

Now we can define out environment and then join the input and output tables:

env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

table_env.execute_sql(create_input())
table_env.execute_sql(create_some_function())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, my_func(ch1), ch2, my_func(ch2), ch3, my_func(ch3), ch4, my_func(ch4), ch5, my_func(ch5), ch6, my_func(ch6) FROM input").wait()

Functions can be defined to perform arbitrary processing of data, including accepting multiple parameters. You can use Flink's windowing capabilities to perform stateful computation, such as averaging a series of values over a time window, see [here] under queries for more details (https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/overview/)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 John