在我的spark-shell中,当我执行一个函数时,下面的条目是什么意思?
[Stage7:===========> (14174 + 5) / 62500]
Run Code Online (Sandbox Code Playgroud) Python有一个标志-O,你可以用它来执行解释器.该选项将生成"优化"字节码(写入.pyo文件),并给出两次,它将丢弃文档字符串.从Python的手册页:
-O启用基本优化.这会将已编译(字节码)文件的文件扩展名从.pyc更改为.pyo.给定两次,导致文档字符串被丢弃.
我认为这个选项的两个主要特点是:
删除所有断言语句.为了速度,这可以防止腐败的程序状态.但是,你不需要大量的断言声明才能有所作为吗?你有任何值得的代码(并且理智吗?)
剥离所有文档字符串.什么应用程序的内存使用如此关键,这是一个胜利?为什么不将所有内容都推入用C编写的模块?
这个选项有什么用?它有真实世界的价值吗?
此命令适用于HiveQL:
insert overwrite directory '/data/home.csv' select * from testtable;
Run Code Online (Sandbox Code Playgroud)
但是使用Spark SQL我收到了一个org.apache.spark.sql.hive.HiveQl堆栈跟踪错误:
java.lang.RuntimeException: Unsupported language features in query:
insert overwrite directory '/data/home.csv' select * from testtable
Run Code Online (Sandbox Code Playgroud)
请指导我在Spark SQL中编写导出到CSV功能.
我已经尝试升级到Apache Spark 1.6.0 RC3.我的应用程序现在几乎每个任务都会发现这些错误:
Managed memory leak detected; size = 15735058 bytes, TID = 830
Run Code Online (Sandbox Code Playgroud)
我已经设置日志记录级别org.apache.spark.memory.TaskMemoryManager来DEBUG看看在日志中:
I2015-12-18 16:54:41,125 TaskSetManager: Starting task 0.0 in stage 7.0 (TID 6, localhost, partition 0,NODE_LOCAL, 3026 bytes)
I2015-12-18 16:54:41,125 Executor: Running task 0.0 in stage 7.0 (TID 6)
I2015-12-18 16:54:41,130 ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
I2015-12-18 16:54:41,130 ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
D2015-12-18 16:54:41,188 TaskMemoryManager: Task 6 acquire 5.0 MB for null
I2015-12-18 16:54:41,199 …Run Code Online (Sandbox Code Playgroud) 我刚刚创建了python列表range(1,100000).
使用SparkContext完成以下步骤:
a = sc.parallelize([i for i in range(1, 100000)])
b = sc.parallelize([i for i in range(1, 100000)])
c = a.zip(b)
>>> [(1, 1), (2, 2), -----]
sum = sc.accumulator(0)
c.foreach(lambda (x, y): life.add((y-x)))
Run Code Online (Sandbox Code Playgroud)
其中发出如下警告:
ARN TaskSetManager:阶段3包含一个非常大的任务(4644 KB).建议的最大任务大小为100 KB.
如何解决此警告?有没有办法处理大小?而且,它会影响大数据的时间复杂度吗?
我想在每天运行Hadoop作业时覆盖/重用现有的输出目录.实际上,输出目录将存储每天作业运行结果的汇总输出.如果我指定相同的输出目录,则会给出错误"输出目录已存在".
如何绕过此验证?
rdd1.join(rdd2)如果rdd1并rdd2拥有相同的分区,会导致洗牌吗?
众所周知,Spark中的分区器对任何"广泛"操作都会产生巨大的性能影响,因此通常会在操作中进行自定义.我正在尝试以下代码:
val rdd1 =
sc.parallelize(1 to 50).keyBy(_ % 10)
.partitionBy(new HashPartitioner(10))
val rdd2 =
sc.parallelize(200 to 230).keyBy(_ % 13)
val cogrouped = rdd1.cogroup(rdd2)
println("cogrouped: " + cogrouped.partitioner)
val unioned = rdd1.union(rdd2)
println("union: " + unioned.partitioner)
Run Code Online (Sandbox Code Playgroud)
我看到默认情况下cogroup()总是会生成带有自定义分区程序的RDD,但union()不会,它将始终恢复为默认值.这是违反直觉的,因为我们通常假设PairRDD应该使用其第一个元素作为分区键.有没有办法"强制"Spark合并2个PairRDD以使用相同的分区键?
我有一个结果RDD labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions).这有以这种格式输出:
[(0.0, 0.08482142857142858), (0.0, 0.11442786069651742),.....]
Run Code Online (Sandbox Code Playgroud)
我想要的是创建一个CSV文件,其中一列labels(上面输出中的元组的第一部分)和一列predictions(元组输出的第二部分).但我不知道如何使用Python在Spark中写入CSV文件.
如何使用上述输出创建CSV文件?
我已经使用Spark SQL注册了一个临时表,如[本节]中所述:
people.registerTempTable("people")
// I can run queries on it all right.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
Run Code Online (Sandbox Code Playgroud)
现在我想通过JDBC远程访问该表.我按照[其他部分]中的描述启动Thrift服务器.
./sbin/start-thriftserver.sh --master spark://same-master-as-above:7077
Run Code Online (Sandbox Code Playgroud)
但桌子不可见.
0: jdbc:hive2://localhost:10000> show tables;
+---------+
| result |
+---------+
+---------+
No rows selected (2.216 seconds)
Run Code Online (Sandbox Code Playgroud)
我想这是因为表是"临时的"(即与SqlContext对象的生命周期相关).但是我如何制作非临时表?
我可以通过Thrift服务器看到Hive表,但我不知道如何公开这样的RDD.我发现了一条评论,暗示我做不到.
或者我应该使用自己的应用程序在我的应用程序中运行Thrift服务器SqlContext?几乎所有的类都是private,并且这个代码不在Maven Central中(据我所知).我应该用HiveThriftServer2.startWithContext吗?它没有文档@DeveloperApi,但可能有用.
apache-spark ×8
hadoop ×2
python ×2
assert ×1
bytecode ×1
csv ×1
file-writing ×1
hiveql ×1
optimization ×1
partitioning ×1
pyspark ×1
rdd ×1
rewrite ×1