'OOM when reading data to Pandas from MongoDB using pymongo client
I have (900k, 300) records on mongo collection.
When i am trying to read the data to pandas the memory consumption increase dramatically till the process is Killed.
I have to mention that the data is fit to memory(1.5GB~) if i am reading it from csv file.
My machine is 32GB RAM and 16 CPU's Centos 7.
My simple code:
client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = pd.DataFrame(list(cursor))
My multiprocessing code:
def read_mongo_parallel(skipses):
print("Starting process")
client = MongoClient(skipses[4],skipses[5])
db = client[skipses[2]]
collection = db[skipses[3]]
print("range of {} to {}".format(skipses[0],skipses[0]+skipses[1]))
cursor = collection.find().skip(skipses[0]).limit(skipses[1])
return list(cursor)
all_lists = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
for rows in executor.map(read_mongo_parallel, skipesess):
all_lists.extend(rows)
df = pd.DataFrame(all_lists)
The memory increase in both methods and kill the kernel,
What i am doing worng?
Solution 1:[1]
The problem is in the list usage when you build the DataFrame.
The cursor is consumed all at once, making a list with 900k dictionaries inside it, which takes a lot of memory.
You can avoid that if you create an empty DataFrame and then pull the documents in batches, a few documents at a time, appending them to the DataFrame.
def batched(cursor, batch_size):
batch = []
for doc in cursor:
batch.append(doc)
if batch and not len(batch) % batch_size:
yield batch
batch = []
if batch: # last documents
yield batch
df = pd.DataFrame()
for batch in batched(cursor, 10000):
df = df.append(batch, ignore_index=True)
10000 seems like a reasonable batch size, but you may want to change it according to your memory constraints: the higher it is, the faster this will end, but also the more memory it will use while running.
UPDATE: Add some benchmark
Note that this approach does not necessary make the query last longer but rather the opposite, as what actually takes time is the process of pulling the documents out of mongodb as dictionaries and allocating them into a list.
Here are some benchmarks with a 300K documents that show how this
approach, with the right batch_size is actually even faster than pulling
the whole cursor into a list:
- The whole cursor into a list
%%time
df = pd.DataFrame(list(db.test.find().limit(300000)))
CPU times: user 35.3 s, sys: 2.14 s, total: 37.5 s Wall time: 37.7 s
batch_size=10000<- FASTEST
%%time
df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 10000):
df = df.append(batch, ignore_index=True)
CPU times: user 29.5 s, sys: 1.23 s, total: 30.7 s Wall time: 30.8 s
batch_size=1000
%%time
df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 1000):
df = df.append(batch, ignore_index=True)
CPU times: user 44.8 s, sys: 2.09 s, total: 46.9 s Wall time: 46.9 s
batch_size=100000
%%time
df = pd.DataFrame()
for batch in batched(db.test.find().limit(300000), 100000):
df = df.append(batch, ignore_index=True)
CPU times: user 34.6 s, sys: 1.15 s, total: 35.8 s Wall time: 36 s
Solution 2:[2]
This test harness creates 900k (albeit small) records and runs fine on my stock laptop. Give it a try.
import pymongo
import pandas as pd
db = pymongo.MongoClient()['mydatabase']
db.mycollection.drop()
operations = []
for i in range(900000):
operations.append(pymongo.InsertOne({'a': i}))
db.mycollection.bulk_write(operations, ordered=False)
cursor = db.mycollection.find({})
df = pd.DataFrame(list(cursor))
print(df.count())
Solution 3:[3]
Load the data in chunks.
Using iterator2dataframes from https://stackoverflow.com/a/39446008/12015722
def iterator2dataframes(iterator, chunk_size: int):
"""Turn an iterator into multiple small pandas.DataFrame
This is a balance between memory and efficiency
"""
records = []
frames = []
for i, record in enumerate(iterator):
records.append(record)
if i % chunk_size == chunk_size - 1:
frames.append(pd.DataFrame(records))
records = []
if records:
frames.append(pd.DataFrame(records))
return pd.concat(frames)
client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = iterator2dataframes(cursor, 1000)
Solution 4:[4]
Just wanted to make y'all aware of pymongoarrow which is officially developed by MongoDB and solves this problem. It can output query results to arrow tables or pandas data frames and is - according to the docs - the preferred way of loading data from mongo into pandas. It sure worked like a charm for me!
Solution 5:[5]
You can try to get data from mongodb in chunk using slice index i.e. get 100000 documents at a time from mongodb. Add documents to dataframe and then fetch next 100000 documents and append the data to dataframe.
client = MongoClient(host,port)
collection = client[db_name][collection_name]
maxrows=905679
for i in range(0, maxrows, 100000):
df2 = df2.iloc[0:0]
if (i+100000<maxrows):
cursor = collection.find()[i:i+100000]
else:
cursor = collection.find()[i:maxrows]
df2= pd.DataFrame(list(cursor))
df.append(df2, ignore_index=True)
Refer below link to know more about slice index in mongodb.
https://api.mongodb.com/python/current/api/pymongo/cursor.html
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
| Solution | Source |
|---|---|
| Solution 1 | |
| Solution 2 | Belly Buster |
| Solution 3 | ckedar |
| Solution 4 | Moritz Wilksch |
| Solution 5 |
