Pyspark:获取HDFS路径上的文件/目录列表

Fed*_*nzi 17 hadoop apache-spark pyspark

如标题.我知道textFile,但顾名思义,它仅适用于文本文件.我需要访问HDFS(或本地路径)上的路径内的文件/目录.我正在使用pyspark

感谢帮助

vol*_*lhv 35

使用JVM网关可能不是那么优雅,但在某些情况下,下面的代码可能会有所帮助:

URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration())

status = fs.listStatus(Path('/some_dir/yet_another_one_dir/'))

for fileStatus in status:
    print(fileStatus.getPath())
Run Code Online (Sandbox Code Playgroud)

  • 如果要过滤结果,请使用`globStatus`而不是`fileStatus`,例如`status = fs.globStatus(Path('/ some_dir/yet_another_one_dir/*.csv'))` (3认同)
  • 这非常好,因为它不需要我上传额外的库到 spark-submit。 (2认同)

Dar*_* M. 15

如果使用PySpark,则可以交互方式执行命令:


列出所选目录中的所有文件:

hdfs dfs -ls <path>例如hdfs dfs -ls /user/path::

import os
import subprocess

cmd = 'hdfs dfs -ls /user/path'
files = subprocess.check_output(cmd, shell=True).strip().split('\n')
for path in files:
  print path
Run Code Online (Sandbox Code Playgroud)

或者搜索所选目录中的文件:

hdfs dfs -find <path> -name <expression>例如hdfs dfs -find /user/path -name *.txt::

import os
import subprocess

cmd = 'hdfs dfs -find {} -name *.txt'.format(source_dir)
files = subprocess.check_output(cmd, shell=True).strip().split('\n')
for path in files:
  filename = path.split(os.path.sep)[-1].split('.txt')[0]
  print path, filename
Run Code Online (Sandbox Code Playgroud)


Tri*_*eid 12

我认为将Spark仅视为一种数据处理工具是有帮助的,其中一个域开始加载数据.它可以读取多种格式,并且它支持Hadoop glob表达式,这对于从HDFS中的多个路径读取非常有用,但是它没有我知道的用于遍历目录或文件的内置工具,也没有特定于与Hadoop或HDFS交互的实用程序.

有一些可用的工具可以做你想要的,包括esutilhdfs.HDFS的LIB支持CLI和API都,您可以直接跳转到"我怎么列出在Python HDFS文件的正确位置.它看起来像这样:

from hdfs import Config
client = Config().get_client('dev')
files = client.list('the_dir_path')
Run Code Online (Sandbox Code Playgroud)

  • 嗨,您能指导我如何制作该hdfscli.cfg文件,我不知道要放置哪个端口号。[global] default.alias = dev [dev.alias] url = http://dev.namenode:port用户= ann (3认同)

Bry*_*ain 6

这可能对你有用:

import subprocess, re
def listdir(path):
    files = str(subprocess.check_output('hdfs dfs -ls ' + path, shell=True))
    return [re.search(' (/.+)', i).group(1) for i in str(files).split("\\n") if re.search(' (/.+)', i)]

listdir('/user/')
Run Code Online (Sandbox Code Playgroud)

这也有效:

hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path('/user/')
[str(f.getPath()) for f in fs.get(conf).listStatus(path)]
Run Code Online (Sandbox Code Playgroud)