我是Spark的新手,我无法找到这个...我有很多镶木地板文件上传到s3位置:
s3://a-dps/d-l/sco/alpha/20160930/parquet/
Run Code Online (Sandbox Code Playgroud)
此文件夹的总大小为20+ Gb,.如何将其分块并将其读入数据帧如何将所有这些文件加载到数据帧中?
分配给火花集群的内存为6 GB.
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
import pandas
# SparkConf().set("spark.jars.packages","org.apache.hadoop:hadoop-aws:3.0.0-alpha3")
sc = SparkContext.getOrCreate()
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId", 'A')
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey", 's')
sqlContext = SQLContext(sc)
df2 = sqlContext.read.parquet("s3://sm/data/scor/alpha/2016/parquet/*")
Run Code Online (Sandbox Code Playgroud)
错误:
Py4JJavaError: An error occurred while calling o33.parquet.
: java.io.IOException: No FileSystem for scheme: s3
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:372)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) … 我在python中编写spark代码.如何在spark.sql查询中传递变量?
q25 = 500
Q1 = spark.sql("SELECT col1 from table where col2>500 limit $q25 , 1")
Run Code Online (Sandbox Code Playgroud)
目前上面的代码不起作用?我们如何传递变量?
我也尝试过,
Q1 = spark.sql("SELECT col1 from table where col2>500 limit q25='{}' , 1".format(q25))
Run Code Online (Sandbox Code Playgroud) 数据并不完全干净,但在使用熊猫时没有问题。pandas 库为 EDA 提供了许多非常有用的函数。
但是,当我对大数据(即 10 亿条记录和 10 列)使用分析时,从数据库表中读取它时,它没有完成并且我的笔记本电脑内存不足,csv 中的数据大小约为 6 GB,我的 RAM 为 14 GB 我的空闲使用量大约是 3 - 4 GB。
df = pd.read_sql_query("select * from table", conn_params)
profile = pandas.profiling.ProfileReport(df)
profile.to_file(outputfile="myoutput.html")
Run Code Online (Sandbox Code Playgroud)
我也尝试过使用check_recoded = False选项。但这对完全分析没有帮助。有什么办法可以对数据进行分块读取,最终生成一个整体的汇总报告?或任何其他方法将此函数用于大型数据集。
我已经解决了多个问题,这些问题有助于将数据帧分为训练和测试,使用 scikit 或不使用 scikit 等。
但我的问题是我有 2 个不同的 csv(来自不同年份的 2 个不同的数据帧)。我想用一个作为火车,另一个作为测试?
对于线性回归/任何模型如何做到这一点?
python linear-regression training-data scikit-learn data-science
关于如何从 sqlAlchemy 读取数据到 dask 数据帧的文档中没有足够的示例。
我看到的一些例子是:
df = dd.read_sql_table(table='my_table_name', uri=my_sqlalchemy_con_url, index_col='id')
Run Code Online (Sandbox Code Playgroud)
但是我的查询不是一次获取整个表,而是只获取 (select * from ....table A where .....) 类型的查询作为参数传递.. 另外,如何传递连接参数是没有提到。
如何从 sqlAlchemy 连接引擎传递连接参数并传递自定义查询而不是获取整个表。
我知道这是基本的,但在 docs/web 中找不到关于此的参考
编辑 :
d100 = 'mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}'.format("xx", "xx", "xxxxx.com", "3306", "xxxx")
df = dd.read_sql_table(table='select * from ps_g9 limit 10;', uri=d100, index_col='uuid')
Run Code Online (Sandbox Code Playgroud)
我得到表不存在错误
Excel 的第一行包含每个单元格中带有 \n 字符的单词。例如:
月份 “中东北部\n(NSA)” “中东北部\n(SA)” “中东南部\n(NSA)”
因此,在使用以下代码转换为 csv 时:
data_xls = pd.read_excel('/home/scripts/usless/HP_PO_hist.xls', 'sheet1', index_col=4,skiprows=3)
data_xls.to_csv('HH_PO_output.csv', encoding='utf-8')
Run Code Online (Sandbox Code Playgroud)
它将 \n 之后的字符转换为新行,例如:
,月,“中东北部 (国家安全局)”,“中东北部 (SA)","中东南部 (美国国家安全局)”,“中东南部
但预期的输出是这样的:
月份 中东北部 (NSA) 中东北部 (SA) 中东南部 (NSA) 中东南部 (SA)
在 Python df 中转换为 csv 时,如何仅从此索引行中删除此 \n 字符?
我可以使用带有单个条件的数据帧连接语句(在 pyspark 中)但是,如果我尝试添加多个条件,那么它就会失败。
代码 :
summary2 = summary.join(county_prop, ["category_id", "bucket"], how = "leftouter").
Run Code Online (Sandbox Code Playgroud)
上面的代码有效。但是,如果我为列表添加一些其他条件,例如 Summary.bucket == 9 或其他条件,则会失败。请帮我解决这个问题。
The error for the statement
summary2 = summary.join(county_prop, ["category_id", (summary.bucket)==9], how = "leftouter")
ERROR : TypeError: 'Column' object is not callable
Run Code Online (Sandbox Code Playgroud)
编辑 :
添加完整的工作示例。
schema = StructType([StructField("category", StringType()), StructField("category_id", StringType()), StructField("bucket", StringType()), StructField("prop_count", StringType()), StructField("event_count", StringType()), StructField("accum_prop_count",StringType())])
bucket_summary = sqlContext.createDataFrame([],schema)
temp_county_prop = sqlContext.createDataFrame([("nation","nation",1,222,444,555),("nation","state",2,222,444,555)],schema)
bucket_summary = bucket_summary.unionAll(temp_county_prop)
county_prop = sqlContext.createDataFrame([("nation","state",2,121,221,551)],schema)
Run Code Online (Sandbox Code Playgroud)
想要加入:
category_id 和bucket 列,我想替换bucket_summary 上的county_prop 的值。
cond = [bucket_summary.bucket == county_prop.bucket, bucket_summary.bucket == …Run Code Online (Sandbox Code Playgroud) 如何使用 where 子句更新 Pyspark 数据框中的列?
这类似于此 SQL 操作:
UPDATE table1 SET alpha1= x WHERE alpha2< 6;
Run Code Online (Sandbox Code Playgroud)
其中 alpha1 和 alpha2 是 table1 的列。
例如:我有一个数据框 table1,其值如下:
表格1 阿尔法1 阿尔法2 3 7 4 5 5 4 6 8 更新后的数据框表1: 阿尔法1 阿尔法2 3 7 x 5 x 4 6 8
如何在 pyspark 数据框中执行此操作?
我有一个标题行的 xls 为:
AZ-凤凰城加州-洛杉矶加州-圣地亚哥 年份 PHXR LXXR SDXR 1987 年 1 月 59.33 54.67 77 1987 年 2 月 59.65 54.89 78 1987 年 3 月 59.99 55.16 79
注意:第一行在“YEAR 列”上方没有名称。如何将此行的名称设置为 YEAR?
I have tried : data_xls = data_xls.rename(columns={data_xls.columns[0]: 'YEAR'})
Run Code Online (Sandbox Code Playgroud)
但它正在用 YEAR 替换 AZ-Phoenix 行。而且我真的无法更改我想要的列。
如何更改这一行?
python ×8
apache-spark ×4
pandas ×4
dataframe ×3
pyspark ×3
csv ×2
amazon-s3 ×1
dask ×1
data-science ×1
excel ×1
profiling ×1
scikit-learn ×1