'akka actor enabled two routers, only one work normally while the other is basically the same

I run into a weird problem, which I don't know why, I create one akka cluster with 2 nodes in my local pc, 127.0.0.1:2551 and 127.0.0.1:2552.

Below is my main actor content

public class AkkaDispatcherActor  extends AbstractBehavior<AkkaDispatcherActor.Message> {

public interface Message extends CborSerializable {
}

public static final class DispatchAllRuleFilesMessage implements Message {
    public DispatchAllRuleFilesMessage() {
    }
}

public static final class CollectAllRuleRuntimeScheduleMessage implements Message {
    public CollectAllRuleRuntimeScheduleMessage() {
    }
}

public static class Reply implements CborSerializable {
    public static String DISPATCH_SUCCESS = "DISPATCH_SUCCESS";
    public static String DISPATCH_FAILED = "DISPATCH_FAILED";
    private String status;
    private String details;

    public Reply(String status, String details) {
        this.status = status;
    }

    public String getStatus() {
        return status;
    }

    public String getDetails() {
        return details;
    }
}

public static class RuleRunTimeReply implements CborSerializable {
    private String ruleRunTimeScheduleInfo;
    private boolean successOrNot;

    public RuleRunTimeReply(String ruleRunTimeScheduleInfo, boolean successOrNot) {
        this.ruleRunTimeScheduleInfo = ruleRunTimeScheduleInfo;
        this.successOrNot = successOrNot;
    }

    public String getRuleRunTimeScheduleInfo() {
        return ruleRunTimeScheduleInfo;
    }

    public boolean getSuccessOrNot() {
        return successOrNot;
    }

}

private ActorRef<AkkaRuleFileHandlerActor.Command> router;
private ActorRef<AkkaRuleHandlerActor.Command> ruleRouter;

public static Behavior<Message> create() {
    return Behaviors.setup(AkkaDispatcherActor::new);
}

AkkaDispatcherActor(ActorContext<Message> context) {
    super(context);
    ServiceKey<AkkaRuleFileHandlerActor.Command> serviceKey = ServiceKey.create(AkkaRuleFileHandlerActor.Command.class, "ruleFileService");
    // each node spawn only one worker
    ActorRef<AkkaRuleFileHandlerActor.Command> worker = context.spawn(AkkaRuleFileHandlerActor.create(), "ruleFileHandler");
    context.getSystem().receptionist().tell(Receptionist.register(serviceKey, worker));
    GroupRouter<AkkaRuleFileHandlerActor.Command> group = Routers.group(serviceKey);
    group = group.withRoundRobinRouting();
    router = context.spawn(group, "ruleFileHandler-group");

    ServiceKey<AkkaRuleHandlerActor.Command> ruleServiceKey = ServiceKey.create(AkkaRuleHandlerActor.Command.class, "ruleService");
    ActorRef<AkkaRuleHandlerActor.Command> ruleWorker = context.spawn(AkkaRuleHandlerActor.create(), "ruleHandler");
    context.getSystem().receptionist().tell(Receptionist.register(ruleServiceKey, ruleWorker));
    GroupRouter<AkkaRuleHandlerActor.Command> ruleGroup = Routers.group(ruleServiceKey);
    ruleGroup = ruleGroup.withRoundRobinRouting();
    ruleRouter = context.spawn(ruleGroup, "ruleHandler-group");
}

@Override
public Receive<Message> createReceive() {
    return newReceiveBuilder()
            .onMessage(CollectAllRuleRuntimeScheduleMessage.class, message -> doCollectRuleRuntimeScheduleInfo())
            .onMessage(DispatchAllRuleFilesMessage.class, message -> doDispatch())
            .build();
}

private void dispatchRulefile(String rulefile) {
    try {
        System.out.println("AkkaDispatcherActor dispatchRulefile " + rulefile);
        CompletionStage<Reply> result =
                AskPattern.ask(
                        router,
                        replyTo -> new AkkaRuleFileHandlerActor.DispatchOneRuleFileMessage(replyTo),
                        Duration.ofSeconds(30),
                        getContext().getSystem().scheduler());
        Reply reply = result.toCompletableFuture().join();
        System.out.println("Dispatch job " + rulefile + " with result");
    } catch (Exception ex) {
        System.out.println("Dispatch job " + rulefile + "failed with exception " + ex.getMessage()+ " with result");
    }
}

private void collectRuleRuntimeScheduleInfo() {
    try {
        CompletionStage<RuleRunTimeReply> result =
                AskPattern.ask(
                        ruleRouter,
                        replyTo -> new AkkaRuleHandlerActor.RuleRuntimeScheduleInfoMsg(replyTo),
                        Duration.ofSeconds(120),
                        getContext().getSystem().scheduler());
        RuleRunTimeReply reply = result.toCompletableFuture().join();
        System.out.println("Collect rule run time schedule successOrNot " + reply.getSuccessOrNot());
    } catch (Exception ex) {
        System.out.println("Sending requests to cluster nodes with errors " + ex.getMessage());
    }
}

public Behavior<Message> doCollectRuleRuntimeScheduleInfo() {
    for (int i = 0; i < 2; i++) {
        collectRuleRuntimeScheduleInfo();
    }
    return this;
}

public Behavior<Message> doDispatch() {
    for (int i = 0; i < 2; i++) {
        dispatchRulefile(String.valueOf(i));
    }
    return this;
}

}

I have 2 child actor, the structure are basically the same, only naming differences, like below

ublic class AkkaRuleFileHandlerActor extends AbstractBehavior<AkkaRuleFileHandlerActor.Command> {
interface Command extends CborSerializable {}

public final static class DispatchOneRuleFileMessage implements Command {
    public ActorRef<AkkaDispatcherActor.Reply> replyTo;

    public DispatchOneRuleFileMessage(ActorRef<AkkaDispatcherActor.Reply> replyTo) {
        this.replyTo = replyTo;
    }

    public DispatchOneRuleFileMessage() {
    }
}

public static Behavior<Command> create() {
    return Behaviors.setup(AkkaRuleFileHandlerActor::new);
}

private AkkaRuleFileHandlerActor(ActorContext<Command> context) {
    super(context);
}

private Behavior<AkkaRuleFileHandlerActor.Command> onDispatchRuleFile(DispatchOneRuleFileMessage message) {
    System.out.println("Got message DispatchRuleFile");
    String scheduleInfo = "hello";
    AkkaDispatcherActor.Reply reply = new AkkaDispatcherActor.Reply(AkkaDispatcherActor.Reply.DISPATCH_SUCCESS, scheduleInfo);
    message.replyTo.tell(reply);
    System.out.println("Success");

    return this;
}

@Override
public Receive<Command> createReceive() {
    return newReceiveBuilder()
            .onMessage(DispatchOneRuleFileMessage.class, this::onDispatchRuleFile).build();
}

}

Weird thing is DispatchAllRuleFilesMessage msg will send to both nodes (2551 and 2552), while CollectAllRuleRuntimeScheduleMessage doesn't....

the behavior keeps the same even when I delete DispatchAllRuleFilesMessage part, 2552 won't receive any RuleRuntimeScheduleInfoMsg at all....



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source