小编Mar*_*kus的帖子

如何将两列合并到一个新的 DataFrame 中?

我有两个数据帧(Spark 2.2.0 和 Scala 2.11.8)。第一个 DataFramedf1有一列称为col1,第二个df2也有 1 列称为col2。两个 DataFrame 中的行数相等。

如何将这两列合并到一个新的 DataFrame 中?

我试过了join,但我认为应该有其他方法来做到这一点。

另外,我尝试应用withColumm,但它无法编译。

val result = df1.withColumn(col("col2"), df2.col1)
Run Code Online (Sandbox Code Playgroud)

更新:

例如:

df1 = 
col1
1
2
3

df2 = 
col2
4
5
6

result = 
col1  col2
1     4
2     5
3     6
Run Code Online (Sandbox Code Playgroud)

scala dataframe apache-spark

3
推荐指数
1
解决办法
3933
查看次数

如何更新现有 SparkSession 实例或在 spark-shell 中创建一个新实例?

当我启动时spark-shell,它会创建一个SparkSession. 但是,我应该按如下方式创建它:

val spark = SparkSession.builder()
                        .config("es.nodes",elasticHost)
                        .config("es.port",elasticPort)
                        .config("es.nodes.wan.only","true")
                        .appName("Test")
                        .getOrCreate()
Run Code Online (Sandbox Code Playgroud)

如上所示,如何更新现有sparkspark-shell或创建新的?

scala apache-spark apache-spark-sql

3
推荐指数
1
解决办法
6716
查看次数

如何计算DataFrame列的2的幂

我需要p使用Spark 2.2和Scala 计算列的2的幂:

但如果我这样做,我得到错误,因为($"ki" / $"ni")是列,而不是Double.

df.withColumn("p",(lit(1) - scala.math.pow(($"ki" / $"ni").as[Double],2))
Run Code Online (Sandbox Code Playgroud)

scala apache-spark apache-spark-sql

3
推荐指数
1
解决办法
1414
查看次数

如何将pandas DataFrame的行保存为JSON字符串?

我有一个pandas DataFrame df,我将每一行转换为JSON字符串,如下所示:

df = pd.DataFrame(np.random.randn(50, 4), columns=list('ABCD'))
df_as_json = df.to_json(orient='records')
Run Code Online (Sandbox Code Playgroud)

然后我想迭代JSON字符串(行)df_as_json并进行进一步处理,如下所示:

for json_document in df_as_json.split('\n'):
    jdict = json.loads(json_document)
    //...
Run Code Online (Sandbox Code Playgroud)

问题是df_as_json.split('\n')并没有真正拆分df_as_json成单独的JSON字符串.

我该怎么做我需要的东西?

python json dictionary pandas

3
推荐指数
2
解决办法
3082
查看次数

如何使用条件删除重复项

我有以下数据帧df

如何删除重复项,同时保持level每个重复的item_id和的最小值country_id

+-----------+----------+---------------+                                        
|item_id    |country_id|level          |
+-----------+----------+---------------+
|     312330|  13535670|             82|
|     312330|  13535670|            369|
|     312330|  13535670|            376|
|     319840|  69731210|            127|
|     319840|  69730600|            526|
|     311480|  69628930|            150|
|     311480|  69628930|            138|
|     311480|  69628930|            405|
+-----------+----------+---------------+
Run Code Online (Sandbox Code Playgroud)

预期输出:

+-----------+----------+---------------+                                        
|item_id    |country_id|level          |
+-----------+----------+---------------+
|     312330|  13535670|             82|
|     319840|  69731210|            127|
|     319840|  69730600|            526|
|     311480|  69628930|            138|
+-----------+----------+---------------+
Run Code Online (Sandbox Code Playgroud)

我知道如何使用 无条件删除重复项dropDuplicates,但我不知道如何针对我的特定情况执行此操作。

scala apache-spark apache-spark-sql

2
推荐指数
1
解决办法
5185
查看次数

为什么这个PySpark加入会失败?

在下面的例子中,我误解了PySpark的性能.

我有几个DataFrame,因此我加入了它们.

print"users_data"
print users_data.show()
print"calc"
print calc.show()
print"users_cat_data"
print users_cat_data.show()

data1 = calc.join(users_data, ['category_pk','item_pk'], 'leftouter')
print "DATA1"
print data1.show()
data2 = data1.join(users_cat_data, ['category_pk'], 'leftouter')
print "DATA2"
print data2.show()
data3 = data2.join(category_data, ['category_pk'], 'leftouter')
print "DATA3"
print data3.show()
data4 = data3.join(clicks_data, ['category_pk','item_pk'], 'leftouter')
print "DATA4"
print data4.show()

data4.write.parquet(output + '/test.parquet', mode="overwrite")
Run Code Online (Sandbox Code Playgroud)

我希望leftouter加入会从右侧DataFrame返回带有匹配项(如果有)的左侧DataFrame.

Soma样本输出:

users_data
+--------------+----------+-------------------------+
|   category_pk|   item_pk|             unique_users|
+--------------+----------+-------------------------+
|           321|       460|                        1|
|           730|       740|                        2|
|           140|       720|                       10|


users_cat_data
+--------------+-----------------------+
|   category_pk|   unique_users_per_cat|
+--------------+-----------------------+ …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark pyspark-sql

2
推荐指数
1
解决办法
1582
查看次数

如何生成具有随机内容和N行的DataFrame?

如何在Scala中创建一个包含100行和3列的Spark DataFrame,这些行具有范围(1,100)中的随机整数值?

我知道如何手动创建DataFrame,但我不能自动化它:

val df = sc.parallelize(Seq((1,20, 40), (60, 10, 80), (30, 15, 30))).toDF("col1", "col2", "col3") 
Run Code Online (Sandbox Code Playgroud)

scala apache-spark spark-dataframe

2
推荐指数
2
解决办法
4463
查看次数

未找到 Spark AnalysisException 全局表或视图

我收到以下错误:

18/03/14 15:31:11 错误 ApplicationMaster:用户类抛出异常:org.apache.spark.sql.AnalysisException:找不到表或视图:产品;第 1 行 位置 42

这是我的代码:

val spark = SparkSession
                .builder()
                .appName("Test")
                .getOrCreate()

val products = spark.read.parquet(productsPath)
products.createGlobalTempView("products")

val q1 = spark.sql("SELECT PERCENTILE(product_price, 0.25) FROM products").map(_.getAs[Double](0)).collect.apply(0)
Run Code Online (Sandbox Code Playgroud)

我究竟做错了什么?是否可以在不使用的情况下在 Spark 中做同样的事情sql

scala apache-spark apache-spark-sql spark-dataframe

2
推荐指数
1
解决办法
6496
查看次数

如何将每一行 JSON 解析为 Spark 2 DataFrame 的列?

在我的 Spark (2.2) DataFrame 中,每一行都是 JSON:

df.head()
//output
//[{"key":"111","event_name":"page-visited","timestamp":1517814315}]

df.show()
//output
//+--------------+
//|         value|
//+--------------+
//|{"key":"111...|
//|{"key":"222...|
Run Code Online (Sandbox Code Playgroud)

我想将每个 JSON 行传递给列以获得这个result

key   event_name     timestamp
111   page-visited   1517814315
...
Run Code Online (Sandbox Code Playgroud)

我试过这种方法,但它没有给我预期的结果:

import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._

val schema = StructType(Seq(
     StructField("key", StringType, true), StructField("event_name", StringType, true), StructField("timestamp", IntegerType, true)
))

val result = df.withColumn("value", from_json($"value", schema))
Run Code Online (Sandbox Code Playgroud)

和:

result.printSchema()
root
 |-- value: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |    |-- event_name: string (nullable = true)
 |    |-- timestamp: …
Run Code Online (Sandbox Code Playgroud)

json scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
4165
查看次数

countDistinct 和 distinct.count 的区别

为什么我得到不同的输出..agg(countDistinct("member_id") as "count")..distinct.count?的区别是一样的之间select count(distinct member_id)select distinct count(member_id)

sql scala apache-spark apache-spark-sql

1
推荐指数
1
解决办法
1041
查看次数