quick fix 2
This commit is contained in:
998
Lib/site-packages/discord/audit_logs.py
Normal file
998
Lib/site-packages/discord/audit_logs.py
Normal file
@ -0,0 +1,998 @@
|
||||
"""
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2015-present Rapptz
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a
|
||||
copy of this software and associated documentation files (the "Software"),
|
||||
to deal in the Software without restriction, including without limitation
|
||||
the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
and/or sell copies of the Software, and to permit persons to whom the
|
||||
Software is furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
||||
OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
DEALINGS IN THE SOFTWARE.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any, Callable, ClassVar, Mapping, Generator, List, Optional, Tuple, Type, TypeVar, Union
|
||||
|
||||
from . import enums, flags, utils
|
||||
from .asset import Asset
|
||||
from .colour import Colour
|
||||
from .invite import Invite
|
||||
from .mixins import Hashable
|
||||
from .object import Object
|
||||
from .permissions import PermissionOverwrite, Permissions
|
||||
from .automod import AutoModTrigger, AutoModRuleAction, AutoModRule
|
||||
from .role import Role
|
||||
from .emoji import Emoji
|
||||
from .partial_emoji import PartialEmoji
|
||||
from .member import Member
|
||||
from .scheduled_event import ScheduledEvent
|
||||
from .stage_instance import StageInstance
|
||||
from .sticker import GuildSticker
|
||||
from .threads import Thread
|
||||
from .integrations import PartialIntegration
|
||||
from .channel import ForumChannel, StageChannel, ForumTag
|
||||
from .onboarding import OnboardingPrompt, OnboardingPromptOption
|
||||
|
||||
__all__ = (
|
||||
'AuditLogDiff',
|
||||
'AuditLogChanges',
|
||||
'AuditLogEntry',
|
||||
)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import datetime
|
||||
|
||||
from . import abc
|
||||
from .guild import Guild
|
||||
from .state import ConnectionState
|
||||
from .types.audit_log import (
|
||||
AuditLogChange as AuditLogChangePayload,
|
||||
AuditLogEntry as AuditLogEntryPayload,
|
||||
_AuditLogChange_TriggerMetadata as AuditLogChangeTriggerMetadataPayload,
|
||||
)
|
||||
from .types.channel import (
|
||||
PermissionOverwrite as PermissionOverwritePayload,
|
||||
ForumTag as ForumTagPayload,
|
||||
DefaultReaction as DefaultReactionPayload,
|
||||
)
|
||||
from .types.invite import Invite as InvitePayload
|
||||
from .types.role import Role as RolePayload, RoleColours
|
||||
from .types.snowflake import Snowflake
|
||||
from .types.command import ApplicationCommandPermissions
|
||||
from .types.automod import AutoModerationAction
|
||||
from .types.onboarding import Prompt as PromptPayload, PromptOption as PromptOptionPayload
|
||||
from .user import User
|
||||
from .app_commands import AppCommand
|
||||
from .webhook import Webhook
|
||||
|
||||
TargetType = Union[
|
||||
Guild,
|
||||
abc.GuildChannel,
|
||||
Member,
|
||||
User,
|
||||
Role,
|
||||
Invite,
|
||||
Emoji,
|
||||
StageInstance,
|
||||
GuildSticker,
|
||||
Thread,
|
||||
Object,
|
||||
PartialIntegration,
|
||||
AutoModRule,
|
||||
ScheduledEvent,
|
||||
Webhook,
|
||||
AppCommand,
|
||||
None,
|
||||
]
|
||||
|
||||
|
||||
def _transform_timestamp(entry: AuditLogEntry, data: Optional[str]) -> Optional[datetime.datetime]:
|
||||
return utils.parse_time(data)
|
||||
|
||||
|
||||
def _transform_color(entry: AuditLogEntry, data: int) -> Colour:
|
||||
return Colour(data)
|
||||
|
||||
|
||||
def _transform_snowflake(entry: AuditLogEntry, data: Snowflake) -> int:
|
||||
return int(data)
|
||||
|
||||
|
||||
def _transform_channel(entry: AuditLogEntry, data: Optional[Snowflake]) -> Optional[Union[abc.GuildChannel, Object]]:
|
||||
if data is None:
|
||||
return None
|
||||
return entry.guild.get_channel(int(data)) or Object(id=data)
|
||||
|
||||
|
||||
def _transform_channels_or_threads(
|
||||
entry: AuditLogEntry, data: List[Snowflake]
|
||||
) -> List[Union[abc.GuildChannel, Thread, Object]]:
|
||||
return [entry.guild.get_channel_or_thread(int(data)) or Object(id=data) for data in data]
|
||||
|
||||
|
||||
def _transform_member_id(entry: AuditLogEntry, data: Optional[Snowflake]) -> Union[Member, User, None]:
|
||||
if data is None:
|
||||
return None
|
||||
return entry._get_member(int(data))
|
||||
|
||||
|
||||
def _transform_guild_id(entry: AuditLogEntry, data: Optional[Snowflake]) -> Optional[Guild]:
|
||||
if data is None:
|
||||
return None
|
||||
return entry._state._get_guild(int(data))
|
||||
|
||||
|
||||
def _transform_roles(entry: AuditLogEntry, data: List[Snowflake]) -> List[Union[Role, Object]]:
|
||||
return [entry.guild.get_role(int(role_id)) or Object(role_id, type=Role) for role_id in data]
|
||||
|
||||
|
||||
def _transform_applied_forum_tags(entry: AuditLogEntry, data: List[Snowflake]) -> List[Union[ForumTag, Object]]:
|
||||
thread = entry.target
|
||||
if isinstance(thread, Thread) and isinstance(thread.parent, ForumChannel):
|
||||
return [thread.parent.get_tag(tag_id) or Object(id=tag_id, type=ForumTag) for tag_id in map(int, data)]
|
||||
return [Object(id=tag_id, type=ForumTag) for tag_id in data]
|
||||
|
||||
|
||||
def _transform_overloaded_flags(entry: AuditLogEntry, data: int) -> Union[int, flags.ChannelFlags, flags.InviteFlags]:
|
||||
# The `flags` key is definitely overloaded. Right now it's for channels, threads and invites but
|
||||
# I am aware of `member.flags` and `user.flags` existing. However, this does not impact audit logs
|
||||
# at the moment but better safe than sorry.
|
||||
channel_audit_log_types = (
|
||||
enums.AuditLogAction.channel_create,
|
||||
enums.AuditLogAction.channel_update,
|
||||
enums.AuditLogAction.channel_delete,
|
||||
enums.AuditLogAction.thread_create,
|
||||
enums.AuditLogAction.thread_update,
|
||||
enums.AuditLogAction.thread_delete,
|
||||
)
|
||||
invite_audit_log_types = (
|
||||
enums.AuditLogAction.invite_create,
|
||||
enums.AuditLogAction.invite_update,
|
||||
enums.AuditLogAction.invite_delete,
|
||||
)
|
||||
|
||||
if entry.action in channel_audit_log_types:
|
||||
return flags.ChannelFlags._from_value(data)
|
||||
elif entry.action in invite_audit_log_types:
|
||||
return flags.InviteFlags._from_value(data)
|
||||
return data
|
||||
|
||||
|
||||
def _transform_forum_tags(entry: AuditLogEntry, data: List[ForumTagPayload]) -> List[ForumTag]:
|
||||
return [ForumTag.from_data(state=entry._state, data=d) for d in data]
|
||||
|
||||
|
||||
def _transform_default_reaction(entry: AuditLogEntry, data: DefaultReactionPayload) -> Optional[PartialEmoji]:
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
emoji_name = data.get('emoji_name') or ''
|
||||
emoji_id = utils._get_as_snowflake(data, 'emoji_id') or None # Coerce 0 -> None
|
||||
return PartialEmoji.with_state(state=entry._state, name=emoji_name, id=emoji_id)
|
||||
|
||||
|
||||
def _transform_overwrites(
|
||||
entry: AuditLogEntry, data: List[PermissionOverwritePayload]
|
||||
) -> List[Tuple[Object, PermissionOverwrite]]:
|
||||
overwrites = []
|
||||
for elem in data:
|
||||
allow = Permissions(int(elem['allow']))
|
||||
deny = Permissions(int(elem['deny']))
|
||||
ow = PermissionOverwrite.from_pair(allow, deny)
|
||||
|
||||
ow_type = elem['type']
|
||||
ow_id = int(elem['id'])
|
||||
target = None
|
||||
if ow_type == '0':
|
||||
target = entry.guild.get_role(ow_id)
|
||||
elif ow_type == '1':
|
||||
target = entry._get_member(ow_id)
|
||||
|
||||
if target is None:
|
||||
target = Object(id=ow_id, type=Role if ow_type == '0' else Member)
|
||||
|
||||
overwrites.append((target, ow))
|
||||
|
||||
return overwrites
|
||||
|
||||
|
||||
def _transform_icon(entry: AuditLogEntry, data: Optional[str]) -> Optional[Asset]:
|
||||
if data is None:
|
||||
return None
|
||||
if entry.action is enums.AuditLogAction.guild_update:
|
||||
return Asset._from_guild_icon(entry._state, entry.guild.id, data)
|
||||
else:
|
||||
return Asset._from_icon(entry._state, entry._target_id, data, path='role') # type: ignore # target_id won't be None in this case
|
||||
|
||||
|
||||
def _transform_avatar(entry: AuditLogEntry, data: Optional[str]) -> Optional[Asset]:
|
||||
if data is None:
|
||||
return None
|
||||
return Asset._from_avatar(entry._state, entry._target_id, data) # type: ignore # target_id won't be None in this case
|
||||
|
||||
|
||||
def _transform_cover_image(entry: AuditLogEntry, data: Optional[str]) -> Optional[Asset]:
|
||||
if data is None:
|
||||
return None
|
||||
return Asset._from_scheduled_event_cover_image(entry._state, entry._target_id, data) # type: ignore # target_id won't be None in this case
|
||||
|
||||
|
||||
def _guild_hash_transformer(path: str) -> Callable[[AuditLogEntry, Optional[str]], Optional[Asset]]:
|
||||
def _transform(entry: AuditLogEntry, data: Optional[str]) -> Optional[Asset]:
|
||||
if data is None:
|
||||
return None
|
||||
return Asset._from_guild_image(entry._state, entry.guild.id, data, path=path)
|
||||
|
||||
return _transform
|
||||
|
||||
|
||||
def _transform_automod_actions(entry: AuditLogEntry, data: List[AutoModerationAction]) -> List[AutoModRuleAction]:
|
||||
return [AutoModRuleAction.from_data(action) for action in data]
|
||||
|
||||
|
||||
def _transform_default_emoji(entry: AuditLogEntry, data: str) -> PartialEmoji:
|
||||
return PartialEmoji(name=data)
|
||||
|
||||
|
||||
def _transform_onboarding_prompts(entry: AuditLogEntry, data: List[PromptPayload]) -> List[OnboardingPrompt]:
|
||||
return [OnboardingPrompt.from_dict(data=prompt, state=entry._state, guild=entry.guild) for prompt in data]
|
||||
|
||||
|
||||
def _transform_onboarding_prompt_options(
|
||||
entry: AuditLogEntry, data: List[PromptOptionPayload]
|
||||
) -> List[OnboardingPromptOption]:
|
||||
return [OnboardingPromptOption.from_dict(data=option, state=entry._state, guild=entry.guild) for option in data]
|
||||
|
||||
|
||||
E = TypeVar('E', bound=enums.Enum)
|
||||
|
||||
|
||||
def _enum_transformer(enum: Type[E]) -> Callable[[AuditLogEntry, int], E]:
|
||||
def _transform(entry: AuditLogEntry, data: int) -> E:
|
||||
return enums.try_enum(enum, data)
|
||||
|
||||
return _transform
|
||||
|
||||
|
||||
F = TypeVar('F', bound=flags.BaseFlags)
|
||||
|
||||
|
||||
def _flag_transformer(cls: Type[F]) -> Callable[[AuditLogEntry, Union[int, str]], F]:
|
||||
def _transform(entry: AuditLogEntry, data: Union[int, str]) -> F:
|
||||
return cls._from_value(int(data))
|
||||
|
||||
return _transform
|
||||
|
||||
|
||||
def _transform_type(
|
||||
entry: AuditLogEntry, data: Union[int, str]
|
||||
) -> Union[enums.ChannelType, enums.StickerType, enums.WebhookType, str, enums.OnboardingPromptType]:
|
||||
if entry.action.name.startswith('sticker_'):
|
||||
return enums.try_enum(enums.StickerType, data)
|
||||
elif entry.action.name.startswith('integration_'):
|
||||
return data # type: ignore # integration type is str
|
||||
elif entry.action.name.startswith('webhook_'):
|
||||
return enums.try_enum(enums.WebhookType, data)
|
||||
elif entry.action.name.startswith('onboarding_prompt_'):
|
||||
return enums.try_enum(enums.OnboardingPromptType, data)
|
||||
else:
|
||||
return enums.try_enum(enums.ChannelType, data)
|
||||
|
||||
|
||||
class AuditLogDiff:
|
||||
def __len__(self) -> int:
|
||||
return len(self.__dict__)
|
||||
|
||||
def __iter__(self) -> Generator[Tuple[str, Any], None, None]:
|
||||
yield from self.__dict__.items()
|
||||
|
||||
def __repr__(self) -> str:
|
||||
values = ' '.join('%s=%r' % item for item in self.__dict__.items())
|
||||
return f'<AuditLogDiff {values}>'
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
def __getattr__(self, item: str) -> Any: ...
|
||||
|
||||
def __setattr__(self, key: str, value: Any) -> Any: ...
|
||||
|
||||
|
||||
Transformer = Callable[['AuditLogEntry', Any], Any]
|
||||
|
||||
|
||||
class AuditLogChanges:
|
||||
# fmt: off
|
||||
TRANSFORMERS: ClassVar[Mapping[str, Tuple[Optional[str], Optional[Transformer]]]] = {
|
||||
'verification_level': (None, _enum_transformer(enums.VerificationLevel)),
|
||||
'explicit_content_filter': (None, _enum_transformer(enums.ContentFilter)),
|
||||
'allow': (None, _flag_transformer(Permissions)),
|
||||
'deny': (None, _flag_transformer(Permissions)),
|
||||
'permissions': (None, _flag_transformer(Permissions)),
|
||||
'id': (None, _transform_snowflake),
|
||||
'color': ('colour', _transform_color),
|
||||
'owner_id': ('owner', _transform_member_id),
|
||||
'inviter_id': ('inviter', _transform_member_id),
|
||||
'channel_id': ('channel', _transform_channel),
|
||||
'afk_channel_id': ('afk_channel', _transform_channel),
|
||||
'system_channel_id': ('system_channel', _transform_channel),
|
||||
'system_channel_flags': (None, _flag_transformer(flags.SystemChannelFlags)),
|
||||
'widget_channel_id': ('widget_channel', _transform_channel),
|
||||
'rules_channel_id': ('rules_channel', _transform_channel),
|
||||
'public_updates_channel_id': ('public_updates_channel', _transform_channel),
|
||||
'permission_overwrites': ('overwrites', _transform_overwrites),
|
||||
'splash_hash': ('splash', _guild_hash_transformer('splashes')),
|
||||
'banner_hash': ('banner', _guild_hash_transformer('banners')),
|
||||
'discovery_splash_hash': ('discovery_splash', _guild_hash_transformer('discovery-splashes')),
|
||||
'icon_hash': ('icon', _transform_icon),
|
||||
'avatar_hash': ('avatar', _transform_avatar),
|
||||
'rate_limit_per_user': ('slowmode_delay', None),
|
||||
'default_thread_rate_limit_per_user': ('default_thread_slowmode_delay', None),
|
||||
'guild_id': ('guild', _transform_guild_id),
|
||||
'tags': ('emoji', None),
|
||||
'default_message_notifications': ('default_notifications', _enum_transformer(enums.NotificationLevel)),
|
||||
'video_quality_mode': (None, _enum_transformer(enums.VideoQualityMode)),
|
||||
'privacy_level': (None, _enum_transformer(enums.PrivacyLevel)),
|
||||
'format_type': (None, _enum_transformer(enums.StickerFormatType)),
|
||||
'type': (None, _transform_type),
|
||||
'communication_disabled_until': ('timed_out_until', _transform_timestamp),
|
||||
'expire_behavior': (None, _enum_transformer(enums.ExpireBehaviour)),
|
||||
'mfa_level': (None, _enum_transformer(enums.MFALevel)),
|
||||
'status': (None, _enum_transformer(enums.EventStatus)),
|
||||
'entity_type': (None, _enum_transformer(enums.EntityType)),
|
||||
'preferred_locale': (None, _enum_transformer(enums.Locale)),
|
||||
'image_hash': ('cover_image', _transform_cover_image),
|
||||
'trigger_type': (None, _enum_transformer(enums.AutoModRuleTriggerType)),
|
||||
'event_type': (None, _enum_transformer(enums.AutoModRuleEventType)),
|
||||
'actions': (None, _transform_automod_actions),
|
||||
'exempt_channels': (None, _transform_channels_or_threads),
|
||||
'exempt_roles': (None, _transform_roles),
|
||||
'applied_tags': (None, _transform_applied_forum_tags),
|
||||
'available_tags': (None, _transform_forum_tags),
|
||||
'flags': (None, _transform_overloaded_flags),
|
||||
'default_reaction_emoji': (None, _transform_default_reaction),
|
||||
'emoji_name': ('emoji', _transform_default_emoji),
|
||||
'user_id': ('user', _transform_member_id),
|
||||
'options': (None, _transform_onboarding_prompt_options),
|
||||
'prompts': (None, _transform_onboarding_prompts),
|
||||
'default_channel_ids': ('default_channels', _transform_channels_or_threads),
|
||||
'mode': (None, _enum_transformer(enums.OnboardingMode)),
|
||||
}
|
||||
# fmt: on
|
||||
|
||||
def __init__(self, entry: AuditLogEntry, data: List[AuditLogChangePayload]):
|
||||
self.before: AuditLogDiff = AuditLogDiff()
|
||||
self.after: AuditLogDiff = AuditLogDiff()
|
||||
# special case entire process since each
|
||||
# element in data is a different target
|
||||
# key is the target id
|
||||
if entry.action is enums.AuditLogAction.app_command_permission_update:
|
||||
self.before.app_command_permissions = []
|
||||
self.after.app_command_permissions = []
|
||||
|
||||
for elem in data:
|
||||
self._handle_app_command_permissions(
|
||||
self.before,
|
||||
entry,
|
||||
elem.get('old_value'), # type: ignore # value will be an ApplicationCommandPermissions if present
|
||||
)
|
||||
|
||||
self._handle_app_command_permissions(
|
||||
self.after,
|
||||
entry,
|
||||
elem.get('new_value'), # type: ignore # value will be an ApplicationCommandPermissions if present
|
||||
)
|
||||
return
|
||||
|
||||
for elem in data:
|
||||
attr = elem['key']
|
||||
|
||||
# special cases for role add/remove
|
||||
if attr == '$add':
|
||||
self._handle_role(self.before, self.after, entry, elem['new_value']) # type: ignore # new_value is a list of roles in this case
|
||||
continue
|
||||
elif attr == '$remove':
|
||||
self._handle_role(self.after, self.before, entry, elem['new_value']) # type: ignore # new_value is a list of roles in this case
|
||||
continue
|
||||
|
||||
# special case for automod trigger
|
||||
if attr == 'trigger_metadata':
|
||||
# given full metadata dict
|
||||
self._handle_trigger_metadata(entry, elem, data) # type: ignore # should be trigger metadata
|
||||
continue
|
||||
elif entry.action is enums.AuditLogAction.automod_rule_update and attr.startswith('$'):
|
||||
# on update, some trigger attributes are keys and formatted as $(add/remove)_{attribute}
|
||||
action, _, trigger_attr = attr.partition('_')
|
||||
# new_value should be a list of added/removed strings for keyword_filter, regex_patterns, or allow_list
|
||||
if action == '$add':
|
||||
self._handle_trigger_attr_update(self.before, self.after, entry, trigger_attr, elem['new_value']) # type: ignore
|
||||
elif action == '$remove':
|
||||
self._handle_trigger_attr_update(self.after, self.before, entry, trigger_attr, elem['new_value']) # type: ignore
|
||||
continue
|
||||
|
||||
# special case for colors to set secondary and tertiary colos/colour attributes
|
||||
if attr == 'colors':
|
||||
self._handle_colours(self.before, elem.get('old_value')) # type: ignore # should be a RoleColours dict
|
||||
self._handle_colours(self.after, elem.get('new_value')) # type: ignore # should be a RoleColours dict
|
||||
continue
|
||||
|
||||
try:
|
||||
key, transformer = self.TRANSFORMERS[attr]
|
||||
except (ValueError, KeyError):
|
||||
transformer = None
|
||||
else:
|
||||
if key:
|
||||
attr = key
|
||||
|
||||
transformer: Optional[Transformer]
|
||||
|
||||
try:
|
||||
before = elem['old_value']
|
||||
except KeyError:
|
||||
before = None
|
||||
else:
|
||||
if transformer:
|
||||
before = transformer(entry, before)
|
||||
|
||||
setattr(self.before, attr, before)
|
||||
|
||||
try:
|
||||
after = elem['new_value']
|
||||
except KeyError:
|
||||
after = None
|
||||
else:
|
||||
if transformer:
|
||||
after = transformer(entry, after)
|
||||
|
||||
setattr(self.after, attr, after)
|
||||
|
||||
# add an alias
|
||||
if hasattr(self.after, 'colour'):
|
||||
self.after.color = self.after.colour
|
||||
self.before.color = self.before.colour
|
||||
if hasattr(self.after, 'expire_behavior'):
|
||||
self.after.expire_behaviour = self.after.expire_behavior
|
||||
self.before.expire_behaviour = self.before.expire_behavior
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'<AuditLogChanges before={self.before!r} after={self.after!r}>'
|
||||
|
||||
def _handle_role(self, first: AuditLogDiff, second: AuditLogDiff, entry: AuditLogEntry, elem: List[RolePayload]) -> None:
|
||||
if not hasattr(first, 'roles'):
|
||||
setattr(first, 'roles', [])
|
||||
|
||||
data = []
|
||||
g: Guild = entry.guild
|
||||
|
||||
for e in elem:
|
||||
role_id = int(e['id'])
|
||||
role = g.get_role(role_id)
|
||||
|
||||
if role is None:
|
||||
role = Object(id=role_id, type=Role)
|
||||
role.name = e['name'] # type: ignore # Object doesn't usually have name
|
||||
|
||||
data.append(role)
|
||||
|
||||
setattr(second, 'roles', data)
|
||||
|
||||
def _handle_app_command_permissions(
|
||||
self,
|
||||
diff: AuditLogDiff,
|
||||
entry: AuditLogEntry,
|
||||
data: Optional[ApplicationCommandPermissions],
|
||||
):
|
||||
if data is None:
|
||||
return
|
||||
|
||||
# avoid circular import
|
||||
from discord.app_commands import AppCommandPermissions
|
||||
|
||||
state = entry._state
|
||||
guild = entry.guild
|
||||
diff.app_command_permissions.append(AppCommandPermissions(data=data, guild=guild, state=state))
|
||||
|
||||
def _handle_trigger_metadata(
|
||||
self,
|
||||
entry: AuditLogEntry,
|
||||
data: AuditLogChangeTriggerMetadataPayload,
|
||||
full_data: List[AuditLogChangePayload],
|
||||
):
|
||||
trigger_value: Optional[int] = None
|
||||
trigger_type: Optional[enums.AutoModRuleTriggerType] = None
|
||||
|
||||
# try to get trigger type from before or after
|
||||
trigger_type = getattr(self.before, 'trigger_type', getattr(self.after, 'trigger_type', None))
|
||||
|
||||
if trigger_type is None:
|
||||
if isinstance(entry.target, AutoModRule):
|
||||
# Trigger type cannot be changed, so it should be the same before and after updates.
|
||||
# Avoids checking which keys are in data to guess trigger type
|
||||
trigger_value = entry.target.trigger.type.value
|
||||
else:
|
||||
# found a trigger type from before or after
|
||||
trigger_value = trigger_type.value
|
||||
|
||||
if trigger_value is None:
|
||||
# try to find trigger type in the full list of changes
|
||||
_elem = utils.find(lambda elem: elem['key'] == 'trigger_type', full_data)
|
||||
if _elem is not None:
|
||||
trigger_value = _elem.get('old_value', _elem.get('new_value')) # type: ignore # trigger type values should be int
|
||||
|
||||
if trigger_value is None:
|
||||
# try to infer trigger_type from the keys in old or new value
|
||||
combined = (data.get('old_value') or {}).keys() | (data.get('new_value') or {}).keys()
|
||||
if not combined:
|
||||
trigger_value = enums.AutoModRuleTriggerType.spam.value
|
||||
elif 'presets' in combined:
|
||||
trigger_value = enums.AutoModRuleTriggerType.keyword_preset.value
|
||||
elif 'keyword_filter' in combined or 'regex_patterns' in combined:
|
||||
trigger_value = enums.AutoModRuleTriggerType.keyword.value
|
||||
elif 'mention_total_limit' in combined or 'mention_raid_protection_enabled' in combined:
|
||||
trigger_value = enums.AutoModRuleTriggerType.mention_spam.value
|
||||
else:
|
||||
# some unknown type
|
||||
trigger_value = -1
|
||||
|
||||
self.before.trigger = AutoModTrigger.from_data(trigger_value, data.get('old_value'))
|
||||
self.after.trigger = AutoModTrigger.from_data(trigger_value, data.get('new_value'))
|
||||
|
||||
def _handle_trigger_attr_update(
|
||||
self, first: AuditLogDiff, second: AuditLogDiff, entry: AuditLogEntry, attr: str, data: List[str]
|
||||
):
|
||||
self._create_trigger(first, entry)
|
||||
trigger = self._create_trigger(second, entry)
|
||||
try:
|
||||
# guard unexpecte non list attributes or non iterable data
|
||||
getattr(trigger, attr).extend(data)
|
||||
except (AttributeError, TypeError):
|
||||
pass
|
||||
|
||||
def _handle_colours(self, diff: AuditLogDiff, colours: Optional[RoleColours]):
|
||||
if colours is not None:
|
||||
# handle colours to multiple colour attributes
|
||||
colour = Colour(colours['primary_color'])
|
||||
secondary_colour = colours['secondary_color']
|
||||
tertiary_colour = colours['tertiary_color']
|
||||
else:
|
||||
colour = None
|
||||
secondary_colour = None
|
||||
tertiary_colour = None
|
||||
|
||||
diff.color = diff.colour = colour
|
||||
diff.secondary_color = diff.secondary_colour = Colour(secondary_colour) if secondary_colour is not None else None
|
||||
diff.tertiary_color = diff.tertiary_colour = Colour(tertiary_colour) if tertiary_colour is not None else None
|
||||
|
||||
def _create_trigger(self, diff: AuditLogDiff, entry: AuditLogEntry) -> AutoModTrigger:
|
||||
# check if trigger has already been created
|
||||
if not hasattr(diff, 'trigger'):
|
||||
# create a trigger
|
||||
if isinstance(entry.target, AutoModRule):
|
||||
# get trigger type from the automod rule
|
||||
trigger_type = entry.target.trigger.type
|
||||
else:
|
||||
# unknown trigger type
|
||||
trigger_type = enums.try_enum(enums.AutoModRuleTriggerType, -1)
|
||||
|
||||
diff.trigger = AutoModTrigger(type=trigger_type)
|
||||
return diff.trigger
|
||||
|
||||
|
||||
class _AuditLogProxy:
|
||||
def __init__(self, **kwargs: Any) -> None:
|
||||
for k, v in kwargs.items():
|
||||
setattr(self, k, v)
|
||||
|
||||
|
||||
class _AuditLogProxyMemberPrune(_AuditLogProxy):
|
||||
delete_member_days: int
|
||||
members_removed: int
|
||||
|
||||
|
||||
class _AuditLogProxyMemberMoveOrMessageDelete(_AuditLogProxy):
|
||||
channel: Union[abc.GuildChannel, Thread]
|
||||
count: int
|
||||
|
||||
|
||||
class _AuditLogProxyMemberDisconnect(_AuditLogProxy):
|
||||
count: int
|
||||
|
||||
|
||||
class _AuditLogProxyPinAction(_AuditLogProxy):
|
||||
channel: Union[abc.GuildChannel, Thread]
|
||||
message_id: int
|
||||
|
||||
|
||||
class _AuditLogProxyStageInstanceAction(_AuditLogProxy):
|
||||
channel: abc.GuildChannel
|
||||
|
||||
|
||||
class _AuditLogProxyMessageBulkDelete(_AuditLogProxy):
|
||||
count: int
|
||||
|
||||
|
||||
class _AuditLogProxyAutoModAction(_AuditLogProxy):
|
||||
automod_rule_name: str
|
||||
automod_rule_trigger_type: str
|
||||
channel: Optional[Union[abc.GuildChannel, Thread]]
|
||||
|
||||
|
||||
class _AuditLogProxyAutoModActionQuarantineUser(_AuditLogProxy):
|
||||
automod_rule_name: str
|
||||
automod_rule_trigger_type: str
|
||||
|
||||
|
||||
class _AuditLogProxyMemberKickOrMemberRoleUpdate(_AuditLogProxy):
|
||||
integration_type: Optional[str]
|
||||
|
||||
|
||||
class AuditLogEntry(Hashable):
|
||||
r"""Represents an Audit Log entry.
|
||||
|
||||
You retrieve these via :meth:`Guild.audit_logs`.
|
||||
|
||||
.. container:: operations
|
||||
|
||||
.. describe:: x == y
|
||||
|
||||
Checks if two entries are equal.
|
||||
|
||||
.. describe:: x != y
|
||||
|
||||
Checks if two entries are not equal.
|
||||
|
||||
.. describe:: hash(x)
|
||||
|
||||
Returns the entry's hash.
|
||||
|
||||
.. versionchanged:: 1.7
|
||||
Audit log entries are now comparable and hashable.
|
||||
|
||||
Attributes
|
||||
-----------
|
||||
action: :class:`AuditLogAction`
|
||||
The action that was done.
|
||||
user: Optional[:class:`abc.User`]
|
||||
The user who initiated this action. Usually a :class:`Member`\, unless gone
|
||||
then it's a :class:`User`.
|
||||
user_id: Optional[:class:`int`]
|
||||
The user ID who initiated this action.
|
||||
|
||||
.. versionadded:: 2.2
|
||||
id: :class:`int`
|
||||
The entry ID.
|
||||
guild: :class:`Guild`
|
||||
The guild that this entry belongs to.
|
||||
target: Any
|
||||
The target that got changed. The exact type of this depends on
|
||||
the action being done.
|
||||
reason: Optional[:class:`str`]
|
||||
The reason this action was done.
|
||||
extra: Any
|
||||
Extra information that this entry has that might be useful.
|
||||
For most actions, this is ``None``. However in some cases it
|
||||
contains extra information. See :class:`AuditLogAction` for
|
||||
which actions have this field filled out.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
users: Mapping[int, User],
|
||||
integrations: Mapping[int, PartialIntegration],
|
||||
app_commands: Mapping[int, AppCommand],
|
||||
automod_rules: Mapping[int, AutoModRule],
|
||||
webhooks: Mapping[int, Webhook],
|
||||
data: AuditLogEntryPayload,
|
||||
guild: Guild,
|
||||
):
|
||||
self._state: ConnectionState = guild._state
|
||||
self.guild: Guild = guild
|
||||
self._users: Mapping[int, User] = users
|
||||
self._integrations: Mapping[int, PartialIntegration] = integrations
|
||||
self._app_commands: Mapping[int, AppCommand] = app_commands
|
||||
self._automod_rules: Mapping[int, AutoModRule] = automod_rules
|
||||
self._webhooks: Mapping[int, Webhook] = webhooks
|
||||
self._from_data(data)
|
||||
|
||||
def _from_data(self, data: AuditLogEntryPayload) -> None:
|
||||
self.action: enums.AuditLogAction = enums.try_enum(enums.AuditLogAction, data['action_type'])
|
||||
self.id: int = int(data['id'])
|
||||
|
||||
# this key is technically not usually present
|
||||
self.reason: Optional[str] = data.get('reason')
|
||||
extra = data.get('options')
|
||||
|
||||
# fmt: off
|
||||
self.extra: Union[
|
||||
_AuditLogProxyMemberPrune,
|
||||
_AuditLogProxyMemberMoveOrMessageDelete,
|
||||
_AuditLogProxyMemberDisconnect,
|
||||
_AuditLogProxyPinAction,
|
||||
_AuditLogProxyStageInstanceAction,
|
||||
_AuditLogProxyMessageBulkDelete,
|
||||
_AuditLogProxyAutoModAction,
|
||||
_AuditLogProxyAutoModActionQuarantineUser,
|
||||
_AuditLogProxyMemberKickOrMemberRoleUpdate,
|
||||
Member, User, None, PartialIntegration,
|
||||
Role, Object
|
||||
] = None
|
||||
# fmt: on
|
||||
|
||||
if isinstance(self.action, enums.AuditLogAction) and extra:
|
||||
if self.action is enums.AuditLogAction.member_prune:
|
||||
# member prune has two keys with useful information
|
||||
self.extra = _AuditLogProxyMemberPrune(
|
||||
delete_member_days=int(extra['delete_member_days']),
|
||||
members_removed=int(extra['members_removed']),
|
||||
)
|
||||
elif self.action is enums.AuditLogAction.member_move or self.action is enums.AuditLogAction.message_delete:
|
||||
channel_id = int(extra['channel_id'])
|
||||
self.extra = _AuditLogProxyMemberMoveOrMessageDelete(
|
||||
count=int(extra['count']),
|
||||
channel=self.guild.get_channel_or_thread(channel_id) or Object(id=channel_id),
|
||||
)
|
||||
elif self.action is enums.AuditLogAction.member_disconnect:
|
||||
# The member disconnect action has a dict with some information
|
||||
self.extra = _AuditLogProxyMemberDisconnect(count=int(extra['count']))
|
||||
elif self.action is enums.AuditLogAction.message_bulk_delete:
|
||||
# The bulk message delete action has the number of messages deleted
|
||||
self.extra = _AuditLogProxyMessageBulkDelete(count=int(extra['count']))
|
||||
elif self.action in (enums.AuditLogAction.kick, enums.AuditLogAction.member_role_update):
|
||||
# The member kick action has a dict with some information
|
||||
integration_type = extra.get('integration_type')
|
||||
self.extra = _AuditLogProxyMemberKickOrMemberRoleUpdate(integration_type=integration_type)
|
||||
elif self.action.name.endswith('pin'):
|
||||
# the pin actions have a dict with some information
|
||||
channel_id = int(extra['channel_id'])
|
||||
self.extra = _AuditLogProxyPinAction(
|
||||
channel=self.guild.get_channel_or_thread(channel_id) or Object(id=channel_id),
|
||||
message_id=int(extra['message_id']),
|
||||
)
|
||||
elif (
|
||||
self.action is enums.AuditLogAction.automod_block_message
|
||||
or self.action is enums.AuditLogAction.automod_flag_message
|
||||
or self.action is enums.AuditLogAction.automod_timeout_member
|
||||
):
|
||||
channel_id = utils._get_as_snowflake(extra, 'channel_id')
|
||||
channel = None
|
||||
|
||||
# May be an empty string instead of None due to a Discord issue
|
||||
if channel_id:
|
||||
channel = self.guild.get_channel_or_thread(channel_id) or Object(id=channel_id)
|
||||
|
||||
self.extra = _AuditLogProxyAutoModAction(
|
||||
automod_rule_name=extra['auto_moderation_rule_name'],
|
||||
automod_rule_trigger_type=enums.try_enum(
|
||||
enums.AutoModRuleTriggerType, int(extra['auto_moderation_rule_trigger_type'])
|
||||
),
|
||||
channel=channel,
|
||||
)
|
||||
elif self.action is enums.AuditLogAction.automod_quarantine_user:
|
||||
self.extra = _AuditLogProxyAutoModActionQuarantineUser(
|
||||
automod_rule_name=extra['auto_moderation_rule_name'],
|
||||
automod_rule_trigger_type=enums.try_enum(
|
||||
enums.AutoModRuleTriggerType, int(extra['auto_moderation_rule_trigger_type'])
|
||||
),
|
||||
)
|
||||
|
||||
elif self.action.name.startswith('overwrite_'):
|
||||
# the overwrite_ actions have a dict with some information
|
||||
instance_id = int(extra['id'])
|
||||
the_type = extra.get('type')
|
||||
if the_type == '1':
|
||||
self.extra = self._get_member(instance_id)
|
||||
elif the_type == '0':
|
||||
role = self.guild.get_role(instance_id)
|
||||
if role is None:
|
||||
role = Object(id=instance_id, type=Role)
|
||||
role.name = extra.get('role_name') # type: ignore # Object doesn't usually have name
|
||||
self.extra = role
|
||||
elif self.action.name.startswith('stage_instance'):
|
||||
channel_id = int(extra['channel_id'])
|
||||
self.extra = _AuditLogProxyStageInstanceAction(
|
||||
channel=self.guild.get_channel(channel_id) or Object(id=channel_id, type=StageChannel)
|
||||
)
|
||||
elif self.action.name.startswith('app_command'):
|
||||
app_id = int(extra['application_id'])
|
||||
self.extra = self._get_integration_by_app_id(app_id) or Object(app_id, type=PartialIntegration)
|
||||
|
||||
# this key is not present when the above is present, typically.
|
||||
# It's a list of { new_value: a, old_value: b, key: c }
|
||||
# where new_value and old_value are not guaranteed to be there depending
|
||||
# on the action type, so let's just fetch it for now and only turn it
|
||||
# into meaningful data when requested
|
||||
self._changes = data.get('changes', [])
|
||||
|
||||
self.user_id: Optional[int] = utils._get_as_snowflake(data, 'user_id')
|
||||
self.user: Optional[Union[User, Member]] = self._get_member(self.user_id)
|
||||
self._target_id = utils._get_as_snowflake(data, 'target_id')
|
||||
|
||||
def _get_member(self, user_id: Optional[int]) -> Union[Member, User, None]:
|
||||
if user_id is None:
|
||||
return None
|
||||
|
||||
return self.guild.get_member(user_id) or self._users.get(user_id)
|
||||
|
||||
def _get_integration(self, integration_id: Optional[int]) -> Optional[PartialIntegration]:
|
||||
if integration_id is None:
|
||||
return None
|
||||
|
||||
return self._integrations.get(integration_id)
|
||||
|
||||
def _get_integration_by_app_id(self, application_id: Optional[int]) -> Optional[PartialIntegration]:
|
||||
if application_id is None:
|
||||
return None
|
||||
|
||||
# get PartialIntegration by application id
|
||||
return utils.get(self._integrations.values(), application_id=application_id)
|
||||
|
||||
def _get_app_command(self, app_command_id: Optional[int]) -> Optional[AppCommand]:
|
||||
if app_command_id is None:
|
||||
return None
|
||||
|
||||
return self._app_commands.get(app_command_id)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f'<AuditLogEntry id={self.id} action={self.action} user={self.user!r}>'
|
||||
|
||||
@utils.cached_property
|
||||
def created_at(self) -> datetime.datetime:
|
||||
""":class:`datetime.datetime`: Returns the entry's creation time in UTC."""
|
||||
return utils.snowflake_time(self.id)
|
||||
|
||||
@utils.cached_property
|
||||
def target(self) -> TargetType:
|
||||
if self.action.target_type is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
converter = getattr(self, '_convert_target_' + self.action.target_type)
|
||||
except AttributeError:
|
||||
if self._target_id is None:
|
||||
return None
|
||||
return Object(id=self._target_id)
|
||||
else:
|
||||
return converter(self._target_id)
|
||||
|
||||
@utils.cached_property
|
||||
def category(self) -> Optional[enums.AuditLogActionCategory]:
|
||||
"""Optional[:class:`AuditLogActionCategory`]: The category of the action, if applicable."""
|
||||
return self.action.category
|
||||
|
||||
@utils.cached_property
|
||||
def changes(self) -> AuditLogChanges:
|
||||
""":class:`AuditLogChanges`: The list of changes this entry has."""
|
||||
obj = AuditLogChanges(self, self._changes)
|
||||
del self._changes
|
||||
return obj
|
||||
|
||||
@utils.cached_property
|
||||
def before(self) -> AuditLogDiff:
|
||||
""":class:`AuditLogDiff`: The target's prior state."""
|
||||
return self.changes.before
|
||||
|
||||
@utils.cached_property
|
||||
def after(self) -> AuditLogDiff:
|
||||
""":class:`AuditLogDiff`: The target's subsequent state."""
|
||||
return self.changes.after
|
||||
|
||||
def _convert_target_guild(self, target_id: int) -> Guild:
|
||||
return self.guild
|
||||
|
||||
def _convert_target_channel(self, target_id: int) -> Union[abc.GuildChannel, Object]:
|
||||
return self.guild.get_channel(target_id) or Object(id=target_id)
|
||||
|
||||
def _convert_target_user(self, target_id: Optional[int]) -> Optional[Union[Member, User, Object]]:
|
||||
# For some reason the member_disconnect and member_move action types
|
||||
# do not have a non-null target_id so safeguard against that
|
||||
if target_id is None:
|
||||
return None
|
||||
|
||||
return self._get_member(target_id) or Object(id=target_id, type=Member)
|
||||
|
||||
def _convert_target_role(self, target_id: int) -> Union[Role, Object]:
|
||||
return self.guild.get_role(target_id) or Object(id=target_id, type=Role)
|
||||
|
||||
def _convert_target_invite(self, target_id: None) -> Invite:
|
||||
# invites have target_id set to null
|
||||
# so figure out which change has the full invite data
|
||||
changeset = self.before if self.action is enums.AuditLogAction.invite_delete else self.after
|
||||
|
||||
fake_payload: InvitePayload = {
|
||||
'max_age': changeset.max_age,
|
||||
'max_uses': changeset.max_uses,
|
||||
'code': changeset.code,
|
||||
'temporary': changeset.temporary,
|
||||
'uses': changeset.uses,
|
||||
'channel': None, # type: ignore # the channel is passed to the Invite constructor directly
|
||||
}
|
||||
|
||||
obj = Invite(state=self._state, data=fake_payload, guild=self.guild, channel=changeset.channel)
|
||||
try:
|
||||
obj.inviter = changeset.inviter
|
||||
except AttributeError:
|
||||
pass
|
||||
return obj
|
||||
|
||||
def _convert_target_emoji(self, target_id: int) -> Union[Emoji, Object]:
|
||||
return self._state.get_emoji(target_id) or Object(id=target_id, type=Emoji)
|
||||
|
||||
def _convert_target_message(self, target_id: Optional[int]) -> Optional[Union[Member, User, Object]]:
|
||||
# The message_pin and message_unpin action types do not have a
|
||||
# non-null target_id so safeguard against that
|
||||
|
||||
if target_id is None:
|
||||
return None
|
||||
|
||||
return self._get_member(target_id) or Object(id=target_id, type=Member)
|
||||
|
||||
def _convert_target_stage_instance(self, target_id: int) -> Union[StageInstance, Object]:
|
||||
return self.guild.get_stage_instance(target_id) or Object(id=target_id, type=StageInstance)
|
||||
|
||||
def _convert_target_sticker(self, target_id: int) -> Union[GuildSticker, Object]:
|
||||
return self._state.get_sticker(target_id) or Object(id=target_id, type=GuildSticker)
|
||||
|
||||
def _convert_target_thread(self, target_id: int) -> Union[Thread, Object]:
|
||||
return self.guild.get_thread(target_id) or Object(id=target_id, type=Thread)
|
||||
|
||||
def _convert_target_guild_scheduled_event(self, target_id: int) -> Union[ScheduledEvent, Object]:
|
||||
return self.guild.get_scheduled_event(target_id) or Object(id=target_id, type=ScheduledEvent)
|
||||
|
||||
def _convert_target_integration(self, target_id: int) -> Union[PartialIntegration, Object]:
|
||||
return self._get_integration(target_id) or Object(target_id, type=PartialIntegration)
|
||||
|
||||
def _convert_target_app_command(self, target_id: int) -> Union[AppCommand, Object]:
|
||||
target = self._get_app_command(target_id)
|
||||
if not target:
|
||||
# circular import
|
||||
from .app_commands import AppCommand
|
||||
|
||||
target = Object(target_id, type=AppCommand)
|
||||
|
||||
return target
|
||||
|
||||
def _convert_target_integration_or_app_command(self, target_id: int) -> Union[PartialIntegration, AppCommand, Object]:
|
||||
target = self._get_integration_by_app_id(target_id) or self._get_app_command(target_id)
|
||||
if not target:
|
||||
try:
|
||||
# circular import
|
||||
from .app_commands import AppCommand
|
||||
|
||||
# get application id from extras
|
||||
# if it matches target id, type should be integration
|
||||
target_app = self.extra
|
||||
# extra should be an Object or PartialIntegration
|
||||
app_id = target_app.application_id if isinstance(target_app, PartialIntegration) else target_app.id # type: ignore
|
||||
type = PartialIntegration if target_id == app_id else AppCommand
|
||||
except AttributeError:
|
||||
return Object(target_id)
|
||||
else:
|
||||
return Object(target_id, type=type)
|
||||
|
||||
return target
|
||||
|
||||
def _convert_target_auto_moderation(self, target_id: int) -> Union[AutoModRule, Object]:
|
||||
return self._automod_rules.get(target_id) or Object(target_id, type=AutoModRule)
|
||||
|
||||
def _convert_target_webhook(self, target_id: int) -> Union[Webhook, Object]:
|
||||
# circular import
|
||||
from .webhook import Webhook
|
||||
|
||||
return self._webhooks.get(target_id) or Object(target_id, type=Webhook)
|
||||
|
||||
def _convert_target_onboarding_prompt(self, target_id: int) -> Object:
|
||||
return Object(target_id, type=OnboardingPrompt)
|
||||
Reference in New Issue
Block a user