use*_*714 6 python hdfs apache-spark pyspark
我想在我的Spark程序(Pyspark)开始时做一些清理工作.例如,我想从之前的HDFS运行中删除数据.在猪中,这可以使用诸如的命令来完成
fs -copyFromLocal ....
rmf /path/to-/hdfs
Run Code Online (Sandbox Code Playgroud)
或本地使用sh命令.
我想知道如何与Pyspark做同样的事情.
Moh*_*OUI 17
您可以hdfs在PySpark不使用第三方依赖项的情况下删除路径,如下所示:
from pyspark.sql import SparkSession
# example of preparing a spark session
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
# Prepare a FileSystem manager
fs = (sc._jvm.org
.apache.hadoop
.fs.FileSystem
.get(sc._jsc.hadoopConfiguration())
)
path = "Your/hdfs/path"
# use the FileSystem manager to remove the path
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
Run Code Online (Sandbox Code Playgroud)
为了进一步改进,您可以将上述想法包装成一个辅助函数,您可以在作业/包中重复使用该函数:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
def delete_path(spark, path):
sc = spark.sparkContext
fs = (sc._jvm.org
.apache.hadoop
.fs.FileSystem
.get(sc._jsc.hadoopConfiguration())
)
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
delete_path(spark, "Your/hdfs/path")
Run Code Online (Sandbox Code Playgroud)
zer*_*323 14
你可以使用表单示例subprocess.call或sh库执行任意shell命令,所以这样的东西应该可以正常工作:
import subprocess
some_path = ...
subprocess.call(["hadoop", "fs", "-rm", "-f", some_path])
Run Code Online (Sandbox Code Playgroud)
如果您使用Python 2.x,您可以尝试使用spotify/snakebite:
from snakebite.client import Client
host = ...
port = ...
client = Client(host, port)
client.delete(some_path, recurse=True)
Run Code Online (Sandbox Code Playgroud)
hdfs3 是另一个可以用来做同样事情的库:
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host=host, port=port)
HDFileSystem.rm(some_path)
Run Code Online (Sandbox Code Playgroud)
来自https://diogoalexandrefranco.github.io/interacting-with-hdfs-from-pyspark/ 仅使用 PySpark
######
# Get fs handler from java gateway
######
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(URI("hdfs://somehost:8020"), sc._jsc.hadoopConfiguration())
# We can now use the Hadoop FileSystem API (https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html)
fs.listStatus(Path('/user/hive/warehouse'))
# or
fs.delete(Path('some_path'))
Run Code Online (Sandbox Code Playgroud)
其他解决方案在我的情况下不起作用,但这篇博文有所帮助:)
| 归档时间: |
|
| 查看次数: |
12630 次 |
| 最近记录: |