我正在尝试使用Spark数据帧而不是RDD,因为它们看起来比RDD更高级,并且往往会产生更易读的代码.
在一个14节点的Google Dataproc集群中,我有大约6百万个名称被两个不同的系统转换为ID:sa和sb.每个Row包含name,id_sa和id_sb.我的目标是从生产映射id_sa到id_sb使得对于每id_sa时,相应的id_sb是连接到所有名称中最常见的ID id_sa.
让我们试着用一个例子来澄清.如果我有以下行:
[Row(name='n1', id_sa='a1', id_sb='b1'),
Row(name='n2', id_sa='a1', id_sb='b2'),
Row(name='n3', id_sa='a1', id_sb='b2'),
Row(name='n4', id_sa='a2', id_sb='b2')]
Run Code Online (Sandbox Code Playgroud)
我的目标是从生产映射a1到b2.事实上,相关的名称a1是n1,n2和n3,分别映射b1,b2和b2,因此b2是相关联的名称最常见的映射a1.以同样的方式,a2将映射到b2.可以假设总有一个胜利者:不需要打破关系.
我希望我可以使用groupBy(df.id_sa)我的数据帧,但我不知道接下来该做什么.我希望最终会产生以下行的聚合:
[Row(id_sa=a1, max_id_sb=b2),
Row(id_sa=a2, max_id_sb=b2)]
Run Code Online (Sandbox Code Playgroud)
但也许我正在尝试使用错误的工具,我应该回到使用RDD.
假设我有一个函数来生成 (py)spark 数据帧,将数据帧缓存到内存中作为最后一个操作。
def gen_func(inputs):
df = ... do stuff...
df.cache()
df.count()
return df
Run Code Online (Sandbox Code Playgroud)
根据我的理解,Spark 的缓存工作如下:
cache/persistplus 一个动作 ( count()) 时,它是从它的 DAG 计算出来的,并缓存到内存中,附加到引用它的对象上。我的问题是,假设我gen_func用来生成一个数据框,然后覆盖原始数据框引用(可能是 afilter或 a withColumn)。
df=gen_func(inputs)
df=df.filter("some_col = some_val")
Run Code Online (Sandbox Code Playgroud)
在 Spark 中,RDD/DF 是不可变的,因此过滤器后重新分配的 df 和过滤器前的 df 指的是两个完全不同的对象。在这种情况下,对原始 df 的引用cache/counted已被覆盖。这是否意味着缓存的数据帧不再可用并将被垃圾收集?这是否意味着新的后置过滤器df将从头开始计算所有内容,尽管是从先前缓存的数据帧生成的?
我问这个是因为我最近正在修复我的代码的一些内存不足问题,在我看来,缓存可能是问题所在。然而,我还没有真正理解什么是使用缓存的安全方法的全部细节,以及人们如何可能不小心使自己的缓存内存无效。我的理解中缺少什么?我在执行上述操作时是否偏离了最佳实践?
我是火花新手。我正在使用以下配置集在 Spark 独立版 (v3.0.0) 中编写机器学习算法:
SparkConf conf = new SparkConf();
conf.setMaster("local[*]");
conf.set("spark.driver.memory", "8g");
conf.set("spark.driver.maxResultSize", "8g");
conf.set("spark.memory.fraction", "0.6");
conf.set("spark.memory.storageFraction", "0.5");
conf.set("spark.sql.shuffle.partitions", "5");
conf.set("spark.memory.offHeap.enabled", "false");
conf.set("spark.reducer.maxSizeInFlight", "96m");
conf.set("spark.shuffle.file.buffer", "256k");
conf.set("spark.sql.debug.maxToStringFields", "100");
Run Code Online (Sandbox Code Playgroud)
这就是我创建 CrossValidator 的方式
ParamMap[] paramGrid = new ParamGridBuilder()
.addGrid(gbt.maxBins(), new int[]{50})
.addGrid(gbt.maxDepth(), new int[]{2, 5, 10})
.addGrid(gbt.maxIter(), new int[]{5, 20, 40})
.addGrid(gbt.minInfoGain(), new double[]{0.0d, .1d, .5d})
.build();
CrossValidator gbcv = new CrossValidator()
.setEstimator(gbt)
.setEstimatorParamMaps(paramGrid)
.setEvaluator(gbevaluator)
.setNumFolds(5)
.setParallelism(8)
.setSeed(session.getArguments().getTrainingRandom());
Run Code Online (Sandbox Code Playgroud)
问题是,当(在 paramGrid 中) maxDepth 只是 {2, 5} 和 maxIter {5, 20} 时,一切都工作得很好,但是当它像上面的代码中那样时,它会不断记录: ,其中 …
我在查询 ORC 文件格式表时遇到问题
我正在尝试以下查询:
INSERT INTO TABLE <db_name>.<table_name> SELECT FROM <db_name>.<table_name> WHERE CONDITIONS;
Run Code Online (Sandbox Code Playgroud)
这导致:
TaskAttempt 2 failed, info=[Error: Failure while running task:java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveExceptio
Hive Runtime Error while processing row
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:186)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.run(TezProcessor.java:138)
at org.apache.tez.runtime.LogicalIOProcessorRuntimeTask.run(LogicalIOProcessorRuntimeTask.java:324)
at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:176)
at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable$1.run(TezTaskRunner.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:168)
at org.apache.tez.runtime.task.TezTaskRunner$TaskRunnerCallable.call(TezTaskRunner.java:163)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
used by: java.lang.RuntimeException: org.apache.hadoop.hive.ql.metadata.HiveException: Hive Runtime Error while processing row
at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.processRow(MapRecordSource.java:91)
at org.apache.hadoop.hive.ql.exec.tez.MapRecordSource.pushRecord(MapRecordSource.java:68)
at org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor.run(MapRecordProcessor.java:294)
at org.apache.hadoop.hive.ql.exec.tez.TezProcessor.initializeAndRunProcessor(TezProcessor.java:163)
... 13 more …Run Code Online (Sandbox Code Playgroud) 当我尝试将图像上传到存储桶时,它会引发错误"Invalid bucket name "thum.images ": Bucket name must match the regex "^[a-zA-Z0-9.\-_]{1,255}$""。
我认为存储桶名称没有任何问题。
这是我上传图片的代码:
def upload_thumbnail_image(image_key, thumbnail_image):
thumbnail_image_bucket = os.environ['thumbnail_bucket']
thumbnail_image = #image path
image_key = EFE3-27C8-EEB3-4987/3612d0bc-bdfd-49de-82ee-3e66cbb06807.jpg
try:
new_object = client.upload_file(thumbnail_image, thumbnail_image_bucket, image_key)
return new_object
except Exception as Exc:
set_log(Exc.args[0],True)
Run Code Online (Sandbox Code Playgroud) 通常在 Impala 中,我们在将数据插入基础文件为 Parquet 格式的表之前使用 COMPRESSION_CODEC。
用于设置 COMPRESSION_CODEC 的命令:
set compression_codec=snappy;
set compression_codec=gzip;
Run Code Online (Sandbox Code Playgroud)
是否可以通过对 Parquet 文件执行任何类型的操作来找出所使用的压缩编解码器的类型?
据我所知,Spark 支持分区发现,其中目录名称遵循固定模式:column_name=column_value。链接网页的示例:
path -> to -> table -> gender=male -> country=US -> data.parquet
我想使用此功能,但不幸的是我当前正在处理的文件结构不遵循此模式。我无法改变它并且转换它是不可行的。就我而言,目录纯粹是列值,如下所示:
path -> to -> table -> male -> US -> data.parquet
理想情况下,我想配置 Spark 以指示“table”目录的子目录包含“gender”,然后是“country”子目录,依此类推。
我正在使用 Java 的 Spark 2.11。我也在使用 Parquet 文件。
我有一个包含3个经纪人的kafka集群。我最近开始遇到问题,经纪人退出集群,生产者/消费者抛出领导者不可用的错误。
在检查日志时,我看到以下事件序列:
//许多副本获取程序线程开始/停止
[2017-10-09 14:48:50,600] INFO [ReplicaFetcherManager on broker 6] Removed fetcher for partitions
[2017-10-09 14:48:50,608] INFO [ReplicaFetcherThread-0-7], Shutting down (kafka.server.ReplicaFetcherThread)
[2017-10-09 14:48:50,918] INFO [ReplicaFetcherThread-0-7], Stopped (kafka.server.ReplicaFetcherThread)
[2017-10-09 14:48:50,918] INFO [ReplicaFetcherThread-0-7], Shutdown completed (kafka.server.ReplicaFetcherThread)
Run Code Online (Sandbox Code Playgroud)
//不断扩大/缩小ISR
[2017-10-09 14:48:51,037] INFO Partition [__consumer_offsets,8] on broker 6: Expanding ISR for partition __consumer_offsets-8 from 6,8 to 6,8,7 (kafka.cluster.Partition)
[2017-10-09 14:48:51,038] INFO Partition [__consumer_offsets,35] on broker 6: Expanding ISR for partition __consumer_offsets-35 from 6,8 to 6,8,7 (kafka.cluster.Partition)
[2017-10-09 14:49:01,702] INFO Partition [t1,1] on broker 6: Shrinking …Run Code Online (Sandbox Code Playgroud) 我已经按照https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html为 kafka 流应用程序编写了一个测试类 ,其代码是
import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.util.Properties;
public class KafkaStreamsConfigTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;
private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();
private String key="test";
private Object value = "some value";
private Object expected_value = "real value";
String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";
String applicationId = "my-app";
String …Run Code Online (Sandbox Code Playgroud) unit-testing apache-kafka apache-kafka-streams kafka-topic spring-kafka-test
我们使用 Spark 2.4 处理大约 445 GB 的数据。我们的集群有 150 个工人,每个工人有 7 个 CPU 和 127 GB。Spark 以独立模式部署。下面是我们的配置:每个 worker 一个 executor,分配了 7 个 CPU 和 120 GB。RDD 中有 2000 个分区。
我看到有时由于执行人丢失而导致工作失败。以下是错误:
驱动日志:
ExecutorLostFailure (executor 82 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.\nDriver
Run Code Online (Sandbox Code Playgroud)
执行者日志:
2020-07-03 01:53:10 INFO Worker:54 - Executor app-20200702155258-0011/13 finished with state EXITED message Command exited with code 137 exitStatus …Run Code Online (Sandbox Code Playgroud) apache-spark ×5
apache-kafka ×2
hadoop ×2
parquet ×2
pyspark ×2
python ×2
amazon-s3 ×1
bucket ×1
hadoop-yarn ×1
hadoop2 ×1
hive ×1
impala ×1
java ×1
kafka-topic ×1
regex ×1
unit-testing ×1