jupyterhub-python-文本分类

艾泰
2023-12-01

##本文加载语料库,并对语料库进行文本分类。使用语言:python,环境:jupyterhub。本文使用的是NLTK库。

##首先,关于语料库数据集,是zip压缩文件的形式存在的。本文作为案例的数据集来自于联合国大会的演讲,这些演讲分为澳大利亚和新西兰的。因此,在zip的语料库文件夹里,分为“AU”和“NZ”两个子语料库。子语料库中内容是以txt为格式存下的,每一篇文章是一个txt,本案例“AU”和“NZ”里均有98个txt。因为语料库的个别词汇进行后期修改,以防止更多的表述偏差,所以这里不分享作为案例的数据集了。

##首先进行setup。

##代码如下:

from sklearn.datasets import load_files
from sklearn.model_selection import train_test_split
from sklearn import metrics
from sklearn.pipeline import Pipeline
from sklearn.naive_bayes import MultinomialNB
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.datasets import fetch_20newsgroups

import matplotlib.pyplot as plt
from wordcloud import WordCloud

import re

import nltk
from nltk import word_tokenize
from nltk.tokenize import wordpunct_tokenize
from nltk.tokenize import RegexpTokenizer
from nltk.stem import WordNetLemmatizer 
from nltk.stem import SnowballStemmer 
from nltk.stem import PorterStemmer 
from nltk.corpus import wordnet


from nltk.corpus import movie_reviews

from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.feature_selection import chi2
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import make_scorer
from sklearn.metrics import accuracy_score
import numpy as np #'import numpy'
import pandas as pd
import random

# these are added for importing a corpus from a zipfile
from zipfile import ZipFile
import os.path
from os import path

##以下内容运行此命令以确认有用于预处理的停止字列表和其他数据。

nltk.download('stopwords')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
nltk.download('punkt')

##显示结果,“True”。

stop_words = None
from sklearn.feature_extraction.text import ENGLISH_STOP_WORDS as sklearn_stop_words
nltk_stop_words = nltk.corpus.stopwords.words('english')

##上述内容加载停止字列表的一些默认值。

##以下内容为:定义一些基本功能。

# nice preview of document
def get_preview(docs, targets, target_names, doc_id, max_len=0):
    preview = ''
    if max_len < 1:
        preview += 'Label\n'
        preview += '=====\n'
    else:
        preview += str(doc_id)
        preview += '\t'
    if isinstance(targets[doc_id], str):
        preview += targets[doc_id]
    else:
        preview += target_names[targets[doc_id]]
    if max_len < 1:
        preview += '\n\nFull Text\n'
        preview += '=========\n'
        preview += docs[doc_id]
        preview += '\n'
    else:
        excerpt = get_excerpt(docs[doc_id], max_len)
        preview += '\t' + excerpt
    return preview

_RE_COMBINE_WHITESPACE = re.compile(r"\s+")

# generate an excerpt
def get_excerpt(text, max_len):
    excerpt = _RE_COMBINE_WHITESPACE.sub(' ',text[0:max_len])
    if max_len < len(text):
        excerpt += '...'
    return excerpt.strip()

# combine a defined stop word list (or no stop word list) with any extra stop words defined
def set_stop_words(stop_word_list, extra_stop_words):
    if len(extra_stop_words) > 0:
        if stop_word_list is None:
            stop_word_list = []
        stop_words = list(stop_word_list) + extra_stop_words
    else:
        stop_words = stop_word_list
        
    return stop_words

# initiate stemming or lemmatising
def set_normaliser(normalise):
    if normalise == 'PorterStemmer':
        normaliser = PorterStemmer()
    elif normalise == 'SnowballStemmer':
        normaliser = SnowballStemmer('english')
    elif normalise == 'WordNetLemmatizer':
        normaliser = WordNetLemmatizer()
    else:
        normaliser = None
    return normaliser

# using a custom tokenisation process to allow different tokenisers and stemming/lemmatising ...
def tokenise(doc):
    global tokeniser, normalise, normaliser
    
    # you could obviously add more tokenisers here if you wanted ...
    if tokeniser == 'sklearn':
        tokenizer = RegexpTokenizer(r"(?u)\b\w\w+\b") # this is copied straight from sklearn source
        tokens = tokenizer.tokenize(doc)
    elif tokeniser == 'word_tokenize':
        tokens = word_tokenize(doc)
    elif tokeniser == 'wordpunct':
        tokens = wordpunct_tokenize(doc)
    elif tokeniser == 'nopunct':
        tokenizer = RegexpTokenizer(r'\w+')
        tokens = tokenizer.tokenize(doc)
    else:
        tokens = word_tokenize(doc)
        
    # if using a normaliser then iterate through tokens and return the normalised tokens ...
    if normalise == 'PorterStemmer':
        return [normaliser.stem(t) for t in tokens]
    elif normalise == 'SnowballStemmer':
        return [normaliser.stem(t) for t in tokens]
    elif normalise == 'WordNetLemmatizer':
        # NLTK's lemmatiser needs parts of speech, otherwise assumes everything is a noun
        pos_tokens = nltk.pos_tag(tokens)
        lemmatised_tokens = []
        for token in pos_tokens:
            # NLTK's lemmatiser needs specific values for pos tags - this rewrites them ...
            # default to noun
            tag = wordnet.NOUN
            if token[1].startswith('J'):
                tag = wordnet.ADJ
            elif token[1].startswith('V'):
                tag = wordnet.VERB
            elif token[1].startswith('R'):
                tag = wordnet.ADV
            lemmatised_tokens.append(normaliser.lemmatize(token[0],tag))
        return lemmatised_tokens
    else:
        # no normaliser so just return tokens
        return tokens

# CountVectorizer pre-processor - remove numerics.
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'\d+', '', text)
    return text

##从 zip 文件加载语料库。语料库应该将每个类中的文档放在单独的目录中,语料库数据集zip文件应与本程序放在同一文件夹内。运行下述内容解压缩。(用于案例的数据集名为'UN-AUvNZ.zip',在这里标红。)

corpus_filename = 'UN-AUvNZ.zip'

# Create a ZipFile Object and load sample.zip in it
with ZipFile(corpus_filename, 'r') as zipObj:
   # Extract all the contents of zip file in current directory
   zipObj.extractall()

##检查解压缩后目录是否生成。

corpus_dirname = 'UN-AUvNZ'

if path.isdir(corpus_dirname):
    print('Directory exists:', corpus_dirname)
else:
    print('Directory does not exist!: ', corpus_dirname)

##如果确实生成,会显示'Directory exists:'

##接下来,加载数据并显示类标签。比如在本语料库里,子文件夹是AU和NZ,那么类标签就是这两个。

dataset = load_files(corpus_dirname, encoding='utf-8')
dataset_target_names = dataset.target_names
print(dataset_target_names)

##print的结果就是['AU', 'NZ']

##此后,把数据集分为训练/测试数据集。本案例中,80%的数据用于训练,20%用于测试。这些文件被随机分配到每一组。重新运行这个单元格来重新排列数据集是很有用的,这样就可以使用不同的数据来评估模型,用于训练和测试。

# assign the train/test split - 0.2 is 80% for training, 20% for testing
test_size = 0.2

# keeping data structure compatible with previous notebook
dataset_target = []
for target in dataset.target:
    dataset_target.append(dataset_target_names[target])
    
# do the train test split ...
# docs_train and docs_test are the documents
# y_train and y_test are the labels

docs_train, docs_test, y_train, y_test = train_test_split(dataset.data, 
                                                          dataset_target, 
                                                          test_size = test_size, 
                                                          random_state=None)

##下述内容为检查导入的数据。首先,我们将预览文档标签和一个简短的摘录。

for train_id in range(len(docs_train)):
    print(get_preview(docs_train, y_train, dataset_target_names, train_id, max_len=80))

##运行上述代码,将看到训练数据集的摘要内容。

##我们可以使用以下呢欧容根据训练集中的索引检查特定文档及其标签。注意:由于随机分割,每次导入上述数据时索引都会改变。

train_id = 11
print(get_preview(docs_train, y_train, dataset_target_names, train_id))

##下一步,进行特征提取/预处理。下一部分将使用朴素贝叶斯(Naive Bayes)和单词包模型。在这段代码之后,进行统一说明。

Vectorizer = CountVectorizer # Choose between token counts or tf-idf weights, valid values are CountVectorizer or TfidfVectorizer without quotes
lowercase = True # valid values are False or True (to make it lowercase). Setting lowercase to True will transform all document text to lowercase. Setting it to False will not do this transformation.
tokeniser = 'sklearn' # should be one of the following (with quotes): 'sklearn', 'word_tokenize', wordpunct', 'nopunct' 
normalise = None # should be one of the following: None or 'PorterStemmer' or 'SnowballStemmer' or 'WordNetLemmatizer'
stop_word_list = None # should be either None or nltk_stop_words or sklearn_stop_words
extra_stop_words = [] #list of any extra stop words e.g. ['one' , 'two']
min_df = 0.0 #ignores terms that occur below a minimum proportion of documents.
max_df = 1.0 #ignores terms that occur below a maximum proportion of documents.
max_features = 1000 #set this to None for no limit or set to the maximum number of the most frequent features.
ngram_range = (1, 1) #Ngram*
encoding = 'utf-8'
decode_error = 'ignore' # what to do if contains characters not of the given encoding - options 'strict', 'ignore', 'replace'

##Ngram*:当ngram_range设置为(1,1)时,你将使用unigrams作为特性,即每个特性都是一个标记。如果将其设置为(1,2),则将使用ungram和bigrams。(1,3)将使用单字符、双字符和三字符。如果你只想要字母,你可以用(2,2)请注意:将ngram范围从(1,1)增加会增加预处理的时间,因为会有更多的特性。

##下面的值设置在我们的分类器中选择和使用的特征的数量。在本例中,我们将基于互信息评分从100个特征开始。

kbest = 100

##特征提取,选择,分类

# 如果前半部分有任何改变(因为存在随机部分),这段要重新run。

stop_words = set_stop_words(stop_word_list, extra_stop_words)
normaliser = set_normaliser(normalise)

pipeline = Pipeline([
    ('vectorizer', Vectorizer(tokenizer = tokenise,
                              lowercase = lowercase,
                              min_df = min_df, 
                              max_df = max_df, 
                              max_features = max_features,
                              stop_words = stop_words, 
                              ngram_range = ngram_range,
                              encoding = encoding, 
                              preprocessor = preprocess_text,
                              decode_error = decode_error)),
    ('selector', SelectKBest(score_func = mutual_info_classif, k=kbest)),
    ('classifier', MultinomialNB()), #here is where you would specify an alternative classifier
])

print('Classifier settings')
print('===================')
print('classifier:', type(pipeline.steps[2][1]).__name__)
print('selector:', type(pipeline.steps[1][1]).__name__)
print('vectorizer:', type(pipeline.steps[0][1]).__name__)
print('classes:', dataset_target_names)
print('lowercase:', lowercase)
print('tokeniser:', tokeniser)
print('normalise:', normalise)
print('min_df:', min_df)
print('max_df:', max_df)
print('max_features:', max_features)
if stop_word_list == nltk_stop_words:
    print('stop_word_list:', 'nltk_stop_words')
elif stop_word_list == sklearn_stop_words:
    print('stop_word_list:', 'sklearn_stop_words')
else:
    print('stop_word_list:', 'None')
print('extra_stop_words:', extra_stop_words)
print('ngram_range:', ngram_range)
print('encoding:', encoding)
print('decode_error:', decode_error)
print('kbest:', kbest)

##本例子,本段运行结果如下:

Classifier settings
===================
classifier: MultinomialNB
selector: SelectKBest
vectorizer: CountVectorizer
classes: ['AU', 'NZ']
lowercase: True
tokeniser: sklearn
normalise: None
min_df: 0.0
max_df: 1.0
max_features: 1000
stop_word_list: None
extra_stop_words: []
ngram_range: (1, 1)
encoding: utf-8
decode_error: ignore
kbest: 100

##训练分类器,对测试数据进行标签预测。因为加入特征选择步骤,分类器的速度会变慢,因为它需要为每个特征计算MI得分并对它们进行排序。这将提取的更多特征。

pipeline.fit(docs_train, y_train)
y_predicted = pipeline.predict(docs_test)

# print report
print('Evaluation metrics')
print('==================')
print(metrics.classification_report(y_test, y_predicted, target_names = dataset_target_names))
cm = metrics.confusion_matrix(y_true=y_test, y_pred=y_predicted, labels=dataset_target_names)

disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=dataset_target_names)
disp = disp.plot(include_values=True, cmap='Blues', ax=None, xticks_rotation='vertical')
plt.show()

vect = pipeline.steps[0][1]
clf = pipeline.steps[2][1]

logodds=clf.feature_log_prob_[1]-clf.feature_log_prob_[0]
    
lookup = dict((v,k) for k,v in vect.vocabulary_.items())

##本段将运行出准确性矩阵的结果。

##现在,我们将查看按信息增益(MI)和类别排序的特征。

# Get SelectKBest feature scores
features = pipeline.named_steps['selector']
# get top k feature indices
cols = features.get_support(indices=True)
# get corresponding feature scores 
top_k_feature_scores  = [features.scores_[i] for i in cols]

# get vectorizer
featnames = pipeline.named_steps['vectorizer']
# get all feature names
fred = featnames.get_feature_names()
# get corresponding feature names
top_k_feature_names = [fred[i] for i in cols]

names_scores = list(zip(top_k_feature_names, top_k_feature_scores, logodds))
ns_df = pd.DataFrame(data = names_scores, columns=['Feature_names', 'Feature_Scores', 'Log odds'])

#Sort the dataframe for better visualization - change this to reorder by 
print('Top features by information gain')
ns_df_sorted = ns_df.sort_values(['Feature_Scores', 'Log odds', 'Feature_names'], ascending = [False, False, False])
display(ns_df_sorted[0:10])

print('Features most indicative of', dataset_target_names[0])
ns_df_sorted = ns_df.sort_values(['Log odds', 'Feature_Scores', 'Feature_names'], ascending = [True, False, False])
display(ns_df_sorted[0:10])

print('Features most indicative of', dataset_target_names[1])
ns_df_sorted = ns_df.sort_values(['Log odds', 'Feature_Scores', 'Feature_names'], ascending = [False, False, False])
display(ns_df_sorted[0:10])

##上述代码将输出信息特征的wordlist排行榜(总表、分类表),下面的代码将列出所有的词汇特性。

print('Total Features: ',len(vect.get_feature_names()))
print(vect.get_feature_names())

##以下代码将输出基于信息增益所选择的所有特征的列表。

print('Total Features: ',len(ns_df['Feature_names']))
print(list(ns_df['Feature_names']))

##接下来我们来分析哪些分类正确,哪些分类错误,为什么会被正确or错误分类?

# setup a counter for each cell in the confusion matrix
counter = {}
previews = {}
for true_target, target_name in enumerate(dataset_target_names):
    counter[target_name] = {}
    previews[target_name] = {}
    for predicted_target, ptarget_name in enumerate(dataset_target_names):
        counter[target_name][ptarget_name] = {}
        previews[target_name][ptarget_name] = ''

# get doc-term matrix for test docs
doc_terms = vect.transform(docs_test)

# iterate through all predictions, building the counter and preview of docs
# there is a better way to do this, but this will do!

for doc_id, prediction in enumerate(pipeline.predict(docs_test)):
    for k, v in enumerate(doc_terms[doc_id].toarray()[0]):
        if v > 0 and lookup[k] in list(ns_df['Feature_names']):
            if lookup[k] not in counter[y_test[doc_id]][prediction]:
                counter[y_test[doc_id]][prediction][lookup[k]] = 0
            counter[y_test[doc_id]][prediction][lookup[k]] += v
    
    previews[y_test[doc_id]][prediction] += get_preview(docs_test, y_test, dataset_target_names, doc_id, max_len=80) + '\n'

# output a wordcloud and preview of docs for each cell of confusion matrix ...
for true_target, target_name in enumerate(dataset_target_names):
    for predicted_target, ptarget_name in enumerate(dataset_target_names):
        if true_target == predicted_target:
            print(dataset_target_names[true_target],'Correctly classified')
        else:
            print(dataset_target_names[true_target],'incorrectly classified as',dataset_target_names[predicted_target])
        print('=================================================================')

        wordcloud = WordCloud(background_color="white", width=800, height=600, color_func=lambda *args, **kwargs: "black").generate_from_frequencies(counter[target_name][ptarget_name])
        plt.figure(figsize=(16, 8), dpi= 600)
        plt.imshow(wordcloud, interpolation="bilinear")
        plt.axis("off")
        plt.show()        
        
        print(previews[target_name][ptarget_name])

##以下代码用于预览文档及其特性。可以看到这个特定文档的预测标签、实际标签、全文和特性。

test_id = 4

print('Prediction')
print('==========')
print(pipeline.predict([docs_test[test_id]])[0])
print()

print(get_preview(docs_test, y_test, dataset_target_names, test_id))

print('Features')
print('========')
for k, v in enumerate(vect.transform([docs_test[test_id]]).toarray()[0]):
    if v > 0 and lookup[k] in list(ns_df['Feature_names']):
        print(v, '\t', lookup[k])

 类似资料: