'Applying rack awareness on Pyspark Structured Streaming running on Kubernetes and reading from AWS MSK

I have a Pyspark Structured Streaming application in the following setup:
Pyspark - version 3.0.1, running on AWS EKS using the Spark operator.
Kafka - running on AWS MSK with a cluster running Apache Kafka version of 2.8.1 and replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector is configured at the cluster configurations (i.e rack awareness is enabled on cluster side).

The flow:
The application reads from Kafka, performs batch processing in 5 minutes intervals, and writes to Kafka again. Both my MSK cluster and the ASG running the instances of my Spark executors are spread on the same AZ's. I wish to leverage the rack awareness mechanism to allow the Spark executors to read from the closest replica.
I wish to do something like the following:

  1. When spawning new executors on new pods, extract the broker.rack corresponding to the same AZ.
  2. Inject that broker.rack as an environmental variable and initialize the Spark kafka consumer with a client.rack matching that broker.rack parameter.

Is this possible? Or any other solution?



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source