'Joining rows from two dataframes with the closest point

Hi I am kinda new to spark and I am not sure how to approach this.

I have 2 tables (way smaller for easier explanation):

A:Weather Data

B:travel data

I need to join these tables by finding the closest station when the trip started in the same date and do the same when the trip ended. so at the end I have all the weather data from the station at the time the trip started and when the trip finished, and just one row for each trip with the data from the closest weather station.

i have done something similar with geopandas and udf but it was way easier because i was looking for an interception. like this:

def find_state_gps(lat, long):
df = gdf_states.apply(lambda x: x["NAME"] if x["geometry"].intersects(Point(long,lat)) else None, axis =1)
idx = df.first_valid_index()
value = df.loc[idx] if idx is not None else "Not in USA territory"
return(value)

state_gps = udf(find_state_gps, StringType())

I am not sure how to handle the logic this time.

i also tried doing this query with no luck.

query = "SELECT STATION,\
    NAME,\
    LATITUDE,\
    LONGITUDE,\
    AWND,\
    p.id_trip,\
    p.Latitude,\
    p.Longitude,\
    p.startDate,\
      Abs(p.latitude-LATITUDE)**2 + Abs(p.Longitude-LONGITUDE)**2\
      AS dd\
FROM df2\
CROSS JOIN (\
SELECT id AS id_trip,\
        station_id,\
        Latitude,\
        Longitude,\
        startDate\
 FROM df1\
) AS p ON 1=1\
 ORDER BY dd"

and got the following error: ParseException: mismatched input '2' expecting {, ';'}(line 1, pos 189)

At the end i want something like this without repeated trips.

id started_date finish_date finished weather_station_start weather_station_end more columns about weather for starting and ending trip locations
1 bim baz bim baz bim bim
2 bim baz bim baz bim bim

I really appreciate your help guys.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source