假设我有一个大约有 10 个分区的 kafka 主题,我知道每个消费者组应该有 10 个消费者在任何给定时间从该主题中读取数据,以实现最大并行度。
然而,我想知道对于一个主题在任何给定时间点可以处理的消费者组的数量是否也有任何直接的规则。(最近在一次采访中我被问到了这个问题)。据我所知,这取决于代理的配置,以便它在任何给定时间点可以处理多少个连接。
但是,只是想知道在给定时间点可以扩展多少个最大消费者组(每个消费者组有 10 个消费者)?
我有一个 XLS/CSV 文件,正在将其读入 pandas 数据帧。我想从这个数据帧生成一个 avro 模式。
我对 python 和 pandas 都是新手。请帮忙。
data_frame = pd.read_excel(INPUT_PATH)
我想从这个数据帧动态生成 avro 模式。请帮忙
我使用 Spark 将 json 数据写入 s3。但是,我不断收到以下错误。我们使用 apache hudi 进行更新。这只发生在某些数据上,其他一切都正常。
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0
in file s3a://<path to parquet file>
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.ja va:251)
App > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
App > at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
App > at com.uber.hoodie.func.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:45)
App > at com.uber.hoodie.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:44)
App > at com.uber.hoodie.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:94)
App > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
App > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
App > ... 4 more
App > Caused by: java.lang.UnsupportedOperationException:org.apache.parquet.avro.AvroConverters$FieldLongConverter
Run Code Online (Sandbox Code Playgroud)
我无法理解。我跟踪了几个线程并在我的 Spark confs 中设置 --conf "spark.sql.parquet.writeLegacyFormat=true" 。但即使这样也无济于事。
我有一个 Spark 作业,它有一个具有以下值的 DataFrame:
{
"id": "abchchd",
"test_id": "ndsbsb",
"props": {
"type": {
"isMale": true,
"id": "dd",
"mcc": 1234,
"name": "Adam"
}
}
}
{
"id": "abc",
"test_id": "asf",
"props": {
"type2": {
"isMale": true,
"id": "dd",
"mcc": 12134,
"name": "Perth"
}
}
}
Run Code Online (Sandbox Code Playgroud)
我想优雅地将它展平(因为没有未知的键和类型等),这样道具仍然是一个,struct但里面的所有东西都被展平了(不管嵌套的级别如何)
所需的输出是:
{
"id": "abchchd",
"test_id": "ndsbsb",
"props": {
"type.isMale": true,
"type.id": "dd",
"type.mcc": 1234,
"type.name": "Adam"
}
}
{
"id": "abc",
"test_id": "asf",
"props": {
"type2.isMale": true,
"type2.id": "dd",
"type2.mcc": 12134,
"type2.name": "Perth" …Run Code Online (Sandbox Code Playgroud) 我有一个数据框,如下所示。除了字段之外,对应的所有值id都是相同的mappingcol。
+--------------------+----------------+--------------------+-------+
|misc |fruit |mappingcol |id |
+--------------------+----------------+--------------------+-------+
|ddd |apple |Map("name"->"Sameer"| 1 |
|ref |banana |Map("name"->"Riyazi"| 2 |
|ref |banana |Map("lname"->"Nikki"| 2 |
|ddd |apple |Map("lname"->"tenka"| 1 |
+--------------------+----------------+--------------------+-------+
Run Code Online (Sandbox Code Playgroud)
我想以这样的方式合并具有同一行的行,以便我精确地得到一行,并且需要合并id的值。mappingcol输出应如下所示:
+--------------------+----------------+--------------------+-------+
|misc |fruit |mappingcol |id |
+--------------------+----------------+--------------------+-------+
|ddd |apple |Map("name"->"Sameer"| 1 |
|ref |banana |Map("name"->"Riyazi"| 2 |
+--------------------+----------------+--------------------+-------+
Run Code Online (Sandbox Code Playgroud)
=mappingcol的值将是:id1
Map(
"name" -> "Sameer",
"lname" -> "tenka"
)
Run Code Online (Sandbox Code Playgroud)
我知道地图可以使用++运算符合并,所以这不是我担心的。我只是无法理解如何合并行,因为如果我使用 a groupBy,我就没有任何东西可以聚合行。
apache-spark ×3
scala ×2
apache-hudi ×1
apache-kafka ×1
avro ×1
dataframe ×1
flatten ×1
hoodie ×1
json ×1
kafka-topic ×1
pandas ×1
parquet ×1
python ×1
schema ×1
sql ×1