Spark: разделить CSV с новыми строками в поле потока октетов

Я использую Scala для анализа файлов CSV. Некоторые из этих файлов имеют поля, которые не являются текстовыми данными, такими как изображения или потоки октетов. Я хотел бы использовать метод Apache Spark textFile() для разделения CSV на строки и

split(",[ ]*(?=([^\"]*\"[^\"]*\")*[^\"]*$)")

чтобы разделить строку на поля. К сожалению, это не работает с файлами, в которых есть упомянутые бинарные поля. Есть две проблемы: 1) потоки октетов могут содержать новые строки, которые делают textFile() разделенных строк, которые должны быть одной, и 2) потоки октетов содержат запятые и/или двойные кавычки, которые не экранированы и портят мою схему.

Файлы обычно большие, от пары МБ до пары 100 МБ. Я должен взять CSV как есть, хотя я мог бы их предварительно обработать.

Все, чего я хочу добиться, - это работающая функция разделения, чтобы я мог игнорировать поле с потоком октетов. Тем не менее, большим бонусом было бы извлечение текстовой информации в потоке октетов.

Итак, как мне двигаться дальше, чтобы решить свои проблемы?

Изменить: типичная запись, полученная с помощью cat, новые строки взяты из файла, а не для косметических целей (сокращено):

7,url,user,02/24/2015 02:29:00 AM,03/22/2015 03:12:36 PM,octet-stream,27156,"MSCF^@^@^@^@�,^@^@^@^@^@^@D^@^@^@^@^@^@^@^C^A^A^@^C^@^D^@^@^@^@^@^T^@^@^@^@^@^P^@�,^@^@^X=^@^@^@^@^@^@^@^@^@^@�^@^@^@^E^@^A^@��^A^@^@^@^@^@^@^@WF6�!^@Info.txt^@=^B^@^@��^A^@^@^@WF7�^@^@List.xml^@^�^@^@��^A^@^@^@WF:�^@^@Filename.txt^@��>��
^@�CK�]�r��^Q��T�^O�^@�-�j�]��FI�Ky��Ei�Je^K""!�^Qx     @�*^U^?�^_�;��ħ�^LI^@$(�^Q���b��\N����t�����+������ȷgvM�^L̽�LǴL�^L��^ER��w^Ui^M��^X�Kޓ�^QJȧ��^N~��&�x�bB��D]1�^B|^G���g^SyG�����:����^_P�^T�^_�����U�|B�gH=��%Z^NY���,^U�^VI{��^S�^U�!�^Lpw�T���+�a�z�l������b����w^K��or��pH�   ��ܞ�l��z�^\i=�z�:^C�^S!_ESCW��ESC""��g^NY2��s�� u���X^?�^R^R+��b^]^Ro�r���^AR�h�^D��^X^M�^]ޫ���ܰ�^]���0^?��^]�92^GhCx�DN^?
mY<{��L^Zk�^\���M�^V^HE���-Ե�$f�f����^D�e�^R:�u����� ^E^A�Ȑ�^B�^E�sZ���Yo��8Eސ�}��&JY���^A9^P������^P����~Jʭy��`�^9«�""�U�      �:�}3���6�Hߧ�v���A7^Xi^L^]�sA�^Q�7�5d�^Xo˛�tY
Bp��4�Y���7DkV_���\^_q~�w�|�a�s̆���#�g�ӳu�^�!W}�n��Rgż_2�]�p�2}��b�G9�M^Q
�����:�X����bR[ԳZV!^G����^U�tq�&�Y6b��GR���s#mn6Z=^ZH^]�b��R^G�C�0R��{r1��4�#�
=r/X2�^O�����r^M�Rȕ�goG^X-����}���P+˥Qf�#��^C�Բ�z1�I�j����6�^Np���ܯ^P�[�^Tzԏ���^F2�e��\�E�߻6c�%���$�:E�*�*©t�y�J�,�S�2U�S�^X}ME�]��]�i��G�su�""��!�-��!r'ܷe_et Y^K^?0���l^A��^^�m�1/q����|�_r�5$�%�([x��W^E�G^^y���@����Z2^?ڠ�^_��^AҶ�OO��^]�vq%:j�^?�jX��\�]����^S�^^n�^C��>.^CY^O-� �_�\K����:p�<7Sֺnj���-Yk�r���^Q^M�n�J^B��^Z0^?�(^C��^W³!�g�Z�~R�A^M�^O^^�%;��Ԗ�p^S�w���*m^S���jڒ|�����<�^S�;Z^^Fc�1���^O�G_o����8��CS���w��^?��n�2~��m���G;��rx4�(�]�'��^E���eƧ�x��.�w�9WO�^^�י3��0,�y��H�Y�.H�x�""'���h}灢^T�Gm;^XE�̼�J��c�^^????;�^A�qZ1ׁBZ^Q�^A^FB�^QbQ�_�3|ƺ�EvZ���^S�w���^P���9^MT��ǩY[+�+�9�Ԩ�^O�^Q���Fy(+�9p�^^Mj�2��Y^?��ڞ��^Ķb�^Z�ψMр}�ڣ�^^S�^?��^U�^Wڻ����z�^@��uk��k^^�>^O�^W�ݤO�h�^G�����Kˇ�.�R|�)-��e^G�^]�/J����U�ϴ�a���i5HO�^L�ESCg�R'���.����d���+~�}��ڝ^Y5]l�3jg54M�������2t�5^Y}�q)��^O;�X\�q^Ox~Vۗ�t�^\f�       >k;^G�K5��,��X�t/�ǧ^G""5��4^MiΟ�n��^B^]�|�����V��ߌ֗Q~�H���8��t��5��ܗ�
�Z�^c�6N�ESCG����^_��>��t^L^R�^:�x���^]v�{^@+KM��qԎ�.^S�%&��=^W-�=�^S�����^CI���&^]_�s�˞�y�z�Jc^W�kڠ�^\��^]j�����^O��;�oY^^�^V59;�c��^B��T�nb����^C��^N��s�x�<{�9-�F�T�^N�5�^Se-���^T�Y[���`^ZsL��v�բ<C�+�~�^ۚ��""�Yκ2^_�^VxT�>��/ݳ^U�m�^@���3^Ge�n^Vc�V�^@�NVn�,�q��^^^]gy�R�S��Ȃ$���>A�d����xg�^GB3�M�J�^QJ^]�^\�{.�D��碎�^W�8a����qޠl?,'^R�^X�Cgy�P[����mڞ��H�Z�s�SD&蠤�s�E��nu�O@O<��3wj`C-%w�W�J�^WP^T�^]r^NT�TC�Lq�Z�f�!�;�l�Y��Gb��>�ud�hx�Ԭ^N)9�^N!k�҉s�35v������.�""^]��~4������۴�Z^]u�^Ti^^�i:�)K��P᳕!�@�^?�>��EE^VE-u�^SgV^L��<��^D�O<�+�J.�c�Z#>�.l����^S� 
ESC��(��E�j�π쬖���2{^U&b\��P^S�`^O^XdL�^ 6bu��FD��^@^@^@^@","field_x, data",field_y,field_z

Ожидаемый результат будет массивом

("7","url","user","02/24/2015 02:29:00 AM","03/22/2015 03:12:36 PM","octet-stream","27156","field_x, data",field_y",field_z")

Или, но это, вероятно, другой вопрос, такой массив (например, запуск strings в поле октета-потока):

("7","url","user","02/24/2015 02:29:00 AM","03/22/2015 03:12:36 PM","octet-stream","27156","Info.txt List.xml Filename.txt","field_x, data",field_y",field_z")

Редактировать 2: каждый файл с двоичным полем также содержит для него поле длины. Таким образом, вместо прямого разделения я могу пройти слева направо по своей записи и извлечь поля. Это, безусловно, большое улучшение моей текущей ситуации, но проблема 1) все еще сохраняется. Как я могу надежно разделить эти файлы?

Я внимательно посмотрел на файлы, и заголовок выглядит так:

RecordId, Field_A, Content_Type, Content_Length, Content, Field_B

(Где Content_Type может быть «октетным потоком», Content_Length — количеством байтов в поле Content, а Content — очевидно, данными). И хорошо для меня, значение Field_B предсказуемо, давайте предположим, что для определенного файла это всегда «Hello World».

Итак, вместо того, чтобы использовать разделение поведения Spark по умолчанию на новые строки, как я могу добиться того, чтобы Spark разделял только новые строки после «Hello World»? (Я также отредактировал заголовок вопроса, так как фокус вопроса изменился)


person flowit    schedule 18.11.2015    source источник
comment
Вы можете начать с предоставления примеров данных и ожидаемых результатов. Некоторые детали, такие как размер отдельных файлов, могут быть полезны. Если вы не можете понять, в чем проблема, когда у вас есть доступ к данным, как мы можем?   -  person zero323    schedule 18.11.2015
comment
Что такое 27156? Длина потока может быть?   -  person Nyavro    schedule 18.11.2015
comment
Да, это. Я мог бы прочитать весь файл с самого начала, пока не достигну этого числа, и прочитать следующие x байтов как поток октетов, но я бы отказался от параллелизма, который предлагает мне искра.   -  person flowit    schedule 18.11.2015
comment
Вы уверены, что .csv действительно можно разобрать? Если это так, то одно из следующих условий, по-видимому, обязательно преобладает: 1) все разделители в потоке октетов экранированы (например, каждому предшествует обратная косая черта или другой экранирующий символ), или 2) поток октетов имеет заголовок поле, которое указывает его длину (возможно, предыдущий столбец, 27156, как предложил Ньявро).   -  person philwalk    schedule 18.11.2015
comment
Хотя для варианта 2) потребуются некоторые метаданные для указания типов столбцов (возможно, первая строка содержит имена столбцов с неявными типами столбцов?)   -  person philwalk    schedule 18.11.2015
comment
Пожалуйста, смотрите Edit 2, спасибо, это действительно очень помогло.   -  person flowit    schedule 18.11.2015


Ответы (2)


Как указано в Spark: чтение файлов с использованием разделителя, отличного от новой строки, я использовал textinputformat.record.delimiter для разделения на "Hello World\n", потому что мне немного повезло, что последний столбец всегда содержит одно и то же значение. После этого я просто прохожу слева направо по записи, и когда я достигаю поля длины, я пропускаю следующие n байтов. Теперь все работает. Спасибо, что указали мне правильное направление.

person flowit    schedule 22.12.2015

Есть две проблемы: 1) потоки октетов могут содержать символы новой строки, из-за которых textFile() разделяет строки, которые должны быть равными одной, и 2) потоки октетов содержат запятые и/или двойные кавычки, которые не экранированы и портят мою схему.

Что ж, на самом деле этот CSV-файл экранирован должным образом:

  • многострочное поле заключено в двойные кавычки: "MSCF^@ .. ^@^@" (что также обрабатывает возможные разделители внутри поля)
  • двойные кавычки внутри поля экранируются другой двойной кавычкой, как и должно быть: Je^K""!

Конечно, простое разбиение в этом случае не сработает (и никогда не должно использоваться для данных csv), но любой считыватель csv, способный обрабатывать многострочные поля, должен правильно анализировать эти данные.

Также имейте в виду, что двойные кавычки внутри потока октетов должны быть неэкранированы, иначе эти данные будут недействительными (еще одна причина не использовать разделение, а средство чтения csv, которое обрабатывает это).

person Danny_ds    schedule 28.03.2018