Pan*_*ias 8 java cassandra apache-spark spring-data-cassandra
想象一下以下场景:Spark应用程序(Java实现)正在使用Cassandra数据库加载,转换为RDD并处理数据.该应用程序还从数据库中蒸出新数据,这些数据也由自定义接收器处理.流处理的输出存储在数据库中.该实现使用Spring Data Cassandra与数据库集成.
CassandraConfig:
@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {
@Autowired
private Environment env;
@Bean
public CassandraClusterFactoryBean cluster() {
CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));
return cluster;
}
@Bean
public CassandraMappingContext mappingContext() {
return new BasicCassandraMappingContext();
}
@Bean
public CassandraConverter converter() {
return new MappingCassandraConverter(mappingContext());
}
@Bean
public CassandraSessionFactoryBean session() throws Exception {
CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
session.setCluster(cluster().getObject());
session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
session.setConverter(converter());
session.setSchemaAction(SchemaAction.NONE);
return session;
}
@Bean
public CassandraOperations cassandraTemplate() throws Exception {
return new CassandraTemplate(session().getObject());
}
}
Run Code Online (Sandbox Code Playgroud)
DataProcessor.main方法:
// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);
while(pagingResults != null && !pagingResults.isEmpty()) {
Event lastEvent = pagingResults.get(pagingResults.size() - 1);
pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize page and add to the existing
rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}
// data processing
...
Run Code Online (Sandbox Code Playgroud)
预计初始加载会有大量数据.因此,数据在rddBuffer中进行分页,加载和分发.
还有以下选项:
我想知道Spark与Cassandra集成的最佳实践是什么.在我的实施中,最好的选择是什么?
Apache Spark 1.0.0,Apache Cassandra 2.0.8
与卡桑德拉和火花的最简便的方法是使用官方的开源卡桑德拉驱动程序由DataStax开发星火:https://github.com/datastax/spark-cassandra-connector
这个驱动程序是在Cassandra Java Driver之上构建的,它提供了Cassandra和Spark之间的直接桥梁.与Calliope不同,它不使用Hadoop接口.此外,它还提供以下独特功能:
| 归档时间: |
|
| 查看次数: |
8323 次 |
| 最近记录: |