'NoReachableHostException when Reactive Springboot java app try to connect to ElasticSerach on AWS
I was writing some Simple Spring boot app to connect to Elasticsearch(Open Search) on AWS. Here is the versions of dependencies added to the app:
spring-boot-starter Ver 2.6.2
spring-boot-starter-webflux Ver 5.3.14
spring-boot-starter-data-elasticsearch Ver 4.30
Spring-core Ver 5.3.14
org.elasticsearch Ver 7.15.2
io.netty Ver 4.1.72
reactor.core Ver 3.4.13
When I try to hit the endpoint I get the following error.
searchForPage failed:
org.springframework.data.elasticsearch.client.NoReachableHostException: Host 'vpc-dev-i6o7flf44543kxt36fhamxh3qy.us-east-1.es.amazonaws.com/200.212.124.178:443' not reachable. Cluster state is offline.
at org.springframework.data.elasticsearch.client.reactive.SingleNodeHostProvider.lambda$lookupActiveHost$3(SingleNodeHostProvider.java:101)
at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:103)
at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:220)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203)
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:89)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:150)
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onComplete(FluxOnAssembly.java:549)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:142)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:400)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:419)
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:473)*
Here is the code that calls searchForPage():
private Flux<SearchResults> stream(SearchCriteria searchCriteria,
ReactiveElasticsearchTemplate reactiveElasticsearchTemplate,
Supplier<ReactiveElasticsearchTemplate> supplier) {
NativeSearchQuery query = searchCriteriaToQueryConverterWithoutAggregations
.convert(searchCriteria);
query.setPageable(Pageable.unpaged());
logger.info("search criteria for the streaming service: " + searchCriteria);
return reactiveElasticsearchTemplate
.search(query, SearchResults.class, IndexCoordinates.of(indexName))
.doOnComplete(() -> logger.info("Results Returned from stream API"))
.onErrorResume(error->{
logger.error(ErrorCodes.ES.toString(), error);
return supplier.get().search(
query, SearchResults.class, IndexCoordinates.of(indexName));})
.map(searchHit -> searchHit.getContent());
}
Here is the code that configures reactive client:
ReactiveElasticsearchClient initializeElasticRestClient() {
String ENDPOINT_PORT
= "vpc-dev-i6o7flf44543kxt36fhamxh3qy.us-east-1.es.amazonaws.com:443";
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(ENDPOINT_PORT)
.usingSsl()
.withConnectTimeout(Duration.ofSeconds(5000))
.withSocketTimeout(Duration.ofSeconds(5000))
// adding interceptor to sign the request for AWS
.withHttpClientConfigurer(hacb -> hacb.addInterceptorLast(interceptor))
.build();
return DefaultReactiveElasticsearchClient.create(clientConfiguration);
}
In the logs i see the following which makes me think the connection is happening ,but it fails during the completion process
reactor.netty.http.server.HttpServerOperations [7f326663-1, L:/200.212.124.162:8443 - R:/200.212.124.172:16002] Last HTTP response frame
reactor.netty.http.server.HttpServerOperations [8bbbb248, L:/200.212.124.162:8443 - R:/200.212.124.203:1908] Increasing pending responses, now 1
reactor.netty.http.server.HttpServerOperations [9734afa6, L:/200.212.124.162:8443 - R:/200.212.124.172:16086] Increasing pending responses, now 1
reactor.netty.tcp.SslProvider [c04eba45, L:/200.212.124.201:8443 - R:/200.212.124.203:39526] SSL enabled using engine sun.security.ssl.SSLEngineImpl@762e3341
reactor.netty.http.server.HttpServerOperations [c04eba45, L:/200.212.124.201:8443 - R:/200.212.124.203:39526] New http connection, requesting read
reactor.netty.transport.TransportConfig [c04eba45, L:/200.212.124.201:8443 - R:/200.212.124.203:39526] Initialized pipeline DefaultChannelPipeline{(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (reactor.left.sslReader = reactor.netty.tcp.SslProvider$SslReadHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
reactor.netty.tcp.SslProvider [b92129ad, L:/200.212.124.201:8443 - R:/200.212.124.172:41114] SSL enabled using engine sun.security.ssl.SSLEngineImpl@6ec2a10c
io.netty.handler.ssl.SslHandler [id: 0xc04eba45, L:/200.212.124.201:8443 - R:/200.212.124.203:39526] HANDSHAKEN: protocol:TLSv1.2 cipher suite:TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
reactor.netty.http.server.HttpServerOperations [b92129ad, L:/200.212.124.201:8443 - R:/200.212.124.172:41114] New http connection, requesting read
reactor.netty.transport.TransportConfig [b92129ad, L:/200.212.124.201:8443 - R:/200.212.124.172:41114] Initialized pipeline DefaultChannelPipeline{(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (reactor.left.sslReader = reactor.netty.tcp.SslProvider$SslReadHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpServerCodec), (reactor.left.httpTrafficHandler = reactor.netty.http.server.HttpTrafficHandler), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
reactor.netty.http.server.HttpServerOperations [c04eba45, L:/200.212.124.201:8443 - R:/200.212.124.203:39526] Increasing pending responses, now 1
reactor.netty.http.server.HttpServer [c04eba45-1, L:/200.212.124.201:8443 - R:/200.212.124.203:39526] Handler is being applied: org.springframework.http.server.reactive.ReactorHttpHandlerAdapter@307f4807
reactor.netty.http.server.HttpServerOperations [c04eba45-1, L:/200.212.124.201:8443 - R:/200.212.124.203:39526] Detected non persistent http connection, preparing to close
I am new to both Spring Flux and Elastic search. Any help on this greatly appreciated.
Solution 1:[1]
So the exception is just a generic exception they throw when something goes wrong, which can be very confusing, you can see my answer here Connecting to ES with Spring Data Elasticsearch (reactive) gives error host not reachable
You should probably try and debug it in localhost with ssh tunnel at first, sure you will get your answer in no time!
My guess is permissions....
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | KafKafOwn |
