'Detect if hazard lights are on using python to control SK6812 LEDs

In our travel trailer, I have an ESP32 device that monitors the inputs from the 7 pin trailer harness. The 7 pin trailer harness is used by the tow vehicle to turn on/off various lights, charge the battery, and control the trailer brakes. The ESP32 sends MQTT messages topics based on the state of each pin. For example, when the left turn or right turn signal & brake light is on, I'll get:

Topic: harness/left_turn
Payload: True/False

Topic: harness/right_turn
Payload: True/False

In my python3 code, I have a class that collects these states:

self.left_turn
self.right_turn

When both left_turn and right_turn are on, then it's assumed that the brakes are on. I did not connect the brake control pin to the ESP32 as I do not want to mess with the brakes at all.

I already have a method to detect if the brake lights are on and sets:

self.brake_lights_state  #  bool

The problem I'm having now is to detect if the hazard lights are on, that is if 'self.brake_lights_state' is changing at a regular cadence.

Assuming the hazard lights flash on for 400ms and off for 400ms, how would I detect this? I want to prevent false positives from tapping the brakes. For example when backing up the travel trailer, it's common to feather the brakes/gas a bit. So, I'd only want the hazards to turn on say after it detects the lights flashing 3 or 4 times within that nearly exact 400ms time interval. I say nearly exact, because sometimes for some weird reason, every 5th blink the tow vehicle has a speed of 400ms on, and 325ms speed off.

I'm completely over my head. I'm guessing I could use deque collection and time() somehow to track when the brake_lights_state changes.

Does anyone have any ideas on where to get started? I'm running this all within a thread and use the run() method when the trailer is being towed.

def run(self):
    """
    Called when the thread is running.
    Monitors the 7 pin harness state and updates the SK6812 leds as needed.
    """
    while not self.die:
        if self.is_driving_state:
            if self.brake_lights_state:
                self.do_brake()
            elif self.right_blinker_state:
                self.do_right_blinker()
            elif self.left_blinker_state:
                self.do_right_blinker()

            if self.reverse_lights_state:
                self.do_reverse()
            if self.marker_lights_state:
                self.do_marker()
            sleep(0.05)
        else:
            sleep(1)

def do_brake(self):
    # check if hazard lights are on first... 
    # Then call self.do_hazard(), else do this:
    if self.brake_lights_state:
        self.set_lights(self.leds['rear_right_blinker'], BRAKE_COLOR)
        self.set_lights(self.leds['rear_left_blinker'], BRAKE_COLOR)
    else:
        self.set_lights(self.leds['rear_right_blinker'], OFF_COLOR)
        self.set_lights(self.leds['rear_left_blinker'], OFF_COLOR)


Solution 1:[1]

I'm not a python developer, but this is how I managed to get it working. It's not graceful, but it works.

from time import time, sleep
from collections import deque

BLINKER_ON_TIME = [350, 400]   # Range of time the blinker can be on
BLINKER_OFF_TIME = [350, 400]  # Range of time the blinker can be off

HAZARD_MIN_CYCLE_COUNT = 5

MAX_BRAKE_HISTORY_TIME = ((((BLINKER_ON_TIME[1]+5)*HAZARD_MIN_CYCLE_COUNT)+((BLINKER_OFF_TIME[1]+5)*HAZARD_MIN_CYCLE_COUNT)) - BLINKER_ON_TIME[0]) / 1000
MIN_BRAKE_HISTORY_TIME = ((((BLINKER_ON_TIME[0])*HAZARD_MIN_CYCLE_COUNT)+((BLINKER_OFF_TIME[0])*HAZARD_MIN_CYCLE_COUNT)) - BLINKER_ON_TIME[1])/ 1000

brake_history = deque([], maxlen=HAZARD_MIN_CYCLE_COUNT*2)

brake = False
previous_brake = False
hazard = False

timer = time()
change_timeout = .400

while True:
    # Simulate changing the brakes on/off after every 'change_timeout'
    if time() - timer > change_timeout:
        timer = time()
        brake = not brake

    # If the brake state has changed, lets check if hazards should be on/off
    if brake != previous_brake:
        brake_history.append([time(), brake])
        previous_brake = brake
        if len(brake_history) == HAZARD_MIN_CYCLE_COUNT*2:  # must have some data.

            if MIN_BRAKE_HISTORY_TIME < time() - brake_history[0][0] < MAX_BRAKE_HISTORY_TIME:
                if hazard is False:
                    print(f"{time()}: hazard on....")
                hazard = True
                change_timeout = .300  # Lets simulate the timing being way off, to clear the hazard state.
            else:
                if hazard is True:
                    print(f"{time()}: hazard off....")
                hazard = False
                brake_history.clear()
                change_timeout = .375

    sleep(0.05)

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 tazzytazzy