TextCNN+TorchText实现恶意程序分类

本文介绍如何使用TextCNN实现恶意程序的分类任务。实验数据来自天池新人赛阿里云安全恶意程序检测,整个比赛实现了多种模型,最终通过模型融合实现分类任务,TextCNN为所用模型之一。本文实现了使用torchtext直接从列表加载和处理数据,设计了k-fold cross validation进行交叉验证,并使用torch Conv1d实现textcnn。

1 数据格式

通过对原始数据集进行处理之后,训练集和测试集分别用 .plk 文件保存在本地。训练集内容包括特征(恶意程序的API调用序列,每个序列是一个以空格分隔的字符串)和标签,均存储在list中:

import pickle
with open('../dataset/security_train.pkl', 'rb') as f:
labels = pickle.load(f)
train_api = pickle.load(f)
print(labels[0])
print(train_api[0][0:100])
==============output==============
5
LdrLoadDll LdrGetProcedureAddress LdrGetProcedureAddress...

测试集内容包括文件id(恶意程序)和特征,均存储在list中:

import pickle
with open('../dataset/security_test.pkl', 'rb') as f:
file_ids = pickle.load(f)
test_api = pickle.load(f)
print(file_ids[0])
print(test_api[0][0:100])
==============output==============
1
RegOpenKeyExA CopyFileA OpenSCManagerA CreateServiceA...

2 加载数据

这里我们使用文本处理工具 torchtext 来进行数据的加载和预处理。不同于从 .csv 文件加载数据的常规做法,我们直接从上述列表中加载数据。此外,我们还借助 sklearn 在加载数据过程中实现了对训练集的划分,便于后续的交叉验证。

2.1 加载数据

在 torchtext 中,所有 DataSet 对象中的数据都是一个 examples 列表(可以通过DataSet.examples 获得),而该列表中的每一条数据为一个 example 对象。因此,我们可以使用 data.Example 中的 fromlist 方法,来从列表中构建数据集。

class MyDataset(object):
def __init__(self, seed=1234, test=False):
if torch.cuda.is_available():
print("gpu cuda is available!")
torch.cuda.manual_seed(seed)
else:
print("cuda is not available! cpu is available!")
torch.manual_seed(seed)
self.seed = seed
self.train_examples = []
self.test_examples = []
# 分词器
self.tokenizer = lambda x: x.split()
# 定义如何处理特征字段和标签字段
self.TEXT = data.Field(sequential=True, tokenize=self.tokenizer, fix_length=8000)
self.LABEL = data.Field(sequential=False, use_vocab=False)

# file_id, api_seq, label为每个example中的字段(如exampl.label)
fields = [('file_id', None),
('api_seq', self.TEXT),
('label', self.LABEL)]

# 加载数据
if test:
with open('../dataset/security_test.pkl', 'rb') as f:
file_ids = pickle.load(f)
test_api = pickle.load(f)
for f_id, tes_api in zip(file_ids, test_api):
# fields中的三项分别对应列表中的三项
self.test_examples.append(Example.fromlist([f_id, tes_api, None], fields))
else:
with open('../dataset/security_train.pkl', 'rb') as f:
labels = pickle.load(f)
train_api = pickle.load(f)
for tra_api, label in zip(train_api, labels):
self.train_examples.append(Example.fromlist([None, tra_api, label], fields))

2.2 划分训练集

对训练集进行划分,便于后的的交叉验证:

def get_fold_data(self, num_folds=5):
fields = [('api_seq', self.TEXT), ('label', self.LABEL)]

kf = KFold(n_splits=num_folds, shuffle=True, random_state=self.seed)
train_data_arr = np.array(self.train_examples)

for train_index, val_index in kf.split(train_data_arr):
yield(
self.TEXT,
self.LABEL,
data.Dataset(train_data_arr[train_index], fields=fields),
data.Dataset(train_data_arr[val_index], fields=fields),
)

def get_test_data(self):
return self.test_examples

3 TextCNN 模型

TextCNN模型如下图所示,其过程非常简单。首先对一个 text_len * embedding_size 的矩阵做一维卷积,然后将卷积结果做max pooling后拼接,最后输入到全连接网络中进行分类。

TextCNN

这里我们首先介绍一下torch中Conv1dConv2d

  • Conv1d

Conv1d表示卷积的方向是一维的

class torch.nn.Conv1d(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True)
#in_channels(int) 输入信号的通道。在文本分类中,即为词向量的维度;图像中为RGB三通道。
# out_channels(int) 卷积产生的通道。有多少个out_channels,就需要多少个1维卷积
# kernel_size(int or tuple) 卷积核的尺寸,卷积核的大小
# stride(int or tuple, optional) 卷积步长
# padding (int or tuple, optional) 输入的每一条边补充0的层数
# dilation(int or tuple, `optional``) 空洞卷积,卷积核元素之间的间距
# groups(int, optional) 从输入通道到输出通道的阻塞连接数
# bias(bool, optional) 如果bias=True,添加偏置

in_channels(int):输入信号的通道。在文本分类中,即为词向量的维度; out_channels(int):卷积产生的通道,即filter的数量 kernel_size(int or tuple): 卷积核的尺寸。kernel_size为int时,其大小kernel_size*in_channels;kernel_size为tuple时,其大小 kernel_size[0]*kernel_size[1] stride(int or tuple, optional) : 卷积步长 padding (int or tuple, optional):输入的每一条边补充0的层数 dilation(int or tuple, `optional``) :空洞卷积,卷积核元素之间的间距 groups(int, optional) :从输入通道到输出通道的阻塞连接数 bias(bool, optional) : 如果bias=True,添加偏置

  • Conv2d
class torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1, bias=True)

Conv2d与Conv1d的参数完全相同,只不过其用在图像卷积中,含义与文本卷积不同。

in_channels:通道数,图像中为RGB三通道。channel本质上定义了一个数据点用几个值来描述 out_channels:同样是filter的数量,也即输出后的通道数 kernel_size:这里的kernel size可以是int或tuple的,如kernel_size=2,那么就是2*2大小的卷积核;如果是(2,3),那么就是一个non-squre的卷积核,相应的stride和padding也可以设置为tuple

所以这里我们可以使用Conv1d和Conv2d实现textcnn。如果使用Conv2d,那么要设置in_channels=1,kernel_size设置为tuple,kernel_size[0]为Conv1d中kernel_size的大小,而kernel_size[1]为Conv1d中in_channels的大小。(不明白为什么那么多教程用Conv2d实现textcnn)

  • Model
import torch as th
from torch import nn
from torch.nn import Conv2d

class TextCNN(nn.Module):
def __init__(self):
super(TextCNN, self).__init__()
vocab = 304
dim = 256
class_num = 8
filters = 64
kernels = [2, 3, 4, 5]
dilations = [1, 2, 3, 4]

self.embed = nn.Embedding(num_embeddings=vocab, embedding_dim=dim)
self.dp1 = nn.Dropout2d(0.25)
self.convs = nn.ModuleList()
for ks in kernels:
for dila in dilations:
self.convs.append(nn.Conv1d(in_channels=dim, out_channels=filters, kernel_size=ks, dilation=dila))
self.conv_ac = nn.ReLU()
# MaxPooling,在最后一维的行上做最大池化,1为输出的size
self.pool1 = nn.AdaptiveMaxPool1d(1)
self.dp2 = nn.Dropout(0.5)
self.fc1 = nn.Linear(1024, 256)
self.ac1 = nn.ReLU()
self.dp3 = nn.Dropout(0.3)
self.fc2 = nn.Linear(256, 8)


def forward(self, x):
# 输出后的x.shape = (batch_size * text_len * embedding_size)
x = self.embed(x)
x = self.dp1(x)
# Conv1d对输入数据的最后一维进行一维卷积
# 输出后的x.shape = (batch_size * embedding_size * text_len)
x = x.permute(0, 2, 1)
warppers = []
for conv1d in self.convs:
# 输出shape:batch_size * out_channels(64) * W(<=8000)
conv = conv1d(x)
conv_ac = self.conv_ac(conv)
# without permute
# 最后一维做最大池化,指定池化后的输出size为1
# pool shape:batch_size * out_channels(64) * 1
pool = self.pool1(conv_ac)
warppers.append(pool.squeeze(-1))
# 所有的卷积结果拼接成一个向量:shape:batch_size * 1024(64*16)
x = th.cat(warppers, dim=-1)
x = self.dp2(x)
x = self.fc1(x)
x = self.ac1(x)
x = self.dp3(x)
rst = self.fc2(x)
return rst

4 模型训练

在我们的数据中,假设每个词对应的词向量维度Dim=256Dim=256 ,每一个样本的分词后的长度已知设为W=8000W = 8000 (在加载数据时的TEXT=data.Field()中的fix_length参数指定),每个 mini-batch 的大小为 N=32N=32 。那么我们希望得到的是一个个维度为$N ∗ W ∗ D i m $的浮点数数据作为卷积层的输入

在数据加载时,我们已经实现了分词,且无需去除停用词,因此我们加载完数据后首先使用TEXT.build_vocab 建立词汇表(词汇表是词语到 index 的映射,index从0到M,M为已知词汇的个数,如{'我‘:0, ‘好’:1, ‘帅’:2…})。

然后我们调用 data.BucketIterator 将产生 Iterator 迭代数据,并指定batch_size,此时Iterator 中每个 batch 的大小即为 batch_size * text_len (32*8000)。将数据输入到模型中,经过nn.Embedding层之后,其shape变为 batch_size * text_len * embedding size (32*8000*256),即卷积层的输入

由于Torch没有实现cross-validation,因此我们用自定义的data_loader,在训练过程中使用5折交叉验证,即80%的数据训练,20%的数据验证,交叉验证五轮。训练与验证过程的main函数如下:

def main():
data_generator = MyDataset()
_history = []
device = None
model = None
fold_index = 0

for TEXT, LABEL, train_data, val_data in data_generator.get_fold_data():
logger.info("[+] Running Training ...")
logger.info(f"Now fold: {fold_index + 1} / {5}") # num_folds=5

# 构建词汇表(只在train_data上构建即可)
TEXT.build_vocab(train_data)
LABEL.build_vocab(train_data) # For converting str into float labels.

model = TextCNN()
optimizer = optim.Adam(model.parameters(), lr=0.00006, weight_decay=0.00006)

# set cuda
device = torch.device('cuda:0')
model = model.to(device)

train_iterator = data.BucketIterator(train_data, batch_size=32, sort_key=lambda x: len(x.api_seq), device=device)
val_iterator = data.BucketIterator(val_data, batch_size=32, sort_key=lambda x: len(x.api_seq), device=device)
for epoch in range(80):
train_loss, train_acc = train_run(model, train_iterator, optimizer, device)
val_loss, val_acc = eval_run(model, val_iterator, device)
logger.info(f'| Epoch: {epoch + 1:02} | Train Loss: {train_loss:.6f} | Train ACC: {train_acc:.6f} '
f'| Val Loss: {val_loss:.6f} | Val ACC: {val_acc:.6f}')
best = model
torch.save(model.state_dict(), f"best_model_{fold_index}.pkl")
fold_index += 1

train_run() 和 eval_run() 会返回训练集和测试集上的loss和accuracy,代码如下:

def train_run(model, iterator, optimizer, device):
epoch_loss = 0
corrects = 0

for batch in iterator:
model.train()
optimizer.zero_grad()
# shape: text_len * batch_size ==> batch_size * text_len
feat = batch.api_seq.t_()
output = model(feat)
target = batch.label
feat, target = feat.to(device), target.to(device)
# shape: output([batch_size, 8]); target([batch_size])
loss = F.cross_entropy(output, target)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
# [0]为最大值,[1]为最大值对应的index
result = torch.max(output, 1)[1]
corrects += (result.view(target.size()).data == target.data).sum()

return epoch_loss / len(iterator), corrects / (len(iterator) * 32)

def eval_run(model, iterator, device):
epoch_loss = 0
corrects = 0

with torch.no_grad():
for batch in iterator:
model.eval()
feat = batch.api_seq.t_()
predictions = model(feat)
target = batch.label
feat, target = feat.to(device), target.to(device)
loss = F.cross_entropy(predictions, target)
epoch_loss += loss.item()
result = torch.max(predictions, 1)[1]
corrects += (result.view(target.size()).data == target.data).sum()

return epoch_loss / len(iterator), corrects / (len(iterator) * 32)

每次训练结束后,保存最优模型,最后将5个模型在test_data上的输出取平均,即最终结果。

本实验过程仅在测试集数据上做了交叉验证,在真正比赛时,需要在所有数据上构建词典(TEXT.build_vocab(train_data, test_data)),以保证模型在测试集上的精度。

此外,在代码运行过程中,GPU并没有跑满,显存占用8G/10G,GPU利用率80%,内存占满。瓶颈之一是内存不够(16G),另外textcnn的data.BucketIterator不像Dataloader一样可以配置num_workers实现多线程,导致性能没有发挥到最佳。

参考

[1]:TorchText用法示例及完整代码_nlpuser的博客-CSDN博客_torchtext

[2]:pytorch实现textCNN_无所知的博客-CSDN博客

[3]:pytorch TextCNN笔记

[4]:用机器学习进行恶意软件检测–以阿里云恶意软件检测比赛为例 - 先知社区

文章作者: Alston
文章链接: https://lizitong67.github.io/2021/05/17/TextCNN-TorchText%E5%AE%9E%E7%8E%B0%E6%81%B6%E6%84%8F%E7%A8%8B%E5%BA%8F%E5%88%86%E7%B1%BB/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Alston's blog