'Couchbase Runtime Exception

My project use couchbase client version 2.7.2. I want to Job through the scheduler, I try to search and filter the event list in couchbase and send it to kafka.

    class Service{
    
    DataAccessObject dao = new DataAccessObject();

    public void find(){
        dao.find(WHERE)
           .flatMap(data -> Observable.just(Gson.fromJson(SomeClass)))
           .filter(data ->{
               if(data.type == "A"){
                   return dao.isVaild(data);
               }else{
                   return true;
               }
           })
           .flatMap(data ->{
               sendKafka(data);
               return Observable.from(data);
           })
           .collect(() -> new ArrayList<Campaign>(), List::add)
           .flatMap(data -> {
            updateDocument(data); //couchbase document update
            return Observable.just(data);})
           .subscribe();
    }

    public void updateDocument(data){ //couchbase document update
        ....
    }

}

class DataAccessObject{

    private CouchbaseConnector couchbaseConnector;

    public Observable<JsonDocument> find(currentTime){
        
        ViewQuery query = ViewQuery.from(".....");

        return couchbaseConnector.getBucket(CouchbaseBucket).async()
        .query(query)
        .retryWhen(
            RetryBuilder.anyOf(RuntimeException.class)
                .delay(Delay.fixed(1000L, TimeUnit.MILLISECONDS))
                .max(5)
                .build())
        .flatMap(AsyncViewResult::rows)
        .flatMap(AsyncViewRow::document);
    }

    public boolean isVaild(data){
        ViewQuery query = ViewQuery.from(".....");

        List<View> result = couchbaseConnector.getBucket(".....")
                .query(query)
                .allRows();

        if(result.size() > 0){
            return true;
        }else{
            return false;
        }
    }
}






The above code is the flow of logic. At Service.find() is called, Runtime Expression often occurs in isValid() or updateDocument(). Is it a structure that can cause problems related to synchronization? Can I know what caused it?

java.lang.RuntimeException: java.util.concurrent.TimeoutException: {"b":"SomeObject","r":"127.0.0.1:11210","s":"kv","c":"7F1F0C2F0510E119/00000000087DF4F6","t":1000000,"i":"0x3dc","l":"127.0.0.1:59825"}
at rx.exceptions.Exceptions.propagate(Exceptions.java:57)
at rx.observables.BlockingObservable.blockForSingle(BlockingObservable.java:463)
at rx.observables.BlockingObservable.single(BlockingObservable.java:340)
at


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source