От zip до seq с помощью SPARK

Каждый день я получаю zip-архив «2018-06-26.zip» размером ок. Сжатый 250 Мб, содержащий 165-170 000 небольших файлов XML (КБ). Я загружаю zip-архив в HDFS (избегая проблемы с небольшими файлами) и использую SPARK для их извлечения из zip (zip-архивы не разделяются), создавая парный RDD с именем файла в качестве ключа и содержимым в качестве значения и сохраняю их как Sequence-файл через парный RDD. Все работает гладко с небольшим zip-архивом, содержащим всего 3 XML-файла для целей тестирования, но когда я загружаю его большой архив, я получаю

   java.lang.OutOfMemoryError: GC overhead limit exceeded
   at java.util.Arrays.copyOf(Arrays.java:2367)
   at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
   ...
   ...

Я использую Cloudera Quickstart VM: CDH 5.13.3 (HDFS: 2.60, JDK: 1.7.0.67, SPARK: 1.6.0, Scala 2.10)

Я еще не запускал его на полномасштабном кластере, так как хотел убедиться, что мой код верен, прежде чем развертывать его ...

Сборщик мусора продолжает работу OOM с превышением предела накладных расходов. Я знаю об увеличении объема памяти для драйвера и Java Heap Space, но подозреваю, что мой подход забирает слишком много памяти .... Мониторинг использования памяти не обнаруживает утечки памяти ...

Вот код:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip", 10).collect
   .foreach { zip_file : (String, PortableDataStream) =>
    val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
    var zip_entry : ZipEntry = null
    while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
      if (!zip_entry.isDirectory) {
        val key_file_name = zip_entry.getName
        val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
        xml_map += ( key_file_name -> value_file_content )
      }
      zip_stream.closeEntry()
    }
    zip_stream.close()
  }
val xml_rdd = sc.parallelize(xml_map.toSeq).saveAsSequenceFile("/user/cloudera/2018_06_26")

Любая помощь или идеи приветствуются.


person Dan Kjeldstrøm Hansen    schedule 10.09.2018    source источник
comment
Вы проверяли, когда возникает ошибка? После 1.000, 10.000 или 100.000 файлов? Когда вы распаковываете файл, сколько места он занимает?   -  person boje    schedule 10.09.2018
comment
Привет @boje, спасибо за комментарий .... я обнаружил, что виноват размер разделов (!) .... так что мое исправление заключалось в том, чтобы оптимизировать размер / количество разделов, изменив это число: sc.parallelize (xml_map.toSeq, 150) .saveAsSequenceFile (c: / temp / today), в результате получается 150 разделов каждый примерно по 18 Мбайт, или 50 разделов каждый примерно по 50 Мбайт, или что-то среднее между ними. Это зависит от индивидуальной задачи, размера торговли на скорость и т. Д.   -  person Dan Kjeldstrøm Hansen    schedule 11.09.2018


Ответы (1)


Мое окончательное решение:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import java.util.zip.{ZipEntry, ZipInputStream}
import org.apache.spark.input.PortableDataStream
import scala.collection.mutable
val conf = new SparkConf().setMaster("local").setAppName("ZipToSeq")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
var xml_map = new mutable.HashMap[String, String]()
sc.binaryFiles("/user/cloudera/test/2018-06-26.zip").collect
   .foreach { zip_file : (String, PortableDataStream) =>
   val zip_stream : ZipInputStream = new ZipInputStream(zip_file._2.open)
   var zip_entry : ZipEntry = null
   while ({zip_entry = zip_stream.getNextEntry; zip_entry != null}) {
      if (!zip_entry.isDirectory) {
      val key_file_name = zip_entry.getName
      val value_file_content = scala.io.Source.fromInputStream(zip_stream, "iso-8859-1").getLines.mkString("\n")
      xml_map += ( key_file_name -> value_file_content )
   }
   zip_stream.closeEntry()
  }
  zip_stream.close()
}
val xml_rdd = sc.parallelize(xml_map.toSeq, 75).saveAsSequenceFile("/user/cloudera/2018_06_26")

Исходный zip-файл 325 Мб, содержащий 170 000 файлов XML. В результате получается 75 разделов, каждый размером ок. 35 Мб. Всего ~ 2,5 Гб Время работы локально на моем ПК с Windows: 1,2 минуты :-)

person Dan Kjeldstrøm Hansen    schedule 11.09.2018