'AWS Lambda Function insert csv to ElasticSearch
So i'm trying to insert some basic .csv files directly from a S3 bucket to elastic Search, each time a .csv will be dropped into the S3 it'll trigger my lambda that will feed the data from the .csv to Elastic Search, here's what i got so far :
import json
import os
import logging
import boto3
from datetime import datetime
import re
import csv
from aws_requests_auth.aws_auth import AWSRequestsAuth
from elasticsearch import RequestsHttpConnection, helpers, Elasticsearch
from core_libs.dynamodbtypescasters import DynamodbTypesCaster
from core_libs.streams import EsStream, EsClient
credentials = boto3.Session().get_credentials()
AWS_REGION = 'eu-west-3'
HOST = MY_PERSONAL_COMPANY_HOST
ES_SERVER = f"https://{HOST}"
AWS_ACCESS_KEY = credentials.access_key
AWS_SECRET_ACCESS_KEY = credentials.secret_key
AWS_SESSION_TOKEN = credentials.token
s3 = boto3.client('s3')
def lambda_handler(event, context):
awsauth = AWSRequestsAuth(
aws_access_key=AWS_ACCESS_KEY,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
aws_token=AWS_SESSION_TOKEN,
aws_host=HOST,
aws_region=AWS_REGION,
aws_service='es',
)
BUCKET_NAME = record['s3']['bucket']['name']
SOURCE_PATH = record['s3']['object']['key']
SOURCE_FILE = SOURCE_PATH.split('/')[-1]
obj = s3.get_object(Bucket=BUCKET_NAME, Key=SOURCE_PATH)
body = obj['Body'].read()
lines = body.splitlines()
for line in lines:
print(line)
And this is where i'm stuck. don't know if i should use the bulk API and if i can just insert a json version of my .csv as it is, nor how to do so
Solution 1:[1]
Not really an answer on you specific question.
However I am wondering why you are not using an SQS setup with Filebeat (recommended/out of the box functionality) and use an ingest pipeline with CSV processor?
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | superstienos |
