我知道如何在Spark SQL中编写UDF:
def belowThreshold(power: Int): Boolean = {
return power < -40
}
sqlContext.udf.register("belowThreshold", belowThreshold _)
Run Code Online (Sandbox Code Playgroud)
我可以做类似的定义聚合函数吗?这是怎么做到的?
对于上下文,我想运行以下SQL查询:
val aggDF = sqlContext.sql("""SELECT span, belowThreshold(opticalReceivePower), timestamp
FROM ifDF
WHERE opticalReceivePower IS NOT null
GROUP BY span, timestamp
ORDER BY span""")
Run Code Online (Sandbox Code Playgroud)
它应该返回类似的东西
Row(span1, false, T0)
我希望聚合函数告诉我opticalReceivePower在定义的组中是否有任何值span,timestamp哪些值低于阈值.我是否需要以不同的方式将UDAF写入上面粘贴的UDF?
scala aggregate-functions user-defined-functions apache-spark apache-spark-sql
每次将一个角色输入一个Text小部件时,我想获取该小部件的内容并从某个数字中减去它的长度(基本上是"你有x个字符剩下的"交易).
但StringVar()总是落后于一件事.这是我收集的内容,因为在将字符输入到Text小部件之前处理了该事件.这意味着如果我在该字段中有3个字符并且我输入第4个,StringVar则更新但仍然是3个字符长,然后当我输入第5个字符时它更新为4.
有没有办法让两者保持一致?
这是一些代码.我删除了不相关的部分.
def __init__(self, master):
self.char_count = StringVar()
self.char_count.set("140 chars left")
self.post_tweet = Text(self.master)
self.post_tweet.bind("<Key>", self.count)
self.post_tweet.grid(...)
self.char_count = Label(self.master, textvariable=self.foo)
self.char_count.grid(...)
def count(self):
self.x = len(self.post_tweet.get(1.0, END))
self.char_count.set(str(140 - self.x))
Run Code Online (Sandbox Code Playgroud) 我的星火公司的理解fileStream()方法是,它需要三种类型作为参数:Key,Value,和Format.在文本文件的情况下,适当的类型有:LongWritable,Text,和TextInputFormat.
首先,我想了解这些类型的性质.直觉上,我猜想Key在这种情况下,文件的行号是该行Value的文本.因此,在以下文本文件示例中:
Hello
Test
Another Test
Run Code Online (Sandbox Code Playgroud)
的第一行DStream会拥有Key的1(0?)和Value的Hello.
它是否正确?
我的问题的第二部分:我看了反编译的实现,ParquetInputFormat我发现了一些好奇的东西:
public class ParquetInputFormat<T>
extends FileInputFormat<Void, T> {
//...
public class TextInputFormat
extends FileInputFormat<LongWritable, Text>
implements JobConfigurable {
//...
Run Code Online (Sandbox Code Playgroud)
TextInputFormat延伸FileInputFormat的类型LongWritable和Text,而ParquetInputFormat扩展了同一类的类型Void和T.
这是否意味着我必须创建一个Value类来保存我的镶木地板数据的整行,然后将类型传递<Void, MyClass, …
我已经按照使用EMR控制台上的说明设置了SSH隧道ssh -i ~/SparkTest.pem -ND 8157 hadoop@ec2-52-1-245-67.compute-1.amazonaws.com。我还按照说明设置了FoxyProxy。
我可以在上访问Hadoop ResourceManager http://master-public-dns-name:8088/,并且可以看到我的应用程序正在运行。
单击任何应用程序的主URL或任何节点HTTP URL都将导致“问题加载页面”,其中SSH隧道提供以下输出channel 2: open failed: connect failed: Connection refused。
我应该改用YARN ResourceManager吗?在哪里可以找到该网址?设置代理和SSH隧道时,是否有某种配置步骤会以某种方式错过?
干杯。
我在EMR上运行Spark.我用以下内容提交EMR的步骤:
spark-submit --deploy-mode cluster --master yarn --num-executors 15 --executor-cores 3 --executor-memory 3G
尽管如此,我的资源管理器UI显示3个节点中的每个节点都有4个或6个YARN容器,每个容器有1个内核和3个内存.
每个节点都有16个内核和30G的RAM.
似乎YARN创建了尽可能多的1核/ 3GB容器,直到节点上的内存不足为止.这使得10多个核心未使用.
为什么Spark不尊重我的--executor-cores设定?
apache-spark ×4
hadoop ×2
scala ×2
amazon-ec2 ×1
amazon-emr ×1
emr ×1
hadoop-yarn ×1
hadoop2 ×1
python ×1
ssh ×1
text-widget ×1
tkinter ×1