'Why does APScheduler SQLAlchemyJobStore cannot pickle the job but the default MemoryJobStore can?

I have a job that involves sqlite database operation and I use the app scheduler do it for me in the background. It is working fine if it uses the default memory job store but if it uses the SQLAlchemyJobStore, it throws the following error:

  File "/usr/lib/python3.8/site-packages/apscheduler/jobstores/sqlalchemy.py", line 95, in add_job
    'job_state': pickle.dumps(job.__getstate__(), self.pickle_protocol)
_pickle.PicklingError: Can't pickle <class 'sqlalchemy.orm.decl_api.Base'>: attribute lookup Base on sqlalchemy.orm.decl_api failed

here is how I initialise the scheduler

class DataManager:
    
    def __init__(self, periodic_refresh_db = False):
        self.data_hdl = DataHandler()
        self.db = Database(Base)
        self.scheduler = BackgroundScheduler(
            jobstores={
                'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')})
        self.refresh_interval_in_hr = 1
        self.scheduler.start()
        if periodic_refresh_db:
            self.start_periodic_refresh_db()
    
    def refresh_db(self):
        self.data_hdl.fetch_data()
        self.db.delete_unoccupied_nft()
        
        db_ready_data = self.data_hdl.db_ready_data
        db_ready_data = db_ready_data[~db_ready_data["slug"].isin(self.db.remain_slugs)]
        
        db_ready_data.to_sql(
            name = NftCollection.__tablename__,
            con = self.db.engine,
            if_exists="append",
            index = False
        )

NftCollection is an orm object, which is the following:

class NftCollection(Base):

    __tablename__ = "nft_collection"
    __table_args__ = {'sqlite_autoincrement': True}

    id = Column(Integer, primary_key=True, nullable=False)
    slug = Column(String(30), nullable=False)
    name = Column(String, nullable=False)
    floor = Column(Float, nullable=False, default=0.0)
    vol_1day = Column(Integer, nullable=False, default=0.0)
    vol_7day = Column(Integer, nullable=False, default=0.0)
    occupier = Column(String(100), nullable=True)
    occupy_min = Column(Integer, nullable=True)
    occupy_until = Column(DateTime, nullable=True)
    lock = Column(Boolean, default=False)

start_periodic_refresh_db involves database operation, it is written as the following

def start_periodic_refresh_db(self):
        self.scheduler.add_job(
            func=self.refresh_db,
            trigger="interval",
            hours= self.refresh_interval_in_hr,
            id = self.refresh_db.__name__,
            replace_existing = True,
            max_instances = 1,
            misfire_grace_time = None,
            name = "refresh database by crawling",
            next_run_time = datetime.now()
        )
        print(f"refresh_db scheduled periodically, next run time: {self.next_refresh_time}")

the above code will not work unless removing the jobstores argument, i.e. using the default MemoryJobStore

Why is it? and how can I make the sql jobs store behave as expected? Thank you



Solution 1:[1]

This is because MemoryJobStore does not pickle anything. Quote from the documentation:

MemoryJobStore stores jobs in memory as-is, without serializing them. This allows you to schedule callables that are unreachable globally and use job non-serializable job arguments.

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Alex Grönholm