前言
仅记录学习过程,有问题欢迎讨论
利用rnn实现分词效果(感觉十分依赖词数据)
使用jieba分词好的数据做样本
- pip install jieba
关于池化层(Pooling Layer),它通常用于卷积神经网络(CNN)中,用于减少数据的空间大小(降维),同时保留重要的信息。在图像识别等任务中,池化层可以有效地提取局部特征并减少计算量。
然而,在中文分词这样的NLP任务中,通常不会直接使用池化层。这是因为中文分词是一个序列标注任务,需要对输入的字符序列进行逐一的标签预测。而池化层的设计初衷是处理具有空间结构的数据(如图像),并不直接适用于序列数据
代码
import jieba
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
""" 基于pytorch的网络编写一个分词模型 我们使用jieba分词的结果作为训练数据 看看是否可以得到一个效果接近的神经网络模型 中文分词缺点: 1.对词表极为依赖,如果没有词表,则无法进行;如果词表中缺少需要的词,结果也不会正确 2.切分过程中不会关注整个句子表达的意思,只会将句子看成一个个片段 3.如果文本中出现一定的错别字,会造成一连串影响 4.对于人名等的无法枚举实体词无法有效的处理 """
class TorchModel(nn.Module):
def __init__(self, vocab, input_dim, hidden_size, rnn_layer_size):
super(TorchModel, self).__init__()
self.emb = nn.Embedding(len(vocab) + 1, input_dim)
# 多层rnn效果会比 单层好
self.rnn = nn.RNN(input_size=input_dim,
hidden_size=hidden_size,
num_layers=rnn_layer_size,
batch_first=True)
# 不能使用pool
# self.pool = nn.AvgPool1d(sentence_length)
# 输出为0/1 2分类的
self.classify = nn.Linear(hidden_size, 2)
# -1 不参与计算
self.loss = nn.CrossEntropyLoss(ignore_index=-1)
def forward(self, x, y=None):
x = self.emb(x)
x, _ = self.rnn(x)
# 用 polling 层
# x= self.pool(x.transpose(1,2)).squeeze()
y_pred = self.classify(x)
if y is not None:
# y_pred : n,class_num [[1,2,3][3,2,1]]
# y : n [0 ,1 ]
# 20*20*2===>view ===> 400 * 2 y===> 400 *1
return self.loss(y_pred.view(-1, 2), y.view(-1))
else:
return y_pred
# 使用jieba获取切分好的数据 来作为样本数据
# 我爱你们 === 1,1,0,1
def sequence_to_label(sentence):
words = jieba.lcut(sentence)
labels = [0] * len(sentence)
pointer = 0
for word in words:
pointer += len(word)
labels[pointer - 1] = 1
return labels
# 读取给定词表数据 构建字符集
def build_vocab(path):
vocab = {
}
with open(path, encoding="utf8") as f:
for index, line in enumerate(f):
char = line.strip()
vocab[char] = index + 1
vocab['unk'] = len(vocab) + 1
return vocab
class Dataset:
def __init__(self, vocab, corpus_path, max_length):
self.vocab = vocab
self.corpus_path = corpus_path
self.max_length = max_length
self.load()
# 构建数据集
def load(self):
# data 的结构为 [x,y]
self.data = []
with open(self.corpus_path, encoding="utf8") as f:
for line in f:
vocab = self.vocab
# 转化为 切分好的数据 y
y = sequence_to_label(line)
# 转化为数字
x = [vocab.get(char, vocab['unk']) for char in line]
# 都 标准化为最大长度
x, y = self.padding(x, y)
self.data.append([torch.LongTensor(x), torch.LongTensor(y)])
# 使用部分数据做展示,使用全部数据训练时间会相应变长
if len(self.data) > 10000:
break
def padding(self, x, y):
# 长了就截取
x = x[:self.max_length]
# 短了就 补0
x += [0] * (self.max_length - len(x))
y = y[:self.max_length]
# y 不能用 0
y += [-1] * (self.max_length - len(y))
return x, y
# 为了给 data_load 使用 做小批量数据分割
def __len__(self):
return len(self.data)
def __getitem__(self, item):
return self.data[item]
def build_dataset(vocab, corpus_path, max_length, batch_size):
dataset = Dataset(vocab, corpus_path, max_length)
# shuffle 随机打乱样本
data_loader = DataLoader(dataset, shuffle=True, batch_size=batch_size) # torch
return data_loader
def main():
batch_size = 20
lr = 1e-3
epoch_size = 10
vocab = build_vocab("D:\\NLP\\test\\week4\\chars.txt")
hidden_size = 100
# 每个字符的维度
input_dim = 20
rnn_layer_size = 2
# 样本最大长度
max_length = 20
model = TorchModel(vocab, input_dim, hidden_size, rnn_layer_size)
optim = torch.optim.Adam(model.parameters(), lr=lr)
# 语料库(样本数据)路径
corpus_path = "D:\\NLP\\test\\week4\\corpus.txt"
dataiter = build_dataset(vocab, corpus_path, max_length, batch_size)
for epoch in range(epoch_size):
epoch_loss = []
model.train()
for x, y_true in dataiter:
loss = model(x, y_true)
loss.backward()
optim.step()
optim.zero_grad()
epoch_loss.append(loss.item())
print("第%d轮 loss = %f" % (epoch + 1, np.mean(epoch_loss)))
# save model
torch.save(model.state_dict(), "model.pth")
return
# 最终预测
def predict(model_path, vocab_path, input_strings):
# 配置保持和训练时一致
char_dim = 20 # 每个字的维度
hidden_size = 100 # 隐含层维度
num_rnn_layers = 2 # rnn层数
vocab = build_vocab(vocab_path) # 建立字表(字符集)
model = TorchModel(vocab, char_dim, hidden_size, num_rnn_layers) # 建立模型
model.load_state_dict(torch.load(model_path)) # 加载训练好的模型权重
model.eval()
for input_string in input_strings:
# 逐条预测
x = [vocab.get(char, vocab['unk']) for char in input_string]
with torch.no_grad():
result = model.forward(torch.LongTensor([x]))[0]
result = torch.argmax(result, dim=-1) # 预测出的01序列
# 在预测为1的地方切分,将切分后文本打印出来
for index, p in enumerate(result):
if p == 1:
print(input_string[index], end=" ")
else:
print(input_string[index], end="")
print()
if __name__ == '__main__':
main()
# input_strings = ["同时国内有望出台新汽车刺激方案",
# "沪胶后市有望延续强势",
# "经过两个交易日的强势调整后",
# "昨日上海天然橡胶期货价格再度大幅上扬"]
# predict("model.pth", "D:\\NLP\\test\\week4\\chars.txt", input_strings)
贴一个递归的实现词表全排列
word = “hello”
cuts = [“world”, “!”]
new_list = [word] + cuts
print(new_list)
输出: [‘hello’, ‘world’, ‘!’]
"""" 实现分词的全排列 """
import jieba
Dict = {
"经常": 0.1,
"经": 0.1,
"有": 0.1,
"常": 0.1,
"有意见": 0.1,
"歧": 0.1,
"意见": 0.1,
"分歧": 0.1,
"见": 0.1,
"意": 0.1,
"见分歧": 0.1,
"分": 0.1,
}
# 文本
sentence = "经常有意见分歧"
# 实现全切分函数(14)
# 输出['经常',xxx] [['经',xxx],...]
def all_cut(sentence, Dict):
results = []
if not sentence:
return [[]]
for word in Dict:
# 如果句子以当前词开头
if sentence.startswith(word):
# 递归处理剩余部分
remaining = all_cut(sentence[len(word):],Dict)
for cuts in remaining:
results.append([word]+cuts)
return results
target = []
res = all_cut(sentence,Dict)
print(res)
print(len(res))
# python实现一个数组的全排列
def fun(list):
if len(list) < 1:
return [list]
else:
result = []
for i in range(len(list)):
remaining = list[:i] + list[i + 1:]
for perm in fun(remaining):
result.append([list[i]] + perm)
return result
# lst = [1, 2, 3]
# res = fun(lst)
# print(res)
文章评论