from __future__ import annotations
import asyncio
import inspect
from typing import Any, AsyncIterator, Callable, Dict, Generator, List, Optional, TYPE_CHECKING, Union
from aiocache import cached
from aiocache.factory import Cache
from mi import Instance, InstanceMeta, User
from mi.actions import DriveActions, NoteActions, UserActions
from mi.api.emoji import EmojiManager
from mi.api.reaction import ReactionManager
from mi.chat import Chat
from mi.drive import File
from mi.emoji import Emoji
from mi.exception import InvalidParameters, NotExistRequiredParameters
from mi.http import Route
from mi.iterators import InstanceIterator
from mi.models.chat import RawChat
from mi.models.drive import RawFile
from mi.models.note import RawNote
from mi.models.user import RawUser
from mi.note import Note, Reaction
from mi.user import FollowRequest, Followee
from mi.utils import check_multi_arg, get_cache_key, get_module_logger, key_builder, remove_dict_empty, str_lower, \
upper_to_lower
if TYPE_CHECKING:
from mi import HTTPClient, Client
from mi.types import (Note as NotePayload, Chat as ChatPayload)
class ClientActions:
def __init__(self, state: 'ConnectionState', http: HTTPClient, loop: asyncio.AbstractEventLoop):
self.__state = state
self.__http = http
self.__loop = loop
self.note: NoteActions = NoteActions(state, http, loop)
self.user: UserActions = UserActions(state, http, loop)
self.drive: DriveActions = DriveActions(state, http, loop)
self.emoji: EmojiManager = EmojiManager(state, http, loop)
self.reaction: ReactionManager = ReactionManager(state, http, loop)
def get_user_instance(self, user_id: Optional[str] = None, user: Optional[User] = None) -> UserActions:
return UserActions(self.__state, self.__http, self.__loop, user_id=user_id, user=user)
def get_note_instance(self, note_id: str) -> NoteActions:
return NoteActions(self.__state, self.__http, self.__loop, note_id=note_id)
[ドキュメント]class ConnectionState(ClientActions):
def __init__(self, dispatch: Callable[..., Any], http: HTTPClient, loop: asyncio.AbstractEventLoop, client: Client):
super().__init__(self, http, loop)
self.client: Client = client
self.dispatch = dispatch
self.http: HTTPClient = http
self.logger = get_module_logger(__name__)
self.loop: asyncio.AbstractEventLoop = loop
self.parsers = parsers = {}
for attr, func in inspect.getmembers(self):
if attr.startswith('parse'):
parsers[attr[6:].upper()] = func
def get_client_actions(self) -> ClientActions:
return ClientActions(self, self.http, self.loop)
def parse_emoji_added(self, message: Dict[str, Any]):
self.dispatch('emoji_add', Emoji(message['body']['emoji'], state=self))
[ドキュメント] def parse_channel(self, message: Dict[str, Any]) -> None:
"""parse_channel is a function to parse channel event
チャンネルタイプのデータを解析後適切なパーサーに移動させます
Parameters
----------
message : Dict[str, Any]
Received message
"""
base_msg = upper_to_lower(message['body'])
channel_type = str_lower(base_msg.get('type'))
self.logger.debug(f'ChannelType: {channel_type}')
self.logger.debug(f'recv event type: {channel_type}')
getattr(self, f'parse_{channel_type}')(base_msg['body'])
def parse_renote(self, message: Dict[str, Any]):
pass
[ドキュメント] def parse_unfollow(self, message: Dict[str, Any]):
"""
フォローを解除した際のイベントを解析する関数
"""
[ドキュメント] def parse_signin(self, message: Dict[str, Any]):
"""
ログインが発生した際のイベント
"""
[ドキュメント] def parse_receive_follow_request(self, message: Dict[str, Any]):
"""
フォローリクエストを受け取った際のイベントを解析する関数
"""
self.dispatch('follow_request', FollowRequest(message, state=self))
def parse_me_updated(self, message: Dict[str, Any]):
pass
def parse_read_all_announcements(self, message: Dict[str, Any]) -> None:
pass # TODO: 実装
[ドキュメント] def parse_reply(self, message: NotePayload) -> None:
"""
リプライ
"""
self.dispatch('message', Note(RawNote(message), state=self))
[ドキュメント] def parse_follow(self, message: Dict[str, Any]) -> None:
"""
ユーザーをフォローした際のイベントを解析する関数
"""
self.dispatch('user_follow', User(RawUser(message), state=self))
[ドキュメント] def parse_followed(self, message: Dict[str, Any]) -> None:
"""
フォローイベントを解析する関数
"""
self.dispatch('follow', User(RawUser(message), state=self))
[ドキュメント] def parse_mention(self, message: Dict[str, Any]) -> None:
"""
メンションイベントを解析する関数
"""
self.dispatch('mention', Note(RawNote(message), state=self))
def parse_drive_file_created(self, message: Dict[str, Any]) -> None:
pass # TODO: 実装
def parse_read_all_unread_mentions(self, message: Dict[str, Any]) -> None:
pass # TODO:実装
def parse_read_all_unread_specified_notes(self, message: Dict[str, Any]) -> None:
pass # TODO:実装
def parse_read_all_channels(self, message: Dict[str, Any]) -> None:
pass # TODO:実装
def parse_read_all_notifications(self, message: Dict[str, Any]) -> None:
pass # TODO:実装
def parse_unread_mention(self, message: Dict[str, Any]) -> None:
pass
def parse_unread_specified_note(self, message: Dict[str, Any]) -> None:
pass
def parse_read_all_messaging_messages(self, message: Dict[str, Any]) -> None:
pass
[ドキュメント] def parse_messaging_message(self, message: ChatPayload) -> None:
"""
チャットが来た際のデータを処理する関数
"""
self.dispatch('message', Chat(RawChat(message), state=self))
[ドキュメント] def parse_unread_messaging_message(self, message: Dict[str, Any]) -> None:
"""
チャットが既読になっていない場合のデータを処理する関数
"""
self.dispatch('message', Chat(RawChat(message), state=self))
[ドキュメント] def parse_notification(self, message: Dict[str, Any]) -> None:
"""
通知イベントを解析する関数
Parameters
----------
message: Dict[str, Any]
Received message
"""
accept_type = ['reaction']
notification_type = str_lower(message['type'])
if notification_type in accept_type:
getattr(self, f'parse_{notification_type}')(message)
def parse_follow_request_accepted(self, message: Dict[str, Any]) -> None:
pass
def parse_poll_vote(self, message: Dict[str, Any]) -> None:
pass # TODO: 実装
[ドキュメント] def parse_unread_notification(self, message: Dict[str, Any]) -> None:
"""
未読の通知を解析する関数
Parameters
----------
message : Dict[str, Any]
Received message
"""
# notification_type = str_lower(message['type'])
# getattr(self, f'parse_{notification_type}')(message)
[ドキュメント] def parse_reaction(self, message: Dict[str, Any]) -> None:
"""
リアクションに関する情報を解析する関数
"""
self.dispatch('reaction', Reaction(message, state=self))
[ドキュメント] def parse_note(self, message: NotePayload) -> None:
"""
ノートイベントを解析する関数
"""
note = Note(RawNote(message), state=self)
# Router(self.http.ws).capture_message(note.id) TODO: capture message
self.client._on_message(note)
async def get_i(self) -> User:
res = await self.http.request(Route('POST', '/api/i'), auth=True, lower=True)
return User(RawUser(res), state=self)
def get_users(self,
limit: int = 10,
*,
offset: int = 0,
sort: Optional[str] = None,
state: str = 'all',
origin: str = 'local',
username: Optional[str] = None,
hostname: Optional[str] = None,
get_all: bool = False) -> AsyncIterator[User]:
return InstanceIterator(self).get_users(limit=limit, offset=offset, sort=sort, state=state, origin=origin,
username=username, hostname=hostname, get_all=get_all)
[ドキュメント] @cached(ttl=10, namespace='get_user', key_builder=key_builder)
async def get_user(self, user_id: Optional[str] = None, username: Optional[str] = None,
host: Optional[str] = None) -> User:
"""
ユーザーのプロフィールを取得します。一度のみサーバーにアクセスしキャッシュをその後は使います。
fetch_userを使った場合はキャッシュが廃棄され再度サーバーにアクセスします。
Parameters
----------
user_id : str
取得したいユーザーのユーザーID
username : str
取得したいユーザーのユーザー名
host : str, default=None
取得したいユーザーがいるインスタンスのhost
Returns
-------
dict
ユーザー情報
"""
field = remove_dict_empty({"userId": user_id, "username": username, "host": host})
data = await self.http.request(Route('POST', '/api/users/show'), json=field, auth=True, lower=True)
return User(RawUser(data), state=self)
[ドキュメント] async def post_chat(self, content: str, *, user_id: str = None, group_id: str = None, file_id=None) -> Chat:
"""
チャットを送信します。
Parameters
----------
content : str
送信する内容
user_id : str, optional
ユーザーid, default=None
group_id : str, optional
グループid, default=None
file_id : str, optional
添付するファイルid, efault=None
Returns
-------
Chat
チャットの内容
"""
args = remove_dict_empty({'userId': user_id, 'groupId': group_id, 'text': content, 'fileId': file_id})
data = await self.http.request(Route('POST', '/api/messaging/messages/create'), json=args, auth=True, lower=True)
return Chat(RawChat(data), state=self)
[ドキュメント] async def delete_chat(self, message_id: str) -> bool:
"""
指定したidのメッセージを削除します。
Parameters
----------
message_id : str
メッセージid
Returns
-------
bool
成功したか否か
"""
args = {'messageId': f'{message_id}'}
data = await self.http.request(Route('POST', '/api/messaging/messages/delete'), json=args, auth=True)
return bool(data)
@get_cache_key
async def fetch_user(self, user_id: Optional[str] = None, username: Optional[str] = None,
host: Optional[str] = None, **kwargs) -> User:
"""
サーバーにアクセスし、ユーザーのプロフィールを取得します。基本的には get_userをお使いください。
Parameters
----------
user_id : str
取得したいユーザーのユーザーID
username : str
取得したいユーザーのユーザー名
host : str, default=None
取得したいユーザーがいるインスタンスのhost
Returns
-------
dict
ユーザー情報
"""
if not check_multi_arg(user_id, username):
raise NotExistRequiredParameters("user_id, usernameどちらかは必須です")
field = remove_dict_empty({"userId": user_id, "username": username, "host": host})
data = await self.http.request(Route('POST', '/api/users/show'), json=field, auth=True, lower=True)
old_cache = Cache(namespace='get_user')
await old_cache.delete(kwargs['cache_key'].format('get_user'))
return User(RawUser(data), state=self)
[ドキュメント] async def get_followers(
self,
user_id: Optional[str] = None,
username: Optional[str] = None,
host: Optional[str] = None,
since_id: Optional[str] = None,
until_id: Optional[str] = None,
limit: int = 10,
get_all: bool = False,
) -> AsyncIterator[Followee]:
"""
与えられたユーザーのフォロワーを取得します
Parameters
----------
user_id : str, default=None
ユーザーのid
username : str, default=None
ユーザー名
host : str, default=None
ユーザーがいるインスタンスのhost名
since_id : str, default=None
until_id : str, default=None
前回の最後の値を与える(既に実行し取得しきれない場合に使用)
limit : int, default=10
取得する情報の最大数 max: 100
get_all : bool, default=False
全てのフォロワーを取得する
Yields
------
dict
フォロワーの情報
Raises
------
InvalidParameters
limit引数が不正な場合
"""
if not check_multi_arg(user_id, username):
raise NotExistRequiredParameters("user_id, usernameどちらかは必須です")
if limit > 100:
raise InvalidParameters("limit は100以上を受け付けません")
data = remove_dict_empty(
{
"userId": user_id,
"username": username,
"host": host,
"sinceId": since_id,
"untilId": until_id,
"limit": limit,
}
)
if get_all:
loop = True
while loop:
get_data = await self.http.request(Route('POST', '/api/users/followers'), json=data, auth=True, lower=True)
if len(get_data) > 0:
data["untilId"] = get_data[-1]["id"]
else:
break
for i in [Followee(i, state=self) for i in get_data]:
yield i
else:
get_data = await self.http.request(Route('POST', '/api/users/followers'), json=data, auth=True, lower=True)
for i in [Followee(i, state=self) for i in get_data]:
yield i
@cached(ttl=10, key_builder=key_builder, key='get_instance')
async def get_instance(self, host: Optional[str] = None, detail: bool = False) -> Union[InstanceMeta, Instance]:
if host is None:
data = await self.http.request(Route('POST', '/api/meta'), json={'detail': detail}, auth=True, lower=True)
return InstanceMeta(data, state=self)
data = await self.http.request(Route('POST', '/api/federation/show-instance'), json={'host': host}, auth=True,
lower=True)
return Instance(data, state=self)
@get_cache_key
async def fetch_instance(self, host: Optional[str] = None, **kwargs):
old_cache = Cache(namespace='get_instance')
await old_cache.delete(kwargs['cache_key'].format('get_instance'))
return await self.get_instance(host=host)
async def remove_emoji(self, emoji_id: str) -> bool:
return bool(await self.http.request(Route('POST', '/api/admin/emoji/remove'), json={'id': emoji_id}, auth=True))
async def get_user_notes(
self,
user_id: str,
*,
since_id: Optional[str] = None,
include_my_renotes: bool = True,
include_replies: bool = True,
with_files: bool = False,
until_id: Optional[str] = None,
limit: int = 10,
get_all: bool = False,
exclude_nsfw: bool = True,
file_type: Optional[List[str]] = None,
since_date: int = 0,
until_data: int = 0
) -> Generator[Note]:
if limit > 100:
raise InvalidParameters("limit は100以上を受け付けません")
args = remove_dict_empty(
{
"userId": user_id,
"includeReplies": include_replies,
"limit": limit,
"sinceId": since_id,
"untilId": until_id,
"sinceDate": since_date,
"untilDate": until_data,
"includeMyRenotes": include_my_renotes,
"withFiles": with_files,
"fileType": file_type,
"excludeNsfw": exclude_nsfw
}
)
if get_all:
loop = True
while loop:
get_data = await self.http.request(Route('POST', '/api/users/notes'), json=args, auth=True, lower=True)
if len(list(get_data)) <= 0:
break
args["untilId"] = get_data[-1]["id"]
for data in get_data:
yield Note(RawNote(data), state=self)
else:
get_data = await self.http.request(Route('POST', '/api/users/notes'), json=args, auth=True, lower=True)
for data in get_data:
yield Note(RawNote(**upper_to_lower(data)), state=self)
[ドキュメント] async def get_announcements(self, limit: int, with_unreads: bool, since_id: str, until_id: str):
"""
Parameters
----------
limit: int
最大取得数
with_unreads: bool
既読済みか否か
since_id: str
until_id: str
前回の最後の値を与える(既に実行し取得しきれない場合に使用)
"""
if limit > 100:
raise InvalidParameters("limit は100以上を受け付けません")
args = {
"limit": limit,
"withUnreads": with_unreads,
"sinceId": since_id,
"untilId": until_id,
}
return await self.http.request(Route('POST', '/api/announcements'), json=args, auth=True, lower=True)
async def file_upload(
self,
name: Optional[str] = None,
to_file: Optional[str] = None,
to_url: Optional[str] = None,
*,
force: bool = False,
is_sensitive: bool = False,
) -> File:
if to_file and to_url is None: # ローカルからアップロードする
with open(to_file, "rb") as f:
args = remove_dict_empty({"isSensitive": is_sensitive, "force": force, "name": f"{name}", 'file': f})
res = await self.http.request(Route('POST', '/api/drive/files/create'), data=args, auth=True, lower=True)
elif to_file is None and to_url: # URLからアップロードする
args = {"url": to_url, "force": force, "isSensitive": is_sensitive}
res = await self.http.request(Route('POST', '/api/drive/files/upload-from-url'), json=args, auth=True, lower=True)
else:
raise InvalidParameters("path または url のどちらかは必須です")
return File(RawFile(res), state=self)