python

关注公众号 jb51net

关闭
首页 > 脚本专栏 > python > Pytorch数英验证码

教你如何用Pytorch搭建数英混合验证码图片识别模型

作者:西北一条虫

大家都知道checkpoints存放的是模型文件,data存放的是数据集,本文给大家分享如何利用Pytorch搭建数英混合验证码图片识别模型包括普通卷积模块,深度可分离卷积模块,空间通道注意力模块,残差模块,感兴趣的朋友跟随小编一起看看吧

项目结构如下

checkpoints存放的是模型文件,data存放的是数据集

一、数据集生成(create_data.py)

利用captcha模块,大小写26位字母和0-9十个数字共62个字符,以每个字符为开头、后三位字符随机选取的方式生成500张图片,一共大约62*500张图片数据集。

import os
import random
import sys
from captcha.image import ImageCaptcha
from tqdm import tqdm
# 用于生成验证码的字符集
content_eng = '0123456789QWERTYUIOPASDFGHJKLZXCVBNMqwertyuiopasdfghjklzxcvbnm'
content_numb = '0123456789'
char_set_eng = list(content_eng)
char_set_numb = list(content_numb)
# 验证码的长度,每个验证码由4个数字组成
CAPTCHA_LEN = 4
# 验证码图片的存放路径
CAPTCHA_IMAGE_PATH = 'data/numb'
CAPTCHA_IMAGE_ENG_PATH = 'data/en'
def create_captcha(captcha_text, path):
    image = ImageCaptcha()
    img = image.generate_image(captcha_text)
    ImageCaptcha.create_noise_dots(img, color='yellow', width=3, number=30)
    ImageCaptcha.create_noise_curve(img, color='blue')
    img.save(path)
# 生成英文和数字验证码图片
def generate_en_captcha_image(charSet=char_set_eng, captchaImgPath=CAPTCHA_IMAGE_ENG_PATH, numbs=500):
    k = 0
    total = 1
    char_list = list(charSet)
    char_dict = dict(zip(range(len(char_list)), char_list))
    charSetLen = len(charSet)
    if not os.path.exists(captchaImgPath):
        os.makedirs(captchaImgPath)
    for i in range(charSetLen):
        total += numbs
    for i in tqdm(range(charSetLen)):
        for _ in range(numbs):
            chars = random.choices(char_list, k=3)
            captcha_text = str(char_list[i]) + ''.join(chars)
            file_path = captchaImgPath + captcha_text + '.jpg'
            try:
                create_captcha(captcha_text, file_path)
            except:
                pass
            k += 1

二、数据预处理 (utils.py)

读取图片并灰度化,将图片长宽统一成 [60, 160],并进行数据增强

class CaptchaSet(Dataset):
    def __init__(self, mode='train', root_path='data/en', split_size=0.8, size=[60, 160], seed=666, char_set='en'):
        super(CaptchaSet, self).__init__()
        self.paths = os.listdir(root_path)
        random.seed(seed)
        random.shuffle(self.paths)
        self.images = [os.path.join(root_path, img) for img in self.paths]
        self.labels = [img.split('.')[0] for img in self.paths]
        if char_set == 'en':
            chars = '0123456789QWERTYUIOPASDFGHJKLZXCVBNMqwertyuiopasdfghjklzxcvbnm'
            self.char_list = list(chars)
        if char_set == 'numb':
            chars = '0123456789'
            self.char_list = list(chars)
        self.char_dict = dict(zip(self.char_list, range(len(self.char_list))))
        idxs = int(len(self.images)*split_size)
        if mode == 'train':
            self.images = self.images[:idxs]
            self.labels = self.labels[:idxs]
        if mode == 'val':
            self.images = self.images[idxs:]
            self.labels = self.labels[idxs:]
        self.transform = transforms.Compose([
            lambda x: Image.open(x).convert('RGB'),
            transforms.Grayscale(),
            transforms.RandomRotation(0.1),
            transforms.RandomAffine(0.1),
            transforms.Resize(size),
            transforms.ToTensor(),
        ])
    def __getitem__(self, idx):
        img = self.images[idx]
        img = self.transform(img)
        label = self.labels[idx]
        label = [int(self.char_dict[i]) for i in label]
        # label = [int(i) for i in list(label)]
        label = torch.Tensor(label).long()
        return img, label
    def __len__(self):
        return len(self.images)

三、模型搭建 (models.py)

数据经过模型的输入输出形状如下

数据输入维度:[batchsize, 1, h, w] # h, w 代表图片的长和宽

数据输出维度:[batchsize, 4, n_classes] # n_classes 代码字符类别数量

模型中构造了普通卷积模块,深度可分离卷积模块,空间通道注意力模块,残差模块。

利用空间通道注意力学习字符的分布位置,最后直接输出每个字符的类别。

各个模块代码如下:

1)普通卷积模块

class ConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size=3, stride=1, padding=1):
        super(ConvBlock, self).__init__()
        self.sequential = nn.Sequential(
            nn.Conv2d(
                in_channels=in_ch,
                out_channels=out_ch,
                kernel_size=kernel_size,
                stride=stride,
                padding=padding),
            nn.InstanceNorm2d(out_ch),
            nn.ReLU(inplace=True)
        )
    def forward(self, x):
        x = self.sequential(x)
        return x

2) 深度可分离卷积模块

class DepthConv(nn.Module):
    def __init__(self, in_ch, kernel_size=3, stride=1, padding=1):
        super(DepthConv, self).__init__()
        self.depth_conv = nn.Conv2d(in_ch,
                                    in_ch,
                                    kernel_size,
                                    stride,
                                    padding,
                                    groups=in_ch,
                                    )
    def forward(self, x):
        x = self.depth_conv(x)
        return x
class DepthConvBlock(nn.Module):
    def __init__(self, in_ch, out_ch, kernel_size=3, stride=1, padding=1):
        super(DepthConvBlock, self).__init__()
        self.depth = DepthConv(in_ch,
                               kernel_size=kernel_size,
                               stride=stride,
                               padding=padding)
        self.sequential = nn.Sequential(
            nn.Conv2d(in_channels=in_ch,
                      out_channels=out_ch,
                      kernel_size=1,
                      stride=1,
                      padding=0),
            nn.InstanceNorm2d(out_ch),
            nn.ReLU(inplace=True)
        )
    def forward(self, x):
        x = self.depth(x)
        x = self.sequential(x)
        return x

3) 空间通道注意力模块:

class ChannelAttention(nn.Module):
    '''
    func: 实现通道Attention.
    parameters:
        in_channels: input的通道数, input.size = (batch,channel,w,h) if batch_first else (channel,batch,,w,h)
        reduction: 默认4. 即在FC的时,存在in_channels --> in_channels//reduction --> in_channels的转换
        batch_first: 默认True.如input为channel_first,则batch_first = False
    '''
    def __init__(self, in_channels, reduction=4, batch_first=True):
        super(ChannelAttention, self).__init__()
        self.batch_first = batch_first
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.max_pool = nn.AdaptiveMaxPool2d(1)
        self.sharedMLP = nn.Sequential(
            nn.Conv2d(in_channels, in_channels // reduction, kernel_size=1, bias=False),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels // reduction, in_channels, kernel_size=1, bias=False),
        )
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        if not self.batch_first:
            x = x.permute(1, 0, 2, 3)
        avgout = self.sharedMLP(self.avg_pool(x)) #size = (batch,in_channels,1,1)
        maxout = self.sharedMLP(self.max_pool(x)) #size = (batch,in_channels,1,1)
        w = self.sigmoid(avgout + maxout) #通道权重  size = (batch,in_channels,1,1)
        out = x * w.expand_as(x) #返回通道注意力后的值 size = (batch,in_channels,w,h)
        if not self.batch_first:
            out = out.permute(1, 0, 2, 3) #size = (channel,batch,w,h)
        return out
class SpatialAttention(nn.Module):
    '''
    func: 实现空间Attention.
    parameters:
        kernel_size: 卷积核大小, 可选3,5,7,
        batch_first: 默认True.如input为channel_first,则batch_first = False
    '''
    def __init__(self, kernel_size=3, batch_first = True):
        super(SpatialAttention, self).__init__()
        assert kernel_size in (3, 5, 7), "kernel size must be 3 or 7"
        padding = kernel_size // 2
        self.batch_first = batch_first
        self.conv = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
        self.sigmoid = nn.Sigmoid()
    def forward(self, x):
        if not self.batch_first:
            x = x.permute(1, 0, 2, 3)  #size = (batch,channels,w,h)
        avgout = torch.mean(x, dim=1, keepdim=True) #size = (batch,1,w,h)
        maxout, _ = torch.max(x, dim=1, keepdim=True)  #size = (batch,1,w,h)
        x1 = torch.cat([avgout, maxout], dim=1)    #size = (batch,2,w,h)
        x1 = self.conv(x1)    #size = (batch,1,w,h)
        w = self.sigmoid(x1)   #size = (batch,1,w,h)
        out = x * w            #size = (batch,channels,w,h)
        if not self.batch_first:
            out = out.permute(1, 0, 2, 3) #size = (channels,batch,w,h)
        return out
class CBAtten_Res(nn.Module):
    '''
    func:channel attention + spatial attention + resnet
    parameters:
        in_channels: input的通道数, input.size = (batch,in_channels,w,h) if batch_first else (in_channels,batch,,w,h);
        out_channels: 输出的通道数
        kernel_size: 默认3, 可选[3,5,7]
        stride: 默认2, 即改变out.size --> (batch,out_channels,w/stride, h/stride).
                一般情况下,out_channels = in_channels * stride
        reduction: 默认4. 即在通道atten的FC的时,存在in_channels --> in_channels//reduction --> in_channels的转换
        batch_first:默认True.如input为channel_first,则batch_first = False
    '''
    def __init__(self, in_channels, out_channels, kernel_size=3,
                 stride=2, reduction=4, batch_first=True):
        super(CBAtten_Res, self).__init__()
        self.batch_first = batch_first
        self.reduction = reduction
        self.padding = kernel_size // 2
        #h/2, w/2
        self.max_pool = nn.MaxPool2d(3, stride=stride, padding=self.padding)
        self.conv_res = nn.Conv2d(in_channels, out_channels,
                                  kernel_size=1,
                                  stride=1,
                                  bias=True)
        #h/2, w/2
        self.conv1 = nn.Conv2d(in_channels, out_channels,
                               kernel_size=kernel_size,
                               stride=stride,
                               padding=self.padding,
                               bias=True)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU(inplace=True)
        self.ca = ChannelAttention(out_channels, reduction=self.reduction,
                                   batch_first=self.batch_first)
        self.sa = SpatialAttention(kernel_size=kernel_size,
                                   batch_first=self.batch_first)
    def forward(self, x):
        if not self.batch_first:
            x = x.permute(1, 0, 2, 3)  #size = (batch,in_channels,w,h)
        residual = x
        out = self.conv1(x)   #size = (batch,out_channels,w/stride,h/stride)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.ca(out)
        out = self.sa(out)  #size = (batch,out_channels,w/stride,h/stride)
        residual = self.max_pool(residual)  #size = (batch,in_channels,w/stride,h/stride)
        residual = self.conv_res(residual)  #size = (batch,out_channels,w/stride,h/stride)
        out += residual                       #残差
        out = self.relu(out)                    #size = (batch,out_channels,w/stride,h/stride)
        if not self.batch_first:
            out = out.permute(1, 0, 2, 3)       #size = (out_channels,batch,w/stride,h/stride)
        return out

4) 残差模块

class IRBlock(nn.Module):
    """
    IRB残差块: ConvBlock, DepthWiseConv, InstanceNorm2d, LeakyReLU, Conv2d, InstanceNorm2d
    rate: 输入通道数乘以rate,要变换的通道数
    输入与输出维度保持不变
    """
    def __init__(self, in_ch, rate=2, kernel_size=1, stride=1, padding=0):
        super(IRBlock, self).__init__()
        res_ch = in_ch * rate
        self.conv1 = ConvBlock(in_ch, res_ch, kernel_size=kernel_size, stride=stride, padding=padding)
        self.dw1 = DepthConv(res_ch)
        self.sequential = nn.Sequential(
            nn.InstanceNorm2d(res_ch),
            nn.LeakyReLU(),
            nn.Conv2d(res_ch, in_ch, kernel_size=1, stride=1, padding=0),
            nn.InstanceNorm2d(in_ch)
        )
        self.down_conv = False
        if stride > 1:
            self.down_conv = nn.Conv2d(in_ch, in_ch, kernel_size=kernel_size, stride=stride, padding=padding)
    def forward(self, x):
        out = self.conv1(x)
        out = self.dw1(out)
        if self.down_conv:
            x = self.down_conv(x)
        out = self.sequential(out) + x
        return out

5)利用各个模块搭建模型

class Net1(nn.Module):
    def __init__(self, in_ch=1, out_ch=4, n_classes=10):
        super(Net1, self).__init__()
        self.sequential = nn.Sequential(
            ConvBlock(in_ch, 64, kernel_size=3, stride=1, padding=1),          # [b, 1, 160, 60]
            ConvBlock(64, 64, kernel_size=1, stride=1, padding=0),         # /2
            CBAtten_Res(64, 64, kernel_size=3, reduction=1, stride=2),
            ConvBlock(64, 128, kernel_size=3, stride=1, padding=1),
            DepthConvBlock(128, 128, kernel_size=1, stride=1, padding=0),
            ConvBlock(128, 128, kernel_size=3, stride=1, padding=1),         # /2
            CBAtten_Res(128, 128, kernel_size=3, reduction=1, stride=2),
            ConvBlock(128, 256, kernel_size=1, stride=1, padding=0),
            IRBlock(256, 2),
            IRBlock(256, 2),
            IRBlock(256, 2),
            IRBlock(256, 2),
            ConvBlock(256, 256, kernel_size=1, stride=1, padding=0),
            CBAtten_Res(256, 256, kernel_size=3, reduction=1, stride=2),
            ConvBlock(256, 512, kernel_size=3, stride=1, padding=1),
            DepthConvBlock(512, 512, kernel_size=1, stride=1, padding=0),
            CBAtten_Res(512, 512, kernel_size=3, reduction=1, stride=1),
        )
        self.avg = nn.AdaptiveMaxPool2d((6, 16))        # [b, 512, 16, 6]
        self.linear1 = nn.Linear(96, out_ch)
        self.linear2 = nn.Linear(512, n_classes)
        self.drop = nn.Dropout(0.3)
        self.softmax = nn.Softmax(dim=2)
    def forward(self, x):
        out = self.sequential(x)
        out = self.avg(out)             # [b, 512, 16, 6]
        b, c, h, w = out.size()
        out = out.view((b, c, -1))          # [b, 512, 96]
        out = self.drop(out)
        out = self.linear1(out)              # [b, 4, 10]
        out = torch.transpose(out, 1, 2)
        out = self.linear2(out)
        out = self.softmax(out)
        return out
    def initialize(self):
        for m in self.modules():
            if isinstance(m, nn.Linear):
                nn.init.normal_(m.weight.data)
                nn.init.zeros_(m.bias.data)
            if isinstance(m, nn.Conv2d):
                nn.init.normal_(m.weight.data)
                nn.init.zeros_(m.bias.data)

模型参数量,权重占比信息:

四、模型训练 (trian.py)

Loss:采用交叉熵损失,对每个位置预测的字符分别计算交叉熵,最后求和。

def loss3d(input, target, criteon):
    total_loss = torch.Tensor([0.])
    total_loss = total_loss.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
    total_loss = total_loss[0]
    for idx, _ in enumerate(range(len(input))):
        pred = input[idx]
        label = target[idx]
        loss = criteon(pred, label)
        total_loss += loss
    return total_loss / len(input)

训练代码如下:

def train(net_path, n_classes=62, epochs=50, batch_size=32, lr=1e-4, root_path='data/en'):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    if os.path.exists(net_path):    
        net_dict = torch.load(net_path)
        model = net_dict['model']
        best_acc = net_dict['best_acc']
    else:
        model = Net1(n_classes).to(device)
        best_acc = 0
    char_set = os.path.split(root_path)[-1]
    train_set = CaptchaSet(mode='train', root_path=root_path, char_set=char_set)
    train_laoder = DataLoader(train_set, batch_size=batch_size, shuffle=True)
    val_set = CaptchaSet(mode='val', root_path=root_path, char_set=char_set)
    val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)
    model = model.to(device)
    criteon = nn.CrossEntropyLoss().to(device)
    optim = torch.optim.Adam(model.parameters(), lr=lr)
    vis = Visdom()
    char_dict = train_set.char_dict
    char_dict = {str(key): value for value, key in char_dict.items()}
    for epoch in tqdm(range(1, epochs+1)):
        train_correct = 0
        train_result = 0
        val_correct = 0
        val_result = 0
        model.train()
        for i, (data, label) in enumerate(train_laoder):
            data, label = data.to(device), label.to(device)
            pred = model(data)
            # pred = pred[0]
            # label = label[0]
            train_loss = loss3d(pred, label, criteon)
            optim.zero_grad()
            train_loss.backward()
            optim.step()
            preds = torch.argmax(pred, dim=2)
            correct, result = calculate(preds, label)
            train_correct += correct
            train_result += result
            if i % 100 == 0:
                print('epoch:%s, step: %s, train_loss: %s' % (epoch, i, train_loss.mean().detach().cpu().item()))
        train_acc = train_correct / train_result
        model.eval()
        for data, label in val_loader:
            data, label = data.to(device), label.to(device)
            pred = model(data)
            val_loss = loss3d(pred, label, criteon)
            preds = torch.argmax(pred, dim=2)
            correct, result = calculate(preds, label)
            val_correct += correct
            val_result += result
        val_acc = val_correct / val_result
        if val_acc > best_acc:
            best_acc = val_acc
            net_dict = {
                'model': model,
                'char_dict': char_dict,
                'best_acc': best_acc,
            }
            torch.save(net_dict, 'best_net.h5')
        print('epoch: %s, train_loss: %s, train_acc: %s, val_loss: %s, val_acc: %s, best_acc: %s' % (epoch,
                                                                                                     train_loss.detach().cpu().item(),
                                                                                                     train_acc,
                                                                                                     val_loss.detach().cpu().item(),
                                                                                                     val_acc,
                                                                                                     best_acc
                                                                                                 ))
        data = data*255
        vis.images(data[:8], win='x')
        pred_text = preds[:8]
        pred_text = [[char_dict[str(char.item())] for char in chars] for chars in pred_text.detach().cpu()]
        label_text = label[:8]
        label_text = [[char_dict[str(char.item())] for char in chars] for chars in label_text.detach().cpu()]
        vis.text(str(pred_text), win='y')
        vis.text(str(label_text), win='true')
        net_dict = {
            'model': model,
            'char_dict': char_dict,
            'best_acc': best_acc,
        }
        torch.save(net_dict, 'net.h5')

经过训练,在大小写识别错误也算错误的情况下,准确度在百分之90以上,如果忽略大小写,则准确度会更高。纯数字验证码识别准确度在百分之98以上。

五、模型应用 (predict.py)

python predict.py  -f data/en/0A3s.jpg

识别结果:

到此这篇关于教你如何用Pytorch搭建数英混合验证码图片识别模型的文章就介绍到这了,更多相关Pytorch数英验证码内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文