我们有一个 spark 流应用程序(spark 2.1 在 Hortonworks 2.6 上运行)并使用DataSet.repartition(在DataSet<Row>从 Kafka 读取的a上)DataSet<Row>'s根据给定的列(称为block_id)重新分区分区。
我们从一个DataSet<Row>包含 50 个分区的分区开始,最后(在调用 之后DataSet.repartition)分区数量等于唯一block_id 的数量。
问题是它的DataSet.repartition行为不像我们预期的那样——当我们查看运行 的 spark 作业的事件时间线时repartition,我们看到有几个任务处理 1 个block_id,而处理 2 个block_id甚至 3 或 4 个block_id 的任务较少。
似乎可以DataSet.repartition确保所有Rows具有相同block_id 的都在单个分区内,但并不是每个创建分区的任务都只处理一个 block_id。
结果是重新分区作业(在流应用程序内部运行)花费的时间与其最长的任务(处理最多block_id 的任务)一样多。
我们尝试使用提供给流媒体应用程序的 Vcores 数量 - 从 10 到 25 到 50(我们在从 Kafka 读取的原始 RDD 中有 50 …
我正在为火花流的实现而苦苦挣扎。
来自 kafka 的消息看起来像这样,但有更多的字段
{"event":"sensordata", "source":"sensors", "payload": {"actual data as a json}}
{"event":"databasedata", "mysql":"sensors", "payload": {"actual data as a json}}
{"event":"eventApi", "source":"event1", "payload": {"actual data as a json}}
{"event":"eventapi", "source":"event2", "payload": {"actual data as a json}}
Run Code Online (Sandbox Code Playgroud)
我正在尝试从 Kafka 主题(具有多个模式)读取消息。我需要阅读每条消息并查找事件和源字段并决定将其存储为数据集的位置。实际数据在字段有效负载中作为 JSON,它只是一个记录。
有人可以帮助我实施这个或任何其他替代方案吗?
在同一主题中发送具有多个模式的消息并使用它是一种好方法吗?
提前致谢,
apache-kafka apache-spark spark-streaming apache-spark-dataset
试图了解 Hive 分区与 Spark 分区的关系,最终解决了一个关于连接的问题。
我有 2 个外部 Hive 表;均由 S3 存储桶支持并由 分区date;所以在每个存储桶中都有名称为 format 的键date=<yyyy-MM-dd>/<filename>。
问题 1:
如果我将此数据读入 Spark:
val table1 = spark.table("table1").as[Table1Row]
val table2 = spark.table("table2").as[Table2Row]
Run Code Online (Sandbox Code Playgroud)
那么结果数据集将分别有多少个分区?分区等于 S3 中的对象数量?
问题2:
假设这两种行类型具有以下架构:
Table1Row(date: Date, id: String, ...)
Table2Row(date: Date, id: String, ...)
Run Code Online (Sandbox Code Playgroud)
并且我想加入table1和table2在领域date和id:
table1.joinWith(table2,
table1("date") === table2("date") &&
table1("id") === table2("id")
)
Run Code Online (Sandbox Code Playgroud)
Spark 是否能够利用被连接的字段之一是 Hive 表中的分区键来优化连接?如果是这样怎么办?
问题 3:
假设现在我正在使用RDDs 代替:
val rdd1 = table1.rdd …Run Code Online (Sandbox Code Playgroud) 我有一个包含多列的数据框:
| a | b | c | d |
-----------------
| 0 | 4 | 3 | 6 |
| 1 | 7 | 0 | 4 |
| 2 | 4 | 3 | 6 |
| 3 | 9 | 5 | 9 |
Run Code Online (Sandbox Code Playgroud)
我现在想合并[b,c,d]到一个列中。但是,我不知道列的列表有多大,否则我只能使用 UDF3 将三者结合起来。
所以想要的结果是:
| a | combined |
-----------------
| 0 | [4, 3, 6] |
| 1 | [7, 0, 4] |
| 2 | [4, 3, …Run Code Online (Sandbox Code Playgroud) 我想做一个简单的 Spark SQL 代码,读取一个名为 的文件u.data,其中包含电影评级,创建一个Datasetof Rows,然后打印数据集的第一行。
作为前提,我将文件读取到 a JavaRDD,并根据 a 映射 RDD ratingsObject(该对象有两个参数movieID和rating)。所以我只想打印这个数据集中的第一行。
我使用 Java 语言和 Spark SQL。
public static void main(String[] args){
App obj = new App();
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").getOrCreate();
Map<Integer,String> movieNames = obj.loadMovieNames();
JavaRDD<String> lines = spark.read().textFile("hdfs:///ml-100k/u.data").javaRDD();
JavaRDD<MovieRatings> movies = lines.map(line -> {
String[] parts = line.split(" ");
MovieRatings ratingsObject = new MovieRatings();
ratingsObject.setMovieID(Integer.parseInt(parts[1].trim()));
ratingsObject.setRating(Integer.parseInt(parts[2].trim()));
return ratingsObject;
});
Dataset<Row> movieDataset = spark.createDataFrame(movies, …Run Code Online (Sandbox Code Playgroud) 我有以下案例类:
case class Person(name: String, lastname: Option[String] = None, age: BigInt) {}
Run Code Online (Sandbox Code Playgroud)
以及以下 json:
{ "name": "bemjamin", "age" : 1 }
Run Code Online (Sandbox Code Playgroud)
当我尝试将数据框转换为数据集时:
spark.read.json("example.json")
.as[Person].show()
Run Code Online (Sandbox Code Playgroud)
它向我显示以下错误:
线程“main”org.apache.spark.sql.AnalysisException 中的异常:无法解析
lastname给定的输入列“ ”:[年龄,名称];
我的问题是:如果我的架构是我的案例类并且它定义姓氏是可选的,那么 as() 不应该进行转换吗?
我可以使用 .map 轻松解决此问题,但我想知道是否有另一种更清洁的替代方法。
我有一个由三列组成的 Spark DataFrame:
id | col1 | col2
-----------------
x | p1 | a1
-----------------
x | p2 | b1
-----------------
y | p2 | b2
-----------------
y | p2 | b3
-----------------
y | p3 | c1
Run Code Online (Sandbox Code Playgroud)
申请后,df.groupBy("id").pivot("col1").agg(collect_list("col2"))我得到以下数据帧(aggDF):
+---+----+--------+----+
| id| p1| p2| p3|
+---+----+--------+----+
| x|[a1]| [b1]| []|
| y| []|[b2, b3]|[c1]|
+---+----+--------+----+
Run Code Online (Sandbox Code Playgroud)
然后我找到除了列之外的id列的名称。
val cols = aggDF.columns.filter(x => x != "id")
Run Code Online (Sandbox Code Playgroud)
之后我cols.foldLeft(aggDF)((df, x) => df.withColumn(x, when(size(col(x)) > 0, col(x)).otherwise(lit(null))))用null. …
我在Zeppelin笔记本上使用Spark,而groupByKey()似乎不起作用.
这段代码:
df.groupByKey(row => row.getLong(0))
.mapGroups((key, iterable) => println(key))
Run Code Online (Sandbox Code Playgroud)
给我这个错误(可能是一个编译错误,因为它在我正在处理的数据集很大的时候很快出现):
error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
Run Code Online (Sandbox Code Playgroud)
我尝试添加一个case类并将所有行映射到它中,但仍然遇到了同样的错误
import spark.implicits._
case class DFRow(profileId: Long, jobId: String, state: String)
def getDFRow(row: Row):DFRow = {
return DFRow(row.getLong(row.fieldIndex("item0")),
row.getString(row.fieldIndex("item1")),
row.getString(row.fieldIndex("item2")))
}
df.map(DFRow(_))
.groupByKey(row => row.getLong(0))
.mapGroups((key, iterable) => println(key))
Run Code Online (Sandbox Code Playgroud)
我的Dataframe的架构是:
root
|-- item0: …Run Code Online (Sandbox Code Playgroud) scala apache-spark apache-spark-dataset apache-spark-encoders
我有一个类似于以下示例的Spark数据集:
0 1 2 3
+------+------------+--------------------+---+
|ItemID|Manufacturer| Category |UPC|
+------+------------+--------------------+---+
| 804| ael|Brush & Broom Han...|123|
| 805| ael|Wheel Brush Parts...|124|
+------+------------+--------------------+---+
Run Code Online (Sandbox Code Playgroud)
我需要通过搜索列标题来查找列的位置.
例如:
int position=getColumnPosition("Category");
Run Code Online (Sandbox Code Playgroud)
这应该返回2.
是否有Dataset<Row>数据类型支持的Spark函数来查找列索引或可以在Spark数据集上运行的任何java函数?
我想使用Spark(2.2)数据集提供下面提到的数据
Name Age Age+5
A 10 15
B 5 10
C 25 30
Run Code Online (Sandbox Code Playgroud)
我尝试使用以下内容:
dataset.select(
dataset.col("Name"),
dataset.col("Age),
dataset.col( dataset.selectExpr("Age"+5).toString() )
);
Run Code Online (Sandbox Code Playgroud)
这会抛出异常,因为Age找不到列.