python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Python敏感词检测方案

Python实现敏感词检测的多种方案

作者:llm大模型算法工程师weng

在互联网内容审核、社交平台监管、评论系统过滤等场景中,敏感词检测是一项必不可少的功能,本文将详细介绍几种主流的实现方式,并分析各自的优缺点及适用场景,需要的朋友可以参考下

一、引言

在互联网内容审核、社交平台监管、评论系统过滤等场景中,敏感词检测是一项必不可少的功能。Python凭借其丰富的生态和简洁的语法,提供了多种实现敏感词检测的方案。本文将详细介绍几种主流的实现方式,并分析各自的优缺点及适用场景。

二、基础方案:关键词匹配

2.1 直接遍历匹配

最简单的实现方式是使用列表存储敏感词,然后遍历检测:

class SimpleSensitiveWordFilter:
    def __init__(self):
        self.sensitive_words = []
    def add_words(self, words):
        self.sensitive_words.extend(words)
    def contains_sensitive_word(self, text):
        for word in self.sensitive_words:
            if word in text:
                return True
        return False
    def replace_sensitive_words(self, text, replace_char='*'):
        result = text
        for word in self.sensitive_words:
            result = result.replace(word, replace_char * len(word))
        return result

优点:实现简单,易于理解
缺点:效率低,时间复杂度O(n*m),无法处理重叠匹配

2.2 正则表达式匹配

使用正则表达式可以更灵活地匹配敏感词:

import re
class RegexSensitiveWordFilter:
    def __init__(self):
        self.pattern = None
    def add_words(self, words):
        # 使用正则的 | 连接所有敏感词
        pattern_str = '|'.join(re.escape(word) for word in words)
        self.pattern = re.compile(pattern_str, re.IGNORECASE)
    def contains_sensitive_word(self, text):
        return bool(self.pattern.search(text))
    def find_all_sensitive_words(self, text):
        return self.pattern.findall(text)
    def replace_sensitive_words(self, text, replace_char='*'):
        def replace_func(match):
            return replace_char * len(match.group())
        return self.pattern.sub(replace_func, text)

优点:支持复杂匹配规则,代码简洁
缺点:敏感词数量大时性能下降,正则表达式编译开销较大

三、进阶方案:前缀树(Trie树)

3.1 原理介绍

Trie树(前缀树)是一种专门用于字符串快速匹配的树形数据结构。它的核心思想是利用字符串的公共前缀来减少不必要的比较,实现O(n)的时间复杂度。

3.2 实现代码

class TrieNode:
    __slots__ = ('children', 'is_end')
    def __init__(self):
        self.children = {}
        self.is_end = False
class TrieSensitiveWordFilter:
    def __init__(self):
        self.root = TrieNode()
    def add_word(self, word):
        """添加单个敏感词"""
        node = self.root
        for char in word:
            if char not in node.children:
                node.children[char] = TrieNode()
            node = node.children[char]
        node.is_end = True
    def add_words(self, words):
        """批量添加敏感词"""
        for word in words:
            self.add_word(word)
    def contains_sensitive_word(self, text):
        """检测是否包含敏感词"""
        for i in range(len(text)):
            node = self.root
            j = i
            while j < len(text) and text[j] in node.children:
                node = node.children[text[j]]
                if node.is_end:
                    return True
                j += 1
        return False
    def find_all_sensitive_words(self, text):
        """找出所有敏感词及位置"""
        results = []
        for i in range(len(text)):
            node = self.root
            j = i
            while j < len(text) and text[j] in node.children:
                node = node.children[text[j]]
                if node.is_end:
                    results.append({
                        'word': text[i:j+1],
                        'start': i,
                        'end': j
                    })
                j += 1
        return results
    def replace_sensitive_words(self, text, replace_char='*'):
        """替换敏感词"""
        sensitive_positions = self.find_all_sensitive_words(text)
        if not sensitive_positions:
            return text
        result = list(text)
        for pos in sensitive_positions:
            for k in range(pos['start'], pos['end'] + 1):
                result[k] = replace_char
        return ''.join(result)

3.3 优化版:AC自动机

AC自动机(Aho-Corasick automaton)在Trie树的基础上增加了失败指针,实现多模式串的高效匹配:

from collections import deque
class AhoCorasickNode:
    __slots__ = ('children', 'fail', 'output')
    def __init__(self):
        self.children = {}
        self.fail = None
        self.output = []  # 存储以此节点结尾的敏感词
class AhoCorasickFilter:
    def __init__(self):
        self.root = AhoCorasickNode()
        self._built = False
    def add_word(self, word):
        """添加敏感词"""
        node = self.root
        for char in word:
            if char not in node.children:
                node.children[char] = AhoCorasickNode()
            node = node.children[char]
        node.output.append(word)
        self._built = False
    def add_words(self, words):
        for word in words:
            self.add_word(word)
    def _build_fail_pointers(self):
        """构建失败指针(BFS)"""
        queue = deque()
        # 初始化第一层节点的失败指针
        for char, child in self.root.children.items():
            child.fail = self.root
            queue.append(child)
        while queue:
            current = queue.popleft()
            for char, child in current.children.items():
                queue.append(child)
                # 寻找失败指针
                fail_node = current.fail
                while fail_node is not None and char not in fail_node.children:
                    fail_node = fail_node.fail
                if fail_node is None:
                    child.fail = self.root
                else:
                    child.fail = fail_node.children[char]
                    # 合并输出
                    child.output.extend(child.fail.output)
    def search(self, text):
        """搜索文本中的所有敏感词"""
        if not self._built:
            self._build_fail_pointers()
            self._built = True
        result = []
        node = self.root
        for i, char in enumerate(text):
            # 沿着失败指针查找匹配
            while node is not self.root and char not in node.children:
                node = node.fail
            if char in node.children:
                node = node.children[char]
            else:
                node = self.root
            # 收集匹配结果
            for word in node.output:
                result.append({
                    'word': word,
                    'position': i - len(word) + 1
                })
        return result
    def contains_sensitive_word(self, text):
        return len(self.search(text)) > 0
    def replace_sensitive_words(self, text, replace_char='*'):
        matches = self.search(text)
        if not matches:
            return text
        result = list(text)
        for match in matches:
            start = match['position']
            end = start + len(match['word'])
            for i in range(start, end):
                result[i] = replace_char
        return ''.join(result)

四、第三方库方案

4.1 使用better_profanity

from better_profanity import profanity
# 初始化
profanity.load_censor_words()
# 检测
text = "You are a fool"
if profanity.contains_profanity(text):
    print("包含敏感词")
# 替换
censored_text = profanity.censor(text)
print(censored_text)  # You are a ****
# 自定义敏感词库
custom_badwords = ['badword1', 'badword2']
profanity.load_censor_words(custom_badwords)

4.2 使用ahocorasick库

import ahocorasick
class PyAhocorasickFilter:
    def __init__(self):
        self.automaton = ahocorasick.Automaton()
        self._built = False
    def add_words(self, words):
        for idx, word in enumerate(words):
            self.automaton.add_word(word, (idx, word))
        self._built = False
    def build(self):
        self.automaton.make_automaton()
        self._built = True
    def search(self, text):
        if not self._built:
            self.build()
        result = []
        for end_index, (idx, word) in self.automaton.iter(text):
            start_index = end_index - len(word) + 1
            result.append({
                'word': word,
                'start': start_index,
                'end': end_index
            })
        return result

4.3 其他相关库

库名特点适用场景
profanity-check基于机器学习需要语义理解
alt-profanity-check轻量级简单场景
ngram-profanity支持模糊匹配变形词检测

五、高级特性实现

5.1 支持跳过干扰字符

class AdvancedFilter:
    def __init__(self, skip_chars=None):
        self.skip_chars = skip_chars or {' ', '-', '_', '.'}
        self.trie_filter = TrieSensitiveWordFilter()
    def _normalize(self, text):
        """标准化文本,去除干扰字符"""
        return ''.join(c for c in text if c not in self.skip_chars)
    def contains_sensitive_word(self, text):
        normalized = self._normalize(text)
        return self.trie_filter.contains_sensitive_word(normalized)

5.2 支持拼音/谐音检测

# 使用 pypinyin 库处理拼音
from pypinyin import lazy_pinyin
class PinyinSensitiveFilter:
    def __init__(self):
        self.pinyin_map = {}
    def add_word(self, word):
        # 同时添加中文和拼音形式的敏感词
        self.trie_filter.add_word(word)
        pinyin = ''.join(lazy_pinyin(word))
        self.trie_filter.add_word(pinyin)

5.3 支持英文大小写/变体处理

class CaseInsensitiveFilter:
    def __init__(self):
        self.trie_filter = TrieSensitiveWordFilter()
    def add_word(self, word):
        # 添加小写版本
        self.trie_filter.add_word(word.lower())
    def contains_sensitive_word(self, text):
        return self.trie_filter.contains_sensitive_word(text.lower())

六、性能对比

方案时间复杂度空间复杂度1000词响应时间适用场景
直接遍历O(n*m)O(m)~50ms小词库
正则表达式O(n*m)O(m)~30ms中等词库
Trie树O(n)O(m*k)~5ms大词库
AC自动机O(n)O(m*k)~3ms超大词库
better_profanityO(n)O(m)~2ms通用场景

注:n为文本长度,m为敏感词数量,k为平均词长

七、完整实战示例

class SensitiveWordDetectionSystem:
    """完整的敏感词检测系统"""
    def __init__(self, word_file=None, skip_chars=None):
        self.filter = AhoCorasickFilter()
        self.skip_chars = skip_chars or {' ', '.', '-', '_', '*', '#'}
        if word_file:
            self.load_words_from_file(word_file)
    def load_words_from_file(self, file_path):
        """从文件加载敏感词库"""
        with open(file_path, 'r', encoding='utf-8') as f:
            words = [line.strip() for line in f if line.strip()]
        self.filter.add_words(words)
    def preprocess_text(self, text):
        """预处理文本(移除干扰字符)"""
        # 可选:移除干扰字符
        # text = ''.join(c for c in text if c not in self.skip_chars)
        # 可选:转小写
        text = text.lower()
        return text
    def detect(self, text):
        """检测文本"""
        processed = self.preprocess_text(text)
        return self.filter.search(processed)
    def audit(self, text, strict=True):
        """内容审核"""
        matches = self.detect(text)
        result = {
            'is_safe': len(matches) == 0,
            'sensitive_words': [m['word'] for m in matches],
            'count': len(matches),
            'suggested_action': 'block' if strict and len(matches) > 0 else 'review'
        }
        return result
    def censor(self, text, replace_char='*'):
        """敏感词脱敏"""
        processed = self.preprocess_text(text)
        matches = self.filter.search(processed)
        if not matches:
            return text
        # 需要映射回原始文本的位置
        result = list(text)
        # 简化版:直接替换(实际应用需要处理位置映射)
        for match in matches:
            # 这里简化处理
            pass
        return self.filter.replace_sensitive_words(processed, replace_char)
# 使用示例
if __name__ == "__main__":
    # 初始化系统
    system = SensitiveWordDetectionSystem()
    system.filter.add_words(['敏感词', '不良信息', '违规内容'])
    # 测试文本
    test_texts = [
        "这是一条正常的消息",
        "这里包含敏感词,需要处理",
        "用户发送了不良信息内容"
    ]
    for text in test_texts:
        result = system.audit(text)
        print(f"文本: {text}")
        print(f"审核结果: {result}")
        print("-" * 50)

八、敏感词库管理建议

8.1 词库分类

sensitive_words/
├── political.txt     # 政治敏感
├── porn.txt          # 色情低俗
├── violence.txt      # 暴力恐怖
├── abuse.txt         # 人身攻击
└── spam.txt          # 垃圾广告

8.2 词库维护策略

  1. 定期更新:建立词库更新机制
  2. 分级管理:按敏感程度分级处理
  3. 白名单机制:支持误杀词的快速恢复
  4. 版本控制:记录词库变更历史

九、总结与建议

9.1 方案选择指南

9.2 注意事项

  1. 性能优化:使用缓存、异步处理、批量检测
  2. 误杀控制:建立白名单和人工审核机制
  3. 编码问题:统一使用UTF-8编码
  4. 变形词:需要考虑拼音、谐音、数字替代等
  5. 多语言:中英文混合检测需要特殊处理

9.3 扩展方向

通过合理选择和优化敏感词检测方案,可以有效保障平台内容安全,同时保持良好的用户体验。

以上就是Python实现敏感词检测的多种方案的详细内容,更多关于Python敏感词检测方案的资料请关注脚本之家其它相关文章!

您可能感兴趣的文章:
阅读全文