'Parallelizing operations within a ConnectionIO transaction

So I have a program in which I get a list of file paths from a database, delete those files on the filesystem and finally delete the file paths from the database. I put all operations inside a transaction to ensure that the paths would be deleted from the database iff all of the files are deleted in the filesystem.

Something like this

val result = for {
deletePath <- (fr""" select path from files""").query[String].stream //Stream[doobie.ConnectionIO,String]
_ <- Stream.eval(AsyncConnectionIO.liftIO(File(deletePath).delete()) //Stream[doobie.ConnectionIO,Unit]
_ <- Stream.eval(sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys) 
}

result.compile.drain.transact(transactor)

Unfortunately, the file system is distributed which means individual operation is slow but it allows multiple operations at once.

So my question is, how do I parallelize the filesystem deletion operation here?



Solution 1:[1]

Yeah, you can. Just use appropriate combinators instead of the for syntax.

val result =
  (fr""" select path from files""")
    .query[String]
    .stream
    .parEvalMapUnordered(maxConcurrent = 64) { deletePath =>
      AsyncConnectionIO.liftIO(File(deletePath).delete()) >>
      sql"delete from files where path = ${deletePath}".withUniqueGeneratedKeys
    }

result.compile.drain.transact(transactor)

Remember to change the maxConcurrent parameter to something that makes sense for your use case.


(I couldn't test the code so it may have some typos)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Luis Miguel Mejía Suárez