Scala — использование фильтра DF для нескольких полей

Я искал через stackoverflow уже несколько дней, и я просто не нашел ответа на следующий вопрос. Я действительно новичок в кодировании Scala, так что это может быть очень простой вопрос. Любая помощь будет высоко ценится.

Проблема, с которой я столкнулся (ошибка), связана с последним битом кода.
Я пытаюсь получить отфильтрованное подмножество записей из фрейма данных, где во всех отфильтрованных записях отсутствуют данные из одного или нескольких из указанные поля.

Я использую Scala IDE Build 4.7.0 в Eclipse.
В файле pom.xml, который я использую, есть spark-core_2.11, версия 2.0.0.

Спасибо.
Джесси

val source_path = args(0)
val source_file = args(1)

val vFile = sc.textFile(source_path + "/" + source_file)

val vSchema = StructType(
            StructField("FIELD_1",LongType,false)::
            StructField("FIELD_2",LongType,false)::
            StructField("FIELD_3",StringType,true)::
            StructField("FIELD_4",StringType,false)::
            StructField("FIELD_ADD_1",StringType,false)::
            StructField("FIELD_ADD_2",StringType,false)::
            StructField("FIELD_ADD_3",StringType,false)::
            StructField("FIELD_ADD_4",StringType,false)::
            StructField("FIELD_5",StringType,false)::
            StructField("FIELD_6",StringType,false)::
            StructField("FIELD_7",StringType,false)::
            StructField("FIELD_8",StringType,false)::
            Nil)

// val vRow = vFile.map(x=>x.split((char)30, -1)).map(x=> Row(
val vRow = vFile.map(x=>x.split("", -1)).map(x=> Row(
                            x(1).toLong,
                            x(2).toLong,
                            x(3).toString.trim(),
                            x(4).toString.trim(),
                            x(5).toString.trim(),
                            x(6).toString.trim(),
                            x(7).toString.trim(),
                            x(8).toString.trim(),
                            x(9).toString.trim(),
                            x(10).toString.trim(),
                            x(11).toString.trim(),
                            x(12).toString.trim()
                        ))

val dfData = sqlContext.createDataFrame(vRow.distinct(),vSchema)

val dfBlankRecords = dfData.filter(x => (
                    x.trim(col("FIELD_ADD_1")) == "" ||
                    x.trim(col("FIELD_ADD_2")) == "" ||
                    x.trim(col("FIELD_ADD_3")) == "" ||
                    x.trim(col("FIELD_ADD_4")) == ""
                ))

person Zugabo    schedule 21.06.2018    source источник
comment
Я бы добавил тег apache-spark для большей наглядности. В строке val vRow ... вы делаете x.split("", -1). Является ли пустая строка преднамеренной? Это разбивается на массив отдельных символов.   -  person Travis Hegner    schedule 21.06.2018
comment
Кроме того, какая версия искры? Если >= 1.6, есть лучшие методы для чтения текстовых файлов непосредственно в наборы данных.   -  person Travis Hegner    schedule 21.06.2018
comment
@TravisHegner, на самом деле между этими кавычками есть непечатаемый символ, который существует в качестве разделителя столбцов в файле. Я собирался попробовать использовать следующую строку, чтобы было немного понятнее, но пока не придумал, как ее правильно написать. val vSrcRow = vSrcFile.map(x=›x.split((char)30, -1)).map(x=›Row( Кроме того, согласно файлу pom.xml, который я использую вместе с .scala code, у меня есть spark-core_2.11, версия 2.0.0. Я был бы более чем рад увидеть лучший метод чтения текстовых файлов в наборы данных, если вы готовы поделиться.   -  person Zugabo    schedule 22.06.2018


Ответы (1)


Функции spark.read.* будут считывать данные непосредственно в Dataset/Dataframe API, избегая (в некоторой степени) необходимости в определениях схемы и вообще работая с RDD API.

val source_path = args(0)
val source_file = args(1)

val dfData = spark.read.textFile(source_path + "/" + source_file)
  .flatMap(l => {
    val a = l.split('\u001e'.toString, -1).map(_.trim())
    val f1 = a(0).toLong
    val f2 = a(1).toLong
    val Array(f3, f4, fa1, fa2, fa3, fa4, f5, f6, f7, f8) = a.slice(2,12)

    if (fa1 == "" ||
        fa2 == "" ||
        fa3 == "" ||
        fa4 == "") {
      Some(f1, f2, f3, f4, fa1, fa2, fa3, fa4, f5, f6, f7, f8)
    } else {
      None
    }
  }).toDF("FIELD_1", "FIELD_2", "FIELD_3", "FIELD_4",
          "FIELD_ADD_1", "FIELD_ADD_2", "FIELD_ADD_3", "FIELD_ADD_4",
          "FIELD_5", "FIELD_6", "FIELD_7", "FIELD_8")

Я думаю, что это приведет к тому, что вы хотите. Я уверен, что кто-то лучше меня мог бы оптимизировать это больше и с более кратким кодом.

Обратите внимание, что массив нулевой индексации, если вы намеренно выбирали определенные поля, вам придется их настроить. Я также не уверен, является ли '\u001e' (шестнадцатеричное значение 30) подходящим значением, которое вам нужно для вашей разделенной строки.

person Travis Hegner    schedule 22.06.2018
comment
это выглядит отлично. Я постараюсь включить его в остальную часть моего кода и дам вам знать, как это работает. Нужно ли определять только длинные поля? Прошу прощения, забыл указать базу персонажа. Я ищу HEX 1E или ASCII 30. Похоже, это тоже должно работать. Ваша помощь очень ценится!! - person Zugabo; 22.06.2018
comment
Без проблем. .split() уже возвращает Array[String], поэтому преобразование строковых полей не требуется. - person Travis Hegner; 22.06.2018
comment
Большое спасибо за информацию. Я адаптировал большую часть его к тому, что я пытаюсь сделать. Однако файл, который я пытаюсь использовать, довольно большой (в нем 268 столбцов). Итак, когда я помещаю их все в метод Some(), я получаю следующую ошибку: слишком много аргументов для применения метода: (x: A)Some[A] в объекте Some . Поскольку я не знаком с методом Some(), я не совсем уверен, как это исправить. Каков предел аргументов для этого метода? - person Zugabo; 25.06.2018
comment
Метод Some() на самом деле генерирует Option, который в основном представляет собой Iterable длины 1. Опция сглаживается при выполнении .flatMap(). Таким образом, параметры на самом деле представлены как Some[TupleX], однако длина TupleX ограничена 22 элементами (читай: Tuple2-Tuple22). - person Travis Hegner; 25.06.2018
comment
К сожалению, это решение вообще не будет работать с таким количеством столбцов. Можно ли оставить остальные столбцы как Strings? Если это так, вы можете выпустить Some[(Long, Long, Array[String])], и это будет более чистое решение. Если нет, то какие типы столбцов у вас есть? - person Travis Hegner; 25.06.2018
comment
Большое спасибо за Вашу помощь!! Мне удалось успешно использовать предложенное вами решение для одного из файлов, и я смог выяснить, как использовать мой первоначальный вариант для второго файла, сократив количество столбцов до тех, которые мне действительно нужны. Это раздутый файл, и не все данные были необходимы. - person Zugabo; 26.06.2018