小编Bar*_*zny的帖子

SparkSQL与Spark上的Hive - 差异和利弊?

SparkSQL CLI内部使用HiveQL,如果是Hive on spark(HIVE-7292),则hive使用spark作为后端引擎.有人可以提供更多的亮点,这两种情况究竟有何不同以及两种方法的利弊?

hadoop hive apache-spark apache-spark-sql

29
推荐指数
2
解决办法
4万
查看次数

为什么在 Mac 上安装 Apache Airflow 时没有出现此类表错误?

很难给出正确的标题。好的,就到这里了。我正在按照本教程在我的 Mac(Mojave 版本)上安装 Apache Airflow -

https://towardsdatascience.com/getting-started-with-apache-airflow-df1aa77d7b1b

在执行 pip install 气流任务后的第一步,当我运行气流版本命令时,我收到以下错误,然后出现气流版本 -

错误 - 使用 Traceback 执行前回调失败(最近一次调用最后一次):文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py”,第 1244 行,在 _execute_context游标、语句、参数、上下文 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py”,第 552 行,在 do_execute cursor.execute(statement, parameters) sqlite3 中。 OperationalError:没有这样的表:日志

上述异常是以下异常的直接原因:

回溯(最近一次调用最后一次):文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/airflow/utils/cli_action_loggers.py”,第 68 行,在 on_pre_execution cb(**kwargs) 文件“/ Users/karthikv/anaconda3/lib/python3.7/site-packages/airflow/utils/cli_action_loggers.py”,第 99 行,在 default_action_log session.add(log) 文件中“/Users/karthikv/anaconda3/lib/python3.7 /contextlib.py”,第 119 行,在 退出 next(self.gen) 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/airflow/utils/db.py”,第 45 行,在 create_session session.commit() 文件“/Users/karthikv /anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”,第 1026 行,提交 self.transaction.commit() 文件“/Users/karthikv/anaconda3/lib/python3.7/site -packages/sqlalchemy/orm/session.py”,第 493 行,提交 self._prepare_impl() 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”,第 472 行,在 _prepare_impl self.session.flush() 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”中,第 2451 行,冲洗 self._flush(objects ) 文件 "/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”,第 2589 行,在 _flush transaction.rollback(_capture_exception=True) 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/util /langhelpers.py”,第 …

python sqlalchemy pip airflow

20
推荐指数
1
解决办法
1万
查看次数

写入 Delta 表时检测到架构不匹配 - Azure Databricks

我尝试将“small_radio_json.json”加载到 Delta Lake 表。在此代码之后我将创建表。

我尝试创建 Delta 表,但收到错误“写入 Delta 表时检测到架构不匹配”。可能与分区有关 events.write.format("delta").mode("overwrite").partitionBy("artist").save("/delta/events/")

如何修复或修改代码。

    //https://learn.microsoft.com/en-us/azure/azure-databricks/databricks-extract-load-sql-data-warehouse
    //https://learn.microsoft.com/en-us/azure/databricks/_static/notebooks/delta/quickstart-scala.html
    
    //Session configuration
    val appID = "123558b9-3525-4c62-8c48-d3d7e2c16a6a"
    val secret = "123[xEPjpOIBJtBS-W9B9Zsv7h9IF:qw"
    val tenantID = "12344839-0afa-4fae-a34a-326c42112bca"

    spark.conf.set("fs.azure.account.auth.type", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type", 
    "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
    spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
   spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant- 
   id>/oauth2/token")
   spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")

   //Account Information
    val storageAccountName = "mydatalake"
   val fileSystemName = "fileshare1"

    spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName + 
    ".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net", 
    "" + appID + "")
    spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName + 
    ".dfs.core.windows.net", "" + secret …
Run Code Online (Sandbox Code Playgroud)

scala azure-databricks delta-lake

11
推荐指数
2
解决办法
4万
查看次数

Spark会话状态

如何在我的 py Spark 代码中检查 SparkSession 状态?要求是检查sparkSession是否处于活动状态。如果sparksession未激活,创建另一个spark会话并调用一些函数

我正在 Jupyter 笔记本中编写并运行此代码。

spark = SparkSession.builder.master("yarn") \                                 
    .config("spark.dynamicAllocation.enabled", "true") \
                                    .config("spark.serializer", 
    "org.apache.spark.serializer.KryoSerializer")
    .config("spark.shuffle.spill.compress", "true")
    .config("spark.shuffle.service.enabled", "true")
    .config("spark.io.compression.codec", "snappy")
    .config("spark.kryoserializer.buffer.max", "250m") \
    .config("spark.driver.memory", memory) \
    .config("spark.driver.cores", cores) \
    .config("spark.executor.cores", cores) \
    .config("spark.executor.memory", memory) \
    .config("spark.executor.instances", cores) \
    .enableHiveSupport()\
    .getOrCreate()
Run Code Online (Sandbox Code Playgroud)
  1. Spark 打印 SparkSession 详细信息

3.

if(spark):
    print("yes")

else:
    print("no")
Run Code Online (Sandbox Code Playgroud)

打印“是”

  1. spark.stop()

它停止了 Spark 应用程序——我签入了 UI

但是当我再次在第三步运行代码时

5.

if(spark):
    print("yes")

else:
    print("no")
Run Code Online (Sandbox Code Playgroud)

打印“yes”作为输出

  1. 但它确实会产生火花
error : AttributeError: 'NoneType' object has no attribute 'sc'
Run Code Online (Sandbox Code Playgroud)
  1. 但是当我运行下一个命令时我看到了奇怪的事情
df = spark.read.csv(file_name) …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

7
推荐指数
1
解决办法
7268
查看次数

关于使用 parquet 处理时间序列数据的问题

我正在探索以可扩展且经济高效的方式存储来自传感器的大量数据(时间序列数据)的方法。

目前,我正在为每个传感器编写一个 CSV 文件,按日期分区,因此我的文件系统层次结构如下所示:

client_id/sensor_id/year/month/day.csv

我的目标是能够对此数据执行 SQL 查询(通常获取特定客户端/传感器的时间范围、执行聚合等)我尝试将其加载到 和Postgrestimescaledb但数据量太大并且查询速度慢得不合理。

我现在正在尝试使用SparkParquet文件来执行这些查询,但我有一些问题无法从我对该主题的研究中得到解答,即:

我正在将此数据转换为镶木地板文件,所以我现在有这样的内容:

client_id/sensor_id/year/month/day.parquet

但我担心的是,当Spark加载包含许多Parquet文件的顶部文件夹时,行组信息的元数据并不像我使用包含所有数据(按client/sensor/year/month/day. 这是真的?或者拥有多个 Parquet 文件或单个分区的 Parquet 文件是否相同?我知道镶木地板文件在内部存储在像我正在使用的文件夹层次结构中,但我不清楚这如何影响文件的元数据。

我无法执行此操作的原因是我不断接收新数据,并且根据我的理解,由于页脚元数据工作的性质,我无法附加到镶木地板文件。它是否正确?现在,我只需将前一天的数据转换为镶木地板,并为每个客户端的每个传感器创建一个新文件。

谢谢。

time-series apache-spark parquet

5
推荐指数
1
解决办法
5276
查看次数

如何向现有图表添加属性

我正在通过将“源”、“目标”、“关系”作为数据帧传递来创建网络图,我想向上图中的某些节点添加属性,因此我将属性作为字典传递给图表,我不这样做知道如何向已生成的图形添加属性,因为我有多个属性,所以我应该使用字典。

graph = nx.from_pandas_edgelist(main_df, source='Source', target='Target', 
                                edge_attr='Relationship')

nx.set_node_attributes(graph, node_dict)
Run Code Online (Sandbox Code Playgroud)

首先,我通过传递以下参数来运行图表,然后对于该图表,我传递具有属性的字典。如何将属性字典添加到“图表”?

python graph networkx

4
推荐指数
1
解决办法
2064
查看次数

Spark 中的 RDD 和 Dataframe 有什么区别?

嗨,我对 apache Spark 比较陌生。我想了解 RDD、数据帧和数据集之间的区别。

例如,我正在从 s3 存储桶中提取数据。

df=spark.read.parquet("s3://output/unattributedunattributed*")
Run Code Online (Sandbox Code Playgroud)

在这种情况下,当我从 s3 加载数据时,RDD 是什么?另外,由于 RDD 是不可变的,我可以更改 df 的值,因此 df 不能是 rdd。

如果有人能解释 RDD、数据帧和数据集之间的区别,我将不胜感激。

apache-spark apache-spark-sql pyspark

3
推荐指数
1
解决办法
4040
查看次数

未找到 AWS EMR s3a 文件系统

我正在运行一个 EMR 实例。它工作正常,但当我尝试从 Python Spark 脚本访问 S3 文件时,它突然开始出现以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o36.json.: 
   java.lang.RuntimeException: 
     java.lang.ClassNotFoundException: 
       Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
Run Code Online (Sandbox Code Playgroud)

我们如何解决这个问题?

提前致谢。

amazon-s3 amazon-emr pyspark

1
推荐指数
1
解决办法
3710
查看次数