Использование машинного обучения для анализа больших данных с помощью Spark

С развитием более дешевых хранилищ предприятия производят все больше и больше данных, связанных с их деятельностью. Компания может получить конкурентное преимущество, внедрив процессы для анализа огромного количества данных, которые она производит. Тем не менее, для эффективного использования таких данных требуется много вычислительной мощности.

Spark предлагает решение для компаний, которые хотят активно контролировать свой бизнес и принимать взвешенные решения. Возможности параллельной обработки Spark делают его хорошим кандидатом для более быстрой работы с крупномасштабными данными. Spark также включает библиотеку машинного обучения, что делает его полезным инструментом для ученых, работающих с большими данными.

В этой статье мы будем использовать набор данных журнала с веб-сайта потоковой передачи музыки (Sparkify), чтобы предсказать, уйдет ли пользователь. Для этого анализа мы будем использовать Spark с платформой облачных решений IBM Watson. Давайте посмотрим, как Spark работает с набором данных.

Исследование данных

Давайте начнем наш анализ, открыв сеанс Spark, импортировав наш набор данных и просмотрев данные, доступные в нашем наборе данных:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.json(cos.url('medium-sparkify-event-data.json', 'sparkify-donotdelete-pr-n3mm1l5ansd2bd'))
df.printSchema()
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)

Набор данных содержит 543 705строк, и быстрая проверка значений na показывает, что он не содержит естественных нулевых значений. Тем не менее есть строки с пустым userId. После удаления этих строк в наборе данных осталось 528 005  строк.

Взаимодействие пользователя на веб-сайте зависит от страниц, которые он посещает, поэтому давайте посмотрим, какие у нас есть разные типы страниц:

df.select(df.page).dropDuplicates().sort(["page"]).show()
+-------------------+
|                page|
+--------------------+
|               About|
|          Add Friend|
|     Add to Playlist|
|              Cancel|
|Cancellation Conf...|
|           Downgrade|
|               Error|
|                Help|
|                Home|
|              Logout|
|            NextSong|
|         Roll Advert|
|       Save Settings|
|            Settings|
|    Submit Downgrade|
|      Submit Upgrade|
|         Thumbs Down|
|           Thumbs Up|
|             Upgrade|
+--------------------+

Глядя на доступные страницы, мы можем определить отток как действие пользователя по посещению страницы «Подтверждение отмены». Используя эту информацию, давайте создадим столбец оттока:

churned_user=[x.userId for x in df.filter(df.page=='Cancellation Confirmation').select('userId').dropDuplicates().collect()]
df=df.withColumn('churn',df.userId.isin(churned_user).cast('Integer'
))

Каково распределение оттока?

Ниже мы видим, что распределение перераспределения оттока несбалансировано:

Обработка данных и разработка функций

Прежде чем приступить к созданию моделей машинного обучения, мы проведем некоторую обработку данных и разработку функций:

Демографические данные

  • пол: M или F мы закодировали как 0 или 1;
  • уровень: бесплатно или платно мы закодировали как 0 или 1;
  • isCA : 1, если пользователь находится в Калифорнии, и 0 в противном случае;
df_user=df_user.withColumn("isCA",get_localisation(df_user.location_level1).cast(IntegerType()))
    df_user=df_user.replace(["M", "F"], ["0", "1"], "gender")
    df_user=df_user.replace(["free", "paid"], ["0", "1"], "level")
    df_user=df_user.withColumn("gender",df_user.level.cast(IntegerType()))
    df_user=df_user.withColumn("level",df_user.level.cast(IntegerType()))

Взаимодействие пользователя с веб-сайтом

  • Среднее количество сыгранных песен за сеанс и общее количество сыгранных песен;
  • Среднее время между сеансами;
  • Среднее количество подъемов и опусканий за сеанс;
  • Время в часах между первым и последним сеансом;
#feature 2 - user's interactions
    # Average song played in one session and number of songs played
    df_user_inter=df.filter((df.page=='NextSong')) \
            .groupBy('userID','sessionId') \
            .agg(count('sessionId').alias('nb_song')) \
            .groupBy('userID') \
            .agg(round(mean('nb_song'),0).alias('average_nb_song'),sum('nb_song').alias('nb_song'))
    # average time  between sessions
    df_time_session=df.select('userID','sessionId','duree').dropDuplicates() \
            .groupBy('userID','duree') \
            .agg(count('sessionId').alias('number_of_session'))

    df_time_session=df_time_session.withColumn('avg_time_session',df_time_session.duree/df_time_session.number_of_session)
    # Tumps up/down and upvote/downvote
    df_user_inter_tUP=df.filter((df.page=='Thumbs Up')) \
            .groupBy('userID','sessionId') \
            .agg(count('sessionId').alias('step')) \
            .groupBy('userID') \
            .agg(round(mean('step'),0).alias('av_tumpup'))
    df_user_inter_tDOWN=df.filter((df.page=='Thumbs Down')) \
            .groupBy('userID','sessionId') \
            .agg(count('sessionId').alias('step')) \
            .groupBy('userID') \
            .agg(round(mean('step'),0).alias('av_tumpdown'))

Окончательный набор данных содержит следующие данные:

root
 |-- label: integer (nullable = true)
 |-- gender: integer (nullable = true)
 |-- level: integer (nullable = true)
 |-- isCA: integer (nullable = true)
 |-- average_nb_song: double (nullable = true)
 |-- av_tumpup: double (nullable = false)
 |-- av_tumpdown: double (nullable = false)
 |-- duree: double (nullable = true)
 |-- avg_time_session: double (nullable = true)
 |-- nb_song: long (nullable = true)

Давайте посмотрим, как эти функции связаны друг с другом:

df_pds = df_model.toPandas()[df_model.columns[1:]]
axs = pd.plotting.scatter_matrix(df_pds, figsize=(10, 10));
n = len(df_pds.columns)
for i in range(n):
    v = axs[i, 0]
    v.yaxis.label.set_rotation(0)
    v.yaxis.label.set_ha('right')
    v.set_yticks(())
    h = axs[n-1, i]
    h.xaxis.label.set_rotation(90)
    h.set_xticks(())

В завершение мы масштабировали функции с помощью standardScaler и объединили все функции в один столбец с помощью vectorAssembler.

assembler=VectorAssembler(inputCols=df_model.columns[1:],outputCol='features_int')
    scaler = StandardScaler(inputCol="features_int", outputCol="features", withStd=True)

Обучение и настройка наших моделей

Мы протестируем пять моделей на нашем наборе данных:

  • Логистическая регрессия;
  • Классификатор дерева решений;
  • Классификатор случайного леса;
  • Дерево повышения градиента; и
  • Наивный Байес.

Мы разделили наш набор данных на обучающий, тестовый и проверочный наборы:

train, test_valid=df_model.randomSplit([0.7, 0.3], seed=42)
    test, valid=test_valid.randomSplit([0.5, 0.5], seed=42)

Каждая модель будет реализована с использованием конвейера и CrossValidator для настройки параметров:

#Five model to test
    models={
        'Logistic Regression':LogisticRegression(maxIter=10),
        'Decission Tree Classifier':DecisionTreeClassifier(),
        'Random Forest Classifier':RandomForestClassifier(),
        'Gradient Boosted Tree':GBTClassifier(maxIter=10),
        'Naive Bayes':NaiveBayes()
    }
    
    #paramGrid for tuning
    paramGrid={
        'Logistic Regression':ParamGridBuilder() \
            .addGrid( models['Logistic Regression'].regParam, [0,0.01,0.001,0.0001]) \
            .addGrid(models['Logistic Regression'].family, ['binomial','multinomial']) \
            .build(),
        'Decission Tree Classifier':ParamGridBuilder()
             .addGrid(models['Decission Tree Classifier'].maxDepth, [2,  10, 30])
             .addGrid(models['Decission Tree Classifier'].maxBins, [10, 40, 100])
             .build(),
        'Random Forest Classifier': ParamGridBuilder() \
            .addGrid(models['Random Forest Classifier'].numTrees, [10, 30, 50]) \
            .addGrid(models['Random Forest Classifier'].maxDepth, [2, 10, 30]) \
            .build(),
        'Gradient Boosted Tree':ParamGridBuilder() \
            .addGrid(models['Gradient Boosted Tree'].maxBins, [10, 40, 100]) \
            .addGrid(models['Gradient Boosted Tree'].maxDepth, [2, 10, 30]) \
            .build(),
        'Naive Bayes':ParamGridBuilder() \
            .addGrid(models['Naive Bayes'].smoothing, [0.0,0.6,1.0])\
            .build()
    }
    
    trainned_model={}
    for mdl in models:
        print('Trainning model ' + mdl)
        print('...')
        pipeline = Pipeline(stages = [assembler,scaler, models[mdl]])
        crossval= CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid[mdl],
                          evaluator=MulticlassClassificationEvaluator(metricName='f1'),
                          numFolds=3)
        model_test = crossval.fit(train)

Точность и F-оценка пяти моделей приведены ниже:

Trainning model Logistic Regression
...
Logistic Regression metrics:
Train - Accuracy: 84.18% and F-1 Score: 82.71%
Test  - Accuracy: 78.12% and F-1 Score: 80.44%
Valid - Accuracy: 84.00% and F-1 Score: 82.81%
Trainning model Decission Tree Classifier
...
Decission Tree Classifier metrics:
Train - Accuracy: 87.66% and F-1 Score: 87.25%
Test  - Accuracy: 84.38% and F-1 Score: 81.25%
Valid - Accuracy: 84.00% and F-1 Score: 84.00%
Trainning model Random Forest Classifier
...
Random Forest Classifier metrics:
Train - Accuracy: 98.10% and F-1 Score: 98.09%
Test  - Accuracy: 87.50% and F-1 Score: 84.26%
Valid - Accuracy: 87.00% and F-1 Score: 86.53%
Trainning model Gradient Boosted Tree
...
Gradient Boosted Tree metrics:
Train - Accuracy: 90.19% and F-1 Score: 89.64%
Test  - Accuracy: 81.25% and F-1 Score: 81.25%
Valid - Accuracy: 87.00% and F-1 Score: 87.38%
Trainning model Naive Bayes
...
Naive Bayes metrics:
Train - Accuracy: 79.75% and F-1 Score: 73.28%
Test  - Accuracy: 65.62% and F-1 Score: 56.88%
Valid - Accuracy: 85.00% and F-1 Score: 80.27%

Классификатор случайного леса лучше работает на обучающих, тестовых и проверочных наборах, поэтому мы оставим его в качестве лучшей модели.

Spark позволил нам относительно быстро вычислить наши пять моделей машинного обучения в нашем большом наборе данных. Для специалиста по данным важно иметь базовые знания об этой технологии, поскольку все больше компаний готовы интегрировать ее в свои процессы.

Спасибо, что прочитали мою статью, и не стесняйтесь обращаться ко мне за любой вклад или вопрос.

Подробный код вы можете найти на моем Github.