Partage
  • Partager sur Facebook
  • Partager sur Twitter

Analyse de sentiment en utilisant RNN (LSTM)

9 juin 2020 à 23:45:16

#coding: utf-8
import pandas as pd
import numpy as np
import xml.etree.ElementTree as ET
from collections import Counter
import matplotlib.pyplot as plt
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn as nn

path_train = "C:\ABSA16_Laptops_Train_English_SB2.xml"

def get_list(path):
tree = ET.parse(path)
root = tree.getroot()
text_list = []
opinion_list = []
for review in root.findall("Review"):
text_string = ""

for sent in review.findall("./sentences/sentence"):
text_string=text_string + " " + sent.find("text").text
text_string = text_string.lower()
text_string= text_string.strip()
text_list.append(text_string +("\n"))


for opinion in review.findall("./Opinions/Opinion"):
if (opinion.get("category")=="LAPTOP#GENERAL"):
opinion_dict =opinion.get("polarity")
#opinion_inner_list.append(opinion_dict)
opinion_list.append(opinion_dict)
return text_list, opinion_list

reviews, labels = get_list(path_train)

#test_text_list, test_opinion_list = get_list(path_test)
print('train_text_list',reviews)
print()
print('train_opinion_list',labels)


ligne= []
from string import punctuation
all_text = ''.join([c for c in reviews if c not in punctuation])#éffacer la ponctuation
print(all_text)


reviews_split = all_text.split('\n') 
print ('nombre de revue :', len(reviews_split))
all_text2 = ' '.join(reviews_split)
words = all_text2.split()

count_words = Counter(words)
#compter le fréquance des mots #
total_words = len(words)
print(total_words)#nbr total des mots
sorted_words = count_words.most_common(total_words)
print (count_words)#&crire la fréquance des mots
vocab_to_int = {w:i+1 for i, (w,c) in enumerate(sorted_words)}
print (vocab_to_int)
# créer le liste qui comporte les listes des moots sous forme d'entiers #
reviews_int = []
for review in reviews_split:
r = [vocab_to_int[w] for w in review.split()]
reviews_int.append(r)
print (reviews_int)
print("langueuuur rev_int",len(reviews_int))

# encoder les étequettes #

encoded_labels = [1 if label =='positive' else 0 if label =='negative'else 2 if label =='conflict ' else 3 for label in labels]#encoder les labels
encoded_labels = np.array(encoded_labels)


# analyser lse données #
reviews_len = [len(x) for x in reviews_int]
pd.Series(reviews_len).hist()
plt.show()
pd.Series(reviews_len).describe()
# se débarasser des valeurs courtes et lantes #
print("les labels11111 ",len(encoded_labels))
reviews_int = [ reviews_int[i] for i, l in enumerate(reviews_len) if l>0 ]
encoded_labels = [encoded_labels[i] for i,l in enumerate(reviews_len) if l>0 and i<2082]
print("les labels ",len(encoded_labels))
#construire des liste de langueur fixe
def pad_features(reviews_int, seq_length):

features = np.zeros((len(reviews_int), seq_length), dtype = int)

for i, review in enumerate(reviews_int):
review_len = len(review)#la lanngueur de la revue

if review_len <= seq_length:
zeroes = list(np.zeros(seq_length-review_len))#liste de zeros de languer
new = zeroes+review #compliter avec la liste des zeros
elif review_len > seq_length:
new = review[0:seq_length]

features[i,:] = np.array(new)

return features


seq_length =395
features=pad_features (reviews_int, seq_length)
print (features)
len_feat = len(features)
print ("len feat",len_feat)
split_frac = 0.8

#les données de formation

train_x = features[0:int(split_frac*len_feat)]#les revues
print("train_x",train_x)
train_y = encoded_labels[0:int(split_frac*len_feat)]# les labels
print("train_y",train_y)
print("revue_split",len(reviews_split))
print("languer rev",len(reviews_int))


#données restantes
remaining_x = features[int(split_frac*len_feat):]
remaining_y = encoded_labels[int(split_frac*len_feat):]
print (len(remaining_x))
print(len(remaining_y))

#données de validation
valid_x = remaining_x[0:int(len(remaining_x)*0.5)]
valid_y = remaining_y[0:int(len(remaining_y)*0.5)]
print ('len(valid_x)',len(valid_x))
print('len(valid_y)',len(valid_y))
#données de test
test_x = remaining_x[int(len(remaining_x)*0.5):]
test_y = remaining_y[int(len(remaining_y)*0.5):]
print ('len(test_x)',len(test_x))
print('len(test_y)',len(test_y))

train_data = TensorDataset(torch.tensor(np.asarray(train_x)), torch.tensor(np.asarray(train_y)))
valid_data = TensorDataset(torch.tensor(np.asarray(valid_x)), torch.tensor(np.asarray(valid_y)))
test_data = TensorDataset(torch.tensor(np.asarray(test_x)), torch.tensor(np.asarray(test_y)))


# dataloaders
batch_size = 20 #nombre d'échantillons traités avant la mise à jour du modèle

train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size,drop_last = True)
valid_loader = DataLoader(valid_data, shuffle=True, batch_size=batch_size,drop_last = True)
test_loader = DataLoader(test_data, shuffle=True, batch_size=batch_size,drop_last = True)

dataiter = iter(train_loader)# accéder aux données d'entarinement
sample_x, sample_y = dataiter.next()
print('Sample input size: ', sample_x.size()) # batch_size, seq_length
print('Sample input: \n', sample_x)
print()
print('Sample label size: ', sample_y.size()) # batch_size
print('Sample label: \n', sample_y)
train_on_gpu = torch.device("cpu")#Training on CPU
class SentimentLSTM(nn.Module):
"""
Le modèle RNN qui sera utilisé pour effectuer l'analyse des sentiments.
"""

def __init__(self, vocab_size, output_size, embedding_dim, hidden_dim, n_layers, drop_prob=0.5):#le constructeur pour definir les differentes couche qu'on vas avoir
"""
Initialisez le modèle en configurant les couches.
"""

super (SentimentLSTM, self).__init__ ()

self.vocab_size=vocab_size
self.output_size = output_size
self.n_layers = n_layers
self.hidden_dim = hidden_dim
# embedding and LSTM layers
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, n_layers,
dropout=drop_prob, batch_first=True)


self.dropout = nn.Dropout(0.2)

# linear and sigmoid layers
self.fc = nn.Linear(hidden_dim, output_size)
self.sig = nn.Sigmoid()

def forward(self, x, hidden):# transforemer les donner pour avoir le resultat la prediction x=bach de donnees
"""
Effectuez une passe avant de notre modèle sur une entrée et un état caché.
"""
batch_size = x.size(0)
# embeddings and lstm_out
embeds = self.embedding(x)
lstm_out, hidden = self.lstm(embeds, hidden)

#empiler les sorties lstm
lstm_out = lstm_out.contiguous().view(-1, self.hidden_dim)

# abandon et couche entièrement connectée
out = self.dropout(lstm_out)
out = self.fc(out)
# sigmoid function
sig_out = self.sig(out)

# remodeler pour être batch_size en premier
sig_out = sig_out.view(batch_size, -1)
sig_out = sig_out[:, -1] # get last batch of labels




# retourner la dernière sortie sigmoïde et l'état caché
return sig_out, hidden


def init_hidden(self, batch_size):
weight = next(self.parameters()).data

if (train_on_gpu):
hidden = (weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cpu(),
weight.new(self.n_layers, batch_size, self.hidden_dim).zero_().cpu())


return hidden


# Instantiation du modèle
vocab_size = len(vocab_to_int)+1 #( +1 for the 0 padding )Taille de notre vocabulaire ou plage de valeurs pour notre entrée
output_size = 1# Taille de notre sortie souhaitée; le nombre de scores de classe que nous voulons afficher (pos / neg,conflet/neutre).
embedding_dim = 40 # Nombre de colonnes dans la table de recherche d'intégration; taille de nos plongements.
hidden_dim = 25# Nombre d'unités dans les couches cachées de nos cellules LSTM.


n_layers = 4 #Nombre de couches LSTM dans le réseau

net = SentimentLSTM(vocab_size, output_size, embedding_dim, hidden_dim, n_layers)
print(net)

# fonction d'optimisation et de perte
lr=0.01#Taux d'apprentissage
import torch.optim as optim

criterion = nn.BCELoss()#calculer les erreur pour les optimoiser
optimizer = optim.Adam(net.parameters(), lr=lr)##construire un objet optimiseur

# training params
epochs = 6 # Nombre de fois pour parcourir le jeu de données d'apprentissage.
counter = 0#compteur
print_every = 10#le nombre de
clip=5 # gradient clipping(valeur maximal du gradien)
# move model to GPU, if available
if(train_on_gpu):
net.cpu()

net.train()

# train for some number of epochs
for e in range(epochs):
# initialize hidden state
h = net.init_hidden(batch_size)#initialiser les couches cachées

# batch loop
for inputs, labels in train_loader:
counter += 1
if(train_on_cpu):
inputs, labels = inputs.cpu(), labels.cpu()
#print("batch size :",batch_size)
# Creating new variables for the hidden state, otherwise
# we'd backprop through the entire training history
h = tuple([each.data for each in h])

# zero accumulated gradients
net.zero_grad()

# get the output from the model
inputs = inputs.type(torch.LongTensor)
output, h = net(inputs, h)

# calculate the loss and perform backprop
loss = criterion(output.squeeze(), labels.float())
loss.backward()

nn.utils.clip_grad_norm_(net.parameters(), clip)
optimizer.step()

# loss stats
if counter % print_every == 0:
# Get validation loss
val_h = net.init_hidden(batch_size)
val_losses = []
net.eval()
for inputs, labels in valid_loader:

# Creating new variables for the hidden state, otherwise
# we'd backprop through the entire training history
val_h = tuple([each.data for each in val_h])

if(train_on_cpu):
inputs, labels = inputs.cpu(), labels.cpu()

inputs = inputs.type(torch.LongTensor)
output, val_h = net(inputs, val_h)
val_loss = criterion(output.squeeze(), labels.float())
val_losses.append(val_loss.item())

net.train()


print("Epoch: {}/{}...".format(e+1, epochs),
"Step: {}...".format(counter),
"Loss: {:.6f}...".format(loss.item()),
"Val Loss: {:.6f}".format(np.mean(val_losses)))


# tester les données

test_losses = [] # track loss
num_correct = 0
# init hidden state
h = net.init_hidden(batch_size)
net.eval()
# iterate over test data
for inputs, labels in test_loader:

# Creating new variables for the hidden state, otherwise
# we'd backprop through the entire training history
h = tuple([each.data for each in h])
# print("la valeur de h: \n",h)
if(train_on_gpu):
inputs, labels = inputs.cpu(), labels.cpu()

# get predicted outputs
inputs = inputs.type(torch.LongTensor)
output, h = net(inputs, h)

# calculate loss
test_loss = criterion(output.squeeze(), labels.float())
test_losses.append(test_loss.item())
#print(test_loss.item())

# convert output probabilities to predicted class (0 or 1)
pred = torch.round(output.squeeze()) # rounds to the nearest integer
#pred = pred.argmax(dim=1, keepdim=True)
# compare predictions to true label
correct_tensor = pred.eq(labels.float().view_as(pred))
correct = np.squeeze(correct_tensor.cpu().numpy())
num_correct += np.sum(correct)
#print(num_correct)


# -- Statistiques! - ##
# avg test loss
print("Test perte: {:.3f}".format(np.mean(test_losses)))

# précision sur toutes les données de test
test_acc = num_correct/len(test_loader.dataset)
print("Test de précision: {:.3f}%".format(test_acc*100))

def tokenize_review(test_review):
test_review = test_review.lower() # lowercase
# get rid of punctuation
test_text = ''.join([c for c in test_review if c not in punctuation])

# splitting by spaces
test_words = test_text.split()

# tokens
test_ints = []
test_ints.append([vocab_to_int[word] for word in test_words])

return test_ints

test_review_neg = 'It is the worst laptop ever.'# la phrase à tester

def predict(net, test_review, sequence_length=200):

net.eval()
# tokenize review
test_ints = tokenize_review(test_review)

# pad tokenized sequence
seq_length=sequence_length
features = pad_features(test_ints, seq_length)

# convert to tensor to pass into your model
feature_tensor = torch.from_numpy(np.asarray(features))
batch_size = feature_tensor.size(0)

# initialize hidden state
h = net.init_hidden(batch_size)

if(train_on_gpu):
feature_tensor = feature_tensor.cpu()

# get the output from the model
feature_tensor = feature_tensor.type(torch.LongTensor)
output, h = net(feature_tensor, h)

# convert output probabilities to predicted class (0 or 1)
pred = torch.round(output.squeeze())
#printing output value, before rounding
print('Prediction value, pre-rounding: {:.6f}'.format(output.item()))

# print custom response
# print custom response
if(pred.item()==1):
print("Positive review detected!")
elif(pred.item()==0):
print("Negative review detected.")
elif(pred.item()==2):
print("conflet review detected.")
else:
print("neutre review detected.")



seq_length=200
predict(net, test_review_neg, seq_length)

Bonsoir,

Je suis débutant en traitement de texte avec python (pytorch), et j'ai écris un processus d'analyse de sentiment  en utilisant les réseaux de neurones récurrents (RNN)   mis en œuvre en utilisant l' architecture de mémoire à court terme à long terme (LSTM) ,mais mon code ne retourne pas toujours le résultat correct et la précision change à chaque exécution .

Question:Que peut-on faire pour améliorer la précision ?, et avoir le bon résultat ?

Merci d'avance.

  • Partager sur Facebook
  • Partager sur Twitter
10 juin 2020 à 2:35:08

Bonjour,

Doublon

Les doublons nuisent au bon fonctionnement du forum et sont donc interdits. Si vous vous êtes trompé de section, il suffit de signaler votre sujet au staff pour qu'il le déplace au bon endroit.

Je vous invite à continuer la discussion sur l'autre sujet : https://openclassrooms.com/forum/sujet/analyse-de-sentiment

Je ferme ce sujet. 

  • Partager sur Facebook
  • Partager sur Twitter