获取CSV到Spark数据帧

Ale*_*ont 20 apache-spark pyspark

我在Spark上使用python并希望将csv放入数据帧.

Spark SQL 的文档奇怪地没有提供CSV作为源的解释.

我找到了Spark-CSV,但是文档的两个部分存在问题:

  • "This package can be added to Spark using the --jars command line option. For example, to include it when starting the spark shell: $ bin/spark-shell --packages com.databricks:spark-csv_2.10:1.0.3" 我每次启动pyspark或spark-submit时是否真的需要添加此参数?它似乎非常不优雅.有没有办法在python中导入它而不是每次重新加载它?

  • df = sqlContext.load(source="com.databricks.spark.csv", header="true", path = "cars.csv")即使我这样做,这也行不通."源"参数在这行代码中代表什么?我如何简单地在linux上加载本地文件,比如"/Spark_Hadoop/spark-1.3.1-bin-cdh4/cars.csv"?

ohr*_*uus 31

随着更新版本的Spark(我相信,1.4),这已经变得容易多了.该表达式sqlContext.read为您提供了一个DataFrameReader实例,其.csv()方法如下:

df = sqlContext.read.csv("/path/to/your.csv")
Run Code Online (Sandbox Code Playgroud)

请注意,您还可以通过向调用添加关键字参数header=True来指示csv文件具有标头.csv().还有一些其他选项可供使用,并在上面的链接中进行了描述.

  • 这应该是新接受的答案。 (4认同)

Ara*_*mar 22

from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)

Employee_rdd = sc.textFile("\..\Employee.csv")
               .map(lambda line: line.split(","))

Employee_df = Employee_rdd.toDF(['Employee_ID','Employee_name'])

Employee_df.show()
Run Code Online (Sandbox Code Playgroud)

  • 您没有显示`sc`是什么。如果是SparkContext(),则应在代码示例中显示分配。 (2认同)

Non*_*one 13

将csv文件读入RDD,然后从原始RDD生成RowRDD.

创建由与步骤1中创建的RDD中的行结构匹配的StructType表示的模式.

通过SQLContext提供的createDataFrame方法将模式应用于行的RDD.

lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))

# The schema is encoded in a string.
schemaString = "name age"

fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)

# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)
Run Code Online (Sandbox Code Playgroud)

来源:SPARK编程指南

  • 这个答案很老,新版本的spark有更简单的方法来实现这一目标.请参阅答案/sf/answers/2914683971/和/sf/answers/3257793101/ (6认同)

abb*_*obh 11

如果您不介意额外的包依赖项,可以使用Pandas来解析CSV文件.它处理内部逗号就好了.

依赖关系:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
Run Code Online (Sandbox Code Playgroud)

立即将整个文件读入Spark DataFrame:

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# If no header:
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) 
s_df = sql_sc.createDataFrame(pandas_df)
Run Code Online (Sandbox Code Playgroud)

或者,更有数据意识的是,您可以将数据块化为Spark RDD然后DF:

chunk_100k = pd.read_csv('file.csv', chunksize=100000)

for chunky in chunk_100k:
    Spark_temp_rdd = sc.parallelize(chunky.values.tolist())
    try:
        Spark_full_rdd += Spark_temp_rdd
    except NameError:
        Spark_full_rdd = Spark_temp_rdd
    del Spark_temp_rdd

Spark_DF = Spark_full_rdd.toDF(['column 1','column 2'])
Run Code Online (Sandbox Code Playgroud)


Gra*_*non 9

对于Pyspark,假设csv文件的第一行包含标题

spark = SparkSession.builder.appName('chosenName').getOrCreate()
df=spark.read.csv('fileNameWithPath', mode="DROPMALFORMED",inferSchema=True, header = True)
Run Code Online (Sandbox Code Playgroud)


Flo*_*ent 6

在Spark 2.0之后,建议使用Spark会话:

from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSession
spark = SparkSession \
    .builder \
    .appName("basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

def mapper(line):
    fields = line.split(',')
    return Row(ID=int(fields[0]), field1=str(fields[1].encode("utf-8")), field2=int(fields[2]), field3=int(fields[3]))

lines = spark.sparkContext.textFile("file.csv")
df = lines.map(mapper)

# Infer the schema, and register the DataFrame as a table.
schemaDf = spark.createDataFrame(df).cache()
schemaDf.createOrReplaceTempView("tablename")
Run Code Online (Sandbox Code Playgroud)