'How to use Groovy and Spock to test Apach Flink Job?
I have a Flink job that reads data from Kafka into a table which is emitted into a DataStream on which I apply a filter function and then convert the data stream back to a table which writes data back to Kafka.
I want to test the functionality of the filter function. I am writing unit tests in Groovy using Spock (framework). In my unit test I am calling the Flink job with the Table SQL string with details about the Kafka topic, however, I am confused on how to load the right StreamExecution and TableEnvironment because when I create a new object of my Flink class, those values are null and I don't have getters/setters to set everything up because that would make the code really messy.
The following is my logic. My question is can I write Apache Flink APIs as seamlessly in Groovy or there are many layers/pitfalls and how can I better approach these tests:
class DataStreamTests extends Specification {
@Autowired
ApplicationConfiguration configuration;
FlinkStreaming streaming = new FlinkStreaming();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
.build();
final StreamTableEnvironment tableEnvironment = TableEnvironment.create(settings);
def resourcePath = "cars/porche.txt"
StreamSpec streamSpec = ConfigurationParser.createStreamSpec(getClass().getResource(resourcePath).text)
def "create a new input stream from input table sql"() {
given:
DataStream<String> streamRecords = env.readTextFile("/streaming_signals/pedals.txt")
streaming.setConfiguration(configuration)
streaming.setTableEnvironment(tableEnvironment);
when:
String tableSpec = streaming.createTableSpec(streamSpec);
DataStream<Row> rawStream = streaming.getFilteredStream(streamSpec, tableSpec)
DataStream<String> comapareStreams = rawStream.map(new MapFunction<Row, String>() {
@Override
public String map(Row record) {
// Logic to compare stream received with test stream
}
});
then:
// Comparison Logic
}
}
org.apache.flink.table.api.ValidationException: Could not find any factories that implement 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:385)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:295)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:266)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:95)
at com.streaming.DataStreamTests.$spock_initializeFields(DataStreamTests.groovy:38
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
