'akka actor enabled two routers, only one work normally while the other is basically the same
I run into a weird problem, which I don't know why, I create one akka cluster with 2 nodes in my local pc, 127.0.0.1:2551 and 127.0.0.1:2552.
Below is my main actor content
public class AkkaDispatcherActor extends AbstractBehavior<AkkaDispatcherActor.Message> {
public interface Message extends CborSerializable {
}
public static final class DispatchAllRuleFilesMessage implements Message {
public DispatchAllRuleFilesMessage() {
}
}
public static final class CollectAllRuleRuntimeScheduleMessage implements Message {
public CollectAllRuleRuntimeScheduleMessage() {
}
}
public static class Reply implements CborSerializable {
public static String DISPATCH_SUCCESS = "DISPATCH_SUCCESS";
public static String DISPATCH_FAILED = "DISPATCH_FAILED";
private String status;
private String details;
public Reply(String status, String details) {
this.status = status;
}
public String getStatus() {
return status;
}
public String getDetails() {
return details;
}
}
public static class RuleRunTimeReply implements CborSerializable {
private String ruleRunTimeScheduleInfo;
private boolean successOrNot;
public RuleRunTimeReply(String ruleRunTimeScheduleInfo, boolean successOrNot) {
this.ruleRunTimeScheduleInfo = ruleRunTimeScheduleInfo;
this.successOrNot = successOrNot;
}
public String getRuleRunTimeScheduleInfo() {
return ruleRunTimeScheduleInfo;
}
public boolean getSuccessOrNot() {
return successOrNot;
}
}
private ActorRef<AkkaRuleFileHandlerActor.Command> router;
private ActorRef<AkkaRuleHandlerActor.Command> ruleRouter;
public static Behavior<Message> create() {
return Behaviors.setup(AkkaDispatcherActor::new);
}
AkkaDispatcherActor(ActorContext<Message> context) {
super(context);
ServiceKey<AkkaRuleFileHandlerActor.Command> serviceKey = ServiceKey.create(AkkaRuleFileHandlerActor.Command.class, "ruleFileService");
// each node spawn only one worker
ActorRef<AkkaRuleFileHandlerActor.Command> worker = context.spawn(AkkaRuleFileHandlerActor.create(), "ruleFileHandler");
context.getSystem().receptionist().tell(Receptionist.register(serviceKey, worker));
GroupRouter<AkkaRuleFileHandlerActor.Command> group = Routers.group(serviceKey);
group = group.withRoundRobinRouting();
router = context.spawn(group, "ruleFileHandler-group");
ServiceKey<AkkaRuleHandlerActor.Command> ruleServiceKey = ServiceKey.create(AkkaRuleHandlerActor.Command.class, "ruleService");
ActorRef<AkkaRuleHandlerActor.Command> ruleWorker = context.spawn(AkkaRuleHandlerActor.create(), "ruleHandler");
context.getSystem().receptionist().tell(Receptionist.register(ruleServiceKey, ruleWorker));
GroupRouter<AkkaRuleHandlerActor.Command> ruleGroup = Routers.group(ruleServiceKey);
ruleGroup = ruleGroup.withRoundRobinRouting();
ruleRouter = context.spawn(ruleGroup, "ruleHandler-group");
}
@Override
public Receive<Message> createReceive() {
return newReceiveBuilder()
.onMessage(CollectAllRuleRuntimeScheduleMessage.class, message -> doCollectRuleRuntimeScheduleInfo())
.onMessage(DispatchAllRuleFilesMessage.class, message -> doDispatch())
.build();
}
private void dispatchRulefile(String rulefile) {
try {
System.out.println("AkkaDispatcherActor dispatchRulefile " + rulefile);
CompletionStage<Reply> result =
AskPattern.ask(
router,
replyTo -> new AkkaRuleFileHandlerActor.DispatchOneRuleFileMessage(replyTo),
Duration.ofSeconds(30),
getContext().getSystem().scheduler());
Reply reply = result.toCompletableFuture().join();
System.out.println("Dispatch job " + rulefile + " with result");
} catch (Exception ex) {
System.out.println("Dispatch job " + rulefile + "failed with exception " + ex.getMessage()+ " with result");
}
}
private void collectRuleRuntimeScheduleInfo() {
try {
CompletionStage<RuleRunTimeReply> result =
AskPattern.ask(
ruleRouter,
replyTo -> new AkkaRuleHandlerActor.RuleRuntimeScheduleInfoMsg(replyTo),
Duration.ofSeconds(120),
getContext().getSystem().scheduler());
RuleRunTimeReply reply = result.toCompletableFuture().join();
System.out.println("Collect rule run time schedule successOrNot " + reply.getSuccessOrNot());
} catch (Exception ex) {
System.out.println("Sending requests to cluster nodes with errors " + ex.getMessage());
}
}
public Behavior<Message> doCollectRuleRuntimeScheduleInfo() {
for (int i = 0; i < 2; i++) {
collectRuleRuntimeScheduleInfo();
}
return this;
}
public Behavior<Message> doDispatch() {
for (int i = 0; i < 2; i++) {
dispatchRulefile(String.valueOf(i));
}
return this;
}
}
I have 2 child actor, the structure are basically the same, only naming differences, like below
ublic class AkkaRuleFileHandlerActor extends AbstractBehavior<AkkaRuleFileHandlerActor.Command> {
interface Command extends CborSerializable {}
public final static class DispatchOneRuleFileMessage implements Command {
public ActorRef<AkkaDispatcherActor.Reply> replyTo;
public DispatchOneRuleFileMessage(ActorRef<AkkaDispatcherActor.Reply> replyTo) {
this.replyTo = replyTo;
}
public DispatchOneRuleFileMessage() {
}
}
public static Behavior<Command> create() {
return Behaviors.setup(AkkaRuleFileHandlerActor::new);
}
private AkkaRuleFileHandlerActor(ActorContext<Command> context) {
super(context);
}
private Behavior<AkkaRuleFileHandlerActor.Command> onDispatchRuleFile(DispatchOneRuleFileMessage message) {
System.out.println("Got message DispatchRuleFile");
String scheduleInfo = "hello";
AkkaDispatcherActor.Reply reply = new AkkaDispatcherActor.Reply(AkkaDispatcherActor.Reply.DISPATCH_SUCCESS, scheduleInfo);
message.replyTo.tell(reply);
System.out.println("Success");
return this;
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(DispatchOneRuleFileMessage.class, this::onDispatchRuleFile).build();
}
}
Weird thing is DispatchAllRuleFilesMessage msg will send to both nodes (2551 and 2552), while CollectAllRuleRuntimeScheduleMessage doesn't....
the behavior keeps the same even when I delete DispatchAllRuleFilesMessage part, 2552 won't receive any RuleRuntimeScheduleInfoMsg at all....
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
