我打算使用 apache flink 使用 flink 将数据读/写到 cassandra 中。我希望使用flink-connector-cassandra,我没有找到连接器的好的文档/示例。
您能否指出使用 Apache Flink 从 cassandra 读取和写入数据的正确方法。我只看到纯粹用于写入的接收器示例?apache flink 是否也用于从类似于 apache spark 的 cassandra 读取数据?
我已经看到了许多与此错误相关的答案,但是所有答案都重定向到了scala版本等。但是我认为我的情况有所不同。
我有一个使用2.10版设置的远程Spark主工作者集群。我能够通过列出所有工作程序节点的http:// master-ip:8080进行验证
在我的应用程序中,我尝试使用Java 7代码创建SparkConf。以下是代码
sparkConf = new SparkConf(true)
.set("spark.cassandra.connection.host", "localhost")
.set("spark.cassandra.auth.username", "username")
.set("spark.cassandra.auth.password", "pwd")
.set("spark.master", "spark://master-ip:7077")
.set("spark.app.name","Test App");
Run Code Online (Sandbox Code Playgroud)
以下是我添加的Maven依赖项
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.10</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
<exclusions>
<exclusion>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
</exclusion>
</exclusions>
</dependency>
Run Code Online (Sandbox Code Playgroud)
我收到以下错误
Caused by: java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at org.apache.spark.util.Utils$.getSystemProperties(Utils.scala:1710)
at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:73)
at org.apache.spark.SparkConf.<init>(SparkConf.scala:68)
Run Code Online (Sandbox Code Playgroud)
来自工作节点之一的Spark版本
./spark-shell --version
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8, Java …Run Code Online (Sandbox Code Playgroud) 我有雪花列,其中包含 yyyy-mm-dd hh:MM:ss 格式的日期。
我使用下面的函数
date_trunc('DAY', '2019-09-23 12:33:25')
Run Code Online (Sandbox Code Playgroud)
输出 :2019-09-23 00:00:00
根据文档预计:2019-09-23
这是一个错误还是有其他方法可以完全删除时间组件?
我们的应用程序(MapReduce 作业、微服务)完全在 AWS 上运行。
我们打算使用单一服务来查看(出于调试目的)、监控和警报(基于阈值的通知)日志。
与 AWS 本身提供的服务(本例中为 cloudwatch)相比,使用相扑逻辑等外部服务提供商有什么具体好处吗?
Datastax的cassandra的java驱动程序提供了Accessor.请参考这里
参考下面的例子,他们是分批进行分页和获取记录还是存在查询超时的风险?
@Accessor
public interface UserAccessor {
@Query("SELECT * FROM user")
Result<User> getAll();
}
Run Code Online (Sandbox Code Playgroud)
当我说分页时,他们会在内部做类似下面的事情
Statement stmt = new SimpleStatement("SELECT * FROM user");
stmt.setFetchSize(24);
ResultSet rs = session.execute(stmt);
Run Code Online (Sandbox Code Playgroud) 我希望停用 cassandra 节点,因为该节点需要操作系统升级。
我登录到该节点。执行退役命令,几分钟后我在控制台上收到此异常
nodetool decommission
error: Stream failed
-- StackTrace --
org.apache.cassandra.streaming.StreamException: Stream failed
at org.apache.cassandra.streaming.management.StreamEventJMXNotifier.onFailure(StreamEventJMXNotifier.java:85)
at com.google.common.util.concurrent.Futures$4.run(Futures.java:1172)
at com.google.common.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:297)
at com.google.common.util.concurrent.ExecutionList.executeListener(ExecutionList.java:156)
at com.google.common.util.concurrent.ExecutionList.execute(ExecutionList.java:145)
at com.google.common.util.concurrent.AbstractFuture.setException(AbstractFuture.java:202)
at org.apache.cassandra.streaming.StreamResultFuture.maybeComplete(StreamResultFuture.java:208)
at org.apache.cassandra.streaming.StreamResultFuture.handleSessionComplete(StreamResultFuture.java:184)
at org.apache.cassandra.streaming.StreamSession.closeSession(StreamSession.java:412)
at org.apache.cassandra.streaming.StreamSession.onError(StreamSession.java:507)
at org.apache.cassandra.streaming.StreamSession.start(StreamSession.java:229)
at org.apache.cassandra.streaming.StreamCoordinator$StreamSessionConnector.run(StreamCoordinator.java:208)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)
我不确定那个异常是什么意思,我打算通过nodetool netstats和nodetool status来验证退役是否成功,其输出在下面。在两个地方都说LEAVING,我如何确认它是否完整。
nodetool netstats
Mode: LEAVING
Not sending any streams.
Read Repair Statistics:
Attempted: 1
Mismatch (Blocking): 0
Mismatch (Background): 0
Pool Name Active Pending Completed
Commands n/a …Run Code Online (Sandbox Code Playgroud) cassandra datastax-enterprise nodetool datastax cassandra-2.1
在 cassandra 中,我的列类型之一是 timeuuid,其根据文档等效的 java 类型是 java.util.UUID
我的输入时间为 java.sql.Timestamp ,为了在实体类中设置它,我想转换java.sql.Timestamp -> java.util.UUID。
我有一个有 4 个分区的 kafka 主题,因为我有一个有 4 个消费者的消费者组。
我的目的是确保消息在分区之间平均分布。
有没有办法验证跨 kafka 主题分区的消息分布?
我有 1 个主节点,其中 3 个工作节点与主节点通信。
作为灾难恢复,我们创建了 2 个 Master,让 Zookeeper 选举 Master。我正在使用 datastax 的 spark Cassandra 连接器。有没有办法传递多个 Spark Master URL 以连续尝试成功。
new SparkConf(true)
.set("spark.cassandra.connection.host", "10.3.2.1")
.set("spark.cassandra.auth.username","cassandra")
.set("spark.cassandra.auth.password",cassandra"))
.set("spark.master", "spark://1.1.2.2:7077") // Can I give multiple Urls here?
.set("spark.app.name","Sample App");
Run Code Online (Sandbox Code Playgroud) 我有一个 Spring Boot 应用程序,我们将它部署为 AWS 上的 lambda 函数。是否有类似的方式将 Sprint 启动应用程序部署为 GCP 云功能?
对不起,如果这个问题很幼稚,但我无法从 GCP 文档中找到任何直接的方法。
cassandra ×4
apache-spark ×3
datastax ×2
java ×2
apache-flink ×1
apache-kafka ×1
maven ×1
nodetool ×1
scala ×1
snowflake-cloud-data-platform ×1
spring-boot ×1
sumologic ×1