'Using Tabula to pull tables out pdf
We have standard reports uploaded as PDFs on a daily basis. In the PDFs are some tables that we want to pull into datasets. I have tabula imported in code repositories but I can't seem to get code repositories to bring in the PDF.
I recieve this error:
"Error loading inputs"
"{"message":"Unable to resolve alias","alias":"/US Office/COO/pdf_tests/test.pdf","fallbackBranches":["master"]}"
And this is the very basic code I am using:
from transforms.api import transform_df, Input, Output
import tabula
@transform_df(
Output("/US Office/COO/pdf_tests/datasets/pdf_read"),
source_df=Input("/US Office/COO/pdf_tests/test.pdf"),
)
def compute(source_df):
df = source_df
df = tabula.read_pdf(df, pages='all')
return df
I can easily make tabula output a csv with my local python install just not in Foundry. Any help you can give would be great as I am very new to Palantir foundry and code repositories.
Thank You!
Solution 1:[1]
First things first, Thank you to fmsf for getting me goin in the right direction.
Next, The Answer:
I was able to get it to work once I got rid of the .read() in the result variable. Tabula want to see a path(which is hard in this situation) or a Python file like object which is what the .open returns. below is the code that works and will hopefully help to get others started.
from transforms.api import transform, Input, Output
import tabula
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd
@transform( #note that this is @transform and not @transform_df
out=Output("/US Office/COO/pdf_tests/outdata"), # The output datasheet path
raw=Input("/US Office/COO/pdf_tests/datasets/dataset_pdf"), # path to the input raw datasheet that contains the PDF
)
def compute(ctx, raw, out):
rows = []
result = [] # define results list
fs = raw.filesystem() # variable to make accsessing the datasheet filesystem easy
latest_file = 'test.pdf' # the name of the pdf inside the dataset
df_schema = StructType([ # defining the spark schema
StructField("system", StringType(), True),
StructField("status", StringType(), True),
StructField('date', StringType(), True),
StructField('user', StringType(), True),
StructField('id', StringType(), True)
])
with fs.open(latest_file, mode='rb') as f: # with statement that opens the pdf as f. rb essential as it has to be opened as a raw binary file
result = tabula.read_pdf(f, pages='all', multiple_tables=True) # basic tabula read function that generates a list of tables within another list.
rows.append(result[0]) # adds the list of tables into the rows list
df = pd.DataFrame(rows[0]) # Creates a pandas dataframe using the first table in the pdf
df = ctx.spark_session.createDataFrame(df, schema=df_schema) # casts the pandas df to a pyspark df using the defined schema
out.write_dataframe(df) # writes the pyspark dataframe to the ouput dataset
Solution 2:[2]
The part that you are missing is the way to ingest and parse the PDF. Lets go step by step:
Your Input can only consume datasets and they need to have at least one valid transaction on the fallbackBranch (normally master)
You can achieve this by using data connection to ingest your dataset, alternatively you can create a dataset manually, and drop your pdf into its contents.
Then the file will show up, like in this example of an ingest from a public piece of data:
Since this is now a dataset you can read the contents from the Foundry filesystem within your transform. I guess it will look something like this, note the @transform instead of @transform_df:
from transforms.api import transform, Input, Output
from pyspark.sql import Row
@transform(
out=Output("/US Office/COO/pdf_tests/datasets/pdf_read"),
source_df=Input("/US Office/COO/pdf_tests/pdf_staset"),
)
def compute(ctx, source_df, out):
rows = []
with source_df.filesystem().open(latest_file.path, 'rb') as f:
result = tabula.read_pdf(f.read(), pages='all')
# add your logic to populate rows here:
rows.append(Row({
"col_a": "a",
"col_b": "row",
"col_c": "some values"
}))
schema = StructType([
StructField('col_a', StringType()),
StructField('col_b', StringType()),
StructField('col_c', StringType()),
])
return out.write_dataframe(ctx.spark_session.createDataFrame(rows, schema))
I didn't test the code above.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | Connor |
| Solution 2 |

