The KISS Twitch bot
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 

202 lines
6.0 KiB

  1. # Twason - The KISS Twitch bot
  2. # Copyright (C) 2021 Jérôme Deuchnord
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as
  6. # published by the Free Software Foundation, either version 3 of the
  7. # License, or (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from abc import ABC, abstractmethod
  17. from enum import Enum
  18. from typing import Union
  19. from datetime import datetime, timedelta
  20. from urlextract import URLExtract
  21. from fnmatch import fnmatch
  22. import re
  23. EPOCH = datetime(1970, 1, 1)
  24. class ModerationDecision(Enum):
  25. ABSTAIN = -1
  26. DELETE_MSG = 0
  27. TIMEOUT_USER = 1
  28. class Moderator(ABC):
  29. message: str
  30. decision: ModerationDecision
  31. def __init__(self, message: str, decision: ModerationDecision, timeout_duration: Union[None, int]):
  32. self.message = message
  33. self.timeout_duration = timeout_duration
  34. self.decision = decision
  35. @abstractmethod
  36. def get_name(self) -> str:
  37. pass
  38. @abstractmethod
  39. def vote(self, msg: str, author: str) -> ModerationDecision:
  40. pass
  41. class CapsLockModerator(Moderator):
  42. def __init__(
  43. self,
  44. message: str,
  45. decision: ModerationDecision,
  46. timeout_duration: Union[None, int],
  47. min_size: int,
  48. threshold: int
  49. ):
  50. super().__init__(message, decision, timeout_duration)
  51. self.min_size = min_size
  52. self.threshold = threshold / 100
  53. def get_name(self) -> str:
  54. return 'Caps Lock'
  55. def vote(self, msg: str, author: str) -> ModerationDecision:
  56. msg = ''.join(filter(str.isalpha, msg))
  57. if len(msg) < self.min_size:
  58. return ModerationDecision.ABSTAIN
  59. n = 0
  60. for char in msg:
  61. if char.strip() == '':
  62. continue
  63. if char == char.upper():
  64. n += 1
  65. if n / len(msg) >= self.threshold:
  66. return self.decision
  67. return ModerationDecision.ABSTAIN
  68. class FloodModerator(Moderator):
  69. def __init__(
  70. self,
  71. message: str,
  72. decision: ModerationDecision,
  73. timeout_duration: Union[None, int],
  74. max_word_length: Union[None, int],
  75. raid_cooldown: Union[None, int],
  76. ignore_hashtags: bool,
  77. max_msg_occurrences: Union[None, int],
  78. min_time_between_occurrence: Union[None, int]
  79. ):
  80. super().__init__(message, decision, timeout_duration)
  81. self.max_word_length = max_word_length
  82. self.raid_cooldown = raid_cooldown
  83. self.last_raid = EPOCH
  84. self.ignore_hashtags = ignore_hashtags
  85. self.max_msg_occurrences = max_msg_occurrences
  86. self.min_time_between_occurrence = min_time_between_occurrence
  87. self.last_msgs = []
  88. def get_name(self) -> str:
  89. return 'Flood'
  90. def vote(self, msg: str, author: str) -> ModerationDecision:
  91. if self.raid_cooldown is not None and self.last_raid + timedelta(minutes=self.raid_cooldown) > datetime.now():
  92. return ModerationDecision.ABSTAIN
  93. if self.max_word_length is not None:
  94. for word in msg.split(' '):
  95. if word.startswith('#'):
  96. continue
  97. if len(word) > self.max_word_length:
  98. return ModerationDecision.TIMEOUT_USER
  99. if self.max_msg_occurrences is None or self.min_time_between_occurrence is None:
  100. return ModerationDecision.ABSTAIN
  101. clean_msg = None
  102. for last_msg in self.last_msgs:
  103. if last_msg['first-occurrence'] + timedelta(seconds=self.min_time_between_occurrence) <= datetime.now():
  104. clean_msg = last_msg
  105. break
  106. if author != last_msg['author'] or msg != last_msg['message']:
  107. break
  108. last_msg['occurrences'] += 1
  109. if last_msg['occurrences'] >= self.max_msg_occurrences:
  110. return ModerationDecision.TIMEOUT_USER
  111. if clean_msg is not None:
  112. self.last_msgs.remove(clean_msg)
  113. self.last_msgs.append({
  114. 'first-occurrence': datetime.now(),
  115. 'author': author,
  116. 'message': msg,
  117. 'occurrences': 1
  118. })
  119. return ModerationDecision.ABSTAIN
  120. def declare_raid(self):
  121. self.last_raid = datetime.now()
  122. class LinksModerator(Moderator):
  123. def __init__(
  124. self,
  125. message: str,
  126. decision: ModerationDecision,
  127. timeout_duration: Union[None, int],
  128. authorized_urls: [str]
  129. ):
  130. super().__init__(message, decision, timeout_duration)
  131. self.authorized_urls = authorized_urls
  132. def get_name(self) -> str:
  133. return 'Link'
  134. def vote(self, msg: str, author: str) -> ModerationDecision:
  135. url_extractor = URLExtract()
  136. links = url_extractor.find_urls(msg)
  137. if len(links) == 0:
  138. return ModerationDecision.ABSTAIN
  139. if not self.are_urls_authorized(links):
  140. return self.decision
  141. return ModerationDecision.ABSTAIN
  142. def are_urls_authorized(self, links: [str]) -> bool:
  143. for link in links:
  144. is_link_authorized = False
  145. print(link)
  146. for pattern in self.authorized_urls:
  147. print(pattern)
  148. if dump(fnmatch(link, pattern)) or dump(fnmatch(f"http://{link}", pattern)) or dump(fnmatch(f"https://{link}", pattern)):
  149. is_link_authorized = True
  150. break
  151. if not is_link_authorized:
  152. return False
  153. return True
  154. def dump(what):
  155. print(what)
  156. return what