'Circuit breaker misbehaving

I have the following circuit breaker applied:

public Mono<List<Long>> getAssociatedCmaCampaigns(String userId, List<Long> cmaCampaignIds) {
        if (cmaCampaignIds.isEmpty()) {
            log.error(LOG_GET_ASSOCIATED_CMA_CAMPAIGNS + "cmaCampaignIds list is empty. Skipping the calculation of associated cma campaigns.");
            return Mono.error(new AudienceException("CMA Campaign Ids empty."));
        } else {
            return circuitBreaker.run(audienceClient.checkCampaignAssociation(userId, cmaCampaignIds)
                    .map(CampaignAssociationResponseDto::getResponse)
                    .map(campaignAssociations -> {
                        log.info(LOG_GET_ASSOCIATED_CMA_CAMPAIGNS + "Retrieved the associated campaigns successfully: [{}]", campaignAssociations);
                        List<Long> collect = campaignAssociations.stream().parallel().filter(CampaignAssociation::isAssociated).map(CampaignAssociation::getCampaignId).collect(Collectors.toList());

                        if (CollectionUtils.isNotEmpty(collect)) {
                            recordAudienceMetrics(meterRegistry, SUCCESS, cmaCampaignIds);
                            return collect;
                        } else {
                            recordAudienceMetrics(meterRegistry, FAILURE, cmaCampaignIds);
                            // TODO: Check whether the exception is caught by the circuit-breaker
                            throw new AudienceException("Associations retrieved but none are applicable for the user: " + userId);
                        }
                    }).doOnError(throwable -> {
                        log.error(LOG_GET_ASSOCIATED_CMA_CAMPAIGNS_ERROR, throwable.getMessage());
                        recordAudienceMetrics(meterRegistry, FAILURE, cmaCampaignIds);
                    })
                    .onErrorResume(throwable -> Mono.error(new AudienceException(LOG_GET_ASSOCIATED_CMA_CAMPAIGNS_ERROR + throwable)))
                    .switchIfEmpty(Mono.error(new AudienceException(LOG_GET_ASSOCIATED_CMA_CAMPAIGNS_ERROR + "; No associations found for the user: " + userId))), throwable -> {
                recordAudienceMetrics(meterRegistry, TIMEOUT, cmaCampaignIds);
                log.error(LOG_GET_ASSOCIATED_CMA_CAMPAIGNS_ERROR, throwable.getMessage());
                return Mono.error(new AudienceCircuitBreakerException(LOG_GET_ASSOCIATED_CMA_CAMPAIGNS_ERROR + throwable.getMessage()));
            }).retryWhen(Retry.backoff(audienceRetryProperties.getMaxAttempts(), audienceRetryProperties.getWaitDuration()).jitter(audienceRetryProperties.getBackoffJitter()));
        }
    }

The circuit breaker bean configuration can be seen below:

@Bean
    public Customizer<ReactiveResilience4JCircuitBreakerFactory> audienceCustomizer() {
        CircuitBreakerConfig circuitBreakerConfig = CircuitBreakerConfig.custom()
                .failureRateThreshold(audienceCircuitBreakerProperties.getFailureRateThreshold())
                .slowCallDurationThreshold(audienceCircuitBreakerProperties.getSlowCallDurationThreshold())
                .slowCallRateThreshold(audienceCircuitBreakerProperties.getSlowCallRateThreshold())
                .slidingWindowSize(audienceCircuitBreakerProperties.getSlidingWindowSize())
                .permittedNumberOfCallsInHalfOpenState(audienceCircuitBreakerProperties.getPermittedNumberOfCallsInHalfOpenState())
                .waitDurationInOpenState(audienceCircuitBreakerProperties.getWaitDurationInOpenState())
                .build();

        TimeLimiterConfig timeLimiterConfig = TimeLimiterConfig.custom().timeoutDuration(audienceCircuitBreakerProperties.getTimeoutDuration()).build();

        return factory -> factory.configure(resilience4JConfigBuilder -> resilience4JConfigBuilder
                .timeLimiterConfig(timeLimiterConfig)
                .circuitBreakerConfig(circuitBreakerConfig)
                .build(), CIRCUIT_BREAKER_AUDIENCE);
    }

And, I have the following properties defined for the circuit breaker:

cma:
    client:
        endpoint: ${AUDIENCE_ENDPOINT:<<some endpoint>>}
        tenant: ${AUDIENCE_TENANT:<<some tenant>>}
        timeout: ${AUDIENCE_TIMEOUT:100ms}
    circuit-breaker:
        automatic-transition-from-open-to-half-open-enabled: ${CMA_AUTOMATIC_TRANSITION_FROM_OPEN_TO_HALF_OPEN_ENABLED:true}
        failure-rate-threshold: ${CMA_FAILURE_RATE_THRESHOLD:100}
        max-wait-duration-in-half-open-state: ${CMA_MAX_WAIT_DURATION_IN_HALF_OPEN_STATE:5s}
        permitted-number-of-calls-in-half-open-state: ${CMA_PERMITTED_NUM_OF_CALLS_IN_HALF_OPEN_STATE:5}
        register-health-indicator: ${CMA_REGISTER_HEALTH_INDICATOR:true}
        sliding-window-size: ${CMA_SLIDING_WINDOW_SIZE:5}
        sliding-window-type: ${CMA_SLIDING_WINDOW_TYPE:count_based}
        slow-call-duration-threshold: ${CMA_SLOW_CALL_DURATION_THRESHOLD:10s}
        slow-call-rate-threshold: ${CMA_SLOW_CALL_RATE_THRESHOLD:5}
        wait-duration-in-open-state: ${CMA_WAIT_DURATION_IN_OPEN_STATE:5s}
        writable-stack-trace-enabled: ${CMA_WRITABLE_STACK_TRACE_ENABLED:true}
        timeout-duration: ${CMA_TIMEOUT_DURATION:100ms}
    retry:
        wait-duration: ${CMA_RETRY_WAIT_DURATION:1ms}
        max-attempts: ${CMA_RETRY_MAX_ATTEMPTS:2}
        backoff-jitter: ${CMA_RETRY_BACKOFF_JITTER:0.5}

The first question is that the circuit breaker opens and it keeps open forever. It is not even closing. Every time we make the request, all it has to say is [CircuitBreaker 'audience' is OPEN and does not permit further calls]. The second question is that if you look closely at the getAssociatedCmaCampaigns method, we are throwing AudienceException from within. Will this exception cause the circuit breaker to trigger?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source