'EntityManager closed when executing queries on different threads
I am trying to execute a couple of queries on different threads. There are 2 top level queries each executing on different tables at runtime. For executing the first set of queries (executeQuery1()), I spawn 2 different threads and they are processed well. From the output of these queries, I have to extract a list of ids and then fire another set of queries (executeQuery2()) on entirely different threads. As soon as the second set of queries are about to be submitted to the database, I see that EntityManager is closed and the application shutdown.
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'springApplicationAdminRegistrar'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'mbeanExporter'
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans on shutdown
2022-04-28 22:34:29.529 DEBUG 48403 --- [main] o.s.j.e.a.AnnotationMBeanExporter : Unregistering JMX-exposed beans
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'defaultValidator'
2022-04-28 22:34:29.529 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'org.springframework.data.jpa.util.JpaMetamodelCacheCleanup'
2022-04-28 22:34:29.530 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'threadPoolTaskExecutor'
2022-04-28 22:34:29.530 DEBUG 48403 --- [main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'threadPoolTaskExecutor'
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'verticaEntityManagerFactory': [verticaTransactionManager]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'verticaTransactionManager': [transactionTemplate]
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking destroy() on bean with name 'verticaEntityManagerFactory'
2022-04-28 22:34:29.531 INFO 48403 --- [main] c.b.a.n.h.c.VerticaDataSourceConfig$1 : Closing JPA EntityManagerFactory for persistence unit 'vertica'
2022-04-28 22:34:29.531 DEBUG 48403 --- [main] o.hibernate.internal.SessionFactoryImpl : HHH000031: Closing
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.engine.query.spi.QueryPlanCache : Cleaning QueryPlan Cache
2022-04-28 22:34:29.531 TRACE 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope : Handling #sessionFactoryClosed from [org.hibernate.internal.SessionFactoryImpl@77774571] for TypeConfiguration
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.type.spi.TypeConfiguration$Scope : Un-scoping TypeConfiguration [org.hibernate.type.spi.TypeConfiguration$Scope@44af588b] from SessionFactory [org.hibernate.internal.SessionFactoryImpl@77774571]
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.s.i.AbstractServiceRegistryImpl : Implicitly destroying ServiceRegistry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 DEBUG 48403 --- [main] o.h.b.r.i.BootstrapServiceRegistryImpl : Implicitly destroying Boot-strap registry on de-registration of all child ServiceRegistries
2022-04-28 22:34:29.532 TRACE 48403 --- [MyAsyncThread-4] j.i.AbstractLogicalConnectionImplementor : Preparing to begin transaction via JDBC Connection.setAutoCommit(false)
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'verticaDataSource': [dataSourceScriptDatabaseInitializer, org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration, jdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'dataSourceScriptDatabaseInitializer': [jdbcTemplate, namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'jdbcTemplate': [namedParameterJdbcTemplate]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaConfiguration': [jpaVendorAdapter, entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.s.DefaultListableBeanFactory : Retrieved dependent beans for bean 'jpaVendorAdapter': [entityManagerFactoryBuilder]
2022-04-28 22:34:29.532 TRACE 48403 --- [main] o.s.b.f.support.DisposableBeanAdapter : Invoking close() on bean with name 'verticaDataSource'
2022-04-28 22:34:29.533 INFO 48403 --- [main] com.zaxxer.hikari.HikariDataSource : vertica-db-pool - Shutdown initiated...
2022-04-28 22:34:29.533 DEBUG 48403 --- [main] com.zaxxer.hikari.pool.HikariPool : vertica-db-pool - Before shutdown stats (total=20, active=2, idle=18, waiting=0)
I have 2 datasources in my Spring Boot app and therefore have to configure datasources programmatically.
AsyncConfiguration.java
@EnableAsync
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
@Bean(name = "threadPoolTaskExecutor")
public Executor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors());
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors());
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("MyAsyncThread-");
executor.initialize();
return executor;
}
@Override
public Executor getAsyncExecutor() {
return new ThreadPoolTaskExecutor();
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new CustomAsyncExceptionHandler();
}
}
AsyncService.java
@Slf4j
@Service
public class AsyncService {
@Autowired VerticaRepository verticaRepo;
@Async("threadPoolTaskExecutor")
public CompletableFuture<List<Entity1>> execute1(String query) {
List<Entity1> result = verticaRepo.executeQuery1(query);
return CompletableFuture.completedFuture(result);
}
@Async("threadPoolTaskExecutor")
public CompletableFuture<List<Entity2>> execute2(List<BigInteger> ids, String query) {
List<Entity2> result = verticaRepo.executeQuery2(ids, query);
return CompletableFuture.completedFuture(result);
}
}
VerticaDataSourceConfig.java
@Configuration
@ConfigurationProperties("vertica.datasource")
@EnableTransactionManagement
@EnableJpaRepositories(
entityManagerFactoryRef = "verticaEntityManagerFactory",
transactionManagerRef = "verticaTransactionManager",
basePackages = { "mypackage.repository" }
)
public class VerticaDataSourceConfig /*extends HikariConfig*/ {
public final static String PERSISTENCE_UNIT_NAME = "vertica";
public final static String PACKAGES_TO_SCAN = "mypackage.entity";
@Autowired
private Environment env;
@Bean
public HikariDataSource verticaDataSource() {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(env.getProperty("vertica.datasource.jdbc-url"));
hikariConfig.setUsername(env.getProperty("vertica.datasource.username"));
hikariConfig.setPassword(env.getProperty("vertica.datasource.password"));
hikariConfig.setDriverClassName(env.getProperty("vertica.datasource.driver-class-name"));
hikariConfig.setConnectionTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.connectionTimeout")));
hikariConfig.setIdleTimeout(Long.parseLong(env.getProperty("vertica.datasource.hikari.idleTimeout")));
hikariConfig.setMaxLifetime(Long.parseLong(env.getProperty("vertica.datasource.hikari.maxLifetime")));
hikariConfig.setKeepaliveTime(Long.parseLong(env.getProperty("vertica.datasource.hikari.keepaliveTime")));
hikariConfig.setMaximumPoolSize(Integer.parseInt(env.getProperty("vertica.datasource.hikari.maximumPoolSize")));
hikariConfig.setPoolName(env.getProperty("vertica.datasource.hikari.poolName"));
hikariConfig.setValidationTimeout(Integer.parseInt(env.getProperty("vertica.datasource.hikari.validationTimeout")));
return new HikariDataSource(hikariConfig);
}
@Bean
public LocalContainerEntityManagerFactoryBean verticaEntityManagerFactory(
final HikariDataSource verticaDataSource) {
return new LocalContainerEntityManagerFactoryBean() {{
setDataSource(verticaDataSource);
setPersistenceProviderClass(HibernatePersistenceProvider.class);
setPersistenceUnitName(PERSISTENCE_UNIT_NAME);
setPackagesToScan(PACKAGES_TO_SCAN);
Properties jpaProperties = new Properties();
jpaProperties.put("hibernate.ddl-auto", env.getProperty("vertica.jpa.hibernate.ddl-auto"));
jpaProperties.put("hibernate.show-sql", env.getProperty("vertica.jpa.hibernate.show-sql"));
jpaProperties.put("hibernate.format_sql", env.getProperty("vertica.jpa.hibernate.format_sql"));
jpaProperties.put("hibernate.dialect", env.getProperty("vertica.jpa.properties.hibernate.dialect"));
setJpaProperties(jpaProperties);
afterPropertiesSet();;
}};
}
@Bean
public PlatformTransactionManager verticaTransactionManager(EntityManagerFactory verticaEntityManagerFactory) {
return new JpaTransactionManager(verticaEntityManagerFactory);
}
}
VerticaRepository.java
@Repository
public class VerticaRepository {
//@Autowired
@PersistenceContext(unitName = "vertica")
private EntityManager em;
@Transactional
public List<Entity1> executeQuery1(String queryStr) {
// query.setParameter() can only replace parameters in WHERE clause of a query;
// it cannot replace table or column names
String replacedQuery = // replace table name and column name
Query query = em.createNativeQuery(replacedQuery);
List<Object[]> result = query.getResultList();
List<Entity1> entities = new ArrayList<>();
// fill entities list with result
return entities;
}
@Transactional
public List<Entity2> executeQuery2(List<BigInteger> ids, String queryStr) {
String replacedQuery = // replace table name and column name; the table and col names are different from the ones in executeQWuery1()
Query query = em.createNativeQuery(replacedQuery);
List<Object[]> result = query.getResultList();
List<Entity2> entities = new ArrayList<>();
// fill entities list with result
return entities;
}
}
BusinessService.java
@Slf4j
@Component("businessService")
public class BusinessService {
@Autowired
private String query1;
@Autowired
private String query2;
@Autowired private AsyncService asyncService;
public Void serve() throws Exception {
List<CompletableFuture<List<Entity1>>> violationFutures = new ArrayList<>();
for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
violationFutures.add(asyncService.execute1(query1));
}
CompletableFuture<List<List<Entity1>>> vcf = sequence(violationFutures);
List<Entity1> aggregatedViolations = new ArrayList<>();
for (List<Entity1> list: vcf.get()) {
aggregatedViolations.addAll(list);
}
int numProcessors = Runtime.getRuntime().availableProcessors();
List<BigInteger> idList= //somehow get a list of ids from aggregatedViolations
List<List<BigInteger>> partitionedList = ListUtils.partition(idList, numProcessors);
List<CompletableFuture<List<Entity2>>> trendFutures = new ArrayList<>();
for (List<BigInteger> ids: partitionedList) {
for (iterate over some list not shown here; this will loop 2 times with different table and col name substitutions in the query) {
trendFutures.add(asyncService.execute2(getIds(devices), query2));
}
}
CompletableFuture<List<List<Entity2>>> tcf = sequence(trendFutures);
// rest of the business logic is dependent on the above queries execution
return null;
}
private static <T> CompletableFuture<List<List<T>>> sequence(List<CompletableFuture<List<T>>> futures) {
CompletableFuture<Void> allDoneFuture =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
return allDoneFuture.thenApply(v ->
futures.stream().
map(future -> future.join()).
collect(Collectors.toList())
);
}
I believe that this has something to do with the EntityManagers being used in multi-threaded env. However, when I read the documentation, @Transactional will supply a new EM everytime. If the executeQuery1() was able to run 2 queries in parallel on different threads, why is executeQuery2() closing the EM?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
