'Correctly implementing wait and notify in Kotlin
According to this document, using wait
and notify
is discouraged in Kotlin: https://kotlinlang.org/docs/reference/java-interop.html
wait()/notify()
Effective Java Item 69 kindly suggests to prefer concurrency utilities to wait() and notify(). Thus, these methods are not available on references of type Any.
However the document does not propose any correct way of doing it.
Basically, I would like to implement a service, which would read the input data and process them. If there were no input data, it would suspend itself until someone notifies that there are new input data. Something like
while (true) {
val data = fetchData()
processData(data)
if (data.isEmpty()) {
wait()
}
}
EDIT:
I don't want to use these not recommended methods (antipatterns), I really want to find out how to do this properly.
In my case fetchData
reads data from the database, so queues in my case cannot be used.
Solution 1:[1]
In general you should use higher-level concurrency utilities when possible.
However, if none of the higher-level constructs work in your case, the direct
replacement is to use a
ReentrantLock
and a single
Condition
on that lock.
For example, if your Java code was something like:
private Object lock = new Object();
...
synchronized(lock) {
...
lock.wait();
...
lock.notify();
...
lock.notifyAll();
...
}
You can change it to the following Kotlin:
private val lock = ReentrantLock()
private val condition = lock.newCondition()
lock.withLock { // like synchronized(lock)
...
condition.await() // like wait()
...
condition.signal() // like notify()
...
condition.signalAll() // like notifyAll()
...
}
While this is slightly more verbose, conditions do provide some extra
flexibility, as you can have multiple conditions on a single lock, and there
are also other kinds of locks (notably ReentrantReadWriteLock.ReadLock
and
ReentrantReadWriteLock.WriteLock
).
Note that withLock
is a Kotlin-provided extension function that takes care of calling Lock.lock()
/Lock.unlock()
before/after invoking the supplied lambda.
Solution 2:[2]
A BlockingQueue
can be a suitable high-level concurrency utility for your use case, but applying it requires knowing and modifying your code structure.
The idea is, fetchData()
should .take()
an item from the queue, and if the queue is empty, that will block the execution until an item appears, which eliminates the .wait()
in your code. The producer of the data should .put(t)
the data into the queue.
If you really need to use wait
and notify
, e.g. for implementing a concurrency utility at low-level, you can cast a Kotlin object to java.lang.Object
and call these functions afterwards, as said in the language reference. Or, written as extension functions:
@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
private fun Any.wait() = (this as java.lang.Object).wait()
Solution 3:[3]
I'll post the unit tests to GitHub when I can. Java style wait, wait(timeout), notify using coroutines.
package com.spillikinaerospace.android.core
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
/**
* A Kotlin coroutines friendly method of implementing Java wait/notify.
*
* Exist safely on mis-matched calls. More than one wait is ignored as
* is more than one notify if no wait was called. Only works with 2 threads. No notifyall.
*
* See Locks
* https://proandroiddev.com/synchronization-and-thread-safety-techniques-in-java-and-kotlin-f63506370e6d
*/
class SAWaitNotify {
private val channel: Channel<Unit> = Channel<Unit>(0)
// re-enrty block.
private var waitingForNotify = false
/**
* Wait until Notify is called.
*/
public fun saWait() {
synchronized(this) {
if (waitingForNotify) {
return
}
waitingForNotify = true
}
// block forever until notify
runBlocking {
channel.receive()
}
synchronized(this) {
waitingForNotify = false
}
}
/**
* Wait until Notify is called or the given number of seconds have passed.
*
* @param seconds - Max delay before unlock.
*/
public fun saWait(seconds: Long) {
synchronized(this) {
if (waitingForNotify) {
return
}
waitingForNotify = true
}
// block until timeout or notify
runBlocking {
// Setup our delty before calling offer.
val job = launch(Dispatchers.IO)
{
delay((seconds * 1000))
channel.offer(Unit)
}
// This will continue when someone calls offer.
channel.receive()
// Somebody called offer. Maybe the timer, maybe saNotify().
// Channel has released but the timer job may still be running.
// It's ok to call this even if the job has finished.
job.cancel()
}
synchronized(this) {
waitingForNotify = false
}
}
/**
* Release wait()
*/
public fun saNotify() {
synchronized(this) {
if (waitingForNotify == false) {
return
}
waitingForNotify = false
}
channel.offer(Unit)
}
}
Solution 4:[4]
You could use the kotlin Semaphore to suspend the coroutine instead of blocking a thread:
val semaphore = Semaphore(1, 0)
suspend fun runFetchLoop() = withContext(Dispatchers.Default){
while (isActive) {
val data = fetchData()
processData(data)
semaphore.acquire()
if (data.isEmpty()) {
semaphore.acquire()
}
semaphore.release()
}
}
fun notifyDataAvailable() = semaphore.release()
Here is a running example: https://pl.kotl.in/bnKeOspfs .
However I would prefer a cold Flow or a hot Channel to solve this problem. Here is a good article about cold flows and hot channels of the great Roman Elizarov
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | |
Solution 2 | |
Solution 3 | |
Solution 4 |