'How can I connect with OpenSearch repository when using watchdog module in Python code?

If you use the watchdog module, you can even detect events in the local storage and send messages like Slack. The problem is, I want to detect an event that occurs in a remote repository such as OpenSearch where data accumulates and give an alarm. Can you tell me how to connect with OpenSearch using python code? Below is the code that gives a Slack message when an event occurs locally.

import time
import os
import requests
import slack
import json
import pandas as pd
import datetime
from tabulate import tabulate

try:
    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler
except ModuleNotFoundError as e:
    print (e)
    os.system("pip install watchdog")

# ------------------------------------------------
class Handler(FileSystemEventHandler):
    def on_created(self, event): 
        print (f'event type : {event.event_type}\n'

               f'event src_path : {event.src_path}')
        if event.is_directory:
            print ("A folder has been created.")
            send_message_to_slack('A folder has been created.')

            
    def on_deleted(self, event):
        print ("invoke remove")

    def on_moved(self, event):
        print (f'event type : {event.event_type}\n')

def send_message_to_slack(text) : 
    url = 'SLACK CHANNEL URL'
    
    payload = {"text" : text}
    
    requests.post(url, json=payload)


class Watcher:
    def __init__(self, path):
        print ("Monitoring ...")
        self.event_handler = None      # Handler
        self.observer = Observer()     
        self.target_directory = path   
        self.currentDirectorySetting() 

    def currentDirectorySetting(self):
        print ("====================================")
        print ("Current dir:  ", end=" ")
        os.chdir(self.target_directory)
        print ("{cwd}".format(cwd = os.getcwd()))
        print ("====================================")

    # func (2)
    def run(self):
        self.event_handler = Handler()
        self.observer.schedule(
            self.event_handler,
            self.target_directory,
            recursive=False
        )

        self.observer.start() 
        try:
            while True:
                time.sleep(1) 
        except KeyboardInterrupt as e: 
            print ("Stop...")
            self.observer.stop()

myWatcher = Watcher('My local directory address')
myWatcher.run()


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source