小编Ang*_*Sen的帖子

Azure Blob - 使用Python读取

有人能告诉我是否可以直接从Azure blob存储中读取csv文件作为流并使用Python处理它?我知道它可以使用C#.Net(如下所示)完成,但想知道Python中的等效库来执行此操作.

CloudBlobClient client = storageAccount.CreateCloudBlobClient();
CloudBlobContainer container = client.GetContainerReference("outfiles");
CloudBlob blob = container.GetBlobReference("Test.csv");*
Run Code Online (Sandbox Code Playgroud)

python azure azure-storage-blobs

12
推荐指数
4
解决办法
1万
查看次数

将Python DataFrame作为CSV写入Azure Blob

我有两个关于从/向Azure blob读取和编写Python对象的问题.

1)有人能告诉我如何将Python数据帧作为csv文件直接写入Azure Blob而不在本地存储吗?

我尝试使用函数create_blob_from_text和create_blob_from_stream, 但它们都不起作用.

将数据帧转换为字符串并使用create_blob_from_text函数将文件写入blob但是作为普通字符串而不是csv.

    df_b = df.to_string()
    block_blob_service.create_blob_from_text('test', 'OutFilePy.csv', df_b)  
Run Code Online (Sandbox Code Playgroud)

2)如何直接将Azure blob存储中的json文件直接读入Python?

python azure azure-storage azure-blob-storage

6
推荐指数
2
解决办法
5406
查看次数

来自Python Dictionary的PySpark Dataframe没有Pandas

我试图将以下Python dict转换为PySpark DataFrame,但我没有获得预期的输出.

dict_lst = {'letters': ['a', 'b', 'c'], 
             'numbers': [10, 20, 30]}
df_dict = sc.parallelize([dict_lst]).toDF()  # Result not as expected
df_dict.show()
Run Code Online (Sandbox Code Playgroud)

有没有办法在不使用熊猫的情况下做到这一点?

pyspark pyspark-sql

4
推荐指数
1
解决办法
927
查看次数

SFTP、SSH 和 SSH 隧道

我想详细了解 SSH 隧道的概念,因为我正在围绕这个主题学习一些东西。我已经在公共论坛上浏览了一些细节,但仍然有一些问题。

  1. SFTP 服务正在远程服务器中运行,我已获得连接到它的凭据。我使用像 WinScp 这样的 GUI 来连接远程服务器。SSH 隧道在这里的作用是什么?
  2. 远程 SFTP 服务器管理员要求我从我的机器生成 RSA 公钥并将其添加到远程服务器。现在,我可以在没有密码的情况下从 SSH 终端直接连接到服务器。SSH 隧道在这里的作用是什么?
  3. 隧道是隐式的还是需要在某些情况下显式调用?

请说清楚。

ssh sftp ssh-tunnel

4
推荐指数
1
解决办法
687
查看次数

PySpark 并行读取多个文件

我的项目中有以下要求,我们正在尝试使用 PySpark 进行数据处理。

我们过去常常以 Parquet 文件的形式接收每辆车的传感器数据,以及每辆车的一个文件。该文件有很多传感器,但其结构化数据为 Parquet 格式。每个文件的平均文件大小为 200MB。

假设我在一批中收到如下文件并准备处理。

训练文件大小日期

X1 210MB 18 年 9 月 5 日上午 12:10

X1 280MB 18 年 9 月 5 日下午 05:10

Y1 220MB 18 年 9 月 5 日上午 04:10

Y1 241MB 18 年 9 月 5 日下午 06:10

在处理结束时,我需要从每个源文件或一个主文件中接收一个汇总的 .csv 文件,其中包含所有这些车辆的汇总数据。

我知道 HDFS 默认块大小是 128MB,每个文件将被分成 2 个块。我可以知道如何使用 PySpark 完成此要求吗?是否可以并行处理所有这些文件?

请让我知道你的想法

parquet apache-spark-sql pyspark pyspark-sql

3
推荐指数
2
解决办法
7448
查看次数

Azure 文件共享 - 递归目录搜索,如 os.walk

我正在编写一个 Python 脚本来从 Azure 文件共享下载文件。文件共享的结构如下:

/analytics/Part1/file1.txt
/analytics/Part1/file2.txt
/analytics/mainfile.txt
/analytics/Part1/Part1_1/file11.txt
Run Code Online (Sandbox Code Playgroud)

我尝试在脚本中使用以下几行,但它仅在根目录级别查找文件和目录。

/analytics/Part1/file1.txt
/analytics/Part1/file2.txt
/analytics/mainfile.txt
/analytics/Part1/Part1_1/file11.txt
Run Code Online (Sandbox Code Playgroud)

输出是:

/analytics/mainfile.txt  --> File
/analytics/Part1 --> Dir
Run Code Online (Sandbox Code Playgroud)

但是,我正在寻找类似os.walk()Python 中的函数的东西来实现这种递归目录遍历。知道 Azure 文件服务 Python API 中是否提供此类功能吗?

azure python-3.x azure-files azure-storage-files

3
推荐指数
1
解决办法
3161
查看次数

Python Azure Blob 直接上传

我正在尝试将 csv 作为流从 Azure blob 读取到 Python 中,然后直接将其写回 Azure blob。读取操作工作得很好,但写入输出流只是将一个空文件写入 blob。以下代码在 print(df) 之前有效,但在那之后就无效。

下面是代码:

代码:

from io import BytesIO, StringIO

with BytesIO() as input_blob:   

  with BytesIO() as output_blob:

    block_blob_service = BlockBlobService(account_name='aaaccc', account_key='*/*/*--')

    block_blob_service.get_blob_to_stream('test', 'Source.csv', input_blob)

    input_blob.seek(0)

    df=pd.read_csv(input_blob)

    print(df)

    copyfileobj(input_blob, output_blob)

    block_blob_service.create_blob_from_stream('test', 'OutFilePy.csv', output_blob)
Run Code Online (Sandbox Code Playgroud)

python azure-storage azure-blob-storage

2
推荐指数
1
解决办法
1400
查看次数

Apache Spark 2.0 (PySpark) - 为 csv 找到多个数据帧错误源

我正在尝试使用 Spark 2.0 中的以下代码创建数据帧。在 Jupyter/Console 中执行代码时,我面临以下错误。有人可以帮助我如何摆脱这个错误吗?

错误:

Py4JJavaError:调用 o34.csv 时发生错误。: java.lang.RuntimeException: 为 csv 找到多个源(org.apache.spark.sql.execution.datasources.csv.CSVFileFormat, com.databricks.spark.csv.DefaultSource15),请指定完全限定的类名。在 scala.sys.package$.error(package.scala:27​​)

代码:

   from pyspark.sql import SparkSession
   if __name__ == "__main__":
      session = SparkSession.builder.master('local')
                     .appName("RealEstateSurvey").getOrCreate()
      df = session \
           .read \
           .option("inferSchema", value = True) \
           .option('header','true') \
           .csv("/home/senthiljdpm/RealEstate.csv")

     print("=== Print out schema ===")
     session.stop()
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark pyspark-sql

2
推荐指数
1
解决办法
3658
查看次数

PySpark Spark-submit 命令带有 --files 参数错误

我正在 Spark 2.3 集群中使用以下命令运行 PySpark 作业。

spark-submit 
--deploy-mode cluster 
--master yarn 
--files ETLConfig.json 
PySpark_ETL_Job_v0.2.py
Run Code Online (Sandbox Code Playgroud)

ETLConfig.json 有一个传递给 PySpark 脚本的参数。我在主块中引用此配置 json 文件,如下所示:

spark-submit 
--deploy-mode cluster 
--master yarn 
--files ETLConfig.json 
PySpark_ETL_Job_v0.2.py
Run Code Online (Sandbox Code Playgroud)

但是,该命令会引发以下错误。

No such file or directory: u'/tmp/spark-7dbe9acd-8b02-403a-987d-3accfc881a98/userFiles-4df4-5460-bd9c-4946-b289-6433-drgs/ETLConfig.json'
Run Code Online (Sandbox Code Playgroud)

我可以知道我的脚本有什么问题吗?我也尝试过使用SparkFiles.get()命令,但它也不起作用。

apache-spark apache-spark-sql pyspark spark-submit

2
推荐指数
1
解决办法
9841
查看次数