'Problem with implementing the Datasource V2 API for Salesforce

I am trying to implement a custom Spark Datasource for Salesforce by implementing the Spark Datasource V2 interfaces. For querying Salesforce data parallelly I am planning to use the Salesforce Bulk V1 API https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/api_asynch_introduction_bulk_api.htm. I am planning to make it work like this -

  1. The user specifies the options numPartitions and dbtable. Using this, internally I will fetch the recourd counts for that Salesforce object and deduce the chunk size to be used for querying the data using PK chunking. https://developer.salesforce.com/docs/atlas.en-us.api_asynch.meta/api_asynch/asynch_api_code_curl_walkthrough_pk_chunking.htm
  2. The Bulk API is asynchronous. You need to create a Job and then create query batches within the job. If you specify that you want to use PK Chunking along with the chunk size you want then Salesforce autmatically creates the batches for you. You then need to poll for and fetch the results for each batch using a batch-specific URL.
  3. I am planning pass the batch ID to each executor using an InputPartition object. Each executor will then poll for and fetch the results.

I am having trouble deciding how I would go about creating the Bulk job and submitting the batches on the driver node before dispatching the batch ids to the executors. I tried doing this in the implementation of the planInputPartitions method for the Batch interface, but it turns that it is called 2-3 times per each action(show, collect etc.), thus creating unnecessary Bulk jobs.

One potential solution that might work is maintaining a set of hashed user options in the static scope for the implementation of the Batch interface (using a companion object) and only creating the job if it doesnt exist in the set. However, I find this solution to be very clumsy. Also, what happens if a user submits multiple actions on a dataframe. I could maybe also have a TTL for the set entries, but you see how it gets complicated. What would be the ideal way to achieve what I want?

I am aware that there is in existing Salesforce Spark library by Springml, but it doesn't support the authentication types I need and isn't compatible with Spark 3.x.

I am not using the Bulk V2 Salesforce API as it is serial in nature. You submit a query, Salesforce automatically creates the batches, you then need to poll for results and iterate over the batches using batch locators returned in the header.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source