小编maz*_*cha的帖子

在Spark DataFrame中查找每个组的最大行数

我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.

在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sasb.每个Row包含name,id_said_sb.我的目标是从生产映射id_said_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.

让我们试着用一个例子来澄清.如果我有以下行:

[Row(name='n1', id_sa='a1', id_sb='b1'),
 Row(name='n2', id_sa='a1', id_sb='b2'),
 Row(name='n3', id_sa='a1', id_sb='b2'),
 Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)

我的目标是从生产映射a1b2.事实上,相关的名称a1n1,n2n3,分别映射b1,b2b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.

我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:

[Row(id_sa=a1, max_id_sb=b2),
 Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)

但也许我正在尝试使用错误的工具,我应该回到使用RDD.

apache-spark apache-spark-sql pyspark

42
推荐指数
2
解决办法
5万
查看次数

如果我缓存一个 Spark 数据帧然后覆盖引用,原始数据帧还会被缓存吗?

假设我有一个函数来生成 (py)spark 数据帧,将数据帧缓存到内存中作为最后一个操作。

def gen_func(inputs):
   df = ... do stuff...
   df.cache()
   df.count()
   return df
Run Code Online (Sandbox Code Playgroud)

根据我的理解,Spark 的缓存工作如下:

  1. 当在数据帧上调用cache/persistplus 一个动作 ( count()) 时,它是从它的 DAG 计算出来的,并缓存到内存中,附加到引用它的对象上。
  2. 只要存在对该对象的引用,可能在其他函数/其他范围内,df 将继续被缓存,并且所有依赖于 df 的 DAG 将使用内存中缓存的数据作为起点。
  3. 如果对 df 的所有引用都被删除,Spark 会将缓存作为内存进行垃圾回收。它可能不会立即被垃圾回收,导致一些短期内存块(特别是如果生成缓存数据并过快丢弃它们会导致内存泄漏),但最终会被清除。

我的问题是,假设我gen_func用来生成一个数据框,然后覆盖原始数据框引用(可能是 afilter或 a withColumn)。

df=gen_func(inputs)
df=df.filter("some_col = some_val")
Run Code Online (Sandbox Code Playgroud)

在 Spark 中,RDD/DF 是不可变的,因此过滤器后重新分配的 df 和过滤器前的 df 指的是两个完全不同的对象。在这种情况下,对原始 df 的引用cache/counted已被覆盖。这是否意味着缓存的数据帧不再可用并将被垃圾收集?这是否意味着新的后置过滤器df将从头开始计算所有内容,尽管是从先前缓存的数据帧生成的?

我问这个是因为我最近正在修复我的代码的一些内存不足问题,在我看来,缓存可能是问题所在。然而,我还没有真正理解什么是使用缓存的安全方法的全部细节,以及人们如何可能不小心使自己的缓存内存无效。我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?

python apache-spark apache-spark-sql pyspark

15
推荐指数
2
解决办法
2456
查看次数

Spark v3.0.0 - 警告 DAGScheduler:广播大小为 xx 的大型任务二进制文件

我是火花新手。我正在使用以下配置集在 Spark 独立版 (v3.0.0) 中编写机器学习算法:

SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.set("spark.driver.memory", "8g");
conf.set("spark.driver.maxResultSize", "8g");
conf.set("spark.memory.fraction", "0.6");
conf.set("spark.memory.storageFraction", "0.5");
conf.set("spark.sql.shuffle.partitions", "5");
conf.set("spark.memory.offHeap.enabled", "false");
conf.set("spark.reducer.maxSizeInFlight", "96m");
conf.set("spark.shuffle.file.buffer", "256k");
conf.set("spark.sql.debug.maxToStringFields", "100");
Run Code Online (Sandbox Code Playgroud)

这就是我创建 CrossValidator 的方式

ParamMap[] paramGrid = new ParamGridBuilder()
            .addGrid(gbt.maxBins(), new int[]{50})
            .addGrid(gbt.maxDepth(), new int[]{2, 5, 10})
            .addGrid(gbt.maxIter(), new int[]{5, 20, 40})
            .addGrid(gbt.minInfoGain(), new double[]{0.0d, .1d, .5d})
            .build();

    CrossValidator gbcv = new CrossValidator()
            .setEstimator(gbt)
            .setEstimatorParamMaps(paramGrid)
            .setEvaluator(gbevaluator)
            .setNumFolds(5)
            .setParallelism(8)
            .setSeed(session.getArguments().getTrainingRandom());
Run Code Online (Sandbox Code Playgroud)

问题是,当(在 paramGrid 中) maxDepth 只是 {2, 5} 和 maxIter {5, 20} 时,一切都工作得很好,但是当它像上面的代码中那样时,它会不断记录: ,其中 …

java apache-spark apache-spark-ml apache-spark-mllib

11
推荐指数
1
解决办法
2万
查看次数

在 Hive 中处理行时出现 Hive 运行时错误

我在查询 ORC 文件格式表时遇到问题

我正在尝试以下查询:

INSERT INTO TABLE <db_name>.<table_name> SELECT FROM <db_name>.<table_name> WHERE CONDITIONS;
Run Code Online (Sandbox Code Playgroud)

这导致:

TaskAttempt 2 failed, info=[Error: Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveExceptio
 Hive Runtime Error while processing row
      at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:186)
      at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:138)
      at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
      at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
      at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
      at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:168)
      at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:163)
      at java.util.concurrent.FutureTask.run(FutureTask.java:266)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)
used by: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row
      at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:91)
      at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
      at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:294)
      at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:163)
      ... 13 more …
Run Code Online (Sandbox Code Playgroud)

hadoop hive hadoop-yarn hadoop2

8
推荐指数
1
解决办法
2万
查看次数

错误:存储桶名称必须与正则表达式“^[a-zA-Z0-9.\-_]{1,255}$”匹配

当我尝试将图像上传到存储桶时,它会引发错误"Invalid bucket name "thum.images ": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$""

我认为存储桶名称没有任何问题。

这是我上传图片的代码:

def upload_thumbnail_image(image_key, thumbnail_image):
    thumbnail_image_bucket = os.environ['thumbnail_bucket']
    thumbnail_image = #image path
    image_key = EFE3-27C8-EEB3-4987/3612d0bc-bdfd-49de-82ee-3e66cbb06807.jpg
    try:
        new_object = client.upload_file(thumbnail_image, thumbnail_image_bucket, image_key)
        return new_object
    except Exception as Exc:
        set_log(Exc.args[0],True)
Run Code Online (Sandbox Code Playgroud)

python regex amazon-s3 bucket amazon-web-services

7
推荐指数
1
解决办法
2万
查看次数

如何查找 Parquet 文件生成时使用的 COMPRESSION_CODEC?

通常在 Impala 中,我们在将数据插入基础文件为 Parquet 格式的表之前使用 COMPRESSION_CODEC。

用于设置 COMPRESSION_CODEC 的命令:

set compression_codec=snappy;
set compression_codec=gzip;
Run Code Online (Sandbox Code Playgroud)

是否可以通过对 Parquet 文件执行任何类型的操作来找出所使用的压缩编解码器的类型?

hadoop impala parquet

7
推荐指数
1
解决办法
4907
查看次数

Spark分区发现的自定义配置

据我所知,Spark 支持分区发现,其中目录名称遵循固定模式:column_name=column_value。链接网页的示例:

path -> to -> table -> gender=male -> country=US -> data.parquet

我想使用此功能,但不幸的是我当前正在处理的文件结构不遵循此模式。我无法改变它并且转换它是不可行的。就我而言,目录纯粹是列值,如下所示:

path -> to -> table -> male -> US -> data.parquet

理想情况下,我想配置 Spark 以指示“table”目录的子目录包含“gender”,然后是“country”子目录,依此类推。

我正在使用 Java 的 Spark 2.11。我也在使用 Parquet 文件。

apache-spark parquet

7
推荐指数
0
解决办法
445
查看次数

Kafka缓存的zkVersion不等于Zookeeper经纪人无法恢复的zkVersion

我有一个包含3个经纪人的kafka集群。我最近开始遇到问题,经纪人退出集群,生产者/消费者抛出领导者不可用的错误。

在检查日志时,我看到以下事件序列:

//许多副本获取程序线程开始/停止

[2017-10-09 14:48:50,600] INFO [ReplicaFetcherManager on broker 6] Removed fetcher for partitions

[2017-10-09 14:48:50,608] INFO [ReplicaFetcherThread-0-7], Shutting down (kafka.server.ReplicaFetcherThread)
[2017-10-09 14:48:50,918] INFO [ReplicaFetcherThread-0-7], Stopped  (kafka.server.ReplicaFetcherThread)
[2017-10-09 14:48:50,918] INFO [ReplicaFetcherThread-0-7], Shutdown completed (kafka.server.ReplicaFetcherThread)
Run Code Online (Sandbox Code Playgroud)

//不断扩大/缩小ISR

[2017-10-09 14:48:51,037] INFO Partition [__consumer_offsets,8] on broker 6: Expanding ISR for partition __consumer_offsets-8 from 6,8 to 6,8,7 (kafka.cluster.Partition)
[2017-10-09 14:48:51,038] INFO Partition [__consumer_offsets,35] on broker 6: Expanding ISR for partition __consumer_offsets-35 from 6,8 to 6,8,7 (kafka.cluster.Partition)

[2017-10-09 14:49:01,702] INFO Partition [t1,1] on broker 6: Shrinking …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-zookeeper

6
推荐指数
1
解决办法
5483
查看次数

Kafka Streams 测试:java.util.NoSuchElementException:未初始化的主题:“output_topic_name”

我已经按照https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为 kafka 流应用程序编写了一个测试类 ,其代码是

import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class KafkaStreamsConfigTest {

private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;

private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();

private String key="test";
private Object value = "some value";
private Object expected_value = "real value";

String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";

String applicationId = "my-app";
String …
Run Code Online (Sandbox Code Playgroud)

unit-testing apache-kafka apache-kafka-streams kafka-topic spring-kafka-test

6
推荐指数
1
解决办法
1458
查看次数

Spark ExecutorLostFailure- 原因:远程 RPC 客户端解除关联。可能是由于容器超过阈值或网络问题

我们使用 Spark 2.4 处理大约 445 GB 的数据。我们的集群有 150 个工人,每个工人有 7 个 CPU 和 127 GB。Spark 以独立模式部署。下面是我们的配置:每个 worker 一个 executor,分配了 7 个 CPU 和 120 GB。RDD 中有 2000 个分区。

我看到有时由于执行人丢失而导致工作失败。以下是错误:

驱动日志:

ExecutorLostFailure (executor 82 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.\nDriver 
Run Code Online (Sandbox Code Playgroud)

执行者日志:

 2020-07-03 01:53:10 INFO  Worker:54 - Executor app-20200702155258-0011/13 finished with state EXITED message Command exited with code 137 exitStatus …
Run Code Online (Sandbox Code Playgroud)

apache-spark

6
推荐指数
1
解决办法
1463
查看次数