diff --git a/_twitchbot/config.py b/_twitchbot/config.py
index 97121f6..ba1773e 100644
--- a/_twitchbot/config.py
+++ b/_twitchbot/config.py
@@ -21,7 +21,7 @@ from os import environ
from enum import Enum
from typing import Union
-from .moderator import Moderator, CapsLockModerator, ModerationDecision
+from . import moderator
class Command:
@@ -93,7 +93,7 @@ class Config:
command_prefix: str
commands: [Command]
timer: Timer
- moderators: [Moderator]
+ moderators: [moderator.Moderator]
def __init__(
self,
@@ -103,7 +103,7 @@ class Config:
command_prefix: str,
commands: [Command],
timer: Timer,
- moderators: [Moderator]
+ moderators: [moderator.Moderator]
):
self.nickname = nickname
self.channel = channel
@@ -137,23 +137,26 @@ class Config:
commands.append(command)
moderators = []
- for moderator in params.get('moderator', []):
- decision_str = params['moderator']['caps-lock'].get("decision", "delete")
- if decision_str == "delete":
- decision = ModerationDecision.DELETE_MSG
- elif decision_str == "timeout":
- decision = ModerationDecision.TIMEOUT_USER
- else:
- print("WARNING: %s moderator's decision is invalid, it has been deactivated!")
- decision = ModerationDecision.ABSTAIN
-
- if moderator == 'caps-lock':
- moderators.append(CapsLockModerator(
- params['moderator']['caps-lock'].get("message", "{author}, stop the caps lock!"),
- params['moderator']['caps-lock'].get("min-size", 5),
- params['moderator']['caps-lock'].get("threshold", 50),
- decision,
- params['moderator']['caps-lock'].get("duration", None)
+ for mod in params.get('moderator', []):
+ moderator_config = params['moderator'][mod]
+ if mod == 'caps-lock':
+ moderators.append(moderator.CapsLockModerator(
+ moderator_config.get("message", "{author}, stop the caps lock!"),
+ cls.parse_decision(moderator_config.get("decision", "delete")),
+ moderator_config.get("duration", None),
+ moderator_config.get("min-size", 5),
+ moderator_config.get("threshold", 50)
+ ))
+ if mod == 'flood':
+ moderators.append(moderator.FloodModerator(
+ moderator_config.get("message", "{author}, stop the flood!"),
+ cls.parse_decision(moderator_config.get("decision", "timeout")),
+ moderator_config.get("duration", None),
+ moderator_config.get("max-word-length", None),
+ moderator_config.get("raid-cooldown", None),
+ moderator_config.get("ignore-hashtags", False),
+ moderator_config.get("max-msg-occurrences", None),
+ moderator_config.get("min-time-between-occurrence", None)
))
# Generate help command
@@ -173,6 +176,17 @@ class Config:
moderators
)
+ @classmethod
+ def parse_decision(cls, decision_str) -> moderator.ModerationDecision:
+ if decision_str == "delete":
+ decision = moderator.ModerationDecision.DELETE_MSG
+ elif decision_str == "timeout":
+ decision = moderator.ModerationDecision.TIMEOUT_USER
+ else:
+ print("WARNING: %s moderator's decision is invalid, it has been deactivated!")
+ decision = moderator.ModerationDecision.ABSTAIN
+ return decision
+
def find_command(self, command: str) -> Union[None, Command]:
if not command.startswith(self.command_prefix):
return None
diff --git a/_twitchbot/moderator.py b/_twitchbot/moderator.py
index 575a28e..219edc7 100644
--- a/_twitchbot/moderator.py
+++ b/_twitchbot/moderator.py
@@ -18,6 +18,9 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Union
+from datetime import datetime, timedelta
+
+EPOCH = datetime(1970, 1, 1)
class ModerationDecision(Enum):
@@ -30,31 +33,38 @@ class Moderator(ABC):
message: str
decision: ModerationDecision
- def __init__(self, message: str, timeout_duration: Union[None, int]):
+ def __init__(self, message: str, decision: ModerationDecision, timeout_duration: Union[None, int]):
self.message = message
self.timeout_duration = timeout_duration
+ self.decision = decision
@abstractmethod
def get_name(self) -> str:
pass
@abstractmethod
- def vote(self, msg) -> ModerationDecision:
+ def vote(self, msg: str, author: str) -> ModerationDecision:
pass
class CapsLockModerator(Moderator):
- def __init__(self, message: str, min_size: int, threshold: int, decision: ModerationDecision, timeout_duration: Union[None, int]):
- super().__init__(message, timeout_duration)
+ def __init__(
+ self,
+ message: str,
+ decision: ModerationDecision,
+ timeout_duration: Union[None, int],
+ min_size: int,
+ threshold: int
+ ):
+ super().__init__(message, decision, timeout_duration)
self.min_size = min_size
self.threshold = threshold / 100
- self.decision = decision
def get_name(self) -> str:
return 'Caps Lock'
- def vote(self, msg: str) -> ModerationDecision:
+ def vote(self, msg: str, author: str) -> ModerationDecision:
msg = ''.join(filter(str.isalpha, msg))
if len(msg) < self.min_size:
@@ -71,3 +81,70 @@ class CapsLockModerator(Moderator):
return self.decision
return ModerationDecision.ABSTAIN
+
+
+class FloodModerator(Moderator):
+ def __init__(
+ self,
+ message: str,
+ decision: ModerationDecision,
+ timeout_duration: Union[None, int],
+ max_word_length: Union[None, int],
+ raid_cooldown: Union[None, int],
+ ignore_hashtags: bool,
+ max_msg_occurrences: Union[None, int],
+ min_time_between_occurrence: Union[None, int]
+ ):
+ super().__init__(message, decision, timeout_duration)
+ self.max_word_length = max_word_length
+ self.raid_cooldown = raid_cooldown
+ self.last_raid = EPOCH
+ self.ignore_hashtags = ignore_hashtags
+ self.max_msg_occurrences = max_msg_occurrences
+ self.min_time_between_occurrence = min_time_between_occurrence
+ self.last_msgs = []
+
+ def get_name(self) -> str:
+ return 'Flood'
+
+ def vote(self, msg: str, author: str) -> ModerationDecision:
+ if self.raid_cooldown is not None and self.last_raid + timedelta(minutes=self.raid_cooldown) > datetime.now():
+ return ModerationDecision.ABSTAIN
+
+ if self.max_word_length is not None:
+ for word in msg.split(' '):
+ if word.startswith('#'):
+ continue
+ if len(word) > self.max_word_length:
+ return ModerationDecision.TIMEOUT_USER
+
+ if self.max_msg_occurrences is None or self.min_time_between_occurrence is None:
+ return ModerationDecision.ABSTAIN
+
+ clean_msg = None
+ for last_msg in self.last_msgs:
+ if last_msg['first-occurrence'] + timedelta(seconds=self.min_time_between_occurrence) <= datetime.now():
+ clean_msg = last_msg
+ break
+
+ if author != last_msg['author'] or msg != last_msg['message']:
+ break
+
+ last_msg['occurrences'] += 1
+ if last_msg['occurrences'] >= self.max_msg_occurrences:
+ return ModerationDecision.TIMEOUT_USER
+
+ if clean_msg is not None:
+ self.last_msgs.remove(clean_msg)
+
+ self.last_msgs.append({
+ 'first-occurrence': datetime.now(),
+ 'author': author,
+ 'message': msg,
+ 'occurrences': 1
+ })
+
+ return ModerationDecision.ABSTAIN
+
+ def declare_raid(self):
+ self.last_raid = datetime.now()
diff --git a/_twitchbot/twitch_message.py b/_twitchbot/twitch_message.py
new file mode 100644
index 0000000..7f94c88
--- /dev/null
+++ b/_twitchbot/twitch_message.py
@@ -0,0 +1,20 @@
+# Twason - The KISS Twitch bot
+# Copyright (C) 2021 Jérôme Deuchnord
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+from irc3.rfc import raw
+
+USERNOTICE = r'^(@(?P\S+) )?:(?P\S+) (?P(USERNOTICE)) (?P\S+)$'
+
diff --git a/_twitchbot/twitchbot.py b/_twitchbot/twitchbot.py
index a21dc2f..0d75905 100644
--- a/_twitchbot/twitchbot.py
+++ b/_twitchbot/twitchbot.py
@@ -17,13 +17,15 @@
import irc3
+from random import shuffle
+from datetime import datetime, timedelta
+
from . import utils
from .config import TimerStrategy
-from .moderator import ModerationDecision, Moderator
+from .moderator import ModerationDecision, Moderator, FloodModerator
+from . import twitch_message
-from random import shuffle
-from datetime import datetime, timedelta
config = None
@@ -55,6 +57,10 @@ class TwitchBot:
return in_str
+ # @irc3.event(r'^(?P.+)$')
+ # def on_all(self, data):
+ # print(data)
+
@irc3.event(irc3.rfc.PRIVMSG)
def on_msg(self, mask: str = None, target: str = None, data: str = None, tags: str = None, **_):
author = mask.split('!')[0]
@@ -70,6 +76,17 @@ class TwitchBot:
self.nb_messages_since_timer += 1
self.play_timer()
+ @irc3.event(twitch_message.USERNOTICE)
+ def on_user_notice(self, tags: str, **_):
+ tags = utils.parse_tags(tags)
+ if tags.get('msg-id', None) == 'raid':
+ # Notice the Flood moderator a raid has just happened
+ for moderator in self.config.moderators:
+ if isinstance(moderator, FloodModerator) and moderator.raid_cooldown is not None:
+ print("Raid received from %s. Disabling the Flood moderator." % tags.get('display-name'))
+ moderator.declare_raid()
+ break
+
def play_timer(self):
if not self.messages_stack:
print('Filling the timer messages stack in')
@@ -91,7 +108,6 @@ class TwitchBot:
self.last_timer_date = datetime.now()
def moderate(self, tags: {str: str}, msg: str, author: str, channel: str):
- print(tags)
def delete_msg(mod: Moderator):
print("[DELETE (reason: %s)] %s: %s" % (mod.get_name(), author, msg))
self.bot.privmsg(
@@ -130,7 +146,7 @@ class TwitchBot:
message_to_moderate = message_to_moderate[:first - 1] + message_to_moderate[last + 1:]
for moderator in self.config.moderators:
- vote = moderator.vote(message_to_moderate)
+ vote = moderator.vote(message_to_moderate, author)
if vote == ModerationDecision.ABSTAIN:
continue
if vote == ModerationDecision.DELETE_MSG: