DJE*_*bow 9 mysql singlestore apache-spark spark-dataframe
使用Spark 1.4.0,我试图使用insertIntoJdbc()将数据从Spark DataFrame插入到MemSQL数据库中(应该与MySQL数据库交互).但是我一直得到Runtime TableAlreadyExists异常.
首先,我创建这样的MemSQL表:
CREATE TABLE IF NOT EXISTS table1 (id INT AUTO_INCREMENT PRIMARY KEY, val INT);
Run Code Online (Sandbox Code Playgroud)
然后我在Spark中创建一个简单的数据框,并尝试像这样插入到MemSQL中:
val df = sc.parallelize(Array(123,234)).toDF.toDF("val")
//df: org.apache.spark.sql.DataFrame = [val: int]
df.insertIntoJDBC("jdbc:mysql://172.17.01:3306/test?user=root", "table1", false)
java.lang.RuntimeException: Table table1 already exists.
Run Code Online (Sandbox Code Playgroud)
这个解决方案适用于一般的JDBC连接,尽管@wayne的答案可能是一个特别适用于memSQL的更好的解决方案.
从1.4.0开始,insertIntoJdbc似乎已被弃用,并且使用它实际上调用了write.jdbc().
write()返回一个DataFrameWriter对象.如果要将数据附加到表中,则必须将对象的保存模式更改为"append".
上述问题中的示例的另一个问题是DataFrame架构与目标表的架构不匹配.
下面的代码给出了Spark shell的一个工作示例.我正在使用spark-shell --driver-class-path mysql-connector-java-5.1.36-bin.jar启动我的spark-shell会话.
import java.util.Properties
val prop = new Properties()
prop.put("user", "root")
prop.put("password", "")
val df = sc.parallelize(Array((1,234), (2,1233))).toDF.toDF("id", "val")
val dfWriter = df.write.mode("append")
dfWriter.jdbc("jdbc:mysql://172.17.01:3306/test", "table1", prop)
Run Code Online (Sandbox Code Playgroud)
| 归档时间: |
|
| 查看次数: |
6881 次 |
| 最近记录: |