我想声明一个返回 2 1D 数组或 1 2D 数组的 udf(两者的示例都很好)。我知道这适用于 1D:
@udf("array<int>")
Run Code Online (Sandbox Code Playgroud)
但是,我尝试了许多变体,例如以下没有运气:
@udf("array<int>,array<int>")
@udf("array<int>","array<int>")
@udf("array<int,int>")
etc.
Run Code Online (Sandbox Code Playgroud) user-defined-functions apache-spark apache-spark-sql pyspark
我有一个 df1 spark 数据框
id transactions
1 [1, 2, 3, 5]
2 [1, 2, 3, 6]
3 [1, 2, 9, 8]
4 [1, 2, 5, 6]
root
|-- id: int (nullable = true)
|-- transactions: array (nullable = false)
|-- element: int(containsNull = true)
None
Run Code Online (Sandbox Code Playgroud)
我有一个 df2 spark 数据框
items cost
[1] 1.0
[2] 1.0
[2, 1] 2.0
[6, 1] 2.0
root
|-- items: array (nullable = false)
|-- element: int (containsNull = true)
|-- cost: int (nullable = true) …Run Code Online (Sandbox Code Playgroud) 鉴于:
val df = Seq((1L, "04-04-2015")).toDF("id", "date")
val df2 = df.withColumn("month", from_unixtime(unix_timestamp($"date", "dd/MM/yy"), "MMMMM"))
df2.show()
Run Code Online (Sandbox Code Playgroud)
我得到了这个输出:
+---+----------+-----+
| id| date|month|
+---+----------+-----+
| 1|04-04-2015| null|
+---+----------+-----+
Run Code Online (Sandbox Code Playgroud)
但是,我希望输出如下:
+---+----------+-----+
| id| date|month|
+---+----------+-----+
| 1|04-04-2015|April|
+---+----------+-----+
Run Code Online (Sandbox Code Playgroud)
我如何使用 Scala 在 sparkSQL 中做到这一点?
我有一个这样的数据框
data = [(("ID1", "A", 1)), (("ID1", "B", 5)), (("ID2", "A", 12)),
(("ID3", "A", 3)), (("ID3", "B", 3)), (("ID3", "C", 5)), (("ID4", "A", 10))]
df = spark.createDataFrame(data, ["ID", "Type", "Value"])
df.show()
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID1| A| 1|
|ID1| B| 5|
|ID2| A| 12|
|ID3| A| 3|
|ID3| B| 3|
|ID3| C| 5|
|ID4| A| 10|
+---+----+-----+
Run Code Online (Sandbox Code Playgroud)
我只想提取那些只包含一种特定类型 - “A”的行(或 ID)
因此我的预期输出将包含以下行
+---+----+-----+
| ID|Type|Value|
+---+----+-----+
|ID2| A| 1|
|ID4| A| 10|
+---+----+-----+
Run Code Online (Sandbox Code Playgroud)
对于每个 ID 可以包含任何类型 - A、B、C 等。我想提取那些包含一个且仅包含一个类型的 ID …
试图将我的 spark scala 项目转换为 spark-java 项目。我有一个登录 Scala 如下
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ClassName{
val logger = LoggerFactory.getLogger("ClassName")
...
val dataframe1 = ....///read dataframe from text file.
...
logger.debug("dataframe1.printSchema : \n " + dataframe1.printSchema; //this is working fine.
}
Run Code Online (Sandbox Code Playgroud)
现在我正在尝试用 java 1.8 编写它,如下所示
public class ClassName{
public static final Logger logger = oggerFactory.getLogger("ClassName");
...
Dataset<Row> dataframe1 = ....///read dataframe from text file.
...
logger.debug("dataframe1.printSchema : \n " + dataframe1.printSchema()); //this is not working
}
Run Code Online (Sandbox Code Playgroud)
我尝试了几种方法,但没有任何方法可以在调试/信息模式下记录 printSchema。
dataframe1.printSchema() // 这实际上返回 void …
我正在使用 Spark Structured Streaming。此外,我正在与Scala. 我想将配置文件传递给我的 spark 应用程序。此配置文件托管在HDFS. 例如;
spark_job.conf (HOCON)
spark {
appName: "",
master: "",
shuffle.size: 4
etc..
}
kafkaSource {
servers: "",
topic: "",
etc..
}
redisSink {
host: "",
port: 999,
timeout: 2000,
checkpointLocation: "hdfs location",
etc..
}
Run Code Online (Sandbox Code Playgroud)
如何将其传递给 Spark 应用程序?如何hosted HDFS在 Spark 中读取此文件()?
configuration hadoop apache-spark apache-spark-sql spark-structured-streaming
我在 AWS Glue 中有以下工作,它基本上从一个表中读取数据并将其提取为 S3 中的 csv 文件,但是我想在这个表上运行查询(A Select、SUM 和 GROUPBY)并希望得到该输出CSV,如何在 AWS Glue 中执行此操作?我是 Spark 的新手,所以请帮忙
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"db1", table_name = "dbo1_expdb_dbo_stg_plan", transformation_ctx =
"datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings =
[("plan_code", "int", "plan_code", "int"), ("plan_id", "int", "plan_id",
"int")], transformation_ctx = "applymapping1")
datasink2 = glueContext.write_dynamic_frame.from_options(frame =
applymapping1, connection_type = "s3", connection_options = {"path":
"s3://bucket"}, format = "csv", transformation_ctx = …Run Code Online (Sandbox Code Playgroud) val postsQuantiles = posts.stat.approxQuantile("_score", Array(0.25, 0.75), 0.0)因以下错误而失败。我显然可以设置spark.driver.maxResultSize克服这个错误,但我很好奇为什么这会向驱动程序收集数据?
[Stage 3:==================> (7 + 15) / 22]19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 18 tasks (1030.8 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 19 tasks (1087.7 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: Total size of serialized results of 20 tasks (1145.6 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
19/06/01 20:46:30 ERROR TaskSetManager: …Run Code Online (Sandbox Code Playgroud) 我需要根据特定Partition键将数据写入 s3 ,这可以通过使用write.partitionBy. 但是,在这种情况下,我只需要在每个路径中写入一个文件。我正在使用下面的代码来做到这一点。
orderFlow.coalesce(1).write.partitionBy("SellerYearMonthWeekKey")
.mode(SaveMode.Overwrite)
.format("com.databricks.spark.csv")
.option("delimiter", ",")
.option("header", "true")
.save(outputS3Path + "/")
Run Code Online (Sandbox Code Playgroud)
你能帮我找到实现这一目标的最佳方法吗?在上述情况下,我收到 OutOfMemory 错误。
我有以下代码:
from pyspark.sql import functions as func
cols = ("id","size")
result = df.groupby(*cols).agg({
func.max("val1"),
func.median("val2"),
func.std("val2")
})
Run Code Online (Sandbox Code Playgroud)
但它在无法找到func.median("val2")的消息行中失败。同样发生在.medianfuncstd
apache-spark-sql ×10
apache-spark ×9
pyspark ×5
scala ×5
python ×2
aws-glue ×1
dataframe ×1
hadoop ×1
java ×1
sql ×1