我从我自己的工作站上的pycharm启动pyspark应用程序到8节点集群.此群集还具有以spark-defaults.conf和spark-env.sh编码的设置
这就是我获取spark上下文变量的方法.
spark = SparkSession \
.builder \
.master("spark://stcpgrnlp06p.options-it.com:7087") \
.appName(__SPARK_APP_NAME__) \
.config("spark.executor.memory", "50g") \
.config("spark.eventlog.enabled", "true") \
.config("spark.eventlog.dir", r"/net/share/grid/bin/spark/UAT/SparkLogs/") \
.config("spark.cores.max", 128) \
.config("spark.sql.crossJoin.enabled", "True") \
.config("spark.executor.extraLibraryPath","/net/share/grid/bin/spark/UAT/bin/vertica-jdbc-8.0.0-0.jar") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.logConf", "true") \
.getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("INFO")
Run Code Online (Sandbox Code Playgroud)
我想看看我的日志中使用的有效配置.这条线
.config("spark.logConf", "true") \
Run Code Online (Sandbox Code Playgroud)
应该使spark api将其有效配置作为INFO记录到日志中,但是默认日志级别设置为WARN,因此我看不到任何消息.
设置这一行
sc.setLogLevel("INFO")
Run Code Online (Sandbox Code Playgroud)
显示INFO消息向前发展,但到那时为时已晚.
如何设置spark开始的默认日志记录级别?
我正在尝试从pyspark中的列表创建一个字典.我有以下列表清单:
rawPositions
Run Code Online (Sandbox Code Playgroud)
给
[[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3904.125, 390412.5],
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3900.75, 390075.0],
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3882.5625, 388256.25],
[1009794, 'LPF6 Comdty', 'BC22', 'Enterprise', 3.0, 3926.25, 392625.0],
[2766232,
'CDX IG CDSI S25 V1 5Y CBBT CORP',
'BC85',
'Enterprise',
30000000.0,
-16323.2439825,
30000000.0],
[2766232,
'CDX IG CDSI S25 V1 5Y CBBT CORP',
'BC85',
'Enterprise',
30000000.0,
-16928.620101900004,
30000000.0],
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 105.0, 129596.25, 12959625.0],
[1009804, 'LPM6 Comdty', 'BC29', 'Jet', 128.0, 162112.0, 16211200.0],
[1009804, …Run Code Online (Sandbox Code Playgroud) 我的火花集群中有一个24核和124Gb ram的节点.当我将spark.executor.memory字段设置为4g,然后广播一个需要3.5gb存储在ram中的变量时,核心是否共同拥有该变量的24个副本?还是一份?
我正在使用pyspark - v1.6.2
我有一组机器,必须与其他进程共享。可以说我不是一个好人,并且希望我的 Spark 执行程序进程比其他人的进程具有更高的优先级。我该如何设置呢?
我使用的是 StandAlone 模式,v2.01,在 RHEL7 上运行
我是 Spark 的新手并试图找到自己的方式。
我有一个 spark 应用程序,它在dataset 上运行一个复杂的 map 函数。此地图功能可能会因主要与数据相关的原因而失败。我怎样才能得到一些关于出了什么问题的有意义的信息?我不知道从哪里开始。
非常感谢!
我有一个 Spark 集群和一个 vertica 数据库。我用
spark.read.jdbc( # etc
Run Code Online (Sandbox Code Playgroud)
将 Spark 数据帧加载到集群中。当我执行某个 groupby 功能时
df2 = df.groupby('factor').agg(F.stddev('sum(PnL)'))
df2.show()
Run Code Online (Sandbox Code Playgroud)
然后我得到一个 vertica 语法异常
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
at org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
at org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
at org.apache.spark.sql.Dataset.head(Dataset.scala:1934) …Run Code Online (Sandbox Code Playgroud) 我有 128 个核心、8 个节点、每个节点 186Gb 内存。
我有从 jdbc 源加载的数据帧 (Df)。它有一个分区。然后我打电话:
c = Df.repartition(128*3).cache().count()
Run Code Online (Sandbox Code Playgroud)
应用程序 Web UI 显示缓存的 rdd 有 384 个分区,但全部位于一个节点(我们称之为节点 1)上,RAM 大小为 57Mb。
当我查看计数阶段时,我看到 384 个任务,全部在节点 1 上执行。
为什么 Spark 不将数据帧均匀分布在所有节点上?
我在 pycharm 中运行这个。以下是我设置的配置值:
spark = SparkSession \
.builder \
.master("spark://sparkmaster:7087") \
.appName(__SPARK_APP_NAME__) \
.config("spark.executor.memory", "80g") \
.config("spark.eventlog.enabled", "True") \
.config("spark.eventlog.dir", r"C:\Temp\Athena\UAT\Logs") \
.config("spark.cores.max", 128) \
.config("spark.sql.crossJoin.enabled", "True") \
.config("spark.executor.extraLibraryPath","/net/share/grid/bin/spark/UAT/bin/vertica-jdbc-8.0.0-0.jar") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
这是我的火花属性
我有一个 concurrentHashMap 实例,一些线程向其中添加了条目。这些值是整数。
同时,其他线程希望检索映射中所有值的总和。我希望这些线程看到一致的值。但是,它们不必总是看到最新的值。
以下代码线程安全吗?
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class MyClass {
private Map<Integer, Integer> values = new ConcurrentHashMap<>();
public void addValue(Integer key, int value){
values.put(key, value);
}
public long sumOfValues(){
return values
.values()
.stream()
.mapToInt(Integer::intValue)
.sum();
}
}
Run Code Online (Sandbox Code Playgroud)
求和运算会在一组一致的值上计算吗?
当 sum 运算发生时,对 put() 的调用会被阻塞吗?
当然我可以自己同步访问,甚至可以拆分读写锁以允许并发读访问和同步写访问,但是我很好奇在使用 concurrentHashMap 作为集合实现时是否有必要。
当我运行以下命令时:
String s2 = "07:05:45PM";
LocalTime time = LocalTime.parse(s2, DateTimeFormatter.ofPattern("HH:mm:ssa"));
Run Code Online (Sandbox Code Playgroud)
我得到:
Exception in thread "main" java.time.format.DateTimeParseException:
Text '07:05:45PM' could not be parsed at index 8
Run Code Online (Sandbox Code Playgroud)
它似乎不喜欢 AM/PM 指示符,但据我所知,这应该有效。我究竟做错了什么?
apache-spark ×8
pyspark ×6
java ×2
ipython ×1
ipython-sql ×1
java-8 ×1
java-stream ×1
localtime ×1
pyspark-sql ×1
python ×1
python-3.x ×1
sqlalchemy ×1
vertica ×1