'Persistent Redis Pub/Sub Actors
I have a backend app writed in Scala Play. Because I have a realtime implementation using Akka Actors with data stored in a Redis server, I want as my each backend instance (deployed on centos servers) to be a Publisher and in same time a Subscriber to Redis service. Why this? Because a 3rd party app will send requests to my backend to update the data from Redis, and I want that all actors from all instances to push data to clients (frontend) indifferent on which backend instance is redirected this request (a load balancer is used there).
So, when instance1 will publish on Redis, I want that all subscribers(instance2, instance3, even instance1 because I said each instance must be pub/sub) to push data to clients.
I created an object with a Publisher and a Subscriber client and I was expecting that these will have a singleton behavior. But, for an unknown reason, over the night I see that my instances are unsubscribed from the Redis server without a message. I think this, because in the next day, my Redis service have 0 subscribers. I don't know if I have a bad implementation there or just Redis kill the connections after some time.
RedisPubSubServer.scala (In facts, here are just 2 Akka Actors which take RedisClient as params)
class Subscriber(client: RedisClient) extends Actor {
var callback: PubSubMessage => Any = { m => }
implicit val timeout = Timeout(2 seconds)
override def receive: Receive = {
case Subscribe(channel) => client.subscribe(channel)(callback)
case Register(cb) => callback = cb; self ? true
case Unsubscribe(channel) => client.unsubscribe(channel); self ? true
}
}
class Publisher(client: RedisClient) extends Actor {
implicit val timeout = Timeout(2 seconds)
override def receive: Receive = {
case Publish(channel, msg) => client.publish(channel, msg); self ? true
}
}
RedisPubSubClient.scala (here I create the Publisher and Subscriber as singleton)
object Pub {
println("starting publishing service...")
val config = ConfigFactory.load.getObject("redis").toConfig
val client = new RedisClient(config.getString("master"), config.getInt("port"))
val system = ActorSystem("RedisPublisher")
val publisher = system.actorOf(Props(new Publisher(client)))
def publish(channel: String, message: String) =
publisher ! Publish(channel, message)
}
object Sub {
val client = new RedisClient(config.getString("master"), config.getInt("port"))
val system = ActorSystem("RedisSubscriber")
val subscriber = system.actorOf(Props(new Subscriber(client)))
println("SUB Registering...")
subscriber ! Register(callback)
def sub(channel: String) = subscriber ! Subscribe(channel)
def unsub(channel: String) = subscriber ! Unsubscribe(channel)
def callback(msg: PubSubMessage) = {
msg match {
case S(channel, no) => println(s"subscribed to $channel and count $no")
case U(channel, no) => println(s"unsubscribed from $channel and count $no")
case M(channel, msg) => msg match {
case "exit" => client.unsubscribe()
case jsonString => // do the job
}
case E(e) => println(s"ERR = ${e.getMessage}")
}
}
}
and the RedisService
object RedisService {
val system = ActorSystem("RedisServiceSubscriber")
val subscriber = system.actorOf(Props(new Subscriber(client)))
subscriber ! Register(callback)
subscriber ! Subscribe("channelName")
// So, here I'm expecting that subscriber to have a life-cycle as the backend instance
}
from an api endpoint, I push data calling Pub publish method as:
def reloadData(request: AnyType) {
Pub.publish("channelName", requestAsString)
}
Can be possible as Publisher/Subscriber Actors to be killed after a while and due of that to throw in some errors for redis clients Pub/Sub?
For Publisher, I must say that I'm thinking to create the client each time when the api call is made, but for the Subscriber, I can not use another way that a singleton object which will listen the Redis entire life of the backend.
thanks
edit: used library:
"net.debasishg" %% "redisclient" % "3.41"
After some researches, I found another scala redis lib which seems to do exactly what I need in an easier maner
"com.github.etaty" %% "rediscala" % "1.9.0"
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
