From dc061b869c6144ab4365c7cc0d4745485deadbd9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A9r=C3=B4me=20Deuchnord?= Date: Sun, 24 Oct 2021 16:06:33 +0200 Subject: [PATCH] Add Flood moderator --- _twitchbot/config.py | 54 ++++++++++++++-------- _twitchbot/moderator.py | 89 +++++++++++++++++++++++++++++++++--- _twitchbot/twitch_message.py | 20 ++++++++ _twitchbot/twitchbot.py | 26 +++++++++-- 4 files changed, 158 insertions(+), 31 deletions(-) create mode 100644 _twitchbot/twitch_message.py diff --git a/_twitchbot/config.py b/_twitchbot/config.py index 97121f6..ba1773e 100644 --- a/_twitchbot/config.py +++ b/_twitchbot/config.py @@ -21,7 +21,7 @@ from os import environ from enum import Enum from typing import Union -from .moderator import Moderator, CapsLockModerator, ModerationDecision +from . import moderator class Command: @@ -93,7 +93,7 @@ class Config: command_prefix: str commands: [Command] timer: Timer - moderators: [Moderator] + moderators: [moderator.Moderator] def __init__( self, @@ -103,7 +103,7 @@ class Config: command_prefix: str, commands: [Command], timer: Timer, - moderators: [Moderator] + moderators: [moderator.Moderator] ): self.nickname = nickname self.channel = channel @@ -137,23 +137,26 @@ class Config: commands.append(command) moderators = [] - for moderator in params.get('moderator', []): - decision_str = params['moderator']['caps-lock'].get("decision", "delete") - if decision_str == "delete": - decision = ModerationDecision.DELETE_MSG - elif decision_str == "timeout": - decision = ModerationDecision.TIMEOUT_USER - else: - print("WARNING: %s moderator's decision is invalid, it has been deactivated!") - decision = ModerationDecision.ABSTAIN - - if moderator == 'caps-lock': - moderators.append(CapsLockModerator( - params['moderator']['caps-lock'].get("message", "{author}, stop the caps lock!"), - params['moderator']['caps-lock'].get("min-size", 5), - params['moderator']['caps-lock'].get("threshold", 50), - decision, - params['moderator']['caps-lock'].get("duration", None) + for mod in params.get('moderator', []): + moderator_config = params['moderator'][mod] + if mod == 'caps-lock': + moderators.append(moderator.CapsLockModerator( + moderator_config.get("message", "{author}, stop the caps lock!"), + cls.parse_decision(moderator_config.get("decision", "delete")), + moderator_config.get("duration", None), + moderator_config.get("min-size", 5), + moderator_config.get("threshold", 50) + )) + if mod == 'flood': + moderators.append(moderator.FloodModerator( + moderator_config.get("message", "{author}, stop the flood!"), + cls.parse_decision(moderator_config.get("decision", "timeout")), + moderator_config.get("duration", None), + moderator_config.get("max-word-length", None), + moderator_config.get("raid-cooldown", None), + moderator_config.get("ignore-hashtags", False), + moderator_config.get("max-msg-occurrences", None), + moderator_config.get("min-time-between-occurrence", None) )) # Generate help command @@ -173,6 +176,17 @@ class Config: moderators ) + @classmethod + def parse_decision(cls, decision_str) -> moderator.ModerationDecision: + if decision_str == "delete": + decision = moderator.ModerationDecision.DELETE_MSG + elif decision_str == "timeout": + decision = moderator.ModerationDecision.TIMEOUT_USER + else: + print("WARNING: %s moderator's decision is invalid, it has been deactivated!") + decision = moderator.ModerationDecision.ABSTAIN + return decision + def find_command(self, command: str) -> Union[None, Command]: if not command.startswith(self.command_prefix): return None diff --git a/_twitchbot/moderator.py b/_twitchbot/moderator.py index 575a28e..219edc7 100644 --- a/_twitchbot/moderator.py +++ b/_twitchbot/moderator.py @@ -18,6 +18,9 @@ from abc import ABC, abstractmethod from enum import Enum from typing import Union +from datetime import datetime, timedelta + +EPOCH = datetime(1970, 1, 1) class ModerationDecision(Enum): @@ -30,31 +33,38 @@ class Moderator(ABC): message: str decision: ModerationDecision - def __init__(self, message: str, timeout_duration: Union[None, int]): + def __init__(self, message: str, decision: ModerationDecision, timeout_duration: Union[None, int]): self.message = message self.timeout_duration = timeout_duration + self.decision = decision @abstractmethod def get_name(self) -> str: pass @abstractmethod - def vote(self, msg) -> ModerationDecision: + def vote(self, msg: str, author: str) -> ModerationDecision: pass class CapsLockModerator(Moderator): - def __init__(self, message: str, min_size: int, threshold: int, decision: ModerationDecision, timeout_duration: Union[None, int]): - super().__init__(message, timeout_duration) + def __init__( + self, + message: str, + decision: ModerationDecision, + timeout_duration: Union[None, int], + min_size: int, + threshold: int + ): + super().__init__(message, decision, timeout_duration) self.min_size = min_size self.threshold = threshold / 100 - self.decision = decision def get_name(self) -> str: return 'Caps Lock' - def vote(self, msg: str) -> ModerationDecision: + def vote(self, msg: str, author: str) -> ModerationDecision: msg = ''.join(filter(str.isalpha, msg)) if len(msg) < self.min_size: @@ -71,3 +81,70 @@ class CapsLockModerator(Moderator): return self.decision return ModerationDecision.ABSTAIN + + +class FloodModerator(Moderator): + def __init__( + self, + message: str, + decision: ModerationDecision, + timeout_duration: Union[None, int], + max_word_length: Union[None, int], + raid_cooldown: Union[None, int], + ignore_hashtags: bool, + max_msg_occurrences: Union[None, int], + min_time_between_occurrence: Union[None, int] + ): + super().__init__(message, decision, timeout_duration) + self.max_word_length = max_word_length + self.raid_cooldown = raid_cooldown + self.last_raid = EPOCH + self.ignore_hashtags = ignore_hashtags + self.max_msg_occurrences = max_msg_occurrences + self.min_time_between_occurrence = min_time_between_occurrence + self.last_msgs = [] + + def get_name(self) -> str: + return 'Flood' + + def vote(self, msg: str, author: str) -> ModerationDecision: + if self.raid_cooldown is not None and self.last_raid + timedelta(minutes=self.raid_cooldown) > datetime.now(): + return ModerationDecision.ABSTAIN + + if self.max_word_length is not None: + for word in msg.split(' '): + if word.startswith('#'): + continue + if len(word) > self.max_word_length: + return ModerationDecision.TIMEOUT_USER + + if self.max_msg_occurrences is None or self.min_time_between_occurrence is None: + return ModerationDecision.ABSTAIN + + clean_msg = None + for last_msg in self.last_msgs: + if last_msg['first-occurrence'] + timedelta(seconds=self.min_time_between_occurrence) <= datetime.now(): + clean_msg = last_msg + break + + if author != last_msg['author'] or msg != last_msg['message']: + break + + last_msg['occurrences'] += 1 + if last_msg['occurrences'] >= self.max_msg_occurrences: + return ModerationDecision.TIMEOUT_USER + + if clean_msg is not None: + self.last_msgs.remove(clean_msg) + + self.last_msgs.append({ + 'first-occurrence': datetime.now(), + 'author': author, + 'message': msg, + 'occurrences': 1 + }) + + return ModerationDecision.ABSTAIN + + def declare_raid(self): + self.last_raid = datetime.now() diff --git a/_twitchbot/twitch_message.py b/_twitchbot/twitch_message.py new file mode 100644 index 0000000..7f94c88 --- /dev/null +++ b/_twitchbot/twitch_message.py @@ -0,0 +1,20 @@ +# Twason - The KISS Twitch bot +# Copyright (C) 2021 Jérôme Deuchnord +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from irc3.rfc import raw + +USERNOTICE = r'^(@(?P\S+) )?:(?P\S+) (?P(USERNOTICE)) (?P\S+)$' + diff --git a/_twitchbot/twitchbot.py b/_twitchbot/twitchbot.py index a21dc2f..0d75905 100644 --- a/_twitchbot/twitchbot.py +++ b/_twitchbot/twitchbot.py @@ -17,13 +17,15 @@ import irc3 +from random import shuffle +from datetime import datetime, timedelta + from . import utils from .config import TimerStrategy -from .moderator import ModerationDecision, Moderator +from .moderator import ModerationDecision, Moderator, FloodModerator +from . import twitch_message -from random import shuffle -from datetime import datetime, timedelta config = None @@ -55,6 +57,10 @@ class TwitchBot: return in_str + # @irc3.event(r'^(?P.+)$') + # def on_all(self, data): + # print(data) + @irc3.event(irc3.rfc.PRIVMSG) def on_msg(self, mask: str = None, target: str = None, data: str = None, tags: str = None, **_): author = mask.split('!')[0] @@ -70,6 +76,17 @@ class TwitchBot: self.nb_messages_since_timer += 1 self.play_timer() + @irc3.event(twitch_message.USERNOTICE) + def on_user_notice(self, tags: str, **_): + tags = utils.parse_tags(tags) + if tags.get('msg-id', None) == 'raid': + # Notice the Flood moderator a raid has just happened + for moderator in self.config.moderators: + if isinstance(moderator, FloodModerator) and moderator.raid_cooldown is not None: + print("Raid received from %s. Disabling the Flood moderator." % tags.get('display-name')) + moderator.declare_raid() + break + def play_timer(self): if not self.messages_stack: print('Filling the timer messages stack in') @@ -91,7 +108,6 @@ class TwitchBot: self.last_timer_date = datetime.now() def moderate(self, tags: {str: str}, msg: str, author: str, channel: str): - print(tags) def delete_msg(mod: Moderator): print("[DELETE (reason: %s)] %s: %s" % (mod.get_name(), author, msg)) self.bot.privmsg( @@ -130,7 +146,7 @@ class TwitchBot: message_to_moderate = message_to_moderate[:first - 1] + message_to_moderate[last + 1:] for moderator in self.config.moderators: - vote = moderator.vote(message_to_moderate) + vote = moderator.vote(message_to_moderate, author) if vote == ModerationDecision.ABSTAIN: continue if vote == ModerationDecision.DELETE_MSG: