SparkSQL CLI内部使用HiveQL,如果是Hive on spark(HIVE-7292),则hive使用spark作为后端引擎.有人可以提供更多的亮点,这两种情况究竟有何不同以及两种方法的利弊?
很难给出正确的标题。好的,就到这里了。我正在按照本教程在我的 Mac(Mojave 版本)上安装 Apache Airflow -
https://towardsdatascience.com/getting-started-with-apache-airflow-df1aa77d7b1b
在执行 pip install 气流任务后的第一步,当我运行气流版本命令时,我收到以下错误,然后出现气流版本 -
错误 - 使用 Traceback 执行前回调失败(最近一次调用最后一次):文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/base.py”,第 1244 行,在 _execute_context游标、语句、参数、上下文 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/engine/default.py”,第 552 行,在 do_execute cursor.execute(statement, parameters) sqlite3 中。 OperationalError:没有这样的表:日志
上述异常是以下异常的直接原因:
回溯(最近一次调用最后一次):文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/airflow/utils/cli_action_loggers.py”,第 68 行,在 on_pre_execution cb(**kwargs) 文件“/ Users/karthikv/anaconda3/lib/python3.7/site-packages/airflow/utils/cli_action_loggers.py”,第 99 行,在 default_action_log session.add(log) 文件中“/Users/karthikv/anaconda3/lib/python3.7 /contextlib.py”,第 119 行,在 退出 next(self.gen) 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/airflow/utils/db.py”,第 45 行,在 create_session session.commit() 文件“/Users/karthikv /anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”,第 1026 行,提交 self.transaction.commit() 文件“/Users/karthikv/anaconda3/lib/python3.7/site -packages/sqlalchemy/orm/session.py”,第 493 行,提交 self._prepare_impl() 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”,第 472 行,在 _prepare_impl self.session.flush() 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”中,第 2451 行,冲洗 self._flush(objects ) 文件 "/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/orm/session.py”,第 2589 行,在 _flush transaction.rollback(_capture_exception=True) 文件“/Users/karthikv/anaconda3/lib/python3.7/site-packages/sqlalchemy/util /langhelpers.py”,第 …
我尝试将“small_radio_json.json”加载到 Delta Lake 表。在此代码之后我将创建表。
我尝试创建 Delta 表,但收到错误“写入 Delta 表时检测到架构不匹配”。可能与分区有关 events.write.format("delta").mode("overwrite").partitionBy("artist").save("/delta/events/")
如何修复或修改代码。
//https://learn.microsoft.com/en-us/azure/azure-databricks/databricks-extract-load-sql-data-warehouse
//https://learn.microsoft.com/en-us/azure/databricks/_static/notebooks/delta/quickstart-scala.html
//Session configuration
val appID = "123558b9-3525-4c62-8c48-d3d7e2c16a6a"
val secret = "123[xEPjpOIBJtBS-W9B9Zsv7h9IF:qw"
val tenantID = "12344839-0afa-4fae-a34a-326c42112bca"
spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",
"org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "<appID>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<tenant-
id>/oauth2/token")
spark.conf.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
//Account Information
val storageAccountName = "mydatalake"
val fileSystemName = "fileshare1"
spark.conf.set("fs.azure.account.auth.type." + storageAccountName + ".dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type." + storageAccountName +
".dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id." + storageAccountName + ".dfs.core.windows.net",
"" + appID + "")
spark.conf.set("fs.azure.account.oauth2.client.secret." + storageAccountName +
".dfs.core.windows.net", "" + secret …
Run Code Online (Sandbox Code Playgroud) 如何在我的 py Spark 代码中检查 SparkSession 状态?要求是检查sparkSession是否处于活动状态。如果sparksession未激活,创建另一个spark会话并调用一些函数
我正在 Jupyter 笔记本中编写并运行此代码。
spark = SparkSession.builder.master("yarn") \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.config("spark.shuffle.spill.compress", "true")
.config("spark.shuffle.service.enabled", "true")
.config("spark.io.compression.codec", "snappy")
.config("spark.kryoserializer.buffer.max", "250m") \
.config("spark.driver.memory", memory) \
.config("spark.driver.cores", cores) \
.config("spark.executor.cores", cores) \
.config("spark.executor.memory", memory) \
.config("spark.executor.instances", cores) \
.enableHiveSupport()\
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
3.
if(spark):
print("yes")
else:
print("no")
Run Code Online (Sandbox Code Playgroud)
打印“是”
spark.stop()
它停止了 Spark 应用程序——我签入了 UI
但是当我再次在第三步运行代码时
5.
if(spark):
print("yes")
else:
print("no")
Run Code Online (Sandbox Code Playgroud)
打印“yes”作为输出
error : AttributeError: 'NoneType' object has no attribute 'sc'
Run Code Online (Sandbox Code Playgroud)
df = spark.read.csv(file_name) …
Run Code Online (Sandbox Code Playgroud) 我正在探索以可扩展且经济高效的方式存储来自传感器的大量数据(时间序列数据)的方法。
目前,我正在为每个传感器编写一个 CSV 文件,按日期分区,因此我的文件系统层次结构如下所示:
client_id/sensor_id/year/month/day.csv
我的目标是能够对此数据执行 SQL 查询(通常获取特定客户端/传感器的时间范围、执行聚合等)我尝试将其加载到 和Postgres
,timescaledb
但数据量太大并且查询速度慢得不合理。
我现在正在尝试使用Spark
和Parquet
文件来执行这些查询,但我有一些问题无法从我对该主题的研究中得到解答,即:
我正在将此数据转换为镶木地板文件,所以我现在有这样的内容:
client_id/sensor_id/year/month/day.parquet
但我担心的是,当Spark
加载包含许多Parquet
文件的顶部文件夹时,行组信息的元数据并不像我使用包含所有数据(按client/sensor/year/month/day
. 这是真的?或者拥有多个 Parquet 文件或单个分区的 Parquet 文件是否相同?我知道镶木地板文件在内部存储在像我正在使用的文件夹层次结构中,但我不清楚这如何影响文件的元数据。
我无法执行此操作的原因是我不断接收新数据,并且根据我的理解,由于页脚元数据工作的性质,我无法附加到镶木地板文件。它是否正确?现在,我只需将前一天的数据转换为镶木地板,并为每个客户端的每个传感器创建一个新文件。
谢谢。
我正在通过将“源”、“目标”、“关系”作为数据帧传递来创建网络图,我想向上图中的某些节点添加属性,因此我将属性作为字典传递给图表,我不这样做知道如何向已生成的图形添加属性,因为我有多个属性,所以我应该使用字典。
graph = nx.from_pandas_edgelist(main_df, source='Source', target='Target',
edge_attr='Relationship')
nx.set_node_attributes(graph, node_dict)
Run Code Online (Sandbox Code Playgroud)
首先,我通过传递以下参数来运行图表,然后对于该图表,我传递具有属性的字典。如何将属性字典添加到“图表”?
嗨,我对 apache Spark 比较陌生。我想了解 RDD、数据帧和数据集之间的区别。
例如,我正在从 s3 存储桶中提取数据。
df=spark.read.parquet("s3://output/unattributedunattributed*")
Run Code Online (Sandbox Code Playgroud)
在这种情况下,当我从 s3 加载数据时,RDD 是什么?另外,由于 RDD 是不可变的,我可以更改 df 的值,因此 df 不能是 rdd。
如果有人能解释 RDD、数据帧和数据集之间的区别,我将不胜感激。
我正在运行一个 EMR 实例。它工作正常,但当我尝试从 Python Spark 脚本访问 S3 文件时,它突然开始出现以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o36.json.:
java.lang.RuntimeException:
java.lang.ClassNotFoundException:
Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
Run Code Online (Sandbox Code Playgroud)
我们如何解决这个问题?
提前致谢。
apache-spark ×4
pyspark ×3
python ×3
airflow ×1
amazon-emr ×1
amazon-s3 ×1
delta-lake ×1
graph ×1
hadoop ×1
hive ×1
networkx ×1
parquet ×1
pip ×1
scala ×1
sqlalchemy ×1
time-series ×1