我试图在Scala中定义函数,将函数列表作为输入,并将它们转换为传递给下面代码中使用的dataframe数组参数的列.
val df = sc.parallelize(Array((1,1),(2,2),(3,3))).toDF("foo","bar")
val df2 = df
.withColumn("columnArray",array(df("foo").cast("String"),df("bar").cast("String")))
.withColumn("litArray",array(lit("foo"),lit("bar")))
Run Code Online (Sandbox Code Playgroud)
更具体地说,我想创建函数colFunction和litFunction(或者只是一个函数,如果可能的话),它将字符串列表作为输入参数,可以按如下方式使用:
val df = sc.parallelize(Array((1,1),(2,2),(3,3))).toDF("foo","bar")
val colString = List("foo","bar")
val df2 = df
.withColumn("columnArray",array(colFunction(colString))
.withColumn("litArray",array(litFunction(colString)))
Run Code Online (Sandbox Code Playgroud)
我已经尝试将colString映射到具有所有转换的列数组,但这不起作用.关于如何实现这一点的任何想法?非常感谢您阅读该问题以及任何建议/解决方案.
我正在尝试在Windows上使用Intellij通过Scala自学Spark。我正在单台计算机上执行此操作,并且我想在单台计算机上启动多个工作程序来模拟集群。我在此页上读到说
“启动脚本当前不支持Windows。要在Windows上运行Spark集群,请手动启动master和worker。”
我不知道手工启动主人和工人意味着什么。有人可以帮忙吗?非常感谢您的帮助/建议。
我正在迭代地查询一个名为txqueue的mysql表,它正在不断增长.
每个连续查询仅考虑在上一次迭代中执行查询后插入到txqueue表中的行.
为实现此目的,每个连续查询从表中选择主键(下例中的seqno字段)超过上一个查询中观察到的最大seqno的行.
以这种方式标识的任何新插入的行都将写入csv文件.
目的是使这个过程无限期地运行.
下面的尾递归函数可以正常工作,但过了一段时间它会遇到java.lang.StackOverflowError.每个迭代查询的结果包含两到三行,每秒左右返回一次结果.
关于如何避免java.lang.StackOverflowError的任何想法?
这实际上是可以/应该通过流媒体实现的吗?
非常感谢任何建议.
这是有效的代码:
object TXQImport {
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://mysqlserveraddress/mysqldb"
val username = "username"
val password = "password"
var connection:Connection = null
def txImportLoop(startID : BigDecimal) : Unit = {
try {
Class.forName(driver)
connection = DriverManager.getConnection(url, username, password)
val statement = connection.createStatement()
val newMaxID = statement.executeQuery("SELECT max(seqno) as maxid from txqueue")
val maxid = new Iterator[BigDecimal] {
def hasNext = newMaxID.next()
def next() = newMaxID.getBigDecimal(1)
}.toStream.max
val selectStatement = …Run Code Online (Sandbox Code Playgroud)