小编Ben*_*Kok的帖子

从Spark Sql中的字符串列表创建文字和列的数组

我试图在Scala中定义函数,将函数列表作为输入,并将它们转换为传递给下面代码中使用的dataframe数组参数的列.

val df = sc.parallelize(Array((1,1),(2,2),(3,3))).toDF("foo","bar")
val df2 = df
        .withColumn("columnArray",array(df("foo").cast("String"),df("bar").cast("String")))
        .withColumn("litArray",array(lit("foo"),lit("bar")))
Run Code Online (Sandbox Code Playgroud)

更具体地说,我想创建函数colFunction和litFunction(或者只是一个函数,如果可能的话),它将字符串列表作为输入参数,可以按如下方式使用:

val df = sc.parallelize(Array((1,1),(2,2),(3,3))).toDF("foo","bar")
val colString = List("foo","bar")
val df2 = df
         .withColumn("columnArray",array(colFunction(colString))
         .withColumn("litArray",array(litFunction(colString)))
Run Code Online (Sandbox Code Playgroud)

我已经尝试将colString映射到具有所有转换的列数组,但这不起作用.关于如何实现这一点的任何想法?非常感谢您阅读该问题以及任何建议/解决方案.

arrays scala apache-spark apache-spark-sql

8
推荐指数
1
解决办法
2万
查看次数

单Windows机器上的多个Spark Worker

我正在尝试在Windows上使用Intellij通过Scala自学Spark。我正在单台计算机上执行此操作,并且我想在单台计算机上启动多个工作程序来模拟集群。我在此页上读到说

“启动脚本当前不支持Windows。要在Windows上运行Spark集群,请手动启动master和worker。”

我不知道手工启动主人和工人意味着什么。有人可以帮忙吗?非常感谢您的帮助/建议。

scala cluster-computing apache-spark

5
推荐指数
1
解决办法
665
查看次数

Scala Tail Recursion java.lang.StackOverflowError

我正在迭代地查询一个名为txqueue的mysql表,它正在不断增长.

每个连续查询仅考虑在上一次迭代中执行查询后插入到txqueue表中的行.

为实现此目的,每个连续查询从表中选择主键(下例中的seqno字段)超过上一个查询中观察到的最大seqno的行.

以这种方式标识的任何新插入的行都将写入csv文件.

目的是使这个过程无限期地运行.

下面的尾递归函数可以正常工作,但过了一段时间它会遇到java.lang.StackOverflowError.每个迭代查询的结果包含两到三行,每秒左右返回一次结果.

关于如何避免java.lang.StackOverflowError的任何想法?

这实际上是可以/应该通过流媒体实现的吗?

非常感谢任何建议.

这是有效的代码:

object TXQImport {

  val driver = "com.mysql.jdbc.Driver"
  val url = "jdbc:mysql://mysqlserveraddress/mysqldb"
  val username = "username"
  val password = "password"
  var connection:Connection = null

  def txImportLoop(startID : BigDecimal) : Unit = {

      try {

        Class.forName(driver)
        connection = DriverManager.getConnection(url, username, password)
        val statement = connection.createStatement()
        val newMaxID = statement.executeQuery("SELECT max(seqno) as maxid from txqueue")

        val maxid = new Iterator[BigDecimal] {
          def hasNext = newMaxID.next()
          def next() = newMaxID.getBigDecimal(1)
        }.toStream.max

        val selectStatement = …
Run Code Online (Sandbox Code Playgroud)

scala tail-recursion

0
推荐指数
1
解决办法
248
查看次数