我的结构化流应用程序正在写入 parquet,我想摆脱它创建的 _spark_metadata 文件夹。我使用了下面的属性,看起来不错
--conf "spark.hadoop.parquet.enable.summary-metadata=false"
当应用程序启动时,不会_spark_metadata生成文件夹。但是一旦它移动到 RUNNING 状态并开始处理消息,它就会失败并显示以下错误,提示_spark_metadata文件夹不存在。似乎结构化流依赖于这个文件夹,没有它我们就无法运行。只是想知道在这种情况下禁用元数据属性是否有意义。这是流不是指 conf 的错误吗?
Caused by: java.io.FileNotFoundException: File /_spark_metadata does not exist.
at org.apache.hadoop.fs.Hdfs.listStatus(Hdfs.java:261)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1765)
at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1761)
at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1761)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1726)
at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1685)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.list(HDFSMetadataLog.scala:370)
at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:99)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
Run Code Online (Sandbox Code Playgroud) apache-spark parquet spark-streaming apache-spark-sql spark-structured-streaming
Hadoop 3中的资源模型允许我们定义自定义资源类型。我进行了一些谷歌搜索,但是找不到任何可以告诉我们如何配置YARN FairScheduler以便在池中分配/隔离这些资源的信息。
我试图用增量编码编写镶木地板文件。 此页面指出 parquet 支持三种类型的 delta 编码:
(DELTA_BINARY_PACKED, DELTA_LENGTH_BYTE_ARRAY, DELTA_BYTE_ARRAY).
Run Code Online (Sandbox Code Playgroud)
由于spark或不允许我们指定编码方法,我很好奇如何编写启用增量编码的文件pyspark?pyarrow
但是,我在互联网上发现,如果我有TimeStamp镶木地板类型的列,将使用增量编码。所以我使用以下代码来scala创建镶木地板文件。但编码不是增量。
val df = Seq(("2018-05-01"),
("2018-05-02"),
("2018-05-03"),
("2018-05-04"),
("2018-05-05"),
("2018-05-06"),
("2018-05-07"),
("2018-05-08"),
("2018-05-09"),
("2018-05-10")
).toDF("Id")
val df2 = df.withColumn("Timestamp", (col("Id").cast("timestamp")))
val df3 = df2.withColumn("Date", (col("Id").cast("date")))
df3.coalesce(1).write.format("parquet").mode("append").save("date_time2")
Run Code Online (Sandbox Code Playgroud)
parquet-tools显示有关写入的 parquet 文件的以下信息。
file schema: spark_schema
--------------------------------------------------------------------------------
Id: OPTIONAL BINARY L:STRING R:0 D:1
Timestamp: OPTIONAL INT96 R:0 D:1
Date: OPTIONAL INT32 L:DATE R:0 D:1
row group 1: RC:31 TS:1100 OFFSET:4
--------------------------------------------------------------------------------
Id: BINARY …Run Code Online (Sandbox Code Playgroud) 在您尝试过的各种方法中,例如df.select('column').distinct()等df.groupby('column').count(),从列中提取不同值的最有效方法是什么?
我正在尝试设置 Spark 作业来使用来自 Kafka 的数据。Kafka 代理已设置 SSL,但我无法正确构建/验证消费者。
spark-shell命令:
spark-2.3.4-bin-hadoop2.7/bin/spark-shell
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.4
--files "spark-kafka.jaas"
--driver-java-options "-Djava.security.auth.login.config=./spark-kafka.jaas"
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./spark-kafka.jaas"
Run Code Online (Sandbox Code Playgroud)
spark-kafka.jaas
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="myusername"
password="mypwd"
};
Run Code Online (Sandbox Code Playgroud)
外壳命令:
val df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1, host2:port2")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "./truststore.jks")
.option("kafka.ssl.truststore.password", "truststore-pwd")
.option("kafka.ssl.endpoint.identification.algorithm", "")
.option("kafka.sasl.mechanism", "SCRAM-SHA-256")
.option("subscribe", "mytopic")
.option("startingOffsets", "earliest")
.load()
df.show()
Run Code Online (Sandbox Code Playgroud)
错误:
2019-09-23 16:32:19 WARN ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:557)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:540)
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:62)
at …Run Code Online (Sandbox Code Playgroud) 有大量数据被推送到我们的 Kafka 主题之一,有没有办法确定这些数据来自哪个生产者?
我要寻找一些文档来了解之间的差异hive.exec.max.dynamic.partitions和hive.exec.max.dynamic.partitions.pernode。
我们什么时候需要设置这些参数,这些参数有什么用?
hive.exec.max.dynamic.partitions=500
hive.exec.max.dynamic.partitions.pernode=500
Run Code Online (Sandbox Code Playgroud) 我尝试强制分割一个区域并收到以下错误。
ERROR: org.apache.hadoop.hbase.DoNotRetryIOException: 3dd9ec2b32c98131b39fbfa8266881f9 NOT splittable
at org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure.checkSplittable(SplitTableRegionProcedure.java:193)
at org.apache.hadoop.hbase.master.assignment.SplitTableRegionProcedure.<init>(SplitTableRegionProcedure.java:115)
at org.apache.hadoop.hbase.master.assignment.AssignmentManager.createSplitProcedure(AssignmentManager.java:750)
at org.apache.hadoop.hbase.master.HMaster$3.run(HMaster.java:1859)
at org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.submitProcedure(MasterProcedureUtil.java:134)
at org.apache.hadoop.hbase.master.HMaster.splitRegion(HMaster.java:1851)
at org.apache.hadoop.hbase.master.MasterRpcServices.splitRegion(MasterRpcServices.java:808)
at org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos$MasterService$2.callBlockingMethod(MasterProtos.java)
at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:413)
at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:130)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:324)
at org.apache.hadoop.hbase.ipc.RpcExecutor$Handler.run(RpcExecutor.java:304)
Run Code Online (Sandbox Code Playgroud)
有人对这个错误有深入的了解吗?
我正在使用 Cloudera 6.1.1 和 HBase 2.1.0。
当 时spark.memory.offheap.enabled=true,Spark 可以利用堆外内存进行洗牌和缓存 ( StorageLevel.OFF_HEAP)。堆外内存可以用来存储广播变量吗?如何?
目前我们生产环境的交易系统使用的是Kafka。由于Kafka延迟太高,我们希望用Aeron替代Kafka。如何正确使用Aeron?
apache-spark ×5
apache-kafka ×3
hadoop ×3
parquet ×2
pyspark ×2
aeron ×1
cloudera ×1
hadoop-yarn ×1
hadoop3 ×1
hbase ×1
hive ×1
jaas ×1
pyarrow ×1
scala ×1
ssl ×1