我想加入两次数据,如下所示:
rdd1 = spark.createDataFrame([(1, 'a'), (2, 'b'), (3, 'c')], ['idx', 'val'])
rdd2 = spark.createDataFrame([(1, 2, 1), (1, 3, 0), (2, 3, 1)], ['key1', 'key2', 'val'])
res1 = rdd1.join(rdd2, on=[rdd1['idx'] == rdd2['key1']])
res2 = res1.join(rdd1, on=[res1['key2'] == rdd1['idx']])
res2.show()
Run Code Online (Sandbox Code Playgroud)
然后我收到一些错误:
pyspark.sql.utils.AnalysisException:u'Cartesian连接可能非常昂贵,默认情况下禁用.要明确启用它们,请设置spark.sql.crossJoin.enabled = true;'
但我认为这不是交叉连接
更新:
res2.explain()
== Physical Plan ==
CartesianProduct
:- *SortMergeJoin [idx#0L, idx#0L], [key1#5L, key2#6L], Inner
: :- *Sort [idx#0L ASC, idx#0L ASC], false, 0
: : +- Exchange hashpartitioning(idx#0L, idx#0L, 200)
: : +- *Filter isnotnull(idx#0L)
: : …
Run Code Online (Sandbox Code Playgroud) 我想做这个 :
# input:
A B
0 [1, 2] 10
1 [5, 6] -20
# output:
A B
0 1 10
1 2 10
2 5 -20
3 6 -20
Run Code Online (Sandbox Code Playgroud)
每列A的值都是一个列表
df = pd.DataFrame({'A':[[1,2],[5,6]],'B':[10,-20]})
df = pd.DataFrame([[item]+list(df.loc[line,'B':]) for line in df.index for item in df.loc[line,'A']],
columns=df.columns)
Run Code Online (Sandbox Code Playgroud)
上面的代码可以工作,但速度很慢
有什么聪明的方法吗?
谢谢
在官方文档中,只有一个简单的示例:
startTime是相对于1970-01-01 00:00:00 UTC的偏移量,使用该偏移量可以启动窗口间隔。例如,为了使每小时的滚动窗口从每小时的15分钟开始,例如12:15-13:15、13:15-14:15 ...提供
startTime
为15 minutes
。
但是我想知道它如何与所有参数一起工作。
例如:
ts_list = map(lambda x: datetime.datetime(2017, 1, 9, 9, 0, 10) + datetime.timedelta(seconds=x), range(30))
rdd = spark.sparkContext.parallelize(ts_list).map(lambda x: (x, 1))
df = spark.createDataFrame(rdd, schema=['dt', 'val'])
win = df.groupBy(window("dt", "5 seconds", '4 seconds', '3 seconds')).agg(sum("val").alias("sum"))
pprint.pprint(win.select(win['window']['start'].cast('string').alias('start'),
win['window']['end'].cast('string').alias('end')).collect())
Run Code Online (Sandbox Code Playgroud)
输出:
[Row(start=u'2017-01-09 09:00:19', end=u'2017-01-09 09:00:24'),
Row(start=u'2017-01-09 09:00:35', end=u'2017-01-09 09:00:40'),
Row(start=u'2017-01-09 09:00:27', end=u'2017-01-09 09:00:32'),
Row(start=u'2017-01-09 09:00:07', end=u'2017-01-09 09:00:12'),
Row(start=u'2017-01-09 09:00:31', end=u'2017-01-09 09:00:36'),
Row(start=u'2017-01-09 09:00:39', end=u'2017-01-09 09:00:44'),
Row(start=u'2017-01-09 09:00:11', end=u'2017-01-09 09:00:16'),
Row(start=u'2017-01-09 09:00:23', end=u'2017-01-09 09:00:28'), …
Run Code Online (Sandbox Code Playgroud) 我的代码是这样的:
sc = SparkContext()
ssc = StreamingContext(sc, 30)
initRDD = sc.parallelize('path_to_data')
lines = ssc.socketTextStream('localhost', 9999)
res = lines.transform(lambda x: x.join(initRDD))
res.pprint()
Run Code Online (Sandbox Code Playgroud)
我的问题是initRDD
需要每天午夜更新.
我试着这样:
sc = SparkContext()
ssc = StreamingContext(sc, 30)
lines = ssc.socketTextStream('localhost', 9999)
def func(rdd):
initRDD = rdd.context.parallelize('path_to_data')
return rdd.join(initRDD)
res = lines.transform(func)
res.pprint()
Run Code Online (Sandbox Code Playgroud)
但它似乎initRDD
将每30秒更新一次batchDuration
有什么好的理想吗?
一个非常庞大的 DataFrame with schema:
root
|-- id: string (nullable = true)
|-- ext: array (nullable = true)
| |-- element: integer (containsNull = true)
Run Code Online (Sandbox Code Playgroud)
到目前为止我尝试explode
数据,然后collect_list
:
select
id,
collect_list(cast(item as string))
from default.dual
lateral view explode(ext) t as item
group by
id
Run Code Online (Sandbox Code Playgroud)
但这种方式过于庞大.