小编Koe*_*dlt的帖子

使用 Zef (Raku) 安装 Inline::Perl5 时出错

我正在尝试在 Debian 11 x64 上安装 Raku。我以前从未安装过(perl5也是如此)

这是一个带有一些 Perl5 脚本的服务器,我想在 Raku 中通过 Inline::Perl5 “使用”它(如果我在 Raku 模块中找不到我想要的东西,我也想使用 Perl5 模块),但我不能。

perl5是通过APT安装的,rakudo也是如此。

运行时zef install Inline::Perl5,我得到以下信息:

===> Searching for: Inline::Perl5
===> Searching for missing dependencies: Distribution::Builder::MakeFromJSON:ver<0.6+>
===> Searching for missing dependencies: System::Query
===> Failed to find dependencies: System::Query Failed to resolve some missing dependencies
Run Code Online (Sandbox Code Playgroud)

如果我尝试安装System::Queryzef我会得到:

===> Searching for: System::Query
No candidates found matching identity: System::Query
Run Code Online (Sandbox Code Playgroud)

你有什么想法可以帮助我吗?

多谢

rakudo zef raku

9
推荐指数
1
解决办法
178
查看次数

如果两个阶段使用相同的 DataFrame,spark 是否会读取同一文件两次?

以下代码读取相同的 csv 两次,即使只调用一个操作

端到端可运行示例:

import pandas as pd
import numpy as np

df1=  pd.DataFrame(np.arange(1_000).reshape(-1,1))
df1.index = np.random.choice(range(10),size=1000)
df1.to_csv("./df1.csv",index_label = "index")
############################################################################

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StringType, StructField

spark = SparkSession.builder.config("spark.sql.autoBroadcastJoinThreshold","-1").\
config("spark.sql.adaptive.enabled","false").getOrCreate()

schema = StructType([StructField('index', StringType(), True),
                     StructField('0', StringType(), True)])

df1 = spark.read.csv("./df1.csv", header=True, schema = schema)

df2 = df1.groupby("index").agg(F.mean("0"))
df3 = df1.join(df2,on='index')

df3.explain()
df3.count()
Run Code Online (Sandbox Code Playgroud)

Web UI 中的 sql 选项卡显示以下内容:

在此输入图像描述

如您所见,df1 文件被读取了两次。这是预期的行为吗?为什么会发生这种情况?我只有一项操作,因此管道的同一部分不应运行多次。

我已经在这里阅读了答案。问题几乎是相同的,但是在该问题中使用了 RDD,并且我在 pyspark API 中使用了数据帧。在这个问题中,建议如果要避免多个文件扫描,那么 DataFrames API 会有所帮助,这就是 …

apache-spark apache-spark-sql pyspark

7
推荐指数
1
解决办法
710
查看次数

连接数据框并重命名具有相同名称的结果列

缩短的示例:

vals1 = [(1, "a"), 
        (2, "b"), 
      ]
columns1 = ["id","name"]
df1 = spark.createDataFrame(data=vals1, schema=columns1)

vals2 = [(1, "k"), 
      ]
columns2 = ["id","name"]
df2 = spark.createDataFrame(data=vals2, schema=columns2)

df1 = df1.alias('df1').join(df2.alias('df2'), 'id', 'full')
df1.show()
Run Code Online (Sandbox Code Playgroud)

结果具有名为 的一列id和名为 的两列name。假设真实的数据帧有数十个这样的列,如何重命名具有重复名称的列?

python apache-spark pyspark

6
推荐指数
1
解决办法
530
查看次数

为什么spark.memory.fraction的默认值这么低?

Spark配置文档中,我们了解了以下有关spark.memory.fraction配置参数的信息:

用于执行和存储的(堆空间 - 300MB)的一部分。该值越低,溢出和缓存数据驱逐发生的频率就越高。此配置的目的是为内部元数据、用户数据结构以及稀疏、异常大的记录的情况下的不精确大小估计留出内存。建议将此值保留为默认值。

在撰写此问题时,此配置参数的默认值为 0.6。这意味着,对于具有 32GB 堆空间和默认配置的执行器,我们有:

  • 300MB保留空间(行上的硬编码值)
  • (32GB - 300MB) * 0.6 = 19481MB用于执行+存储的共享内存
  • (32GB - 300MB) * 0.4 = 12987MB用户内存

这个“用户内存”(根据文档用于以下用途:

其余空间 (40%) 保留用于用户数据结构、Spark 中的内部元数据,以及在稀疏和异常大的记录情况下防止 OOM 错误。

在具有 32GB 堆空间的执行器上,我们为此分配 12.7GB 内存,这感觉相当大!

这些用户数据结构/内部元数据/防止 OOM 错误真的需要那么多空间吗?是否有一些引人注目的用户内存使用示例可以说明如此大的用户内存区域的需求?

apache-spark

6
推荐指数
1
解决办法
1109
查看次数

mysqli_connect(): (HY000/2002): 无法通过套接字连接到本地 MySQL 服务器

我需要帮助...我尝试使用 php 显示数据库时遇到此错误:

mysqli_connect(): (HY000/2002): 无法通过 /Applications/MAMP/htdocs/databases.php 中的套接字 '/Applications/MAMP/tmp/mysql/mysql.sock' (2) 连接到本地 MySQL 服务器7 数据库连接失败:无法通过套接字“/Applications/MAMP/tmp/mysql/mysql.sock”连接到本地 MySQL 服务器 (2) (2002)

我已经尝试修复它好几天了,阅读了这个论坛上有相同问题的人的文章,并应用了修复程序(就像这个),但完全没有运气。

这是我尝试过的一些步骤: 进入 php.ini 文件并将套接字更改为 /tmp/mysql.sock (我也尝试将其更改为 /Applications/MAMP/tmp/mysql/mysql.sock 看看会发生什么,没有好的)。

尝试对 /etc/my.cnf 文件执行相同的操作。不好。

尝试了很多其他我什至不记得了的事情。

另外,我使用的是 Mac 10.10.2,并且安装了 MAMP。根据 MA​​MP,只有 Apache 正在工作(Apache 亮了绿灯,MySQL 上什么也没有),但我已经下载了 MySQL,并且根据系统首选项它正在运行,尽管它不会让我因某种原因停止 MySQL 服务器(当我单击stop 它会停止然后自行重新启动)。

请帮忙...

如果重要的话,这是 php:

<?php
  // 1. Create a database connection
  $dbhost = "localhost";
  $dbuser = "widget_cms";
  $dbpass = "*********";
  $dbname = "widget_corp";
  $connection = mysqli_connect($dbhost, $dbuser, $dbpass, $dbname);
  // Test if connection succeeded
  if(mysqli_connect_errno()) …
Run Code Online (Sandbox Code Playgroud)

php mysql sockets apache

5
推荐指数
1
解决办法
1万
查看次数

Spark.read.parquet 和 Spark.read.format.load 之间的速度差异

我试图了解是什么导致了阅读速度的巨大差异。我有一个包含 3000 万行和 38 列的数据框。

final_df=spark.read.parquet("/dbfs/FileStore/path/to/file.parquet")
Run Code Online (Sandbox Code Playgroud)

读取该文件需要 14 分钟。

尽管

final_df = spark.read.format("parquet").load("/dbfs/FileStore/path/to/file.parquet")
Run Code Online (Sandbox Code Playgroud)

读取文件仅需2秒。

apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
2605
查看次数

Spark 任务失败并出现 java.lang.ClassNotFoundException:com.datastax.spark.connector.rdd.partitioner.CassandraPartition

我正在尝试创建一个 Spring Boot (2.7.5) 项目,该项目写入 Cassandra (使用 Spring Data cassandra)并使用 Spark 从 Cassandra 读取。当我将 Spark Master 作为 local[*] 提供时,它工作正常,但当我尝试连接到 Spark Cluster Master URL 时,出现如下错误。

卡桑德拉版本 - 4.0.1

火花版本 - 3.1.3

2022-11-17 11:45:03 - Code generated in 2093.165617 ms
2022-11-17 11:45:03 - Starting job: show at HomeController.java:26
2022-11-17 11:45:03 - Got job 0 (show at HomeController.java:26) with 1 output partitions
2022-11-17 11:45:03 - Final stage: ResultStage 0 (show at HomeController.java:26)
2022-11-17 11:45:03 - Parents of final stage: List()
2022-11-17 11:45:03 - …
Run Code Online (Sandbox Code Playgroud)

cassandra apache-spark spring-data-cassandra spark-cassandra-connector

5
推荐指数
0
解决办法
116
查看次数

hadoop-common / hadoop-aws / aws-java-sdk-bundle version compatibility?

I'm getting this exception on a worker when trying to read from S3:

java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(org.apache.hadoop.fs.statistics.DurationTracker, org.apache.hadoop.util.functional.CallableRaisingIOE)'
Run Code Online (Sandbox Code Playgroud)

The troubleshooting page as well as many other answers to this question I found all say to check my JAR versions. They are:

Spark 3.5.0

java.lang.NoSuchMethodError: 'java.lang.Object org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(org.apache.hadoop.fs.statistics.DurationTracker, org.apache.hadoop.util.functional.CallableRaisingIOE)'
Run Code Online (Sandbox Code Playgroud)

And hadoop-aws/3.3.6 identifies aws-java-sdk-bundle 1.12.367, which is what I have.

Do the versions look right? Is there something else I'm missing?

Thanks!

Command line:

~/spark/spark-3.5.0-bin-hadoop3/bin/spark-submit \
    --jars ~/mysql-connector/mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar,/home/pav/.cache/coursier/v1/https/repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.6/hadoop-common-3.3.6.jar,/home/pav/.cache/coursier/v1/https/repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.6/hadoop-aws-3.3.6.jar,/home/pav/.cache/coursier/v1/https/repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.12.367/aws-java-sdk-bundle-1.12.367.jar \
    --driver-class-path ~/mysql-connector/mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar \
    --master …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 apache-spark

5
推荐指数
1
解决办法
1675
查看次数

AWS Glue ApplyMapping 从双精度数到字符串

我在胶水作业方面遇到了一些令人沮丧的问题。

我有一个从爬虫创建的表。它检查了一些 CSV 数据并创建了一个架构。需要修改模式的某些元素,例如将数字更改为字符串并应用标头。

我似乎在这里遇到了一些问题 - 某些字段的模式似乎已被选择为双精度。当我尝试将其转换为我需要的字符串时,它包含一些空精度,例如 1234 --> 1234.0。

我的映射代码类似于:

applymapping1 = ApplyMapping.apply(
    frame = datasource0, 
    mappings = [
        ("col1","double","first_column_name","string"),
        ("col2","double","second_column_name","string")
    ], 
    transformation_ctx = "applymapping1"
 )
Run Code Online (Sandbox Code Playgroud)

爬取数据后得到的结果表类似于:

first_column_name    second_column_name
1234.0               4321.0
5678.0               8765.0
Run Code Online (Sandbox Code Playgroud)

相对于

first_column_name    second_column_name
1234                 4321
5678                 8765
Run Code Online (Sandbox Code Playgroud)

有没有好的方法来解决这个问题?我尝试将爬虫程序最初创建的表中的模式更改为 bigint 而不是 double,但是当我将映射代码更新为 ("col1","bigint","first_column_name","string" )该表最终会变为空。

amazon-web-services aws-glue

3
推荐指数
1
解决办法
2万
查看次数

是否有任何函数可以定位 PySpark 数据帧列中的所有出现次数?

我有以下 PySpark 数据框

A
1001
1110
1000

我想找1到此列中所有出现的情况,并以如下方式将其放入新列中:

A 发生次数
1001 0,3
1110 0,1,2
1000 0

我尝试了locate如下功能:

from pyspark.sql import functions as F
dfa_occ = dfa.withColumn('Occurrences', F.locate('1', (F.col('A'))-1) )
Run Code Online (Sandbox Code Playgroud)

然而,这仅提取第一次出现的1,而我需要找到所有的!

dataframe apache-spark apache-spark-sql pyspark

3
推荐指数
1
解决办法
374
查看次数

了解 Spark 中序列化结果的总大小

我正在对 Databricks 中托管的巨大 Delta 表的数据框进行非常简单的操作。我面临的问题是,运行几个小时后,代码失败,并显示错误“作业因阶段失败而中止:59014 个任务的序列化结果的总大小 (4.0 GiB) 大于spark.driver.maxResultSize 4.0 GiB” ”。

我正在执行的任务是,我读取数据框中的增量表,添加一个新列用于创建存储桶(20个存储桶),并以覆盖模式保存表,将存储桶添加为另一个分区(已经有3个分区,这个新分区)列将是第四个分区)。因此,我没有在应用程序代码中执行任何会导致大量数据返回驱动程序的操作。下面给出的是示例代码

bucket_number = 20
inputDataframe = spark.read.table("huge_delta_table")
inputDataframe = inputDataframe.withColumn("bucket_key", (translate( substring(col("some_column"), 0, 16), "abcdefghijklmnopqrstuvwxyz", "01234567890123456789012345").cast(LongType()) %buckets_number)+1)
inputDatafrme.write.format("delta")
input.write.format("delta").mode("overwrite").option("overwriteSchema", "true").partitionBy("existing_partition_column1","existing_partition_column2","existing_partition_column3","bucket_key") \
.saveAsTable("huge_delta_table")
Run Code Online (Sandbox Code Playgroud)

我想知道,是否是因为大量的任务导致 Spark 的内部结果元数据变得巨大(当出于协调目的而传回驱动程序时)?

apache-spark pyspark databricks

1
推荐指数
1
解决办法
1313
查看次数

将一种类型的 Spark scala 数据集转换为另一种类型

我有一个具有以下案例类类型的数据集:

  case class AddressRawData(
                         addressId: String,
                         customerId: String,
                         address: String
                       )
Run Code Online (Sandbox Code Playgroud)

我想将其转换为:

case class AddressData(
                          addressId: String,
                          customerId: String,
                          address: String,
                          number: Option[Int], //i.e. it is optional
                          road: Option[String],
                          city: Option[String],
                          country: Option[String]
                        )
Run Code Online (Sandbox Code Playgroud)

使用解析器函数:

  def addressParser(unparsedAddress: Seq[AddressData]): Seq[AddressData] = {
    unparsedAddress.map(address => {
      val split = address.address.split(", ")
      address.copy(
        number = Some(split(0).toInt),
        road = Some(split(1)),
        city = Some(split(2)),
        country = Some(split(3))
      )
    }
    )
  }
Run Code Online (Sandbox Code Playgroud)

我是 Scala 和 Spark 的新手。谁能告诉我如何做到这一点?

scala apache-spark apache-spark-sql scala-spark

0
推荐指数
1
解决办法
434
查看次数