'Creating stream from api in Apache Flink

Firstly I describe what I want to do. I have an API that gets a function as a argument (looks like this:dataFromApi => {//do sth}) and I would like to process this data by Flink. I wrote this code to simulate this API:

val myIterator = new TestIterator

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val th1 = new Thread {
  override def run(): Unit = {
    for (i <- 0 to 10) {
      Thread sleep 1000
      myIterator.addToQueue("test" + i)
    }
  }
}
th1.start()

val texts: DataStream[String] = env
  .fromCollection(new TestIterator)

texts.print()

This is my iterator:

class TestIterator extends Iterator[String] with Serializable {
  private val q: BlockingQueue[String] = new LinkedBlockingQueue[String]

  def addToQueue(s: String): Unit = {
    println("Put")
    q.put(s)
  }

  override def hasNext: Boolean = true

  override def next(): String = {
    println("Wait for queue")
    q.take()
  }
}

My idea was execute myIterator.addToQueue(dataFromApi) when I receive data, but this code doesn't work. Despiting adding to the queue, execution blocks on q.take(). I tried to write own SourceFunction based on idea with Queue and also I tried with this: https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/ but I can't manage I want.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source