想象一下以下场景:Spark应用程序(Java实现)正在使用Cassandra数据库加载,转换为RDD并处理数据.该应用程序还从数据库中蒸出新数据,这些数据也由自定义接收器处理.流处理的输出存储在数据库中.该实现使用Spring Data Cassandra与数据库集成.
CassandraConfig:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
@Autowired
private Environment env;
@Bean
public CassandraClusterFactoryBean cluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
return cluster;
}
@Bean
public CassandraMappingContext mappingContext() {
return new BasicCassandraMappingContext();
}
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
}
@Bean
public CassandraSessionFactoryBean session() throws Exception {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setCluster(cluster().getObject());
session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
session.setConverter(converter());
session.setSchemaAction(SchemaAction.NONE);
return session;
}
@Bean
public CassandraOperations cassandraTemplate() throws Exception { …Run Code Online (Sandbox Code Playgroud) 我有一些与使用Apache Spark进行实时分析相关的问题.提交Spark应用程序时,存储在Cassandra数据库中的数据将通过机器学习算法(支持向量机)加载和处理.在新数据到达时,通过Spark的流式传输扩展,它们将保留在数据库中,重新训练现有数据集并执行SVM算法.此过程的输出也存储在数据库中.