'Couchbase Runtime Exception
My project use couchbase client version 2.7.2.
I want to Job through the scheduler, I try to search and filter the event list in couchbase and send it to kafka.
class Service{
DataAccessObject dao = new DataAccessObject();
public void find(){
dao.find(WHERE)
.flatMap(data -> Observable.just(Gson.fromJson(SomeClass)))
.filter(data ->{
if(data.type == "A"){
return dao.isVaild(data);
}else{
return true;
}
})
.flatMap(data ->{
sendKafka(data);
return Observable.from(data);
})
.collect(() -> new ArrayList<Campaign>(), List::add)
.flatMap(data -> {
updateDocument(data); //couchbase document update
return Observable.just(data);})
.subscribe();
}
public void updateDocument(data){ //couchbase document update
....
}
}
class DataAccessObject{
private CouchbaseConnector couchbaseConnector;
public Observable<JsonDocument> find(currentTime){
ViewQuery query = ViewQuery.from(".....");
return couchbaseConnector.getBucket(CouchbaseBucket).async()
.query(query)
.retryWhen(
RetryBuilder.anyOf(RuntimeException.class)
.delay(Delay.fixed(1000L, TimeUnit.MILLISECONDS))
.max(5)
.build())
.flatMap(AsyncViewResult::rows)
.flatMap(AsyncViewRow::document);
}
public boolean isVaild(data){
ViewQuery query = ViewQuery.from(".....");
List<View> result = couchbaseConnector.getBucket(".....")
.query(query)
.allRows();
if(result.size() > 0){
return true;
}else{
return false;
}
}
}
The above code is the flow of logic.
At Service.find() is called, Runtime Expression often occurs in isValid() or updateDocument(). Is it a structure that can cause problems related to synchronization?
Can I know what caused it?
java.lang.RuntimeException: java.util.concurrent.TimeoutException: {"b":"SomeObject","r":"127.0.0.1:11210","s":"kv","c":"7F1F0C2F0510E119/00000000087DF4F6","t":1000000,"i":"0x3dc","l":"127.0.0.1:59825"}
at rx.exceptions.Exceptions.propagate(Exceptions.java:57)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:463)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
