mi.gateway のソースコード
from __future__ import annotations
import json
from typing import Any, Callable, Dict, Optional, TYPE_CHECKING
import aiohttp
from mi.utils import str_lower
from . import config
if TYPE_CHECKING:
from .client import Client
__all__ = ('MisskeyWebSocket', 'MisskeyClientWebSocketResponse')
[ドキュメント]class MisskeyClientWebSocketResponse(aiohttp.ClientWebSocketResponse):
async def close(self, *, code: int = 4000, message: bytes = b'') -> bool:
return await super().close(code=code, message=message)
[ドキュメント]class MisskeyWebSocket:
def __init__(self, socket):
self.socket = socket
self._dispatch = lambda *args: None
self._connection = None
self._misskey_parsers: Optional[Dict[str, Callable[..., Any]]] = None
@classmethod
async def from_client(cls, client: Client, *, timeout: int = 60):
socket = await client.http.ws_connect(client.url + f'?i={config.i.token}')
ws = cls(socket)
ws._dispatch = client.dispatch
ws._connection = client._connection
ws._misskey_parsers = client._connection.parsers
client.dispatch('ready', socket)
await ws.poll_event(timeout=timeout)
return ws
async def received_message(self, msg, /):
if type(msg) is bytes:
msg = msg.decode()
self._misskey_parsers[str_lower(msg['type']).upper()](msg)
async def poll_event(self, *, timeout: int = 60):
msg = await self.socket.receive(timeout=timeout) # TODO: いつか変数に
if msg.type is aiohttp.WSMsgType.TEXT:
await self.received_message(json.loads(msg.data))