'Websockets run client side and server side in 1 script with asyncio

I have 2 Python scripts running currently, 1 is client side script and 1 is server side script. They run perfectly ok. But, I want to combine both into 1 script to optimize data transfer. Below are the scripts.

Run as Client Side

async def init_wss_as_client():
    global SERVER
    try:
        async with websockets.connect(URI) as websocket:
            while True:
                response = await websocket.recv()
                ...
    finally:
        ...

SERVER = Server(DEVICE_ID)
asyncio.run(init_wss_as_client())

Run as Server Side

async def init_wss_as_server(websocket, path):
    try:
        async for message in websocket:
            ...
    finally:
        ...

SESSION = Session(DEVICE_ID)
start_server = websockets.serve(init_wss_as_server, "0.0.0.0", 8443)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

Below are my attempts.

  1. Just combine 2 scripts

    SERVER = Server(DEVICE_ID)
    asyncio.run(init_wss_as_client())
    SESSION = Session(DEVICE_ID)
    start_server = websockets.serve(init_wss_as_server, "0.0.0.0", 8443)
    asyncio.get_event_loop().run_until_complete(start_server)
    asyncio.get_event_loop().run_forever()
    

    This only execute the first function

  2. Merge 2 into 1 with run_forever by referring to this post.

    loop = asyncio.new_event_loop()
    loop.run_until_complete(init_wss_as_client())
    loop.run_until_complete(websockets.serve(init_wss_as_server, "0.0.0.0", 8443))
    loop.run_forever()
    

    Only the first function executed too

  3. I think 1 script can handle client and server side together if this post is correct.

Any suggestions? Thanks.



Solution 1:[1]

To look at the asyncio and websockets's documentations, one should use create_task and one should use run_until_complete. Below is the code it works in my case.

loop = asyncio.get_event_loop()
loop.create_task(init_wss_as_client())
loop.run_until_complete(websockets.serve(init_wss_as_server, "0.0.0.0", 8443))
loop.run_forever()

Solution 2:[2]

Python 3.10.2 is unsupported for Kivy right now, so install Python 3.9(Download Python 3.9.4) and then type the command pip install Kivy==2.0.0. I'm using Python 3.9.4 with Kivy and it works fine

Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source
Solution 1 Pak Ho Cheung
Solution 2 Lakshmi Vallabh