Мы разрабатываем IOT-приложение
Мы получаем следующий поток данных от каждого устройства, для которого мы хотим выполнить анализ,
[{"t":1481368346000,"sensors":[{"s":"s1","d":"+149.625"},{"s":"s2","d":"+23.062"},{"s":"s3","d":"+16.375"},{"s":"s4","d":"+235.937"},{"s":"s5","d":"+271.437"},{"s":"s6","d":"+265.937"},{"s":"s7","d":"+295.562"},{"s":"s8","d":"+301.687"}]}]
На начальном уровне я могу получить схему, используя искровой код Java, следующим образом:
root
|-- sensors: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- d: string (nullable = true)
| | |-- s: string (nullable = true)
|-- t: long (nullable = true)
Код, который я написал,
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
return message._2();
};
});
SQLContext sqlContext = spark.sqlContext();
json.foreachRDD(new VoidFunction<JavaRDD<String>>() {
@Override
public void call(JavaRDD<String> jsonRecord) throws Exception {
Dataset<Row> row = sqlContext.read().json(jsonRecord).toDF();
row.createOrReplaceTempView("MyTable");
row.printSchema();
row.show();
Dataset<Row> sensors = row.select("sensors");
sensors.createOrReplaceTempView("sensors");
sensors.printSchema();
sensors.show();
}
});
Это дает мне и ошибку как «org.apache.spark.sql.AnalysisException: невозможно разрешить« sensors
»данные входные столбцы: [];»
Я новичок в искре и аналитике и не могу найти хороший пример в java для разбора вложенного json.
То, чего я пытаюсь достичь, и, возможно, мне понадобятся предложения от экспертов, вот что:
Я собираюсь извлечь значение каждого датчика, а затем запустить регрессионный анализ, используя библиотеку sparkML sparkML. Это поможет мне узнать, какая тенденция наблюдается в каждом потоке датчиков, а также я хочу обнаружить сбои, используя эти данные.
Я не уверен, что это лучший способ сделать это, и любые рекомендации, ссылки и информация будут действительно полезны.