Explorar el Código

chore: fix lint

pull/7/head
Jérôme Deuchnord hace 6 meses
padre
commit
1f7e0149b2
Se han modificado 5 ficheros con 179 adiciones y 130 borrados
  1. +13
    -11
      twason/__main__.py
  2. +59
    -45
      twason/config.py
  3. +47
    -33
      twason/moderator.py
  4. +3
    -2
      twason/twitch_message.py
  5. +57
    -39
      twason/twitchbot.py

+ 13
- 11
twason/__main__.py Ver fichero

@@ -32,15 +32,17 @@ def main() -> int:
args = get_arguments()

twitchbot.config = get_config(args.config)
bot = irc3.IrcBot.from_config({
'nick': twitchbot.config.nickname,
'password': twitchbot.config.token,
'autojoins': [],
'host': TWITCH_IRC_SERVER,
'port': TWITCH_IRC_PORT,
'ssl': True,
'includes': [twitchbot.__name__]
})
bot = irc3.IrcBot.from_config(
{
"nick": twitchbot.config.nickname,
"password": twitchbot.config.token,
"autojoins": [],
"host": TWITCH_IRC_SERVER,
"port": TWITCH_IRC_PORT,
"ssl": True,
"includes": [twitchbot.__name__],
}
)

bot.run(forever=True)

@@ -49,10 +51,10 @@ def main() -> int:

def get_arguments():
parser = argparse.ArgumentParser()
parser.add_argument('--config', '-c', type=str, default='config.json')
parser.add_argument("--config", "-c", type=str, default="config.json")

return parser.parse_args()


if __name__ == '__main__':
if __name__ == "__main__":
exit(main())

+ 59
- 45
twason/config.py Ver fichero

@@ -30,7 +30,9 @@ class Command:
aliases: [str]
disabled: bool

def __init__(self, name: str, message: str, aliases: [str] = None, disabled: bool = False):
def __init__(
self, name: str, message: str, aliases: [str] = None, disabled: bool = False
):
self.name = name
self.message = message
self.aliases = aliases if aliases is not None else []
@@ -39,10 +41,10 @@ class Command:
@classmethod
def from_dict(cls, params: dict):
return Command(
params.get('name'),
params['message'],
params.get('aliases', []),
params.get('disabled', False)
params.get("name"),
params["message"],
params.get("aliases", []),
params.get("disabled", False),
)


@@ -62,8 +64,8 @@ class Timer:
time_between: int = 10,
msgs_between: int = 10,
strategy: TimerStrategy = TimerStrategy.ROUND_ROBIN,
pool: [Command] = None
):
pool: [Command] = None,
):
self.time_between = time_between
self.msgs_between = msgs_between
self.strategy = strategy
@@ -73,16 +75,16 @@ class Timer:
def from_dict(cls, param: dict):
pool = []

for c in param.get('pool', []):
for c in param.get("pool", []):
command = Command.from_dict(c)
if not command.disabled:
pool.append(command)

return Timer(
time_between=param.get('between', {}).get('time', 10),
msgs_between=param.get('between', {}).get('messages', 10),
strategy=TimerStrategy(param.get('strategy', 'round-robin')),
pool=pool
time_between=param.get("between", {}).get("time", 10),
msgs_between=param.get("between", {}).get("messages", 10),
strategy=TimerStrategy(param.get("strategy", "round-robin")),
pool=pool,
)


@@ -103,7 +105,7 @@ class Config:
command_prefix: str,
commands: [Command],
timer: Timer,
moderators: [moderator.Moderator]
moderators: [moderator.Moderator],
):
self.nickname = nickname
self.channel = channel
@@ -115,14 +117,14 @@ class Config:

@classmethod
def from_dict(cls, params: dict, token: str):
timer = Timer.from_dict(params.get('timer', {}))
timer = Timer.from_dict(params.get("timer", {}))

commands_prefix = params.get('command_prefix', '!')
commands_prefix = params.get("command_prefix", "!")
commands = []

help_command = Command("help", "Voici les commandes disponibles : ")

for command in params.get('commands', []):
for command in params.get("commands", []):
command = Command.from_dict(command)

if command.disabled:
@@ -137,43 +139,53 @@ class Config:
commands.append(command)

moderators = []
for mod in params.get('moderator', []):
moderator_config = params['moderator'][mod]
if mod == 'caps-lock':
moderators.append(moderator.CapsLockModerator(
moderator_config.get("message", "{author}, stop the caps lock!"),
cls.parse_decision(moderator_config.get("decision", "delete")),
moderator_config.get("duration", None),
moderator_config.get("min-size", 5),
moderator_config.get("threshold", 50)
))
if mod == 'flood':
moderators.append(moderator.FloodModerator(
moderator_config.get("message", "{author}, stop the flood!"),
cls.parse_decision(moderator_config.get("decision", "timeout")),
moderator_config.get("duration", None),
moderator_config.get("max-word-length", None),
moderator_config.get("raid-cooldown", None),
moderator_config.get("ignore-hashtags", False),
moderator_config.get("max-msg-occurrences", None),
moderator_config.get("min-time-between-occurrence", None)
))
for mod in params.get("moderator", []):
moderator_config = params["moderator"][mod]
if mod == "caps-lock":
moderators.append(
moderator.CapsLockModerator(
moderator_config.get(
"message", "{author}, stop the caps lock!"
),
cls.parse_decision(moderator_config.get("decision", "delete")),
moderator_config.get("duration", None),
moderator_config.get("min-size", 5),
moderator_config.get("threshold", 50),
)
)
if mod == "flood":
moderators.append(
moderator.FloodModerator(
moderator_config.get("message", "{author}, stop the flood!"),
cls.parse_decision(moderator_config.get("decision", "timeout")),
moderator_config.get("duration", None),
moderator_config.get("max-word-length", None),
moderator_config.get("raid-cooldown", None),
moderator_config.get("ignore-hashtags", False),
moderator_config.get("max-msg-occurrences", None),
moderator_config.get("min-time-between-occurrence", None),
)
)

# Generate help command
if params.get('help', True):
if params.get("help", True):
for command in commands:
help_command.message = "%s %s%s" % (help_command.message, commands_prefix, command.name)
help_command.message = "%s %s%s" % (
help_command.message,
commands_prefix,
command.name,
)

commands.append(help_command)

return Config(
params.get('channel'),
params.get('nickname'),
params.get("channel"),
params.get("nickname"),
token,
commands_prefix,
commands,
timer,
moderators
moderators,
)

@classmethod
@@ -183,7 +195,9 @@ class Config:
elif decision_str == "timeout":
decision = moderator.ModerationDecision.TIMEOUT_USER
else:
print("WARNING: %s moderator's decision is invalid, it has been deactivated!")
print(
"WARNING: %s moderator's decision is invalid, it has been deactivated!"
)
decision = moderator.ModerationDecision.ABSTAIN
return decision

@@ -201,6 +215,6 @@ class Config:


def get_config(file_path: str):
with open(file_path, 'r') as config_file:
token = environ['TWITCH_TOKEN']
with open(file_path, "r") as config_file:
token = environ["TWITCH_TOKEN"]
return Config.from_dict(json.loads(config_file.read()), token)

+ 47
- 33
twason/moderator.py Ver fichero

@@ -33,7 +33,12 @@ class Moderator(ABC):
message: str
decision: ModerationDecision

def __init__(self, message: str, decision: ModerationDecision, timeout_duration: Union[None, int]):
def __init__(
self,
message: str,
decision: ModerationDecision,
timeout_duration: Union[None, int],
):
self.message = message
self.timeout_duration = timeout_duration
self.decision = decision
@@ -49,12 +54,12 @@ class Moderator(ABC):

class CapsLockModerator(Moderator):
def __init__(
self,
message: str,
decision: ModerationDecision,
timeout_duration: Union[None, int],
min_size: int,
threshold: int
self,
message: str,
decision: ModerationDecision,
timeout_duration: Union[None, int],
min_size: int,
threshold: int,
):
super().__init__(message, decision, timeout_duration)

@@ -62,17 +67,17 @@ class CapsLockModerator(Moderator):
self.threshold = threshold / 100

def get_name(self) -> str:
return 'Caps Lock'
return "Caps Lock"

def vote(self, msg: str, author: str) -> ModerationDecision:
msg = ''.join(filter(str.isalpha, msg))
msg = "".join(filter(str.isalpha, msg))

if len(msg) < self.min_size:
return ModerationDecision.ABSTAIN

n = 0
for char in msg:
if char.strip() == '':
if char.strip() == "":
continue
if char == char.upper():
n += 1
@@ -85,15 +90,15 @@ class CapsLockModerator(Moderator):

class FloodModerator(Moderator):
def __init__(
self,
message: str,
decision: ModerationDecision,
timeout_duration: Union[None, int],
max_word_length: Union[None, int],
raid_cooldown: Union[None, int],
ignore_hashtags: bool,
max_msg_occurrences: Union[None, int],
min_time_between_occurrence: Union[None, int]
self,
message: str,
decision: ModerationDecision,
timeout_duration: Union[None, int],
max_word_length: Union[None, int],
raid_cooldown: Union[None, int],
ignore_hashtags: bool,
max_msg_occurrences: Union[None, int],
min_time_between_occurrence: Union[None, int],
):
super().__init__(message, decision, timeout_duration)
self.max_word_length = max_word_length
@@ -105,15 +110,18 @@ class FloodModerator(Moderator):
self.last_msgs = []

def get_name(self) -> str:
return 'Flood'
return "Flood"

def vote(self, msg: str, author: str) -> ModerationDecision:
if self.raid_cooldown is not None and self.last_raid + timedelta(minutes=self.raid_cooldown) > datetime.now():
if (
self.raid_cooldown is not None
and self.last_raid + timedelta(minutes=self.raid_cooldown) > datetime.now()
):
return ModerationDecision.ABSTAIN

if self.max_word_length is not None:
for word in msg.split(' '):
if word.startswith('#'):
for word in msg.split(" "):
if word.startswith("#"):
continue
if len(word) > self.max_word_length:
return ModerationDecision.TIMEOUT_USER
@@ -123,26 +131,32 @@ class FloodModerator(Moderator):

clean_msg = None
for last_msg in self.last_msgs:
if last_msg['first-occurrence'] + timedelta(seconds=self.min_time_between_occurrence) <= datetime.now():
if (
last_msg["first-occurrence"]
+ timedelta(seconds=self.min_time_between_occurrence)
<= datetime.now()
):
clean_msg = last_msg
break

if author != last_msg['author'] or msg != last_msg['message']:
if author != last_msg["author"] or msg != last_msg["message"]:
break

last_msg['occurrences'] += 1
if last_msg['occurrences'] >= self.max_msg_occurrences:
last_msg["occurrences"] += 1
if last_msg["occurrences"] >= self.max_msg_occurrences:
return ModerationDecision.TIMEOUT_USER

if clean_msg is not None:
self.last_msgs.remove(clean_msg)

self.last_msgs.append({
'first-occurrence': datetime.now(),
'author': author,
'message': msg,
'occurrences': 1
})
self.last_msgs.append(
{
"first-occurrence": datetime.now(),
"author": author,
"message": msg,
"occurrences": 1,
}
)

return ModerationDecision.ABSTAIN



+ 3
- 2
twason/twitch_message.py Ver fichero

@@ -16,5 +16,6 @@

from irc3.rfc import raw

USERNOTICE = r'^(@(?P<tags>\S+) )?:(?P<mask>\S+) (?P<event>(USERNOTICE)) (?P<target>\S+)$'

USERNOTICE = (
r"^(@(?P<tags>\S+) )?:(?P<mask>\S+) (?P<event>(USERNOTICE)) (?P<target>\S+)$"
)

+ 57
- 39
twason/twitchbot.py Ver fichero

@@ -41,19 +41,19 @@ class TwitchBot:
self.nb_messages_since_timer = 0

def connection_made(self):
print('connected')
print("connected")

def server_ready(self):
print('ready')
print("ready")

def connection_lost(self):
print('connection lost')
print("connection lost")

@staticmethod
def _parse_variables(in_str: str, **kwargs):
for key in kwargs:
value = kwargs[key]
in_str = in_str.replace('{%s}' % key, value)
in_str = in_str.replace("{%s}" % key, value)

return in_str

@@ -62,15 +62,24 @@ class TwitchBot:
# print(data)

@irc3.event(irc3.rfc.PRIVMSG)
def on_msg(self, mask: str = None, target: str = None, data: str = None, tags: str = None, **_):
author = mask.split('!')[0]
command = self.config.find_command(data.lower().split(' ')[0])
def on_msg(
self,
mask: str = None,
target: str = None,
data: str = None,
tags: str = None,
**_
):
author = mask.split("!")[0]
command = self.config.find_command(data.lower().split(" ")[0])
tags_dict = utils.parse_tags(tags)

if command is not None:
print('%s: %s%s' % (author, self.config.command_prefix, command.name))
self.bot.privmsg(target, self._parse_variables(command.message, author=author))
elif tags_dict.get('mod') == '0':
print("%s: %s%s" % (author, self.config.command_prefix, command.name))
self.bot.privmsg(
target, self._parse_variables(command.message, author=author)
)
elif tags_dict.get("mod") == "0":
self.moderate(tags_dict, data, author, target)

self.nb_messages_since_timer += 1
@@ -79,30 +88,39 @@ class TwitchBot:
@irc3.event(twitch_message.USERNOTICE)
def on_user_notice(self, tags: str, **_):
tags = utils.parse_tags(tags)
if tags.get('msg-id', None) == 'raid':
if tags.get("msg-id", None) == "raid":
# Notice the Flood moderator a raid has just happened
for moderator in self.config.moderators:
if isinstance(moderator, FloodModerator) and moderator.raid_cooldown is not None:
print("Raid received from %s. Disabling the Flood moderator." % tags.get('display-name'))
if (
isinstance(moderator, FloodModerator)
and moderator.raid_cooldown is not None
):
print(
"Raid received from %s. Disabling the Flood moderator."
% tags.get("display-name")
)
moderator.declare_raid()
break

def play_timer(self):
if not self.messages_stack:
print('Filling the timer messages stack in')
print("Filling the timer messages stack in")
self.messages_stack = self.config.timer.pool.copy()
if self.config.timer.strategy == TimerStrategy.SHUFFLE:
print('Shuffle!')
print("Shuffle!")
shuffle(self.messages_stack)

if self.nb_messages_since_timer < self.config.timer.msgs_between or \
datetime.now() < self.last_timer_date + timedelta(minutes=self.config.timer.time_between):
if (
self.nb_messages_since_timer < self.config.timer.msgs_between
or datetime.now()
< self.last_timer_date + timedelta(minutes=self.config.timer.time_between)
):
return

command = self.messages_stack.pop(0)

print("Timer: %s" % command.message)
self.bot.privmsg('#%s' % self.config.channel, command.message)
self.bot.privmsg("#%s" % self.config.channel, command.message)

self.nb_messages_since_timer = 0
self.last_timer_date = datetime.now()
@@ -110,40 +128,41 @@ class TwitchBot:
def moderate(self, tags: {str: str}, msg: str, author: str, channel: str):
def delete_msg(mod: Moderator):
print("[DELETE (reason: %s)] %s: %s" % (mod.get_name(), author, msg))
self.bot.privmsg(
channel,
"/delete %s" % tags['id']
)
self.bot.privmsg(channel, "/delete %s" % tags["id"])

def timeout(mod: Moderator):
print("[TIMEOUT (reason: %s)] %s: %s" % (mod.get_name(), author, msg))
self.bot.privmsg(
channel,
"/timeout %s %d %s" % (
"/timeout %s %d %s"
% (
author,
mod.timeout_duration,
self._parse_variables(mod.message, author=author)
)
self._parse_variables(mod.message, author=author),
),
)

# Ignore emotes-only messages
if tags.get('emote-only', '0') == '1':
if tags.get("emote-only", "0") == "1":
return

message_to_moderate = msg

# Remove emotes from message before moderating
for emote in tags.get('emotes', '').split('/'):
if emote == '':
for emote in tags.get("emotes", "").split("/"):
if emote == "":
break

for indices in emote.split(':')[1].split(','):
[first, last] = indices.split('-')
for indices in emote.split(":")[1].split(","):
[first, last] = indices.split("-")
first, last = int(first), int(last)
if first == 0:
message_to_moderate = message_to_moderate[last + 1:]
message_to_moderate = message_to_moderate[last + 1 :]
else:
message_to_moderate = message_to_moderate[:first - 1] + message_to_moderate[last + 1:]
message_to_moderate = (
message_to_moderate[: first - 1]
+ message_to_moderate[last + 1 :]
)

for moderator in self.config.moderators:
vote = moderator.vote(message_to_moderate, author)
@@ -154,19 +173,18 @@ class TwitchBot:
if vote == ModerationDecision.TIMEOUT_USER:
timeout(moderator)

self.bot.privmsg(channel, self._parse_variables(moderator.message, author=author))
self.bot.privmsg(
channel, self._parse_variables(moderator.message, author=author)
)
break

@irc3.event(irc3.rfc.JOIN)
def on_join(self, mask, channel, **_):
print('JOINED %s as %s' % (channel, mask))
print("JOINED %s as %s" % (channel, mask))

@irc3.event(irc3.rfc.CONNECTED)
def on_connected(self, **_):
for line in [
"CAP REQ :twitch.tv/commands",
"CAP REQ :twitch.tv/tags"
]:
for line in ["CAP REQ :twitch.tv/commands", "CAP REQ :twitch.tv/tags"]:
self.bot.send_line(line)

self.bot.join('#%s' % self.config.channel)
self.bot.join("#%s" % self.config.channel)

Cargando…
Cancelar
Guardar