小编Zha*_*ong的帖子

为什么Spark认为这是一个交叉/笛卡尔联盟

我想加入两次数据,如下所示:

rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['idx', 'val'])
rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val'])

res1 = rdd1.join(rdd2, on=[rdd1['idx'] == rdd2['key1']])
res2 = res1.join(rdd1, on=[res1['key2'] == rdd1['idx']])
res2.show()
Run Code Online (Sandbox Code Playgroud)

然后我收到一些错误:

pyspark.sql.utils.AnalysisException:u'Cartesian连接可能非常昂贵,默认情况下禁用.要明确启用它们,请设置spark.sql.crossJoin.enabled = true;'

但我认为这不是交叉连接

更新:

res2.explain()

== Physical Plan ==
CartesianProduct
:- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner
:  :- *Sort [idx#0L ASC, idx#0L ASC], false, 0
:  :  +- Exchange hashpartitioning(idx#0L, idx#0L, 200)
:  :     +- *Filter isnotnull(idx#0L)
:  : …
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql pyspark

18
推荐指数
2
解决办法
2万
查看次数

如何在熊猫中进行'侧视爆炸()'

我想做这个 :

# input:
        A   B
0  [1, 2]  10
1  [5, 6] -20
# output:
   A   B
0  1  10
1  2  10
2  5 -20
3  6 -20
Run Code Online (Sandbox Code Playgroud)

每列A的值都是一个列表

df = pd.DataFrame({'A':[[1,2],[5,6]],'B':[10,-20]})
df = pd.DataFrame([[item]+list(df.loc[line,'B':]) for line in df.index for item in df.loc[line,'A']],
                  columns=df.columns)
Run Code Online (Sandbox Code Playgroud)

上面的代码可以工作,但速度很慢

有什么聪明的方法吗?

谢谢

python pandas

11
推荐指数
1
解决办法
4274
查看次数

pyspark.sql.functions.window函数的startTime参数有什么作用?

在官方文档中,只有一个简单的示例:

startTime是相对于1970-01-01 00:00:00 UTC的偏移量,使用该偏移量可以启动窗口间隔。例如,为了使每小时的滚动窗口从每小时的15分钟开始,例如12:15-13:15、13:15-14:15 ...提供startTime15 minutes

但是我想知道它如何与所有参数一起工作。

例如:

ts_list = map(lambda x: datetime.datetime(2017, 1, 9, 9, 0, 10) + datetime.timedelta(seconds=x), range(30))
rdd = spark.sparkContext.parallelize(ts_list).map(lambda x: (x, 1))
df = spark.createDataFrame(rdd, schema=['dt', 'val'])
win = df.groupBy(window("dt", "5 seconds", '4 seconds', '3 seconds')).agg(sum("val").alias("sum"))
pprint.pprint(win.select(win['window']['start'].cast('string').alias('start'),
                         win['window']['end'].cast('string').alias('end')).collect())
Run Code Online (Sandbox Code Playgroud)

输出:

[Row(start=u'2017-01-09 09:00:19', end=u'2017-01-09 09:00:24'),                 
 Row(start=u'2017-01-09 09:00:35', end=u'2017-01-09 09:00:40'),
 Row(start=u'2017-01-09 09:00:27', end=u'2017-01-09 09:00:32'),
 Row(start=u'2017-01-09 09:00:07', end=u'2017-01-09 09:00:12'),
 Row(start=u'2017-01-09 09:00:31', end=u'2017-01-09 09:00:36'),
 Row(start=u'2017-01-09 09:00:39', end=u'2017-01-09 09:00:44'),
 Row(start=u'2017-01-09 09:00:11', end=u'2017-01-09 09:00:16'),
 Row(start=u'2017-01-09 09:00:23', end=u'2017-01-09 09:00:28'), …
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark apache-spark-sql pyspark

5
推荐指数
2
解决办法
1450
查看次数

如何在spark流中定期更新rdd

我的代码是这样的:

sc = SparkContext()
ssc = StreamingContext(sc, 30)

initRDD = sc.parallelize('path_to_data')
lines = ssc.socketTextStream('localhost', 9999)
res = lines.transform(lambda x: x.join(initRDD))

res.pprint()
Run Code Online (Sandbox Code Playgroud)

我的问题是initRDD需要每天午夜更新.

我试着这样:

sc = SparkContext()
ssc = StreamingContext(sc, 30)

lines = ssc.socketTextStream('localhost', 9999)


def func(rdd):
    initRDD = rdd.context.parallelize('path_to_data')
    return rdd.join(initRDD)


res = lines.transform(func)

res.pprint()
Run Code Online (Sandbox Code Playgroud)

但它似乎initRDD将每30秒更新一次batchDuration

有什么好的理想吗?

apache-spark spark-streaming

5
推荐指数
1
解决办法
608
查看次数

有没有更好的方法在pyspark中将Array <int>转换为Array <String>

一个非常庞大的 DataFrame with schema:

root
 |-- id: string (nullable = true)
 |-- ext: array (nullable = true)
 |    |-- element: integer (containsNull = true)
Run Code Online (Sandbox Code Playgroud)

到目前为止我尝试explode数据,然后collect_list:

select
  id,
  collect_list(cast(item as string))
from default.dual
lateral view explode(ext) t as item
group by
  id
Run Code Online (Sandbox Code Playgroud)

但这种方式过于庞大.

apache-spark apache-spark-sql pyspark spark-dataframe

4
推荐指数
1
解决办法
2314
查看次数