В этой статье мы узнаем о PySpark и о том, как мы можем использовать его для выполнения любой задачи классификации в нашем случае «Классификация новостей».
Что такое PySpark:
PySpark — это API Python для Apache Spark, распределенная вычислительная среда с открытым исходным кодом и набор библиотек для крупномасштабной обработки данных в режиме реального времени. Если вы уже знакомы с Python и библиотеками, такими как Pandas, то PySpark — хороший язык для изучения, чтобы создавать более масштабируемые анализы и пайплайны. ["источник"]
Во-первых, нам нужно установить PySpark, для этого мы можем выполнить простую установку pip.
!pip install pyspark
Далее нам нужно импортировать пакеты
import pyspark from pyspark import SparkContext from pyspark.sql import SparkSession import warnings warnings.filterwarnings(‘ignore’) # Now every spark project has SparkContext and SparkSesion so we’re going to create that sc = SparkContext(master=’local[2]’) spark = SparkSession.builder.appName(“News Classification”).getOrCreate()
Здесь мы создаем SparkContext в мастере с двумя ядрами, а имя нашей сессии — «Классификация новостей».
Теперь пришло время прочитать набор данных, который вы можете скачать с Kaggle -> https://www.kaggle.com/datasets/rmisra/news-category-dataset
наши данные в формате JSON, к счастью для нас, у Spark есть метод для чтения данных JSON
# read dataset df = spark.read.json(“/kaggle/input/news-category-dataset/News_Category_Dataset_v2.json”) df.show(5)
Мы не собираемся брать все эти столбцы, мы берем только те столбцы, которые подходят для вашей Модели.
df = df.select(“headline”,”short_description”,”category”) df.show(5)
в наших данных у нас более 200 тысяч точек данных, поэтому я возьму только подмножество данных, чтобы продемонстрировать
df = df.limit(10000) df.count() # it’s time to do a little bit of data exploration df.groupby(‘category’).count().sort(‘count’, ascending=False).show()
Проверка отсутствующих значений
# check for missing value print(“Headline “, df.toPandas()[‘headline’].isnull().sum()) print(“short_description “, df.toPandas()[‘short_description’].isnull().sum()) print(“category “, df.toPandas()[‘category’].isnull().sum()
Таким образом, у нас нет нулевого значения в этом наборе данных, поскольку мы видим, что у нас есть два столбца с именами «headline» и «short_description», мы можем объединить их в один и создать столбец функций, который мы собираемся передать в нашем модель.
from pyspark.sql import functions as sf df = df.withColumn(‘description’, sf.concat(sf.col(‘headline’),sf.lit(‘ ‘), sf.col(‘short_description’))) # selecting only 2 columns df = df.select(“description”,”category”) df.show()
Удаление пустых данных
import pyspark.sql.functions as f df = df.where(f.col(‘description’) != “ “) df.show()
Наша модель машинного обучения не будет понимать строковые данные, которые нам нужны для передачи данных в числовую форму, поэтому в НЛП у нас есть много методов для преобразования строки в числовые значения.
мы можем использовать такие методы, как CountVectorizer, TFIDF, BagOfWords, OneHotEncoder, WordEmbeddings, HashingTF и другие…
from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer tokenizer = Tokenizer(inputCol=’description’,outputCol=’mytokens’) stopwordRemover = StopWordsRemover(inputCol=’mytokens’,outputCol=’filtered_tokens’) vectorizer = CountVectorizer(inputCol=’filtered_tokens’,outputCol=’rawFeatures’) idf = IDF(inputCol=’rawFeatures’,outputCol=’vectorizedFeatures’)
Итак, как вы можете видеть в приведенном выше коде, мы выполняем различные операции, такие как токенизатор, стоп-словоудаление, векторизатор и IDF.
И каждая из этих функций ожидает inputCol (в каком столбце вы хотите сделать преобразование) и outputCol (куда мы должны поместить преобразованные данные)
Далее нам нужно закодировать столбец метки в случае (категории)
# encode the label feature (category) labelEncoder = StringIndexer(inputCol=’category’,outputCol=’label’).fit(df) labelEncoder.transform(df).show(5)
Теперь мы собираемся сделать label_dict для проверки меток, которые пригодятся при прогнозировании.
Нам нужно разделить наши данные на наборы данных для обучения и тестирования
# split dataset (train_df,test_df) = df.randomSplit((0.7,0.3),seed=42)
Пришло время создать модель
from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(featuresCol=’vectorizedFeatures’,labelCol=’label’)
Я делаю простую модель LogisticRegression, в которой нам нужно передать featureCol и labelCol.
# let’s create a pipeline for our model # building the pipeline from pyspark.ml import Pipeline pipeline = Pipeline( stages = [tokenizer, stopwordRemover, vectorizer,idf,lr] )
В нашем пайплайне у нас есть 5 шагов
4 шага (tokenizer, stopwordRemover, vectorizer) выполняют преобразования и 1 шаг (lr) для прогнозирования.
Если мы проверим этапы нашего пайплайна, мы получим
pipeline.stages
Пришло время обучить модель
lr_model = pipeline.fit(train_df)
Давайте посмотрим, что в нашей модели lr_model.
объект конвейера состоит из этапов
lr_model = pipeline.fit(train_df)
Пришло время сделать прогноз с помощью test_df
# get prediction on test data predictions = lr_model.transform(test_df)
Если мы посмотрим на столбец предсказания, вы увидите, что есть предсказание имени столбца, в котором у нас есть наше предсказанное значение. Теперь давайте посмотрим на наш прогноз
predictions.select(‘description’,’rawPrediction’, ‘probability’,’category’,’label’,’prediction’).show(10)
Посмотрите последние 2 столбца, в которых у нас есть реальная метка («метка»), и наш прогноз модели («прогноз») кажется очень хорошим результатом.
Пришло время оценить модель
from pyspark.ml.evaluation import MulticlassClassificationEvaluator evaluator = MulticlassClassificationEvaluator(predictionCol=’prediction’, labelCol=’label’) evaluator.evaluate(predictions)*100
точность, отчет о классификации f1score
from pyspark.mllib.evaluation import MulticlassMetrics lr_metric = MulticlassMetrics(predictions[‘label’,’prediction’].rdd) print(“Accuracy “, lr_metric.accuracy) print(“precision “, lr_metric.precision(1.0)) print(“f1Score “, lr_metric.fMeasure(1.0)) print(“recall “, lr_metric.recall(1.0))
Таким образом, вы можете видеть, что мы получаем около 63% точности, используя простую модель логистической регрессии, используя расширенную модель, мы можем достичь очень хорошей точности. Поэтому я призываю вас попробовать…
Исходный код: - Github, Kaggle
Посетите мой канал на Youtube, чтобы узнать о проектах, связанных с машинным обучением, искусственным интеллектом, НЛП и т. д. -› Youtube
На этом пока все, увидимся в следующей статье.
давайте подключаться в Linkedin, Twitter, Instagram, Github и Facebook.
Спасибо, что прочитали!
Подпишитесь на DDIntel Здесь.
Присоединяйтесь к нашей сети здесь: https://datadriveninvestor.com/collaborate