小编maz*_*cha的帖子

是否可以在 PySpark Dataframe 中定义递归 DataType?

我想创建一个像这个例子这样的模式:

friendSchema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("friends",**friendSchema**,True)
Run Code Online (Sandbox Code Playgroud)

我知道数据必须标准化,但我想知道 Spark 是否具有创建如上所述的模式的功能。如果可以的话,怎样才能做到呢?使用UDT可行吗?

apache-spark apache-spark-sql pyspark

6
推荐指数
2
解决办法
895
查看次数

Collect() 调用用于 UDF 函数,该函数返回相当大的列(总共 2 列)以避免 crossJoin

问题如下:

'genre', 'top_tags' (250 rows)
----------------
Action, Array('bleeding', 'dying', 'guns', ...) - can hold up to 50k max. (avg is 4000)
Drama,  Array('crying', 'hard life', 'street') 
Run Code Online (Sandbox Code Playgroud)

另一个表格包含电影、类型及其相关标签

'movie', 'genre', 'tags'. (DataFrame size, around 23M Rows)
------------------------
M1        Action,  'guns', 'dying', 'bleeding', 'outside', 'worldwide'.  approx ~10 records for each movie
Run Code Online (Sandbox Code Playgroud)

我想迭代每部电影,并尝试通过比较标签来扩展其类型相似性。 没有模糊算法,只是精确匹配。

我想返回一个数据帧(相同的电影、流派、标签数据帧),其中包含名为的新列potentially_related_genres和流派列表。

在我看来,我有两个选择:

  1. crossJoin并使用 UDF 比较两列,但这会很糟糕,因为它会让我迭代 5,750,000,000 行。(交叉连接输出)

  2. 腌制结果(collect()对第一个数据帧(250 行)进行操作,然后在 UDF 中使用所有逻辑,这通过调用原始数据帧并使用 withColumn

DF.withColumn('potentially_related_genres', my_udf('genre', 'tags'))

该方法的问题在于,传递给驱动程序是一个相当大的收集(记住胖列 top_tags )。并将其转移给所有要使用的工人。(酸洗和脱酸)

有什么建议吗?

提前致谢。

python apache-spark apache-spark-sql pyspark

6
推荐指数
0
解决办法
244
查看次数

在 spark 2.3.0 中的结构化流中禁用 _spark_metadata

我的结构化流应用程序正在写入 parquet,我想摆脱它创建的 _spark_metadata 文件夹。我使用了下面的属性,看起来不错

--conf "spark.hadoop.parquet.enable.summary-metadata=false"

当应用程序启动时,不会_spark_metadata生成文件夹。但是一旦它移动到 RUNNING 状态并开始处理消息,它就会失败并显示以下错误,提示_spark_metadata文件夹不存在。似乎结构化流依赖于这个文件夹,没有它我们就无法运行。只是想知道在这种情况下禁用元数据属性是否有意义。这是流不是指 conf 的错误吗?

Caused by: java.io.FileNotFoundException: File /_spark_metadata does not exist.
        at org.apache.hadoop.fs.Hdfs.listStatus(Hdfs.java:261)
        at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1765)
        at org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1761)
        at org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1761)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1726)
        at org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1685)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.list(HDFSMetadataLog.scala:370)
        at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.getLatest(HDFSMetadataLog.scala:231)
        at org.apache.spark.sql.execution.streaming.FileStreamSink.addBatch(FileStreamSink.scala:99)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
Run Code Online (Sandbox Code Playgroud)

apache-spark parquet spark-streaming apache-spark-sql spark-structured-streaming

5
推荐指数
1
解决办法
2297
查看次数

YARN FairScheduler配置

Hadoop 3中的资源模型允许我们定义自定义资源类型。我进行了一些谷歌搜索,但是找不到任何可以告诉我们如何配置YARN FairScheduler以便在池中分配/隔离这些资源的信息。

hadoop hadoop-yarn hadoop3

5
推荐指数
1
解决办法
72
查看次数

来自ESLint插件的Vue.js消息中的“ LHS”是什么意思?

来自Vue.js的官方ESLint插件的消息说:

“ v-model”指令要求使用与LHS有效的属性值。

触发该消息的示例是

<input v-model="foo() + bar()">
Run Code Online (Sandbox Code Playgroud)

LHS是什么意思?

eslint vue.js v-model

5
推荐指数
1
解决办法
1116
查看次数

仅在Hadoop集群中的特定节点上存储HDFS数据

我们有30个节点的生产集群。我们要添加5个数据节点以进行额外的存储,以处理数据的临时峰值(大约2 TB)。该数据将被临时存储,我们希望在15天后删除它。

是否可以确保传入的临时数据(2 TB)仅存储在新添加的数据节点上?

我正在寻找类似于YARN节点标签的内容。

先感谢您。

hadoop data-storage hdfs hortonworks-data-platform

5
推荐指数
1
解决办法
92
查看次数

Java 堆空间 - 内存不足错误 - 带有 SASL_SSL 的 Kafka Broker

当我在带有 PLAIN_TEXT 端口 9092 的 Kafka 代理中使用以下“/usr/bin/kafka-delete-records”命令时,该命令工作正常,但是当我使用 SASL_SSL 端口 9094 时,该命令会引发以下错误。有人知道将 Kafka 代理端口 9094 与 SASL_SSL 一起使用的解决方案吗?

$ssh **** ****@<IP address> /usr/bin/kafka-delete-records --bootstrap-server localhost:9094 --offset-json-file /kafka/records.json`

[2019-10-14 04:15:49,891] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)

java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:390)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:351)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:609)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:541)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:467)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:535)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1125)
    at java.lang.Thread.run(Thread.java:748)
Executing records delete operation
Records delete operation completed:

Run Code Online (Sandbox Code Playgroud)

注意:-Xmx 为 8GB,服务器的总内存为 16 GB

请检查下面的当前堆值:

$ …
Run Code Online (Sandbox Code Playgroud)

java ssl out-of-memory sasl apache-kafka

5
推荐指数
1
解决办法
4863
查看次数

卡夫卡生产者抛出错误尝试从状态 IN_TRANSACTION 到状态 IN_TRANSACTION 的无效转换

我的 kafka 生产者抛出错误“尝试从状态 IN_TRANSACTION 到状态 IN_TRANSACTION 的无效转换”。这是我正在努力实现的目标 -

KafkaProducer producer = new KafkaProducer<>(props);
producer.initTransactions();
//transaction 1
producer.beginTransaction();
//send some messages
producer.commitTransaction();

//transaction 2
producer.beginTransaction(); //here it throws an exception "Invalid transition attempted from state IN_TRANSACTION to state IN_TRANSACTION".
//send some messages
producer.commitTransaction();

producer.close();
Run Code Online (Sandbox Code Playgroud)

如果我producer.initTransactions();在开始事务 2 之前再次调用,它会抛出异常“尝试从状态 READY 到状态 INITIALIZING 的无效转换”。

我究竟做错了什么?

transactions apache-kafka kafka-producer-api

5
推荐指数
1
解决办法
533
查看次数

Spark的分区剪枝和谓词下推有什么区别?

我正在研究 Spark 优化方法,并遇到了各种实现优化的方法。但有两个名字引起了我的注意。

  1. 分区修剪
  2. 谓词下推

他们说:

分区修剪:

分区修剪是一种性能优化,它限制 Spark 查询时读取的文件和分区的数量。对数据进行分区后,匹配某些分区过滤条件的查询允许 Spark 仅读取目录和文件的子集,从而提高性能。

谓词下推:

Spark 将尝试将数据过滤移至尽可能靠近源的位置,以避免将不必要的数据加载到内存中。Parquet 和 ORC 文件维护不同块 aof 数据中每列的各种统计信息(例如最小值和最大值)。读取这些文件的程序可以使用这些索引来确定是否需要读取某些块甚至整个文件。这允许程序在处理过程中潜在地跳过大部分数据。

通过阅读上述概念,它们似乎做了同样的事情,即应用满足查询中给出的谓词的读取语句(查询)。分区修剪和谓词下推是不同的概念还是我以错误的方式看待它们?

apache-spark

5
推荐指数
1
解决办法
3876
查看次数

为什么 Impala 扫描节点非常慢(RowBatchQueueGetWaitTime)?

该查询大多数情况下会在 10 秒内返回,但有时需要 40 秒或更长时间。

swarm中有两个执行器节点,两个节点的配置文件没有显着差异,以下是其中之一:

      HDFS_SCAN_NODE (id=0):(Total: 39s818ms, non-child: 39s818ms, % non-child: 100.00%)
     - AverageHdfsReadThreadConcurrency: 0.07 
     - AverageScannerThreadConcurrency: 1.47 
     - BytesRead: 563.73 MB (591111366)
     - BytesReadDataNodeCache: 0
     - BytesReadLocal: 0
     - BytesReadRemoteUnexpected: 0
     - BytesReadShortCircuit: 0
     - CachedFileHandlesHitCount: 0 (0)
     - CachedFileHandlesMissCount: 560 (560)
     - CollectionItemsRead: 0 (0)
     - DecompressionTime: 1s501ms
     - MaterializeTupleTime(*): 11s685ms
     - MaxCompressedTextFileLength: 0
     - NumColumns: 9 (9)
     - NumDictFilteredRowGroups: 0 (0)
     - NumDisksAccessed: 1 (1)
     - NumRowGroups: 56 (56)
     - NumScannerThreadMemUnavailable: 0 (0)
     - NumScannerThreadReservationsDenied: …
Run Code Online (Sandbox Code Playgroud)

olap hadoop hdfs impala

5
推荐指数
1
解决办法
1568
查看次数