我正在Pyspark中读取一个csv文件,如下所示:
df_raw=spark.read.option("header","true").csv(csv_path)
Run Code Online (Sandbox Code Playgroud)
但是,数据文件引用了带有嵌入式逗号的字段,不应将其视为逗号.我如何在Pyspark处理这个问题?我知道熊猫可以解决这个问题,但是Spark可以吗?我使用的版本是Spark 2.0.0.
这是一个在Pandas中工作的示例但是使用Spark失败:
In [1]: import pandas as pd
In [2]: pdf = pd.read_csv('malformed_data.csv')
In [3]: sdf=spark.read.format("org.apache.spark.csv").csv('malformed_data.csv',header=True)
In [4]: pdf[['col12','col13','col14']]
Out[4]:
col12 col13 \
0 32 XIY "W" JK, RE LK SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE
1 NaN OUTKAST#THROOTS~WUTANG#RUNDMC
col14
0 23.0
1 0.0
In [5]: sdf.select("col12","col13",'col14').show()
+------------------+--------------------+--------------------+
| col12| col13| col14|
+------------------+--------------------+--------------------+
|"32 XIY ""W"" JK| RE LK"|SOMETHINGLIKEAPHE...|
| null|OUTKAST#THROOTS~W...| 0.0|
+------------------+--------------------+--------------------+
Run Code Online (Sandbox Code Playgroud)
文件内容:
col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19
80015360210876000,11.22,X,4076710258,,,sxsw,,"32 YIU ""A""",S5,,"32 XIY ""W"" JK, RE LK",SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE,23.0,cyclingstats,2012-25-19,432,2023-05-17,CODERED
61670000229561918,137.12,U,8234971771,,,woodstock,,,T4,,,OUTKAST#THROOTS~WUTANG#RUNDMC,0.0,runstats,2013-21-22,1333,2019-11-23,CODEBLUE
Run Code Online (Sandbox Code Playgroud) 任何人都可以向我解释为什么我会得到这两个表达式的不同结果?我想在两个日期之间过滤:
df.filter("act_date <='2017-04-01'" and "act_date >='2016-10-01'")\
.select("col1","col2").distinct().count()
Run Code Online (Sandbox Code Playgroud)
结果:37M
VS
df.filter("act_date <='2017-04-01'").filter("act_date >='2016-10-01'")\
.select("col1","col2").distinct().count()
Run Code Online (Sandbox Code Playgroud)
结果:25M
他们有什么不同?在我看来,他们应该产生相同的结果
如何在Spark 2.0上的Pyspark中加载gzip压缩的csv文件?
我知道可以按如下方式加载未压缩的csv文件:
spark.read.format("csv").option("header",
"true").load("myfile.csv")
Run Code Online (Sandbox Code Playgroud)
要么
spark.read.option("header", "true").csv("myfile.csv")
Run Code Online (Sandbox Code Playgroud) 如何在dask DataFrame上调用unique?
如果我尝试以与常规pandas数据帧相同的方式调用它,则会出现以下错误:
In [27]: len(np.unique(ddf[['col1','col2']].values))
AttributeError Traceback (most recent call last)
<ipython-input-27-34c0d3097aab> in <module>()
----> 1 len(np.unique(ddf[['col1','col2']].values))
/dir/anaconda2/lib/python2.7/site-packages/dask/dataframe/core.pyc in __getattr__(self, key)
1924 return self._constructor_sliced(merge(self.dask, dsk), name,
1925 meta, self.divisions)
-> 1926 raise AttributeError("'DataFrame' object has no attribute %r" % key)
1927
1928 def __dir__(self):
AttributeError: 'DataFrame' object has no attribute 'values'
Run Code Online (Sandbox Code Playgroud) 我正在运行一个大型Spark 2.1.0,最后将结果写入s3.它运行在30节点集群上,大部分工作正常.但是,偶尔我必须停止工作并再次运行它,因为即使在完成所有计算之后,单个节点也会在写入时卡住.我想知道我是否可以通过改变猜测来缓解这个问题.我在另一篇文章中读到这可能有害并导致重复结果或数据损坏.任何人都可以建议吗?我还被建议通过在我的指定中指定以下设置来使用hadoop默认提交者spark-defaults.conf.我正在运行Spark standalone.
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
Run Code Online (Sandbox Code Playgroud)
对此问题的任何澄清将不胜感激.
我希望使用AQGridView来布局具有3种不同子布局的屏幕.屏幕的顶部应该以2x5网格布局,每个单元格由不同的图形占据.这个我成功完成了.屏幕的中间部分应该是占据屏幕整个宽度的单个图像,跨越列和高度为2的单元格.
屏幕的底部应该是另一个网格布局,其尺寸可能会有所不同,但应填满剩余的空间.我的问题是,实现这一目标的最佳方法是什么?我应该使用3个独立的AQGridViews,每个部分一个,因为网格布局不同,或者我可以将所有布局都放在同一个AQGridView中吗?我尝试了后一种方法,但是我很难在屏幕的中间部分获取图像以正确地跨越细胞.它偏向一边.
我对一些使用psycopg2与 postgres 数据库并行查询的多处理代码的行为感到非常困惑。
本质上,我正在对更大表的各个分区进行相同的查询(使用不同的参数)。我正在使用 multiprocessing.Pool 来分叉一个单独的查询。
我的多处理调用如下所示:
pool = Pool(processes=num_procs)
results=pool.map(run_sql, params_list)
Run Code Online (Sandbox Code Playgroud)
我的 run_sql 代码如下所示:
def run_sql(zip2):
conn = get_connection()
curs = conn.cursor()
print "conn: %s curs:%s pid=%s" % (id(conn), id(curs), os.getpid())
...
curs.execute(qry)
records = curs.fetchall()
def get_connection()
...
conn = psycopg2.connect(user=db_user, host=db_host,
dbname=db_name, password=db_pwd)
return conn
Run Code Online (Sandbox Code Playgroud)
所以我的期望是每个进程都会通过调用获得一个单独的数据库连接,get_connection()并且print id(conn)会显示一个不同的值。然而,情况似乎并非如此,我无法解释它。甚至print id(curs)是一样的。只print os.getpid()显示不同。它是否以某种方式为每个分叉进程使用相同的连接?
conn: 4614554592 curs:4605160432 pid=46802
conn: 4614554592 curs:4605160432 pid=46808
conn: 4614554592 curs:4605160432 pid=46810
conn: 4614554592 curs:4605160432 pid=46784
conn: 4614554592 …Run Code Online (Sandbox Code Playgroud) 谁能告诉我为什么会出现以下错误?根据 pyspark-cassandra 连接器的 README,我在下面尝试的应该可以工作(没有 Spark 包):https : //github.com/TargetHolding/pyspark-cassandra
$ pyspark_jar="$HOME/devel/sandbox/Learning/Spark/pyspark-cassandra/target/scala-2.10/pyspark-cassandra-assembly-0.2.2.jar"
$ pyspark_egg="$HOME/devel/sandbox/Learning/Spark/pyspark-cassandra/target/pyspark_cassandra-0.2.2-py2.7.egg"
$ pyspark --jars $pyspark_jar --py_files $pyspark_egg --conf spark.cassandra.connection.host=localhost
Run Code Online (Sandbox Code Playgroud)
这导致:
Exception in thread "main" java.lang.IllegalArgumentException: pyspark does not support any application options.
at org.apache.spark.launcher.CommandBuilderUtils.checkArgument(CommandBuilderUtils.java:222)
at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildPySparkShellCommand(SparkSubmitCommandBuilder.java:239)
at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(SparkSubmitCommandBuilder.java:113)
at org.apache.spark.launcher.Main.main(Main.java:74)
Run Code Online (Sandbox Code Playgroud) 有人可以向我解释以下内容:
scala> def squared(x: Int) = x * x
squared: (x: Int)Int
scala> val sq : (Int) => Int = squared
sq: Int => Int = <function1>
scala> sq.getClass
res111: Class[_ <: Int => Int] = class $anonfun$1
Run Code Online (Sandbox Code Playgroud)
到目前为止我理解这一点,sqared是一个函数,而sq是一个函数指针.
但后来我这样做:
scala> squared.getClass
<console>:13: error: missing arguments for method squared;
follow this method with `_' if you want to treat it as a partially applied function
squared.getClass
^
Run Code Online (Sandbox Code Playgroud)
为什么我不能在平方上调用getClass?毕竟,不是函数的第一类对象吗?为什么我需要这样做呢?
scala> squared(7).getClass
res113: Class[Int] = int
Run Code Online (Sandbox Code Playgroud)
我也得到了相同的结果
scala> sq(5).getClass
res115: Class[Int] = …Run Code Online (Sandbox Code Playgroud) 对于dask DataFrame,pandas中的sort_values相当于什么?我正在尝试扩展一些具有内存问题的Pandas代码,而不是使用dask DataFrame.
相当于:
ddf.set_index([col1, col2], sorted=True)
Run Code Online (Sandbox Code Playgroud)
?
apache-spark ×5
pyspark ×4
python ×3
dask ×2
amazon-s3 ×1
aqgridview ×1
csv ×1
dataframe ×1
ipad ×1
iphone ×1
objective-c ×1
pandas ×1
postgresql ×1
psycopg2 ×1
scala ×1
xcode ×1