小编fly*_*all的帖子

Pyspark将多个csv文件读入数据帧(或RDD?)

我有一个Spark 2.0.2集群,我通过Puppark通过Jupyter Notebook进行攻击.我有多个管道分隔的txt文件(加载到HDFS.但也可在本地目录中使用),我需要使用spark-csv加载到三个独立的数据帧中,具体取决于文件的名称.

我看到我可以采取的三种方法 - 要么我可以使用python以某种方式遍历HDFS目录(尚未弄清楚如何执行此操作,加载每个文件然后执行联合.

我也知道在spark中存在一些通配符功能(见这里) - 我可以利用它

最后,我可以使用pandas从磁盘加载vanilla csv文件作为pandas数据帧,然后创建一个spark数据帧.这里的缺点是这些文件很大,并且在单个节点上加载到内存中可能需要大约8GB.(这就是为什么它首先转移到集群).

这是我到目前为止的代码和两个方法的一些伪代码:

import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd

sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')

spark = SparkSession(sc)

#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
    if #filename contains 'claim':
        #create or unionAll to merge claim_df
    if #filename contains 'pharm':
        #create or unionAll to merge pharm_df
    if #filename contains 'service':
        #create or unionAll to merge service_df

#Method 2 - …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark spark-dataframe jupyter-notebook

3
推荐指数
1
解决办法
1万
查看次数

Dataframe 上的 Pyspark UDF 列

我正在尝试根据某些列的值在数据帧上创建一个新列。在所有情况下它都返回 null。有人知道这个简单的例子出了什么问题吗?

df = pd.DataFrame([[0,1,0],[1,0,0],[1,1,1]],columns = ['Foo','Bar','Baz'])

spark_df = spark.createDataFrame(df)

def get_profile():
    if 'Foo'==1:
        return 'Foo'
    elif 'Bar' == 1:
        return 'Bar'
    elif 'Baz' ==1 :
        return 'Baz'

spark_df = spark_df.withColumn('get_profile', lit(get_profile()))
spark_df.show()

   Foo  Bar  Baz get_profile
    0    1    0        None
    1    0    0        None
    1    1    1        None
Run Code Online (Sandbox Code Playgroud)

我希望所有行都填写 get_profile 列。

我也尝试过这个:

spark_udf = udf(get_profile,StringType())

spark_df = spark_df.withColumn('get_profile', spark_udf())
print(spark_df.toPandas())
Run Code Online (Sandbox Code Playgroud)

达到同样的效果。

python apache-spark pyspark

2
推荐指数
1
解决办法
1万
查看次数

Pandas应用lambda函数null值

我试图将列拆分为两个,但我知道我的数据中有空值.想象一下这个数据帧:

df = pd.DataFrame(['fruit: apple','vegetable: asparagus',None, 'fruit: pear'], columns = ['text'])

df

                   text
0          fruit: apple
1  vegetable: asparagus
2                   None
3           fruit: pear
Run Code Online (Sandbox Code Playgroud)

我想把它分成多列,如下所示:

df['cat'] = df['text'].apply(lambda x: 'unknown' if x == None else x.split(': ')[0])
df['value'] = df['text'].apply(lambda x: 'unknown' if x == None else x.split(': ')[1])

print df

                   text        cat      value
0          fruit: apple      fruit      apple
1  vegetable: asparagus  vegetable  asparagus
2                  None    unknown    unknown
3           fruit: pear      fruit       pear
Run Code Online (Sandbox Code Playgroud)

但是,如果我有以下df代替:

df = pd.DataFrame(['fruit: apple','vegetable: asparagus',np.nan, 'fruit: …
Run Code Online (Sandbox Code Playgroud)

python pandas

0
推荐指数
1
解决办法
8597
查看次数

xlwings从Python中隐藏工作表并且书籍不可见

我使用 xlwings 来控制工作簿,因为我需要保持现有图表完好无损(否则我会使用 openpyxl)。有没有办法隐藏工作表并在不可见的应用程序中打开一本书?

隐藏我尝试过的工作表:

 wb.sheets[ws].Visible = False
 wb.sheets[ws].hidden= True
 wb.sheets[ws].Visible= xlveryhidden #vba syntax - bombs
Run Code Online (Sandbox Code Playgroud)

前两名跑了,但什么也没做,第三名就被炸飞了。

我可以创建一个新的 Excel 应用程序,我可以创建一本新书(它创建的是新应用程序)。但是,我想打开一本特定的书,但书不采用可见参数 - 如何打开现有工作簿而不使其可见?

excel  = xw.App(visible = False) #creates a new app that is visible
wb= xw.Book(filepath) #opens the already existing workbook, but it is visible - how do I make it part of the app above?
Run Code Online (Sandbox Code Playgroud)

python xlwings

0
推荐指数
1
解决办法
4212
查看次数

基于广播变量的pyspark过滤器数据帧

我有一个pyspark 2.0数据框,我试图根据(相对)较短的列表进行过滤-长度可能为50-100。

filterList = ['A','B','C']
Run Code Online (Sandbox Code Playgroud)

我想将该列表广播到我的每个节点,并使用它删除列表中没有两列之一的记录。

此操作有效:

filter_df= df.where((df['Foo'].isin(filterList )) | (df['Bar'].isin(filterList)))
Run Code Online (Sandbox Code Playgroud)

但是当我广播列表时,我得到一个错误:

filterListB= sc.broadcast(filterList)

filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))

---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-99-1b972cf29148> in <module>()
----> 1 filter_df= df.where((df['Foo'].isin(filterListB)) | (df['Bar'].isin(filterListB)))

/usr/local/spark/python/pyspark/sql/column.pyc in isin(self, *cols)
    284         if len(cols) == 1 and isinstance(cols[0], (list, set)):
    285             cols = cols[0]
--> 286         cols = [c._jc if isinstance(c, Column) else _create_column_from_literal(c) for c in cols]
    287         sc = SparkContext._active_spark_context
    288         jc = getattr(self._jc, "isin")(_to_seq(sc, cols))

/usr/local/spark/python/pyspark/sql/column.pyc in _create_column_from_literal(literal)
     33 …
Run Code Online (Sandbox Code Playgroud)

python broadcast pyspark

0
推荐指数
1
解决办法
1136
查看次数