小编fem*_*yte的帖子

使用包含嵌入逗号的引用字段读取csv文件

我正在Pyspark中读取一个csv文件,如下所示:

df_raw=spark.read.option("header","true").csv(csv_path)
Run Code Online (Sandbox Code Playgroud)

但是,数据文件引用了带有嵌入式逗号的字段,不应将其视为逗号.我如何在Pyspark处理这个问题?我知道熊猫可以解决这个问题,但是Spark可以吗?我使用的版本是Spark 2.0.0.

这是一个在Pandas中工作的示例但是使用Spark失败:

In [1]: import pandas as pd

In [2]: pdf = pd.read_csv('malformed_data.csv')

In [3]: sdf=spark.read.format("org.apache.spark.csv").csv('malformed_data.csv',header=True)

In [4]: pdf[['col12','col13','col14']]
Out[4]:
                    col12                                             col13  \
0  32 XIY "W"   JK, RE LK  SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE
1                     NaN                     OUTKAST#THROOTS~WUTANG#RUNDMC

   col14
0   23.0
1    0.0

In [5]: sdf.select("col12","col13",'col14').show()
+------------------+--------------------+--------------------+
|             col12|               col13|               col14|
+------------------+--------------------+--------------------+
|"32 XIY ""W""   JK|              RE LK"|SOMETHINGLIKEAPHE...|
|              null|OUTKAST#THROOTS~W...|                 0.0|
+------------------+--------------------+--------------------+
Run Code Online (Sandbox Code Playgroud)

文件内容:

    col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13,col14,col15,col16,col17,col18,col19
80015360210876000,11.22,X,4076710258,,,sxsw,,"32 YIU ""A""",S5,,"32 XIY ""W""   JK, RE LK",SOMETHINGLIKEAPHENOMENON#YOUGOTSOUL~BRINGDANOISE,23.0,cyclingstats,2012-25-19,432,2023-05-17,CODERED
61670000229561918,137.12,U,8234971771,,,woodstock,,,T4,,,OUTKAST#THROOTS~WUTANG#RUNDMC,0.0,runstats,2013-21-22,1333,2019-11-23,CODEBLUE
Run Code Online (Sandbox Code Playgroud)

csv apache-spark apache-spark-sql pyspark apache-spark-2.0

21
推荐指数
4
解决办法
3万
查看次数

数据帧上的多个条件过滤器

任何人都可以向我解释为什么我会得到这两个表达式的不同结果?我想在两个日期之间过滤:

df.filter("act_date <='2017-04-01'" and "act_date >='2016-10-01'")\
  .select("col1","col2").distinct().count()
Run Code Online (Sandbox Code Playgroud)

结果:37M

VS

df.filter("act_date <='2017-04-01'").filter("act_date >='2016-10-01'")\
  .select("col1","col2").distinct().count()
Run Code Online (Sandbox Code Playgroud)

结果:25M

他们有什么不同?在我看来,他们应该产生相同的结果

python dataframe apache-spark apache-spark-sql pyspark

17
推荐指数
1
解决办法
4万
查看次数

在Spark 2.0中加载压缩的gzip压缩文件

如何在Spark 2.0上的Pyspark中加载gzip压缩的csv文件?

我知道可以按如下方式加载未压缩的csv文件:

spark.read.format("csv").option("header",          
                                "true").load("myfile.csv")
Run Code Online (Sandbox Code Playgroud)

要么

spark.read.option("header", "true").csv("myfile.csv")
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

7
推荐指数
3
解决办法
2万
查看次数

如何在dask DataFrame上调用unique()

如何在dask DataFrame上调用unique?

如果我尝试以与常规pandas数据帧相同的方式调用它,则会出现以下错误:

In [27]: len(np.unique(ddf[['col1','col2']].values))

AttributeError                            Traceback (most recent call last)
<ipython-input-27-34c0d3097aab> in <module>()
----> 1 len(np.unique(ddf[['col1','col2']].values))

/dir/anaconda2/lib/python2.7/site-packages/dask/dataframe/core.pyc in __getattr__(self, key)
1924             return self._constructor_sliced(merge(self.dask, dsk), name,
1925                                             meta, self.divisions)
-> 1926         raise AttributeError("'DataFrame' object has no attribute %r" % key)
1927
1928     def __dir__(self):

AttributeError: 'DataFrame' object has no attribute 'values'
Run Code Online (Sandbox Code Playgroud)

pandas dask

7
推荐指数
1
解决办法
4041
查看次数

写入s3时在Spark 2.1.0中设置spark.speculation

我正在运行一个大型Spark 2.1.0,最后将结果写入s3.它运行在30节点集群上,大部分工作正常.但是,偶尔我必须停止工作并再次运行它,因为即使在完成所有计算之后,单个节点也会在写入时卡住.我想知道我是否可以通过改变猜测来缓解这个问题.我在另一篇文章中读到这可能有害并导致重复结果或数据损坏.任何人都可以建议吗?我还被建议通过在我的指定中指定以下设置来使用hadoop默认提交者spark-defaults.conf.我正在运行Spark standalone.

 spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version 2
Run Code Online (Sandbox Code Playgroud)

对此问题的任何澄清将不胜感激.

amazon-s3 apache-spark

7
推荐指数
1
解决办法
1839
查看次数

使用AQGridView进行不同的屏幕子布局

我希望使用AQGridView来布局具有3种不同子布局的屏幕.屏幕的顶部应该以2x5网格布局,每个单元格由不同的图形占据.这个我成功完成了.屏幕的中间部分应该是占据屏幕整个宽度的单个图像,跨越列和高度为2的单元格.

屏幕的底部应该是另一个网格布局,其尺寸可能会有所不同,但应填满剩余的空间.我的问题是,实现这一目标的最佳方法是什么?我应该使用3个独立的AQGridViews,每个部分一个,因为网格布局不同,或者我可以将所有布局都放在同一个AQGridView中吗?我尝试了后一种方法,但是我很难在屏幕的中间部分获取图像以正确地跨越细胞.它偏向一边.

iphone xcode objective-c ipad aqgridview

6
推荐指数
0
解决办法
525
查看次数

多处理模块和不同的 psycopg2 连接

我对一些使用psycopg2与 postgres 数据库并行查询的多处理代码的行为感到非常困惑。

本质上,我正在对更大表的各个分区进行相同的查询(使用不同的参数)。我正在使用 multiprocessing.Pool 来分叉一个单独的查询。

我的多处理调用如下所示:

pool = Pool(processes=num_procs)
results=pool.map(run_sql, params_list)
Run Code Online (Sandbox Code Playgroud)

我的 run_sql 代码如下所示:

def run_sql(zip2):
    conn = get_connection()
    curs = conn.cursor()
    print "conn: %s curs:%s pid=%s" % (id(conn), id(curs), os.getpid())
    ...
    curs.execute(qry)
    records = curs.fetchall()

def get_connection()
    ...
    conn = psycopg2.connect(user=db_user, host=db_host, 
                         dbname=db_name, password=db_pwd)

    return conn
Run Code Online (Sandbox Code Playgroud)

所以我的期望是每个进程都会通过调用获得一个单独的数据库连接,get_connection()并且print id(conn)会显示一个不同的值。然而,情况似乎并非如此,我无法解释它。甚至print id(curs)是一样的。只print os.getpid()显示不同。它是否以某种方式为每个分叉进程使用相同的连接?

conn: 4614554592 curs:4605160432 pid=46802
conn: 4614554592 curs:4605160432 pid=46808
conn: 4614554592 curs:4605160432 pid=46810
conn: 4614554592 curs:4605160432 pid=46784
conn: 4614554592 …
Run Code Online (Sandbox Code Playgroud)

python postgresql psycopg2

6
推荐指数
1
解决办法
2936
查看次数

使用选项启动 pyspark 时出错(没有 Spack 包)

谁能告诉我为什么会出现以下错误?根据 pyspark-cassandra 连接器的 README,我在下面尝试的应该可以工作(没有 Spark 包):https : //github.com/TargetHolding/pyspark-cassandra

$ pyspark_jar="$HOME/devel/sandbox/Learning/Spark/pyspark-cassandra/target/scala-2.10/pyspark-cassandra-assembly-0.2.2.jar"

$ pyspark_egg="$HOME/devel/sandbox/Learning/Spark/pyspark-cassandra/target/pyspark_cassandra-0.2.2-py2.7.egg"
$ pyspark --jars $pyspark_jar --py_files $pyspark_egg --conf spark.cassandra.connection.host=localhost 
Run Code Online (Sandbox Code Playgroud)

这导致:

Exception in thread "main" java.lang.IllegalArgumentException: pyspark does not support any application options.
at org.apache.spark.launcher.CommandBuilderUtils.checkArgument(CommandBuilderUtils.java:222)
at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildPySparkShellCommand(SparkSubmitCommandBuilder.java:239)
at org.apache.spark.launcher.SparkSubmitCommandBuilder.buildCommand(SparkSubmitCommandBuilder.java:113)
at org.apache.spark.launcher.Main.main(Main.java:74)
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

5
推荐指数
1
解决办法
8088
查看次数

函数与函数指针

有人可以向我解释以下内容:

scala> def squared(x: Int) = x * x
squared: (x: Int)Int
scala> val sq : (Int) => Int = squared
sq: Int => Int = <function1>
scala>  sq.getClass
res111: Class[_ <: Int => Int] = class $anonfun$1
Run Code Online (Sandbox Code Playgroud)

到目前为止我理解这一点,sqared是一个函数,而sq是一个函数指针.

但后来我这样做:

scala> squared.getClass
<console>:13: error: missing arguments for method squared;
follow this method with `_' if you want to treat it as a partially applied function
   squared.getClass
   ^
Run Code Online (Sandbox Code Playgroud)

为什么我不能在平方上调用getClass?毕竟,不是函数的第一类对象吗?为什么我需要这样做呢?

scala> squared(7).getClass
res113: Class[Int] = int
Run Code Online (Sandbox Code Playgroud)

我也得到了相同的结果

scala> sq(5).getClass
res115: Class[Int] = …
Run Code Online (Sandbox Code Playgroud)

scala

5
推荐指数
2
解决办法
575
查看次数

dask DataFrame相当于pandas DataFrame sort_values

对于dask DataFrame,pandas中的sort_values相当于什么?我正在尝试扩展一些具有内存问题的Pandas代码,而不是使用dask DataFrame.

相当于:

ddf.set_index([col1, col2], sorted=True)
Run Code Online (Sandbox Code Playgroud)

python dask

5
推荐指数
2
解决办法
3764
查看次数