'BulkIndexError: ('2 document(s) failed to index.') - Elasticsearch + Python
At first I found some null values in my preprocessed data, so removed those. (Here's my Data Cleaning Code - and the respective outputs enclosed in '''Comments''')
Cleaning and Preprocessing
df_merged[df_merged.abstract_x != df_merged.abstract_y].shape
#this means out of the 25000 samples, abstract is not matching between metadata and pdf data
'''(22728, 22)'''
# check metadata abstract column to see if null values exist
df_merged.abstract_x.isnull().sum()
'''3363'''
# Check pdf_json abstract to see if null values exist
df_merged.abstract_y.isnull().sum()
'''0'''
#Since the abstract_x from metadata is more reliable , we will use it but only fill by abstract_y text when abstract_x value is null
# Convert all columns to string and then replace abstract_y values
#df = df.astype(str)
df_merged['abstract_y'] = df_merged['abstract_y'].astype(str)
df_merged['abstract_y'] = np.where(df_merged['abstract_y'].map(len) > 50, df_merged['abstract_y'], 'na')
df_merged.loc[df_merged.abstract_x.isnull() & (df_merged.abstract_y != 'na'), 'abstract_x'] = df_merged[df_merged.abstract_x.isnull() & (df_merged.abstract_y != 'na')].abstract_y #we want to overwrite the abstract_x column and abstract_y has to be not na
df_merged.abstract_x.isnull().sum()
'''
2745
'''
df_merged.rename(columns={'abstract_x': 'abstract'}, inplace=True)
df_merged.columns
'''
Index(['cord_uid', 'sha', 'source_x', 'title', 'doi', 'pmcid', 'pubmed_id',
'license', 'abstract', 'publish_time', 'authors', 'journal', 'mag_id',
'who_covidence_id', 'arxiv_id', 'pdf_json_files', 'pmc_json_files',
'url', 's2_id', 'abstract_y', 'body_text_x', 'body_text_y'],
dtype='object')
'''
df_merged = df_merged.drop(['abstract_y'], axis=1)
df_merged.columns
'''
Index(['cord_uid', 'sha', 'source_x', 'title', 'doi', 'pmcid', 'pubmed_id',
'license', 'abstract', 'publish_time', 'authors', 'journal', 'mag_id',
'who_covidence_id', 'arxiv_id', 'pdf_json_files', 'pmc_json_files',
'url', 's2_id', 'body_text_x', 'body_text_y'],
dtype='object')
'''
(df_merged.body_text_x != df_merged.body_text_y).sum()
'''25000'''
df_merged.body_text_x.isnull().sum()
'''1526'''
df_merged.body_text_y.isnull().sum()
'''5238'''
df_merged[df_merged.body_text_x.isnull() & df_merged.body_text_y.notnull()].shape
'''(1447, 21)'''
#when the body_text_y is not null, we'll be putting, bodytext y into x
df_merged.loc[df_merged.body_text_y.notnull(), 'body_text_x'] = df_merged.loc[df_merged.body_text_y.notnull(), 'body_text_y']
df_merged.body_text_x.isnull().sum()
'''79'''
df_merged.columns
'''
Index(['cord_uid', 'sha', 'source_x', 'title', 'doi', 'pmcid', 'pubmed_id',
'license', 'abstract', 'publish_time', 'authors', 'journal', 'mag_id',
'who_covidence_id', 'arxiv_id', 'pdf_json_files', 'pmc_json_files',
'url', 's2_id', 'body_text_x', 'body_text_y'],
dtype='object')
'''
df_merged.rename(columns={'body_text_x': 'body_text'}, inplace=True)
df_merged = df_merged.drop(['body_text_y'], axis=1)
df_merged.columns
'''
Index(['cord_uid', 'sha', 'source_x', 'title', 'doi', 'pmcid', 'pubmed_id',
'license', 'abstract', 'publish_time', 'authors', 'journal', 'mag_id',
'who_covidence_id', 'arxiv_id', 'pdf_json_files', 'pmc_json_files',
'url', 's2_id', 'body_text'],
dtype='object')
'''
df_final = df_merged[['sha', 'title', 'abstract', 'publish_time', 'authors', 'url', 'body_text']]
df_final.head()
sha title abstract publish_time authors url body_text
0 1cbf95a2c3a39e5cc80a5c4c6dbcec7cc718fd59 Genomic Evolution of Severe Acute Respiratory ... Abstract Recent emergence of severe acute resp... 2020-08-31 Jacob, Jobin John; Vasudevan, Karthick; Veerar... https://api.elsevier.com/content/article/pii/S... The outbreak of severe acute respiratory syndr...
1 7dc6943ca46a1093ece2594002d61efdf9f51f28 Impact of COVID-19 on COPD and Asthma admissio... Asthma and Chronic Obstructive Pulmonary Disea... 2020-12-10 Sykes, Dominic L; Faruqi, Shoaib; Holdsworth, ... https://www.ncbi.nlm.nih.gov/pubmed/33575313/;... The COVID-19 pandemic has led to an overall re...
2 5b127336f68f3dca83981d0142eda472634378f0 Programmable System of Cas13-Mediated RNA Modi... Clustered regularly interspaced short palindro... 2021-07-27 Tang, Tian; Han, Yingli; Wang, Yuran; Huang, H... https://www.ncbi.nlm.nih.gov/pubmed/34386490/;... Prokaryotic clustered regularly interspaced sh...
3 aafbe282248436380dd737bae844725882df2249 Are You Tired of Working amid the Pandemic? Th... With the outbreak of novel coronavirus in 2019... 2020-12-09 Chen, Huaruo; Liu, Fan; Pang, Liman; Liu, Fei;... https://doi.org/10.3390/ijerph17249188; https:... In the outbreak of novel coronavirus pneumonia...
4 4013a7e351c40d2bb7fdfe7f185d2ef9b1a872e6 Viral Sepsis in Children Sepsis in children is typically presumed to be... 2018-09-18 Gupta, Neha; Richter, Robert; Robert, Stephen;... https://www.ncbi.nlm.nih.gov/pubmed/30280095/;... The true incidence of viral sepsis, particular...
df_final = df_final.dropna(axis=0,subset=['abstract', 'body_text'])
df_final.isnull().sum()
'''
sha 0
title 0
abstract 0
publish_time 0
authors 104
url 0
body_text 0
dtype: int64
'''
df_final.shape
'''(22186, 7)'''
df_final.to_csv('FINAL_CORD_DATA.csv', index=False)
''')
Whenever I try to use the Sample Dataset that I created, in my es_populate notebook, using the sparse retriever, I keep getting
BulkIndexError Traceback (most recent call last)
~\AppData\Local\Temp/ipykernel_19912/2533749049.py in <module>
----> 1 document_store.write_documents(final_dicts)
~\anaconda3\lib\site-packages\haystack\document_store\elasticsearch.py in write_documents(self, documents, index, batch_size, duplicate_documents)
426 # Pass batch_size number of documents to bulk
427 if len(documents_to_index) % batch_size == 0:
--> 428 bulk(self.client, documents_to_index, request_timeout=300, refresh=self.refresh_type)
429 documents_to_index = []
430
~\anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in bulk(client, actions, stats_only, *args, **kwargs)
388 # make streaming_bulk yield successful results so we can count them
389 kwargs["yield_ok"] = True
--> 390 for ok, item in streaming_bulk(client, actions, *args, **kwargs):
391 # go through request-response pairs and detect failures
392 if not ok:
~\anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in streaming_bulk(client, actions, chunk_size, max_chunk_bytes, raise_on_error, expand_action_callback, raise_on_exception, max_retries, initial_backoff, max_backoff, yield_ok, *args, **kwargs)
309
310 try:
--> 311 for data, (ok, info) in zip(
312 bulk_data,
313 _process_bulk_chunk(
~\anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk(client, bulk_actions, bulk_data, raise_on_exception, raise_on_error, *args, **kwargs)
245 resp=resp, bulk_data=bulk_data, raise_on_error=raise_on_error
246 )
--> 247 for item in gen:
248 yield item
249
~\anaconda3\lib\site-packages\elasticsearch\helpers\actions.py in _process_bulk_chunk_success(resp, bulk_data, raise_on_error)
186
187 if errors:
--> 188 raise BulkIndexError("%i document(s) failed to index." % len(errors), errors)
189
190
BulkIndexError: ('2 document(s) failed to index.', [{'index': {'_index': 'document', '_type': '_doc', '_id': '9d04e1c37a299818d82416898ffe22d6', 'status': 400, 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'json_parse_exception', 'reason': "Non-standard token 'NaN': enable JsonParser.Feature.ALLOW_NON_NUMERIC_NUMBERS to allow\n at [Source: (ByteArrayInputStream); line: 1, column: 217076]"}}, 'data': {'text': 'Increase
My method of using the document store was.
# Connect to Elasticsearch
from haystack.document_store import ElasticsearchDocumentStore
document_store = ElasticsearchDocumentStore(host="localhost", username="", password="", index="document")
C:\Users\manan\anaconda3\lib\site-packages\elasticsearch\connection\base.py:190: ElasticsearchDeprecationWarning: Elasticsearch built-in security features are not enabled. Without authentication, your cluster could be accessible to anyone. See https://www.elastic.co/guide/en/elasticsearch/reference/7.17/security-minimal-setup.html to enable security.
warnings.warn(message, category=ElasticsearchDeprecationWarning)
02/20/2022 00:58:28 - INFO - elasticsearch - HEAD http://localhost:9200/ [status:200 request:0.227s]
02/20/2022 00:58:28 - INFO - elasticsearch - HEAD http://localhost:9200/document [status:200 request:0.015s]
02/20/2022 00:58:28 - INFO - elasticsearch - GET http://localhost:9200/document [status:200 request:0.011s]
02/20/2022 00:58:28 - INFO - elasticsearch - PUT http://localhost:9200/document/_mapping [status:200 request:0.087s]
02/20/2022 00:58:28 - INFO - elasticsearch - HEAD http://localhost:9200/label [status:200 request:0.006s]
document_store.write_documents(final_dicts)
02/20/2022 00:58:34 - INFO - elasticsearch - POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:3.887s]
02/20/2022 00:58:38 - INFO - elasticsearch - POST http://localhost:9200/_bulk?refresh=wait_for [status:200 request:3.464s]
followed by the above error. I'm very new to this, and would appreciate any help that could come my way.
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|
