| @@ -21,7 +21,7 @@ from os import environ | |||
| from enum import Enum | |||
| from typing import Union | |||
| from .moderator import Moderator, CapsLockModerator, ModerationDecision | |||
| from . import moderator | |||
| class Command: | |||
| @@ -93,7 +93,7 @@ class Config: | |||
| command_prefix: str | |||
| commands: [Command] | |||
| timer: Timer | |||
| moderators: [Moderator] | |||
| moderators: [moderator.Moderator] | |||
| def __init__( | |||
| self, | |||
| @@ -103,7 +103,7 @@ class Config: | |||
| command_prefix: str, | |||
| commands: [Command], | |||
| timer: Timer, | |||
| moderators: [Moderator] | |||
| moderators: [moderator.Moderator] | |||
| ): | |||
| self.nickname = nickname | |||
| self.channel = channel | |||
| @@ -137,23 +137,26 @@ class Config: | |||
| commands.append(command) | |||
| moderators = [] | |||
| for moderator in params.get('moderator', []): | |||
| decision_str = params['moderator']['caps-lock'].get("decision", "delete") | |||
| if decision_str == "delete": | |||
| decision = ModerationDecision.DELETE_MSG | |||
| elif decision_str == "timeout": | |||
| decision = ModerationDecision.TIMEOUT_USER | |||
| else: | |||
| print("WARNING: %s moderator's decision is invalid, it has been deactivated!") | |||
| decision = ModerationDecision.ABSTAIN | |||
| if moderator == 'caps-lock': | |||
| moderators.append(CapsLockModerator( | |||
| params['moderator']['caps-lock'].get("message", "{author}, stop the caps lock!"), | |||
| params['moderator']['caps-lock'].get("min-size", 5), | |||
| params['moderator']['caps-lock'].get("threshold", 50), | |||
| decision, | |||
| params['moderator']['caps-lock'].get("duration", None) | |||
| for mod in params.get('moderator', []): | |||
| moderator_config = params['moderator'][mod] | |||
| if mod == 'caps-lock': | |||
| moderators.append(moderator.CapsLockModerator( | |||
| moderator_config.get("message", "{author}, stop the caps lock!"), | |||
| cls.parse_decision(moderator_config.get("decision", "delete")), | |||
| moderator_config.get("duration", None), | |||
| moderator_config.get("min-size", 5), | |||
| moderator_config.get("threshold", 50) | |||
| )) | |||
| if mod == 'flood': | |||
| moderators.append(moderator.FloodModerator( | |||
| moderator_config.get("message", "{author}, stop the flood!"), | |||
| cls.parse_decision(moderator_config.get("decision", "timeout")), | |||
| moderator_config.get("duration", None), | |||
| moderator_config.get("max-word-length", None), | |||
| moderator_config.get("raid-cooldown", None), | |||
| moderator_config.get("ignore-hashtags", False), | |||
| moderator_config.get("max-msg-occurrences", None), | |||
| moderator_config.get("min-time-between-occurrence", None) | |||
| )) | |||
| # Generate help command | |||
| @@ -173,6 +176,17 @@ class Config: | |||
| moderators | |||
| ) | |||
| @classmethod | |||
| def parse_decision(cls, decision_str) -> moderator.ModerationDecision: | |||
| if decision_str == "delete": | |||
| decision = moderator.ModerationDecision.DELETE_MSG | |||
| elif decision_str == "timeout": | |||
| decision = moderator.ModerationDecision.TIMEOUT_USER | |||
| else: | |||
| print("WARNING: %s moderator's decision is invalid, it has been deactivated!") | |||
| decision = moderator.ModerationDecision.ABSTAIN | |||
| return decision | |||
| def find_command(self, command: str) -> Union[None, Command]: | |||
| if not command.startswith(self.command_prefix): | |||
| return None | |||
| @@ -18,6 +18,9 @@ | |||
| from abc import ABC, abstractmethod | |||
| from enum import Enum | |||
| from typing import Union | |||
| from datetime import datetime, timedelta | |||
| EPOCH = datetime(1970, 1, 1) | |||
| class ModerationDecision(Enum): | |||
| @@ -30,31 +33,38 @@ class Moderator(ABC): | |||
| message: str | |||
| decision: ModerationDecision | |||
| def __init__(self, message: str, timeout_duration: Union[None, int]): | |||
| def __init__(self, message: str, decision: ModerationDecision, timeout_duration: Union[None, int]): | |||
| self.message = message | |||
| self.timeout_duration = timeout_duration | |||
| self.decision = decision | |||
| @abstractmethod | |||
| def get_name(self) -> str: | |||
| pass | |||
| @abstractmethod | |||
| def vote(self, msg) -> ModerationDecision: | |||
| def vote(self, msg: str, author: str) -> ModerationDecision: | |||
| pass | |||
| class CapsLockModerator(Moderator): | |||
| def __init__(self, message: str, min_size: int, threshold: int, decision: ModerationDecision, timeout_duration: Union[None, int]): | |||
| super().__init__(message, timeout_duration) | |||
| def __init__( | |||
| self, | |||
| message: str, | |||
| decision: ModerationDecision, | |||
| timeout_duration: Union[None, int], | |||
| min_size: int, | |||
| threshold: int | |||
| ): | |||
| super().__init__(message, decision, timeout_duration) | |||
| self.min_size = min_size | |||
| self.threshold = threshold / 100 | |||
| self.decision = decision | |||
| def get_name(self) -> str: | |||
| return 'Caps Lock' | |||
| def vote(self, msg: str) -> ModerationDecision: | |||
| def vote(self, msg: str, author: str) -> ModerationDecision: | |||
| msg = ''.join(filter(str.isalpha, msg)) | |||
| if len(msg) < self.min_size: | |||
| @@ -71,3 +81,70 @@ class CapsLockModerator(Moderator): | |||
| return self.decision | |||
| return ModerationDecision.ABSTAIN | |||
| class FloodModerator(Moderator): | |||
| def __init__( | |||
| self, | |||
| message: str, | |||
| decision: ModerationDecision, | |||
| timeout_duration: Union[None, int], | |||
| max_word_length: Union[None, int], | |||
| raid_cooldown: Union[None, int], | |||
| ignore_hashtags: bool, | |||
| max_msg_occurrences: Union[None, int], | |||
| min_time_between_occurrence: Union[None, int] | |||
| ): | |||
| super().__init__(message, decision, timeout_duration) | |||
| self.max_word_length = max_word_length | |||
| self.raid_cooldown = raid_cooldown | |||
| self.last_raid = EPOCH | |||
| self.ignore_hashtags = ignore_hashtags | |||
| self.max_msg_occurrences = max_msg_occurrences | |||
| self.min_time_between_occurrence = min_time_between_occurrence | |||
| self.last_msgs = [] | |||
| def get_name(self) -> str: | |||
| return 'Flood' | |||
| def vote(self, msg: str, author: str) -> ModerationDecision: | |||
| if self.raid_cooldown is not None and self.last_raid + timedelta(minutes=self.raid_cooldown) > datetime.now(): | |||
| return ModerationDecision.ABSTAIN | |||
| if self.max_word_length is not None: | |||
| for word in msg.split(' '): | |||
| if word.startswith('#'): | |||
| continue | |||
| if len(word) > self.max_word_length: | |||
| return ModerationDecision.TIMEOUT_USER | |||
| if self.max_msg_occurrences is None or self.min_time_between_occurrence is None: | |||
| return ModerationDecision.ABSTAIN | |||
| clean_msg = None | |||
| for last_msg in self.last_msgs: | |||
| if last_msg['first-occurrence'] + timedelta(seconds=self.min_time_between_occurrence) <= datetime.now(): | |||
| clean_msg = last_msg | |||
| break | |||
| if author != last_msg['author'] or msg != last_msg['message']: | |||
| break | |||
| last_msg['occurrences'] += 1 | |||
| if last_msg['occurrences'] >= self.max_msg_occurrences: | |||
| return ModerationDecision.TIMEOUT_USER | |||
| if clean_msg is not None: | |||
| self.last_msgs.remove(clean_msg) | |||
| self.last_msgs.append({ | |||
| 'first-occurrence': datetime.now(), | |||
| 'author': author, | |||
| 'message': msg, | |||
| 'occurrences': 1 | |||
| }) | |||
| return ModerationDecision.ABSTAIN | |||
| def declare_raid(self): | |||
| self.last_raid = datetime.now() | |||
| @@ -0,0 +1,20 @@ | |||
| # Twason - The KISS Twitch bot | |||
| # Copyright (C) 2021 Jérôme Deuchnord | |||
| # | |||
| # This program is free software: you can redistribute it and/or modify | |||
| # it under the terms of the GNU Affero General Public License as | |||
| # published by the Free Software Foundation, either version 3 of the | |||
| # License, or (at your option) any later version. | |||
| # | |||
| # This program is distributed in the hope that it will be useful, | |||
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |||
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |||
| # GNU Affero General Public License for more details. | |||
| # | |||
| # You should have received a copy of the GNU Affero General Public License | |||
| # along with this program. If not, see <https://www.gnu.org/licenses/>. | |||
| from irc3.rfc import raw | |||
| USERNOTICE = r'^(@(?P<tags>\S+) )?:(?P<mask>\S+) (?P<event>(USERNOTICE)) (?P<target>\S+)$' | |||
| @@ -17,13 +17,15 @@ | |||
| import irc3 | |||
| from random import shuffle | |||
| from datetime import datetime, timedelta | |||
| from . import utils | |||
| from .config import TimerStrategy | |||
| from .moderator import ModerationDecision, Moderator | |||
| from .moderator import ModerationDecision, Moderator, FloodModerator | |||
| from . import twitch_message | |||
| from random import shuffle | |||
| from datetime import datetime, timedelta | |||
| config = None | |||
| @@ -55,6 +57,10 @@ class TwitchBot: | |||
| return in_str | |||
| # @irc3.event(r'^(?P<data>.+)$') | |||
| # def on_all(self, data): | |||
| # print(data) | |||
| @irc3.event(irc3.rfc.PRIVMSG) | |||
| def on_msg(self, mask: str = None, target: str = None, data: str = None, tags: str = None, **_): | |||
| author = mask.split('!')[0] | |||
| @@ -70,6 +76,17 @@ class TwitchBot: | |||
| self.nb_messages_since_timer += 1 | |||
| self.play_timer() | |||
| @irc3.event(twitch_message.USERNOTICE) | |||
| def on_user_notice(self, tags: str, **_): | |||
| tags = utils.parse_tags(tags) | |||
| if tags.get('msg-id', None) == 'raid': | |||
| # Notice the Flood moderator a raid has just happened | |||
| for moderator in self.config.moderators: | |||
| if isinstance(moderator, FloodModerator) and moderator.raid_cooldown is not None: | |||
| print("Raid received from %s. Disabling the Flood moderator." % tags.get('display-name')) | |||
| moderator.declare_raid() | |||
| break | |||
| def play_timer(self): | |||
| if not self.messages_stack: | |||
| print('Filling the timer messages stack in') | |||
| @@ -91,7 +108,6 @@ class TwitchBot: | |||
| self.last_timer_date = datetime.now() | |||
| def moderate(self, tags: {str: str}, msg: str, author: str, channel: str): | |||
| print(tags) | |||
| def delete_msg(mod: Moderator): | |||
| print("[DELETE (reason: %s)] %s: %s" % (mod.get_name(), author, msg)) | |||
| self.bot.privmsg( | |||
| @@ -130,7 +146,7 @@ class TwitchBot: | |||
| message_to_moderate = message_to_moderate[:first - 1] + message_to_moderate[last + 1:] | |||
| for moderator in self.config.moderators: | |||
| vote = moderator.vote(message_to_moderate) | |||
| vote = moderator.vote(message_to_moderate, author) | |||
| if vote == ModerationDecision.ABSTAIN: | |||
| continue | |||
| if vote == ModerationDecision.DELETE_MSG: | |||