В этой статье мы узнаем о PySpark и о том, как мы можем использовать его для выполнения любой задачи классификации в нашем случае «Классификация новостей».

Что такое PySpark:

PySpark — это API Python для Apache Spark, распределенная вычислительная среда с открытым исходным кодом и набор библиотек для крупномасштабной обработки данных в режиме реального времени. Если вы уже знакомы с Python и библиотеками, такими как Pandas, то PySpark — хороший язык для изучения, чтобы создавать более масштабируемые анализы и пайплайны. ["источник"]

Во-первых, нам нужно установить PySpark, для этого мы можем выполнить простую установку pip.

!pip install pyspark

Далее нам нужно импортировать пакеты

import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
import warnings
warnings.filterwarnings(‘ignore’)
# Now every spark project has SparkContext and SparkSesion so we’re going to create that
sc = SparkContext(master=’local[2]’)
spark = SparkSession.builder.appName(“News Classification”).getOrCreate()

Здесь мы создаем SparkContext в мастере с двумя ядрами, а имя нашей сессии — «Классификация новостей».

Теперь пришло время прочитать набор данных, который вы можете скачать с Kaggle -> https://www.kaggle.com/datasets/rmisra/news-category-dataset

наши данные в формате JSON, к счастью для нас, у Spark есть метод для чтения данных JSON

# read dataset
df = spark.read.json(“/kaggle/input/news-category-dataset/News_Category_Dataset_v2.json”)
df.show(5)

Мы не собираемся брать все эти столбцы, мы берем только те столбцы, которые подходят для вашей Модели.

df = df.select(“headline”,”short_description”,”category”)
df.show(5)

в наших данных у нас более 200 тысяч точек данных, поэтому я возьму только подмножество данных, чтобы продемонстрировать

df = df.limit(10000)
df.count()
# it’s time to do a little bit of data exploration
df.groupby(‘category’).count().sort(‘count’, ascending=False).show()

Проверка отсутствующих значений

# check for missing value
print(“Headline “, df.toPandas()[‘headline’].isnull().sum())
print(“short_description “, df.toPandas()[‘short_description’].isnull().sum())
print(“category “, df.toPandas()[‘category’].isnull().sum()

Таким образом, у нас нет нулевого значения в этом наборе данных, поскольку мы видим, что у нас есть два столбца с именами «headline» и «short_description», мы можем объединить их в один и создать столбец функций, который мы собираемся передать в нашем модель.

from pyspark.sql import functions as sf
df = df.withColumn(‘description’, sf.concat(sf.col(‘headline’),sf.lit(‘ ‘), sf.col(‘short_description’)))
# selecting only 2 columns
df = df.select(“description”,”category”)
df.show()

Удаление пустых данных

import pyspark.sql.functions as f
df = df.where(f.col(‘description’) != “ “)
df.show()

Наша модель машинного обучения не будет понимать строковые данные, которые нам нужны для передачи данных в числовую форму, поэтому в НЛП у нас есть много методов для преобразования строки в числовые значения.

мы можем использовать такие методы, как CountVectorizer, TFIDF, BagOfWords, OneHotEncoder, WordEmbeddings, HashingTF и другие…

from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
tokenizer = Tokenizer(inputCol=’description’,outputCol=’mytokens’)
stopwordRemover = StopWordsRemover(inputCol=’mytokens’,outputCol=’filtered_tokens’)
vectorizer = CountVectorizer(inputCol=’filtered_tokens’,outputCol=’rawFeatures’)
idf = IDF(inputCol=’rawFeatures’,outputCol=’vectorizedFeatures’)

Итак, как вы можете видеть в приведенном выше коде, мы выполняем различные операции, такие как токенизатор, стоп-словоудаление, векторизатор и IDF.

И каждая из этих функций ожидает inputCol (в каком столбце вы хотите сделать преобразование) и outputCol (куда мы должны поместить преобразованные данные)

Далее нам нужно закодировать столбец метки в случае (категории)

# encode the label feature (category)
labelEncoder = StringIndexer(inputCol=’category’,outputCol=’label’).fit(df)
labelEncoder.transform(df).show(5)

Теперь мы собираемся сделать label_dict для проверки меток, которые пригодятся при прогнозировании.

Нам нужно разделить наши данные на наборы данных для обучения и тестирования

# split dataset
(train_df,test_df) = df.randomSplit((0.7,0.3),seed=42)

Пришло время создать модель

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol=’vectorizedFeatures’,labelCol=’label’)

Я делаю простую модель LogisticRegression, в которой нам нужно передать featureCol и labelCol.

# let’s create a pipeline for our model
# building the pipeline
from pyspark.ml import Pipeline
pipeline = Pipeline(
stages = [tokenizer, stopwordRemover, vectorizer,idf,lr]
)

В нашем пайплайне у нас есть 5 шагов

4 шага (tokenizer, stopwordRemover, vectorizer) выполняют преобразования и 1 шаг (lr) для прогнозирования.

Если мы проверим этапы нашего пайплайна, мы получим

pipeline.stages

Пришло время обучить модель

lr_model = pipeline.fit(train_df)

Давайте посмотрим, что в нашей модели lr_model.

объект конвейера состоит из этапов

lr_model = pipeline.fit(train_df)

Пришло время сделать прогноз с помощью test_df

# get prediction on test data
predictions = lr_model.transform(test_df)

Если мы посмотрим на столбец предсказания, вы увидите, что есть предсказание имени столбца, в котором у нас есть наше предсказанное значение. Теперь давайте посмотрим на наш прогноз

predictions.select(‘description’,’rawPrediction’, ‘probability’,’category’,’label’,’prediction’).show(10)

Посмотрите последние 2 столбца, в которых у нас есть реальная метка («метка»), и наш прогноз модели («прогноз») кажется очень хорошим результатом.

Пришло время оценить модель

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(predictionCol=’prediction’,
labelCol=’label’)
evaluator.evaluate(predictions)*100

точность, отчет о классификации f1score

from pyspark.mllib.evaluation import MulticlassMetrics
lr_metric = MulticlassMetrics(predictions[‘label’,’prediction’].rdd)
print(“Accuracy “, lr_metric.accuracy)
print(“precision “, lr_metric.precision(1.0))
print(“f1Score “, lr_metric.fMeasure(1.0))
print(“recall “, lr_metric.recall(1.0))

Таким образом, вы можете видеть, что мы получаем около 63% точности, используя простую модель логистической регрессии, используя расширенную модель, мы можем достичь очень хорошей точности. Поэтому я призываю вас попробовать…

Исходный код: - Github, Kaggle

Посетите мой канал на Youtube, чтобы узнать о проектах, связанных с машинным обучением, искусственным интеллектом, НЛП и т. д. -› Youtube

На этом пока все, увидимся в следующей статье.

давайте подключаться в Linkedin, Twitter, Instagram, Github и Facebook.

Спасибо, что прочитали!

Подпишитесь на DDIntel Здесь.

Присоединяйтесь к нашей сети здесь: https://datadriveninvestor.com/collaborate