'Creating stream from api in Apache Flink
Firstly I describe what I want to do. I have an API that gets a function as a argument (looks like this:dataFromApi => {//do sth}) and I would like to process this data by Flink. I wrote this code to simulate this API:
val myIterator = new TestIterator
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val th1 = new Thread {
override def run(): Unit = {
for (i <- 0 to 10) {
Thread sleep 1000
myIterator.addToQueue("test" + i)
}
}
}
th1.start()
val texts: DataStream[String] = env
.fromCollection(new TestIterator)
texts.print()
This is my iterator:
class TestIterator extends Iterator[String] with Serializable {
private val q: BlockingQueue[String] = new LinkedBlockingQueue[String]
def addToQueue(s: String): Unit = {
println("Put")
q.put(s)
}
override def hasNext: Boolean = true
override def next(): String = {
println("Wait for queue")
q.take()
}
}
My idea was execute myIterator.addToQueue(dataFromApi) when I receive data, but this code doesn't work. Despiting adding to the queue, execution blocks on q.take(). I tried to write own SourceFunction based on idea with Queue and also I tried with this: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/ but I can't manage I want.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
