'python multi threading session with msqyl pooling

import requests, pymysql, pymysqlpool
from concurrent.futures import ThreadPoolExecutor, as_completed
from functools import partial
from tenacity import retry, TryAgain, stop_after_attempt

pool = PooledDB(creator = pymysql,
                maxconnections = 0,
                autocommit = True,
                host = 'localhost',
                user = 'someuser',
                passwd = 'somepwd',
                database = 'database')

db = pool.connection()
cur = db.cursor()
dict = {key1: url1,
key2: url2}

@retry(stop=stop_after_attempt(10))
def get_id_list_from_a_website(session, dict_item):
    key, url = dict_item
    sess =  requests.session()

    id_list_return = []
    ...
    scrape website to get id list here
    ...
    if not id_list_return:
        raise TryAgain()
    return id_list_return

def insert_sql(id_after_check, list_of_variable_after_check)
    id = id_after_check
    list_of_variable = list_of_variable_after_check
    if id == 'exist':
        print('key_exist')
    else:
        try:
            sql1 = insert list_of_variable to id
            cur.execute(sql1)
            cur.close()
            db.commit()
            print('successfully insert')
        except:
            print('except')

def get_some_data(session, executor):
    dict_item = dict.items()
    id_list_return = list(executor.map(partial(get_list, session), (dict_item)))
    raw_id_list_return = []

    for i in range(0, len(dict.keys())):
    raw_id_list_return = raw_id_list_return + id_list_return[i]

    futures = {executor.submit(partial(check_stuff, session), stuff_id): stuff_id for stuff_id in raw_list_return}
    for future in as_completed(futures):
        futures[future]
        stuff_id_data = future.result()
        stuff_id = stuff_data_from_check[0]
        stuff_variables = stuff_data_from_check[1:]
        if stuff_variables == 'id_existed'
            print('stuff_id_exist')
        else:
            print('go_insert')
            go_insert = executor.submit(partial(insert_sql, stuff_id), stuff_variables)
            db.close()

@retry(stop=stop_after_attempt(10))
def check_stuff(session, stuff_id):
    stuff_id = stuff_id_from_raw_list
    sql2 = """SELECT * FROM `tbl1` WHERE stuff_id = '"""+str(stuff_id)+"""';"""
    cur.execute(sql2)
    row_count = cur.fetchall()

    
    if row_count == int(0):
       
        sql3= """UPDATE `tbl1` SET latest_search_date='%s' WHERE stuff_id='%s';""" % (search_date, stuff_id)
        cur.execute(sql3)
        cur.close()
        stuff_id_and_variables = [stuff_id, 'stuff_id existed']

    else:
        scape data here and return 'stuff_id_and_variable' 
        if not stuff_id_and_variable:
            raise TryAgain()

    return stuff_id_and_variables

N_THREADS=50
with requests.Session() as session:
    with ThreadPoolExecutor(max_workers=N_THREADS) as executor:
        get_some_data(session, executor)

The problem is: when I check the id in database (execute sql2), few sql2 can be run properly and get the 'stuff_id existed'. However, after some queries, sql2 cannot be run properly even the stuff_id in database already existed and then 'check_stuff()' scrape the data again. So, you can imagine the function 'insert_sql()' will be raised and cause exception...although I can change 'Insert into' to 'Replace into' to finish the data inserting process but I want to get the code without exception and spend less time on duplicate scraping process. Thanks.



Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source