У меня проблема с получением данных из таблицы Cassandra с помощью sparkCassandraConnector. Я создал пространство имен «ks» и таблицу «student» в Cassandra. таблица выглядит следующим образом:
id | имя
----+-----------
10 | Екатерина
Я запустил Spark локально, запустив start-all.sh
Затем я создал класс «SparkCassandraConnector», в котором есть команда для соединения искры и Кассандры. Я пытаюсь извлечь данные из таблицы учеников и распечатать их на экране.
Я получаю сообщение об ошибке "java.lang.ClassNotFoundException: SparkCassandraConnector $ Student java.net.URLClassLoader $ 1.run (URLClassLoader.java:372) java.net.URLClassLoader $ 1.run (URLClassLoader.javasecurity:361.A java. .doPrivileged (собственный метод) java.net.URLClassLoader.findClass (URLClassLoader.java:360) java.lang.ClassLoader.loadClass (ClassLoader.java:424) java.lang.ClassLoader.loadClass (ClassLoader.java .:357) java. lang.Class.forName0 (собственный метод) java.lang.Class.forName (Class.java:340)
Это моя программа:
import org.apache.commons.lang.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import java.io.Serializable;
import static com.datastax.spark.connector.CassandraJavaUtil.javaFunctions;
public class SparkCassandraConnector implements Serializable {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Simple Application");
conf.setMaster("spark://127.0.0.1:7077");
conf.set("spark.cassandra.connection.host", "127.0.0.1");
String[] jars = new String[10];
jars[0] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector-java_2.10/1.1.0-alpha4/spark-cassandra-connector-java_2.10-1.1.0-alpha4.jar";
jars[1] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
jars[3] = "~/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.1.0-alpha4/spark-cassandra-connector_2.10-1.1.0-alpha4.jar";
jars[4] = "~/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.0/cassandra-driver-core-2.1.0.jar";
jars[5] = "~/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.0/cassandra-thrift-2.1.0.jar";
jars[6] = "~/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.0/cassandra-clientutil-2.1.0.jar";
conf = conf.setJars(jars);
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("ks", "student", Student.class)
.map(new org.apache.spark.api.java.function.Function<Student, String>() {
@Override
public String call(Student person) throws Exception {
return person.toString();
}
});
System.out.println("Data as Person beans: \n" + StringUtils.join(rdd.collect(), "\n"));
}
public static class Student implements Serializable{
private Integer id;
private String name;
public Student(){
}
public Student(Integer id, String name) {
this.id = id;
this.name = name;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
}
это мой файл POM:
<dependencies>
<!--Spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>1.1.0-alpha4</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector-java_2.10</artifactId>
<version>1.1.0-alpha4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_2.10</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>