我有两个数据帧(Spark 2.2.0 和 Scala 2.11.8)。第一个 DataFramedf1有一列称为col1,第二个df2也有 1 列称为col2。两个 DataFrame 中的行数相等。
如何将这两列合并到一个新的 DataFrame 中?
我试过了join,但我认为应该有其他方法来做到这一点。
另外,我尝试应用withColumm,但它无法编译。
val result = df1.withColumn(col("col2"), df2.col1)
Run Code Online (Sandbox Code Playgroud)
更新:
例如:
df1 =
col1
1
2
3
df2 =
col2
4
5
6
result =
col1 col2
1 4
2 5
3 6
Run Code Online (Sandbox Code Playgroud) 当我启动时spark-shell,它会创建一个SparkSession. 但是,我应该按如下方式创建它:
val spark = SparkSession.builder()
.config("es.nodes",elasticHost)
.config("es.port",elasticPort)
.config("es.nodes.wan.only","true")
.appName("Test")
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
如上所示,如何更新现有spark的spark-shell或创建新的?
我需要p使用Spark 2.2和Scala 计算列的2的幂:
但如果我这样做,我得到错误,因为($"ki" / $"ni")是列,而不是Double.
df.withColumn("p",(lit(1) - scala.math.pow(($"ki" / $"ni").as[Double],2))
Run Code Online (Sandbox Code Playgroud) 我有一个pandas DataFrame df,我将每一行转换为JSON字符串,如下所示:
df = pd.DataFrame(np.random.randn(50, 4), columns=list('ABCD'))
df_as_json = df.to_json(orient='records')
Run Code Online (Sandbox Code Playgroud)
然后我想迭代JSON字符串(行)df_as_json并进行进一步处理,如下所示:
for json_document in df_as_json.split('\n'):
jdict = json.loads(json_document)
//...
Run Code Online (Sandbox Code Playgroud)
问题是df_as_json.split('\n')并没有真正拆分df_as_json成单独的JSON字符串.
我该怎么做我需要的东西?
我有以下数据帧df:
如何删除重复项,同时保持level每个重复的item_id和的最小值country_id。
+-----------+----------+---------------+
|item_id |country_id|level |
+-----------+----------+---------------+
| 312330| 13535670| 82|
| 312330| 13535670| 369|
| 312330| 13535670| 376|
| 319840| 69731210| 127|
| 319840| 69730600| 526|
| 311480| 69628930| 150|
| 311480| 69628930| 138|
| 311480| 69628930| 405|
+-----------+----------+---------------+
Run Code Online (Sandbox Code Playgroud)
预期输出:
+-----------+----------+---------------+
|item_id |country_id|level |
+-----------+----------+---------------+
| 312330| 13535670| 82|
| 319840| 69731210| 127|
| 319840| 69730600| 526|
| 311480| 69628930| 138|
+-----------+----------+---------------+
Run Code Online (Sandbox Code Playgroud)
我知道如何使用 无条件删除重复项dropDuplicates,但我不知道如何针对我的特定情况执行此操作。
在下面的例子中,我误解了PySpark的性能.
我有几个DataFrame,因此我加入了它们.
print"users_data"
print users_data.show()
print"calc"
print calc.show()
print"users_cat_data"
print users_cat_data.show()
data1 = calc.join(users_data, ['category_pk','item_pk'], 'leftouter')
print "DATA1"
print data1.show()
data2 = data1.join(users_cat_data, ['category_pk'], 'leftouter')
print "DATA2"
print data2.show()
data3 = data2.join(category_data, ['category_pk'], 'leftouter')
print "DATA3"
print data3.show()
data4 = data3.join(clicks_data, ['category_pk','item_pk'], 'leftouter')
print "DATA4"
print data4.show()
data4.write.parquet(output + '/test.parquet', mode="overwrite")
Run Code Online (Sandbox Code Playgroud)
我希望leftouter加入会从右侧DataFrame返回带有匹配项(如果有)的左侧DataFrame.
Soma样本输出:
users_data
+--------------+----------+-------------------------+
| category_pk| item_pk| unique_users|
+--------------+----------+-------------------------+
| 321| 460| 1|
| 730| 740| 2|
| 140| 720| 10|
users_cat_data
+--------------+-----------------------+
| category_pk| unique_users_per_cat|
+--------------+-----------------------+ …Run Code Online (Sandbox Code Playgroud) 如何在Scala中创建一个包含100行和3列的Spark DataFrame,这些行具有范围(1,100)中的随机整数值?
我知道如何手动创建DataFrame,但我不能自动化它:
val df = sc.parallelize(Seq((1,20, 40), (60, 10, 80), (30, 15, 30))).toDF("col1", "col2", "col3")
Run Code Online (Sandbox Code Playgroud) 我收到以下错误:
18/03/14 15:31:11 错误 ApplicationMaster:用户类抛出异常:org.apache.spark.sql.AnalysisException:找不到表或视图:产品;第 1 行 位置 42
这是我的代码:
val spark = SparkSession
.builder()
.appName("Test")
.getOrCreate()
val products = spark.read.parquet(productsPath)
products.createGlobalTempView("products")
val q1 = spark.sql("SELECT PERCENTILE(product_price, 0.25) FROM products").map(_.getAs[Double](0)).collect.apply(0)
Run Code Online (Sandbox Code Playgroud)
我究竟做错了什么?是否可以在不使用的情况下在 Spark 中做同样的事情sql?
在我的 Spark (2.2) DataFrame 中,每一行都是 JSON:
df.head()
//output
//[{"key":"111","event_name":"page-visited","timestamp":1517814315}]
df.show()
//output
//+--------------+
//| value|
//+--------------+
//|{"key":"111...|
//|{"key":"222...|
Run Code Online (Sandbox Code Playgroud)
我想将每个 JSON 行传递给列以获得这个result:
key event_name timestamp
111 page-visited 1517814315
...
Run Code Online (Sandbox Code Playgroud)
我试过这种方法,但它没有给我预期的结果:
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val schema = StructType(Seq(
StructField("key", StringType, true), StructField("event_name", StringType, true), StructField("timestamp", IntegerType, true)
))
val result = df.withColumn("value", from_json($"value", schema))
Run Code Online (Sandbox Code Playgroud)
和:
result.printSchema()
root
|-- value: struct (nullable = true)
| |-- key: string (nullable = true)
| |-- event_name: string (nullable = true)
| |-- timestamp: …Run Code Online (Sandbox Code Playgroud) 为什么我得到不同的输出..agg(countDistinct("member_id") as "count")和..distinct.count?的区别是一样的之间select count(distinct member_id)和select distinct count(member_id)?
apache-spark ×9
scala ×8
json ×2
dataframe ×1
dictionary ×1
pandas ×1
pyspark ×1
pyspark-sql ×1
python ×1
sql ×1