小编Ror*_*rne的帖子

如何在Spark SQL中定义和使用用户定义的聚合函数?

我知道如何在Spark SQL中编写UDF:

def belowThreshold(power: Int): Boolean = {
        return power < -40
      }

sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)

我可以做类似的定义聚合函数吗?这是怎么做到的?

对于上下文,我想运行以下SQL查询:

val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
                                    FROM ifDF
                                    WHERE opticalReceivePower IS NOT null
                                    GROUP BY span, timestamp
                                    ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)

它应该返回类似的东西

Row(span1, false, T0)

我希望聚合函数告诉我opticalReceivePower在定义的组中是否有任何值span,timestamp哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?

scala aggregate-functions user-defined-functions apache-spark apache-spark-sql

37
推荐指数
1
解决办法
3万
查看次数

Tkinter:在<Key>事件后设置StringVar,包括按下的键

每次将一个角色输入一个Text小部件时,我想获取该小部件的内容并从某个数字中减去它的长度(基本上是"你有x个字符剩下的"交易).

StringVar()总是落后于一件事.这是我收集的内容,因为在将字符输入到Text小部件之前处理了该事件.这意味着如果我在该字段中有3个字符并且我输入第4个,StringVar则更新但仍然是3个字符长,然后当我输入第5个字符时它更新为4.

有没有办法让两者保持一致?

这是一些代码.我删除了不相关的部分.

def __init__(self, master):
    self.char_count = StringVar()
    self.char_count.set("140 chars left")

    self.post_tweet = Text(self.master)
    self.post_tweet.bind("<Key>", self.count)
    self.post_tweet.grid(...)

    self.char_count = Label(self.master, textvariable=self.foo)
    self.char_count.grid(...)

def count(self):
    self.x = len(self.post_tweet.get(1.0, END))
    self.char_count.set(str(140 - self.x))
Run Code Online (Sandbox Code Playgroud)

python tkinter text-widget

6
推荐指数
1
解决办法
2580
查看次数

如何使用`ssc.fileStream()`读取镶木地板文件?传递给`ssc.fileStream()`的类型是什么?

我的星火公司的理解fileStream()方法是,它需要三种类型作为参数:Key,Value,和Format.在文本文件的情况下,适当的类型有:LongWritable,Text,和TextInputFormat.

首先,我想了解这些类型的性质.直觉上,我猜想Key在这种情况下,文件的行号是该行Value的文本.因此,在以下文本文件示例中:

Hello
Test
Another Test
Run Code Online (Sandbox Code Playgroud)

的第一行DStream会拥有Key1(0?)和ValueHello.

它是否正确?


我的问题的第二部分:我看了反编译的实现,ParquetInputFormat我发现了一些好奇的东西:

public class ParquetInputFormat<T>
       extends FileInputFormat<Void, T> {
//...

public class TextInputFormat
       extends FileInputFormat<LongWritable, Text>
       implements JobConfigurable {
//...
Run Code Online (Sandbox Code Playgroud)

TextInputFormat延伸FileInputFormat的类型LongWritableText,而ParquetInputFormat扩展了同一类的类型VoidT.

这是否意味着我必须创建一个Value类来保存我的镶木地板数据的整行,然后将类型传递<Void, MyClass, …

hadoop scala apache-spark hadoop2 spark-streaming

6
推荐指数
1
解决办法
3038
查看次数

无法连接到EMR上的Spark UI

我已经按照使用EMR控制台上的说明设置了SSH隧道ssh -i ~/SparkTest.pem -ND 8157 hadoop@ec2-52-1-245-67.compute-1.amazonaws.com。我还按照说明设置了FoxyProxy。

我可以在上访问Hadoop ResourceManager http://master-public-dns-name:8088/,并且可以看到我的应用程序正在运行。

单击任何应用程序的主URL或任何节点HTTP URL都将导致“问题加载页面”,其中SSH隧道提供以下输出channel 2: open failed: connect failed: Connection refused

我应该改用YARN ResourceManager吗?在哪里可以找到该网址?设置代理和SSH隧道时,是否有某种配置步骤会以某种方式错过?

干杯。

ssh hadoop amazon-ec2 amazon-emr apache-spark

5
推荐指数
1
解决办法
636
查看次数

尽管--num-executors设置,但每个执行程序在Spark Yarn模式下只有1个核心

我在EMR上运行Spark.我用以下内容提交EMR的步骤:

spark-submit --deploy-mode cluster --master yarn --num-executors 15 --executor-cores 3 --executor-memory 3G

尽管如此,我的资源管理器UI显示3个节点中的每个节点都有4个或6个YARN容器,每个容器有1个内核和3个内存.

每个节点都有16个内核和30G的RAM.

似乎YARN创建了尽可能多的1核/ 3GB容器,直到节点上的内存不足为止.这使得10多个核心未使用.

为什么Spark不尊重我的--executor-cores设定?

emr hadoop-yarn apache-spark spark-streaming

5
推荐指数
0
解决办法
811
查看次数