'Error after websocket connection disconnects discord gatewway

I've been trying to write a code that connects to a websocket from the Discord gateway but sometime in the middle I get the following error caught through a logging module:

ERROR:root:exception in send: sent 1011 (unexpected error) keepalive ping timeout; no close frame received

Everything before works completely fine, I send a heartbeat ping to the websocket and it promptly replies with a Opcode 11 ACK as a reply. Usually it works for about like 30ish minutes before it stops and shows the error. I'm not sure how to avoid it nor am I sure how to resolve it whenever I do get the error. At most I have to completely restart the entire code and it would work for another half hour. I'm also confused about how to reconnect when returned with a opcode 7 reconnect. How would I disconnect from the websocket and the reconnect? Would I completely just restart my entire code?

For more information the following is the leading comments left by my logger moments before the error begins. Also to mention, after this happens the while loop in the async function _send_loop stops, which confuses me as it is supposed to raise the exception but since there isn't really any break statement for the loop. Here's what's logged in the logging file :

DEBUG:websockets.client:> TEXT '{"op": 1, "d": null}' [20 bytes]
DEBUG:websockets.client:< TEXT '{"t":null,"s":null,"op":11,"d":null}' [36 bytes]
DEBUG:websockets.client:% sending keepalive ping
DEBUG:websockets.client:> PING 34 df f6 b1 [binary, 4 bytes]
DEBUG:websockets.client:< PONG 34 df f6 b1 [binary, 4 bytes]
DEBUG:websockets.client:% received keepalive pong
DEBUG:websockets.client:% sending keepalive ping
DEBUG:websockets.client:> PING 40 77 e9 64 [binary, 4 bytes]
DEBUG:websockets.client:< PONG 40 77 e9 64 [binary, 4 bytes]
DEBUG:websockets.client:% received keepalive pong
DEBUG:root:pushing msg to send
DEBUG:root:gateway send: {'op': 1, 'd': None}
DEBUG:websockets.client:> TEXT '{"op": 1, "d": null}' [20 bytes]
DEBUG:websockets.client:< TEXT '{"t":null,"s":null,"op":11,"d":null}' [36 bytes]
DEBUG:websockets.client:% sending keepalive ping
DEBUG:websockets.client:> PING 5f ac 1a 82 [binary, 4 bytes]
DEBUG:websockets.client:! timed out waiting for keepalive pong
DEBUG:websockets.client:! failing connection with code 1011
DEBUG:websockets.client:= connection is CLOSING
DEBUG:websockets.client:> CLOSE 1011 (unexpected error) keepalive ping timeout [24 bytes]
DEBUG:websockets.client:= connection is CLOSED
DEBUG:websockets.client:x closing TCP connection
DEBUG:root:pushing msg to send
DEBUG:root:gateway send: {'op': 1, 'd': None}
ERROR:root:exception in send: sent 1011 (unexpected error) keepalive ping timeout; no close frame received
DEBUG:root:pushing msg to send
DEBUG:root:gateway send: {'op': 1, 'd': None}
ERROR:root:exception in send: sent 1011 (unexpected error) keepalive ping timeout; no close frame received
DEBUG:root:pushing msg to send
DEBUG:root:gateway send: {'op': 1, 'd': None}
ERROR:root:exception in send: sent 1011 (unexpected error) keepalive ping timeout; no close frame received

Here is my code for more context and understanding. Can you please tell me what is causing these errors?

import asyncio
import websockets
import json
import traceback

from dataclasses import dataclass
from types import SimpleNamespace

import logging as log

log.basicConfig(filename='example.log', level=log.DEBUG)

GATEWAY_URL = "wss://gateway.discord.gg/"

@dataclass
class GatewayMessage():
    op: int
    data: object
    sequence: int
    name: str

def decode_msg(msg):
    obj = json.loads(msg) 
    data = None
    seq = None
    name = None
    if "d" in obj:
        data = SimpleNamespace(**obj["d"])
    if "s" in obj:
        seq = obj["s"]
    if "t" in obj:
        name = obj["t"]
    return GatewayMessage(obj["op"], data, seq, name)


##############
#Sending Area    

class GatewayCon(object):
    def __init__(self, token):
        log.info("init Gateway Connection")
        self._token = token
        self._q = asyncio.Queue()
        self._pulse = 0.5

    def run(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self._run_connection())

    async def _run_connection(self):
        log.info("running Gateway")
        wsurl = f"{GATEWAY_URL}/?v=9&encoding=json"
        async with websockets.connect(wsurl) as ws:
            send = asyncio.create_task(self._send_loop(ws))
            recv = asyncio.create_task(self._recv_loop(ws))
            ping = asyncio.create_task(self._ping_loop(ws))
            await ping
            await send
            await recv

    async def _recv_loop(self, ws):
        async for msg in ws:
            decoded = decode_msg(msg)
            try:
                await self.handle_message(decoded)
            except Exception as e:
                log.error(f"exception in handler: {e}")
                traceback.print_exc()

    async def _send_loop(self, ws):
        while True:
            print("dodo")
            try:
                msg = await self._q.get()
                print(msg)
                strmsg = json.dumps(msg)
                log.debug(f"gateway send: {msg}")
                await ws.send(strmsg)

            except Exception as e:
                log.error(f"exception in send: {e}")
                traceback.print_exc()

                identity = {
                "op": 2,
                "d": {
                    "token": self._token,
                    "intents": 1 << 10,
                    "properties": {
                        "$os": "linux",
                        "$browser": "chrome",
                        "$device": "pc",
                        },
                    }
                }
                await self.send(identity)

                resume = {
                "op": 6,
                "d": {
                    "token": self._token,
                    "session_id": self._session_id,
                    "seq": 1337
                    }
                }
                log.debug(f"gateway send: {resume}")
                await ws.send(resume)


    async def _ping_loop(self, ws):
        while True:
            await asyncio.sleep(self._pulse)
            ping = {"op": 1, "d": None}
            print("heyo")
            await self.send(ping)

    async def send(self, msg):
        log.debug("pushing msg to send")
        await self._q.put(msg)
        


#Specific receving area

class Gateway(GatewayCon):

    def __init__(self, token):
        super().__init__(token)
        self._handlers = {}

    async def handle_message(self, msg):
        log.debug(f"gateway handle: {msg}")
        if msg.op == 10:
            log.info("recieve HELLO, sending identify")
            self._pulse = (msg.data.heartbeat_interval / 1000) + 5
            print(self._pulse)
            identity = {
                "op": 2,
                "d": {
                    "token": self._token,
                    "intents": 1 << 10,
                    "properties": {
                        "$os": "linux",
                        "$browser": "chrome",
                        "$device": "pc",
                    },
                }
            }
            await self.send(identity)
            log.info("done identify")
        elif msg.op == 0:
            event = msg.name.lower()
            self._session_id = msg.data.session_id
            if event in self._handlers:
                await self._handlers[event](msg)
            else:
                log.debug(f"unhandled event {event}")
        elif msg.op == 11:
            print("yo no way")
        else:
            raise Exception(f"unknown op in message {msg.op}")

    def event(self, f):
        self._handlers[f.__name__] = f





if __name__ == "__main__":
    g = Gateway("my token here")###

    @g.event
    async def ready(x):
        log.info("ready")

    @g.event
    async def message_reaction_add(x):
        log.info("reaction added")

    g.run()


Sources

This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.

Source: Stack Overflow

Solution Source