Browse Source

Add Flood moderator

pull/4/head
Jérôme Deuchnord 2 years ago
parent
commit
dc061b869c
4 changed files with 158 additions and 31 deletions
  1. +34
    -20
      _twitchbot/config.py
  2. +83
    -6
      _twitchbot/moderator.py
  3. +20
    -0
      _twitchbot/twitch_message.py
  4. +21
    -5
      _twitchbot/twitchbot.py

+ 34
- 20
_twitchbot/config.py View File

@@ -21,7 +21,7 @@ from os import environ
from enum import Enum
from typing import Union

from .moderator import Moderator, CapsLockModerator, ModerationDecision
from . import moderator


class Command:
@@ -93,7 +93,7 @@ class Config:
command_prefix: str
commands: [Command]
timer: Timer
moderators: [Moderator]
moderators: [moderator.Moderator]

def __init__(
self,
@@ -103,7 +103,7 @@ class Config:
command_prefix: str,
commands: [Command],
timer: Timer,
moderators: [Moderator]
moderators: [moderator.Moderator]
):
self.nickname = nickname
self.channel = channel
@@ -137,23 +137,26 @@ class Config:
commands.append(command)

moderators = []
for moderator in params.get('moderator', []):
decision_str = params['moderator']['caps-lock'].get("decision", "delete")
if decision_str == "delete":
decision = ModerationDecision.DELETE_MSG
elif decision_str == "timeout":
decision = ModerationDecision.TIMEOUT_USER
else:
print("WARNING: %s moderator's decision is invalid, it has been deactivated!")
decision = ModerationDecision.ABSTAIN

if moderator == 'caps-lock':
moderators.append(CapsLockModerator(
params['moderator']['caps-lock'].get("message", "{author}, stop the caps lock!"),
params['moderator']['caps-lock'].get("min-size", 5),
params['moderator']['caps-lock'].get("threshold", 50),
decision,
params['moderator']['caps-lock'].get("duration", None)
for mod in params.get('moderator', []):
moderator_config = params['moderator'][mod]
if mod == 'caps-lock':
moderators.append(moderator.CapsLockModerator(
moderator_config.get("message", "{author}, stop the caps lock!"),
cls.parse_decision(moderator_config.get("decision", "delete")),
moderator_config.get("duration", None),
moderator_config.get("min-size", 5),
moderator_config.get("threshold", 50)
))
if mod == 'flood':
moderators.append(moderator.FloodModerator(
moderator_config.get("message", "{author}, stop the flood!"),
cls.parse_decision(moderator_config.get("decision", "timeout")),
moderator_config.get("duration", None),
moderator_config.get("max-word-length", None),
moderator_config.get("raid-cooldown", None),
moderator_config.get("ignore-hashtags", False),
moderator_config.get("max-msg-occurrences", None),
moderator_config.get("min-time-between-occurrence", None)
))

# Generate help command
@@ -173,6 +176,17 @@ class Config:
moderators
)

@classmethod
def parse_decision(cls, decision_str) -> moderator.ModerationDecision:
if decision_str == "delete":
decision = moderator.ModerationDecision.DELETE_MSG
elif decision_str == "timeout":
decision = moderator.ModerationDecision.TIMEOUT_USER
else:
print("WARNING: %s moderator's decision is invalid, it has been deactivated!")
decision = moderator.ModerationDecision.ABSTAIN
return decision

def find_command(self, command: str) -> Union[None, Command]:
if not command.startswith(self.command_prefix):
return None


+ 83
- 6
_twitchbot/moderator.py View File

@@ -18,6 +18,9 @@
from abc import ABC, abstractmethod
from enum import Enum
from typing import Union
from datetime import datetime, timedelta

EPOCH = datetime(1970, 1, 1)


class ModerationDecision(Enum):
@@ -30,31 +33,38 @@ class Moderator(ABC):
message: str
decision: ModerationDecision

def __init__(self, message: str, timeout_duration: Union[None, int]):
def __init__(self, message: str, decision: ModerationDecision, timeout_duration: Union[None, int]):
self.message = message
self.timeout_duration = timeout_duration
self.decision = decision

@abstractmethod
def get_name(self) -> str:
pass

@abstractmethod
def vote(self, msg) -> ModerationDecision:
def vote(self, msg: str, author: str) -> ModerationDecision:
pass


class CapsLockModerator(Moderator):
def __init__(self, message: str, min_size: int, threshold: int, decision: ModerationDecision, timeout_duration: Union[None, int]):
super().__init__(message, timeout_duration)
def __init__(
self,
message: str,
decision: ModerationDecision,
timeout_duration: Union[None, int],
min_size: int,
threshold: int
):
super().__init__(message, decision, timeout_duration)

self.min_size = min_size
self.threshold = threshold / 100
self.decision = decision

def get_name(self) -> str:
return 'Caps Lock'

def vote(self, msg: str) -> ModerationDecision:
def vote(self, msg: str, author: str) -> ModerationDecision:
msg = ''.join(filter(str.isalpha, msg))

if len(msg) < self.min_size:
@@ -71,3 +81,70 @@ class CapsLockModerator(Moderator):
return self.decision

return ModerationDecision.ABSTAIN


class FloodModerator(Moderator):
def __init__(
self,
message: str,
decision: ModerationDecision,
timeout_duration: Union[None, int],
max_word_length: Union[None, int],
raid_cooldown: Union[None, int],
ignore_hashtags: bool,
max_msg_occurrences: Union[None, int],
min_time_between_occurrence: Union[None, int]
):
super().__init__(message, decision, timeout_duration)
self.max_word_length = max_word_length
self.raid_cooldown = raid_cooldown
self.last_raid = EPOCH
self.ignore_hashtags = ignore_hashtags
self.max_msg_occurrences = max_msg_occurrences
self.min_time_between_occurrence = min_time_between_occurrence
self.last_msgs = []

def get_name(self) -> str:
return 'Flood'

def vote(self, msg: str, author: str) -> ModerationDecision:
if self.raid_cooldown is not None and self.last_raid + timedelta(minutes=self.raid_cooldown) > datetime.now():
return ModerationDecision.ABSTAIN

if self.max_word_length is not None:
for word in msg.split(' '):
if word.startswith('#'):
continue
if len(word) > self.max_word_length:
return ModerationDecision.TIMEOUT_USER

if self.max_msg_occurrences is None or self.min_time_between_occurrence is None:
return ModerationDecision.ABSTAIN

clean_msg = None
for last_msg in self.last_msgs:
if last_msg['first-occurrence'] + timedelta(seconds=self.min_time_between_occurrence) <= datetime.now():
clean_msg = last_msg
break

if author != last_msg['author'] or msg != last_msg['message']:
break

last_msg['occurrences'] += 1
if last_msg['occurrences'] >= self.max_msg_occurrences:
return ModerationDecision.TIMEOUT_USER

if clean_msg is not None:
self.last_msgs.remove(clean_msg)

self.last_msgs.append({
'first-occurrence': datetime.now(),
'author': author,
'message': msg,
'occurrences': 1
})

return ModerationDecision.ABSTAIN

def declare_raid(self):
self.last_raid = datetime.now()

+ 20
- 0
_twitchbot/twitch_message.py View File

@@ -0,0 +1,20 @@
# Twason - The KISS Twitch bot
# Copyright (C) 2021 Jérôme Deuchnord
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.

from irc3.rfc import raw

USERNOTICE = r'^(@(?P<tags>\S+) )?:(?P<mask>\S+) (?P<event>(USERNOTICE)) (?P<target>\S+)$'


+ 21
- 5
_twitchbot/twitchbot.py View File

@@ -17,13 +17,15 @@

import irc3

from random import shuffle
from datetime import datetime, timedelta

from . import utils

from .config import TimerStrategy
from .moderator import ModerationDecision, Moderator
from .moderator import ModerationDecision, Moderator, FloodModerator
from . import twitch_message

from random import shuffle
from datetime import datetime, timedelta

config = None

@@ -55,6 +57,10 @@ class TwitchBot:

return in_str

# @irc3.event(r'^(?P<data>.+)$')
# def on_all(self, data):
# print(data)

@irc3.event(irc3.rfc.PRIVMSG)
def on_msg(self, mask: str = None, target: str = None, data: str = None, tags: str = None, **_):
author = mask.split('!')[0]
@@ -70,6 +76,17 @@ class TwitchBot:
self.nb_messages_since_timer += 1
self.play_timer()

@irc3.event(twitch_message.USERNOTICE)
def on_user_notice(self, tags: str, **_):
tags = utils.parse_tags(tags)
if tags.get('msg-id', None) == 'raid':
# Notice the Flood moderator a raid has just happened
for moderator in self.config.moderators:
if isinstance(moderator, FloodModerator) and moderator.raid_cooldown is not None:
print("Raid received from %s. Disabling the Flood moderator." % tags.get('display-name'))
moderator.declare_raid()
break

def play_timer(self):
if not self.messages_stack:
print('Filling the timer messages stack in')
@@ -91,7 +108,6 @@ class TwitchBot:
self.last_timer_date = datetime.now()

def moderate(self, tags: {str: str}, msg: str, author: str, channel: str):
print(tags)
def delete_msg(mod: Moderator):
print("[DELETE (reason: %s)] %s: %s" % (mod.get_name(), author, msg))
self.bot.privmsg(
@@ -130,7 +146,7 @@ class TwitchBot:
message_to_moderate = message_to_moderate[:first - 1] + message_to_moderate[last + 1:]

for moderator in self.config.moderators:
vote = moderator.vote(message_to_moderate)
vote = moderator.vote(message_to_moderate, author)
if vote == ModerationDecision.ABSTAIN:
continue
if vote == ModerationDecision.DELETE_MSG:


Loading…
Cancel
Save