Как анализировать сложные данные JSON в потоковой передаче искры в Java

Мы разрабатываем IOT-приложение

Мы получаем следующий поток данных от каждого устройства, для которого мы хотим выполнить анализ,

[{"t":1481368346000,"sensors":[{"s":"s1","d":"+149.625"},{"s":"s2","d":"+23.062"},{"s":"s3","d":"+16.375"},{"s":"s4","d":"+235.937"},{"s":"s5","d":"+271.437"},{"s":"s6","d":"+265.937"},{"s":"s7","d":"+295.562"},{"s":"s8","d":"+301.687"}]}]

На начальном уровне я могу получить схему, используя искровой код Java, следующим образом:

    root
     |-- sensors: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- d: string (nullable = true)
     |    |    |-- s: string (nullable = true)
     |-- t: long (nullable = true)

Код, который я написал,

    JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
        public String call(Tuple2<String,String> message) throws Exception {
            return message._2();
        };
    });

    SQLContext sqlContext = spark.sqlContext();
    json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> jsonRecord) throws Exception {

            Dataset<Row> row = sqlContext.read().json(jsonRecord).toDF();
            row.createOrReplaceTempView("MyTable");
            row.printSchema();
            row.show();

            Dataset<Row> sensors = row.select("sensors");
            sensors.createOrReplaceTempView("sensors");
            sensors.printSchema();
            sensors.show();

        }
    });

Это дает мне и ошибку как «org.apache.spark.sql.AnalysisException: невозможно разрешить« sensors »данные входные столбцы: [];»

Я новичок в искре и аналитике и не могу найти хороший пример в java для разбора вложенного json.

То, чего я пытаюсь достичь, и, возможно, мне понадобятся предложения от экспертов, вот что:

Я собираюсь извлечь значение каждого датчика, а затем запустить регрессионный анализ, используя библиотеку sparkML sparkML. Это поможет мне узнать, какая тенденция наблюдается в каждом потоке датчиков, а также я хочу обнаружить сбои, используя эти данные.

Я не уверен, что это лучший способ сделать это, и любые рекомендации, ссылки и информация будут действительно полезны.


person Rahul Borkar    schedule 12.12.2016    source источник


Ответы (1)


Вот как должен выглядеть ваш json.foreachRDD.

json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> rdd) {
            if(!rdd.isEmpty()){
                Dataset<Row> data = spark.read().json(rdd).select("sensors");
                data.printSchema();
                data.show(false);
                //DF in table
                Dataset<Row> df = data.select( org.apache.spark.sql.functions.explode(org.apache.spark.sql.functions.col("sensors"))).toDF("sensors").select("sensors.s","sensors.d");
                df.show(false);
            }
        }
    });

Образец регрессионного анализа можно найти по ссылке JavaRandomForestRegressorExample.java по адресу https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomForestRegressorExample..java

Для анализа данных в реальном времени с использованием искрового машинного обучения и искровой потоковой передачи вы можете обратиться к статьям ниже.

person abaghel    schedule 12.12.2016
comment
Большое спасибо за вашу помощь и указание мне в правильном направлении. Теперь, используя это, я могу извлечь одно значение. Как вы знаете, есть несколько датчиков и несколько потоков. Таким образом, будут поступать тысячи потоков данных датчиков. Не могли бы вы порекомендовать способ обучения различных моделей для каждого датчика, а затем продолжать выполнять прогнозы.... - person Rahul Borkar; 13.12.2016
comment
Я отредактировал свой ответ выше и добавил несколько ссылок для анализа в реальном времени с использованием Spark ML и Streaming. - person abaghel; 13.12.2016