'Creating DataFrame from ElasticSearch Results

I am trying to build a DataFrame in pandas, using the results of a very basic query to Elasticsearch. I am getting the Data I need, but its a matter of slicing the results in a way to build the proper data frame. I really only care about getting the timestamp, and path, of each result. I have tried a few different es.search patterns.

Code:

from datetime import datetime
from elasticsearch import Elasticsearch
from pandas import DataFrame, Series
import pandas as pd
import matplotlib.pyplot as plt
es = Elasticsearch(host="192.168.121.252")
res = es.search(index="_all", doc_type='logs', body={"query": {"match_all": {}}}, size=2, fields=('path','@timestamp'))

This gives 4 chunks of data. [u'hits', u'_shards', u'took', u'timed_out']. My results are inside the hits.

res['hits']['hits']
Out[47]: 
[{u'_id': u'a1XHMhdHQB2uV7oq6dUldg',
  u'_index': u'logstash-2014.08.07',
  u'_score': 1.0,
  u'_type': u'logs',
  u'fields': {u'@timestamp': u'2014-08-07T12:36:00.086Z',
   u'path': u'app2.log'}},
 {u'_id': u'TcBvro_1QMqF4ORC-XlAPQ',
  u'_index': u'logstash-2014.08.07',
  u'_score': 1.0,
  u'_type': u'logs',
  u'fields': {u'@timestamp': u'2014-08-07T12:36:00.200Z',
   u'path': u'app1.log'}}]

The only things I care about, are getting the timestamp, and path for each hit.

res['hits']['hits'][0]['fields']
Out[48]: 
{u'@timestamp': u'2014-08-07T12:36:00.086Z',
 u'path': u'app1.log'}

I can not for the life of me figure out who to get that result, into a dataframe in pandas. So for the 2 results I have returned, I would expect a dataframe like.

   timestamp                   path
0  2014-08-07T12:36:00.086Z    app1.log
1  2014-08-07T12:36:00.200Z    app2.log


Solution 1:[1]

Or you could use the json_normalize function of pandas :

from pandas.io.json import json_normalize
df = json_normalize(res['hits']['hits'])

And then filtering the result dataframe by column names

Solution 2:[2]

Better yet, you can use the fantastic pandasticsearch library:

from elasticsearch import Elasticsearch
es = Elasticsearch('http://localhost:9200')
result_dict = es.search(index="recruit", body={"query": {"match_all": {}}})

from pandasticsearch import Select
pandas_df = Select.from_dict(result_dict).to_pandas()

Solution 3:[3]

I tested all the answers for performance and I found that the pandasticsearch approach is the fastest by a large margin:

tests:

test1 (using from_dict)

%timeit -r 2 -n 5 teste1(resp)

10.5 s ± 247 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)

test2 (using a list)

%timeit -r 2 -n 5 teste2(resp)

2.05 s ± 8.17 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)

test3 (using import pandasticsearch as pdes)

%timeit -r 2 -n 5 teste3(resp)

39.2 ms ± 5.89 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)

test4 (using from pandas.io.json import json_normalize)

%timeit -r 2 -n 5 teste4(resp)

387 ms ± 19 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)

I hope its can be usefull for anyone

CODE:

index = 'teste_85'
    size = 10000
    fields = True
    sort = ['col1','desc']
    query = 'teste'
    range_gte = '2016-01-01'
    range_lte = 'now'
    resp = esc.search(index = index,
                        size = size,
                        scroll = '2m',
                        _source = fields,
                        doc_type = '_doc',
                        body = {
                            "sort" : { "{0}".format(sort[0]) : {"order" : "{0}".format(sort[1])}},
                            "query": {
                                    "bool": {
                                    "must": [
                                        { "query_string": { "query": "{0}".format(query) } },
                                        { "range": { "anomes": { "gte": "{0}".format(range_gte), "lte": "{0}".format(range_lte) } } },
                                    ]
                                    }
                                }
                                })

    def teste1(resp):
        df = pd.DataFrame(columns=list(resp['hits']['hits'][0]['_source'].keys()))
        for hit in resp['hits']['hits']:
            df = df.append(df.from_dict(hit['_source'], orient='index').T)
        return df

    def teste2(resp):
        col=list(resp['hits']['hits'][0]['_source'].keys())
        for hit in resp['hits']['hits']:
            df = pd.DataFrame(list(hit['_source'].values()), col).T
        return df

    def teste3(resp):
        df = pdes.Select.from_dict(resp).to_pandas()
        return df

    def teste4(resp):
        df = json_normalize(resp['hits']['hits'])
        return df

Solution 4:[4]

If your request is likely to return more than 10,000 documents from Elasticsearch, you will need to use the scrolling function of Elasticsearch. Documentation and examples for this function are rather difficult to find, so I will provide you with a full, working example:

import pandas as pd
from elasticsearch import Elasticsearch
import elasticsearch.helpers


es = Elasticsearch('http://localhost:9200')

body={"query": {"match_all": {}}}
results = elasticsearch.helpers.scan(es, query=body, index="my_index")
df = pd.DataFrame.from_dict([document['_source'] for document in results])

Simply edit the fields that start with "my_" to correspond to your own values

Solution 5:[5]

For anyone that encounters this question as well.. @CT Zhu has a nice answer, but I think it is a bit outdated. but when you are using the elasticsearch_dsl package. The result is a bit different. Try this in that case:

# Obtain the results..
res = es_dsl.Search(using=con, index='_all')
res_content = res[0:100].execute()
# convert it to a list of dicts, by using the .to_dict() function
res_filtered = [x['_source'].to_dict() for x in res_content['hits']['hits']]

# Pass this on to the 'from_dict' function
A = pd.DataFrame.from_dict(res_filtered)

Solution 6:[6]

With elasticsearch_dsl you can search documents, get them by id, etc.

For instance

from elasticsearch_dsl import Document

# retrieve document whose _id is in the list of ids
s = Document.mget(ids,using=es_connection,index=myindex)

or

from elasticsearch_dsl import Search

# get (up to) 100 documents from a given index
s = Search(using=es_connection,index=myindex).extra(size=100)

then, in case you want to create a DataFrame and use the elasticsearch ids in your dataframe index, you can do as follows:

df = pd.DataFrame([{'id':r.meta.id, **r.to_dict()} 
                            for r 
                            in s.execute()]).set_index('id',drop=True)

Solution 7:[7]

Here's a bit of code you might find useful for your work. It's simple, and extendible, but has been saving me a lot of time when faced with just "grabbing" some data from ElasticSearch to analyze.

If you just want to grab all the data of a given index and doc_type of your localhost you can do:

df = ElasticCom(index='index', doc_type='doc_type').search_and_export_to_df()

You can use any of the arguments you'd usually use in elasticsearch.search(), or specify a different host. You can also choose whether to include the _id or not, and specify whether the data is in '_source' or 'fields' (it tries to guess). It also tries to convert the field values by default (but you can switch that off).

Here's the code:

from elasticsearch import Elasticsearch
import pandas as pd


class ElasticCom(object):

    def __init__(self, index, doc_type, hosts='localhost:9200', **kwargs):
        self.index = index
        self.doc_type = doc_type
        self.es = Elasticsearch(hosts=hosts, **kwargs)

    def search_and_export_to_dict(self, *args, **kwargs):
        _id = kwargs.pop('_id', True)
        data_key = kwargs.pop('data_key', kwargs.get('fields')) or '_source'
        kwargs = dict({'index': self.index, 'doc_type': self.doc_type}, **kwargs)
        if kwargs.get('size', None) is None:
            kwargs['size'] = 1
            t = self.es.search(*args, **kwargs)
            kwargs['size'] = t['hits']['total']

        return get_search_hits(self.es.search(*args, **kwargs), _id=_id, data_key=data_key)

    def search_and_export_to_df(self, *args, **kwargs):
        convert_numeric = kwargs.pop('convert_numeric', True)
        convert_dates = kwargs.pop('convert_dates', 'coerce')
        df = pd.DataFrame(self.search_and_export_to_dict(*args, **kwargs))
        if convert_numeric:
            df = df.convert_objects(convert_numeric=convert_numeric, copy=True)
        if convert_dates:
            df = df.convert_objects(convert_dates=convert_dates, copy=True)
        return df

def get_search_hits(es_response, _id=True, data_key=None):
    response_hits = es_response['hits']['hits']
    if len(response_hits) > 0:
        if data_key is None:
            for hit in response_hits:
                if '_source' in hit.keys():
                    data_key = '_source'
                    break
                elif 'fields' in hit.keys():
                    data_key = 'fields'
                    break
            if data_key is None:
                raise ValueError("Neither _source nor fields were in response hits")

        if _id is False:
            return [x.get(data_key, None) for x in response_hits]
        else:
            return [dict(_id=x['_id'], **x.get(data_key, {})) for x in response_hits]
    else:
        return []

Solution 8:[8]

redata = map(lambda x:x['_source'], res['hits']['hits'])
pd.DataFrame(redata)

if I just use pandas module, it will be the best solution. in my case, these code cost 20.5ms

if use the pandas.io.json.json_normalize(res['hits']['hits']) , it will cost 291ms, and the result is different.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Brown nightingale
Solution 2 Phil B
Solution 3 Rami
Solution 4 Phil B
Solution 5 zwep
Solution 6 HerrIvan
Solution 7
Solution 8 demented hedgehog