'Do nifi controller services share state in a cluster?

Single Nifi node

We have a volatile variable on a custom controller service level. We use this service in two custom processors. Processor1 makes a change to the variable state, Processor2 should see updated value.

Cluster

What if we do the same example in a cluster environment? Does Nifi create a separate service instance for each Nifi node, thus not sharing any state in between? Or will primary node have instance of this service alone?



Solution 1:[1]

I set up a service and a processor to test this.

Service has private volatile var. It can get and update that var:

public class ClusterStateTestServiceImpl extends AbstractControllerService implements ClusterStateTestService {

    private volatile UUID testValue;

    @OnEnabled
    public void onEnabled(final ConfigurationContext context) throws InitializationException {
        this.testValue = UUID.randomUUID();
    }

    @Override
    public UUID getTestValue() {
        return testValue;
    }

    @Override
    public UUID updateTestValue() {
        this.testValue = UUID.randomUUID();
        return testValue;
    }
}

Processor reads, logs, updates and logs again:

public class ClusterStateTestProcessor extends AbstractProcessor {

    public static final PropertyDescriptor CLUSTER_STATE_TEST_SERVICE = new PropertyDescriptor.Builder()
            .name("Test Service")
            .required(true)
            .identifiesControllerService(ClusterStateTestService.class)
            .build();

    static final PropertyDescriptor HOSTNAME = ...

    private ClusterStateTestService clusterStateTestService;
    private String hostname;

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        FlowFile flowFile = session.get();
        if ( flowFile == null ) return;

        UUID existingUuid = clusterStateTestService.getTestValue();
        getLogger().info("Host: " + hostname + " || Test value: " + existingUuid.toString());
        UUID newUuid = clusterStateTestService.updateTestValue();
        getLogger().info("Host: " + hostname + " || Test value updated to: " + newUuid.toString());

        session.transfer(flowFile, SUCCESS);
    }
}

I triggered the processor on nifi1, then on nifi2, this is the result:

2022-05-23 08:42:30,774 INFO [Timer-Driven Process Thread-16] c.t.ClusterStateTestProcessor ClusterStateTestProcessor[id=8b55386b-4459-3de3-82b7-83b7fbd9e771] Host: nifi1 || Test value: c1840b9b-5766-4341-851a-2065349017d3
2022-05-23 08:42:30,787 INFO [Timer-Driven Process Thread-16] c.t.ClusterStateTestProcessor ClusterStateTestProcessor[id=8b55386b-4459-3de3-82b7-83b7fbd9e771] Host: nifi1 || Test value updated to: a44b2119-b636-4230-8b38-1b35445146f9
                                                              
2022-05-23 08:42:30,776 INFO [Timer-Driven Process Thread-14] c.t.ClusterStateTestProcessor ClusterStateTestProcessor[id=8b55386b-4459-3de3-82b7-83b7fbd9e771] Host: nifi2 || Test value: cb8d79d2-1eb8-4269-a043-902936cb12e0
2022-05-23 08:42:30,776 INFO [Timer-Driven Process Thread-14] c.t.ClusterStateTestProcessor ClusterStateTestProcessor[id=8b55386b-4459-3de3-82b7-83b7fbd9e771] Host: nifi2 || Test value updated to: e1f76321-0e34-4aae-bd06-6b1c84419817

Conclusion

Since each processor reads a different UUID, each nifi node has it's own instance of the service and state is not shared in any way.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 luka_roves