小编maz*_*cha的帖子

在 spark 2.3.0 中的结构化流中禁用 _spark_metadata

我的结构化流应用程序正在写入 parquet,我想摆脱它创建的 _spark_metadata 文件夹。我使用了下面的属性,看起来不错

--conf "spark.hadoop.parquet.enable.summary-metadata=false"

当应用程序启动时,不会_spark_metadata生成文件夹。但是一旦它移动到 RUNNING 状态并开始处理消息,它就会失败并显示以下错误,提示_spark_metadata文件夹不存在。似乎结构化流依赖于这个文件夹,没有它我们就无法运行。只是想知道在这种情况下禁用元数据属性是否有意义。这是流不是指 conf 的错误吗?

Caused by: java.io.FileNotFoundException: File /_spark_metadata does not exist.
        at org.apache.hadoop.fs.Hdfs.listStatus(Hdfs.java:261)
        at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1765)
        at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1761)
        at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1761)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1726)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1685)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.list(HDFSMetadataLog.scala:370)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:99)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
Run Code Online (Sandbox Code Playgroud)

apache-spark parquet spark-streaming apache-spark-sql spark-structured-streaming

5
推荐指数
1
解决办法
2297
查看次数

YARN FairScheduler配置

Hadoop 3中的资源模型允许我们定义自定义资源类型。我进行了一些谷歌搜索,但是找不到任何可以告诉我们如何配置YARN FairScheduler以便在池中分配/隔离这些资源的信息。

hadoop hadoop-yarn hadoop3

5
推荐指数
1
解决办法
72
查看次数

使用 delta 编码 coulmns 编写 parquet 文件

我试图用增量编码编写镶木地板文件。 此页面指出 parquet 支持三种类型的 delta 编码:

    (DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY).
Run Code Online (Sandbox Code Playgroud)

由于spark或不允许我们指定编码方法,我很好奇如何编写启用增量编码的文件pysparkpyarrow

但是,我在互联网上发现,如果我有TimeStamp镶木地板类型的列,将使用增量编码。所以我使用以下代码来scala创建镶木地板文件。但编码不是增量。


    val df = Seq(("2018-05-01"),
                ("2018-05-02"),
                ("2018-05-03"),
                ("2018-05-04"),
                ("2018-05-05"),
                ("2018-05-06"),
                ("2018-05-07"),
                ("2018-05-08"),
                ("2018-05-09"),
                ("2018-05-10")
            ).toDF("Id")
    val df2 = df.withColumn("Timestamp", (col("Id").cast("timestamp")))
    val df3 = df2.withColumn("Date", (col("Id").cast("date")))

    df3.coalesce(1).write.format("parquet").mode("append").save("date_time2")
Run Code Online (Sandbox Code Playgroud)

parquet-tools显示有关写入的 parquet 文件的以下信息。

file schema: spark_schema 
--------------------------------------------------------------------------------
Id:          OPTIONAL BINARY L:STRING R:0 D:1
Timestamp:   OPTIONAL INT96 R:0 D:1
Date:        OPTIONAL INT32 L:DATE R:0 D:1

row group 1: RC:31 TS:1100 OFFSET:4 
--------------------------------------------------------------------------------
Id:           BINARY …
Run Code Online (Sandbox Code Playgroud)

scala apache-spark parquet pyspark pyarrow

4
推荐指数
1
解决办法
3496
查看次数

从 Spark 数据框中选择不同值的最有效方法是什么?

在您尝试过的各种方法中,例如df.select('column').distinct()df.groupby('column').count(),从列中提取不同值的最有效方法是什么?

apache-spark apache-spark-sql pyspark

4
推荐指数
1
解决办法
5198
查看次数

Spark“无法通过 SSL 构建 kafka 消费者”

我正在尝试设置 Spark 作业来使用来自 Kafka 的数据。Kafka 代理已设置 SSL,但我无法正确构建/验证消费者。

spark-shell命令:

spark-2.3.4-bin-hadoop2.7/bin/spark-shell 
    --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4 
    --files "spark-kafka.jaas"   
    --driver-java-options "-Djava.security.auth.login.config=./spark-kafka.jaas"   
    --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./spark-kafka.jaas"
Run Code Online (Sandbox Code Playgroud)

spark-kafka.jaas

KafkaClient {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="myusername"
   password="mypwd"
};
Run Code Online (Sandbox Code Playgroud)

外壳命令:

val df = spark
    .read
    .format("kafka")
    .option("kafka.bootstrap.servers", "host1:port1, host2:port2")
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.ssl.truststore.location", "./truststore.jks")
    .option("kafka.ssl.truststore.password", "truststore-pwd")
    .option("kafka.ssl.endpoint.identification.algorithm", "")
    .option("kafka.sasl.mechanism", "SCRAM-SHA-256")
    .option("subscribe", "mytopic")
    .option("startingOffsets", "earliest")
    .load()

df.show()
Run Code Online (Sandbox Code Playgroud)

错误:

2019-09-23 16:32:19 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
  at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
  at …
Run Code Online (Sandbox Code Playgroud)

ssl jaas apache-kafka apache-spark

3
推荐指数
1
解决办法
4803
查看次数

有没有办法确定 Kafka 主题中消息的来源?

有大量数据被推送到我们的 Kafka 主题之一,有没有办法确定这些数据来自哪个生产者?

apache-kafka

3
推荐指数
1
解决办法
846
查看次数

`hive.exec.max.dynamic.partitions` 和 `hive.exec.max.dynamic.partitions.pernode` 的区别

我要寻找一些文档来了解之间的差异hive.exec.max.dynamic.partitionshive.exec.max.dynamic.partitions.pernode

我们什么时候需要设置这些参数,这些参数有什么用?

hive.exec.max.dynamic.partitions=500
hive.exec.max.dynamic.partitions.pernode=500
Run Code Online (Sandbox Code Playgroud)

hadoop hive

3
推荐指数
1
解决办法
673
查看次数

HBase split 命令抛出 NOT splittable 错误

我尝试强制分割一个区域并收到以下错误。

ERROR: org.apache.hadoop.hbase.DoNotRetryIOException: 3dd9ec2b32c98131b39fbfa8266881f9 NOT splittable                                                                                       
        at org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure.checkSplittable(SplitTableRegionProcedure.java:193)                                                          
        at org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure.<init>(SplitTableRegionProcedure.java:115)                                                                   
        at org.apache.hadoop.hbase.master.assignment.AssignmentManager.createSplitProcedure(AssignmentManager.java:750)                                                                     
        at org.apache.hadoop.hbase.master.HMaster$3.run(HMaster.java:1859)                                                                                                                  
        at org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.submitProcedure(MasterProcedureUtil.java:134)                                                                       
        at org.apache.hadoop.hbase.master.HMaster.splitRegion(HMaster.java:1851)                                                                                                            
        at org.apache.hadoop.hbase.master.MasterRpcServices.splitRegion(MasterRpcServices.java:808)                                                                                         
        at org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java)                                                             
        at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:413)                                                                                                                   
        at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:130)                                                                                                                  
        at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:324)                                                                                                        
        at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:304)
Run Code Online (Sandbox Code Playgroud)

有人对这个错误有深入的了解吗?

我正在使用 Cloudera 6.1.1 和 HBase 2.1.0。

hadoop hbase cloudera

3
推荐指数
1
解决办法
909
查看次数

Spark - 使用堆外内存

当 时spark.memory.offheap.enabled=true,Spark 可以利用堆外内存进行洗牌和缓存 ( StorageLevel.OFF_HEAP)。堆外内存可以用来存储广播变量吗?如何?

apache-spark

3
推荐指数
1
解决办法
3078
查看次数

如何用 Aeron 替换 Kafka

目前我们生产环境的交易系统使用的是Kafka。由于Kafka延迟太高,我们希望用Aeron替代Kafka。如何正确使用Aeron?

apache-kafka aeron

2
推荐指数
1
解决办法
2391
查看次数