小编Jas*_*son的帖子

根据RDD/Spark DataFrame中的特定列从行中删除重复项

假设我有一个相当大的数据集,形式如下:

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',2),
                       ('Baz',22,'US',6),
                       ('Baz',36,'US',6)])
Run Code Online (Sandbox Code Playgroud)

我想要做的是仅根据第一,第三和第四列的值删除重复的行.

删除完全重复的行很简单:

data = data.distinct()
Run Code Online (Sandbox Code Playgroud)

第5行或第6行将被删除

但是,我如何仅删除基于第1,3和4列的重复行?即删除以下任何一个:

('Baz',22,'US',6)
('Baz',36,'US',6)
Run Code Online (Sandbox Code Playgroud)

在Python中,这可以通过使用指定列来完成.drop_duplicates().我怎样才能在Spark/Pyspark中实现同样的目标?

apache-spark apache-spark-sql pyspark

53
推荐指数
6
解决办法
9万
查看次数

通过以字符串格式减去两个日期时间列来计算持续时间

我有一个Spark Dataframe,其中包含一系列日期:

from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.sql.types import *
sqlContext = SQLContext(sc)
import pandas as pd

rdd = sc.parallelizesc.parallelize([('X01','2014-02-13T12:36:14.899','2014-02-13T12:31:56.876','sip:4534454450'),
                                    ('X02','2014-02-13T12:35:37.405','2014-02-13T12:32:13.321','sip:6413445440'),
                                    ('X03','2014-02-13T12:36:03.825','2014-02-13T12:32:15.229','sip:4534437492'),
                                    ('XO4','2014-02-13T12:37:05.460','2014-02-13T12:32:36.881','sip:6474454453'),
                                    ('XO5','2014-02-13T12:36:52.721','2014-02-13T12:33:30.323','sip:8874458555')])
schema = StructType([StructField('ID', StringType(), True),
                     StructField('EndDateTime', StringType(), True),
                     StructField('StartDateTime', StringType(), True)])
df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)

我想做的是duration通过减去EndDateTime和找到StartDateTime.我想我会尝试使用函数执行此操作:

# Function to calculate time delta
def time_delta(y,x): 
    end = pd.to_datetime(y)
    start = pd.to_datetime(x)
    delta = (end-start)
    return delta

# create new RDD and add new column 'Duration' by applying …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

28
推荐指数
3
解决办法
5万
查看次数

在Spark RDD和/或Spark DataFrame中重新整形/透视数据

我有以下格式的数据(RDD或Spark DataFrame):

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

 rdd = sc.parallelize([('X01',41,'US',3),
                       ('X01',41,'UK',1),
                       ('X01',41,'CA',2),
                       ('X02',72,'US',4),
                       ('X02',72,'UK',6),
                       ('X02',72,'CA',7),
                       ('X02',72,'XX',8)])

# convert to a Spark DataFrame                    
schema = StructType([StructField('ID', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Country', StringType(), True),
                     StructField('Score', IntegerType(), True)])

df = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)

我想做的是'重塑'数据,将Country(特别是美国,英国和CA)中的某些行转换为列:

ID    Age  US  UK  CA  
'X01'  41  3   1   2  
'X02'  72  4   6   7   
Run Code Online (Sandbox Code Playgroud)

从本质上讲,我需要Python的pivot工作流程:

categories = ['US', 'UK', 'CA']
new_df = df[df['Country'].isin(categories)].pivot(index = 'ID', 
                                                  columns = 'Country',
                                                  values = 'Score')
Run Code Online (Sandbox Code Playgroud)

我的数据集相当大,所以我不能真正地collect()将数据摄取到内存中来进行Python本身的重塑.有没有办法 …

python pivot apache-spark apache-spark-sql pyspark

25
推荐指数
3
解决办法
2万
查看次数

sklearn.cross_validation.StratifiedShuffleSplit - 错误:"索引超出范围"

我试图使用Scikit-learn的Stratified Shuffle Split来分割样本数据集.我跟着Scikit学习文档上显示的例子在这里

import pandas as pd
import numpy as np
# UCI's wine dataset
wine = pd.read_csv("https://s3.amazonaws.com/demo-datasets/wine.csv")

# separate target variable from dataset
target = wine['quality']
data = wine.drop('quality',axis = 1)

# Stratified Split of train and test data
from sklearn.cross_validation import StratifiedShuffleSplit
sss = StratifiedShuffleSplit(target, n_iter=3, test_size=0.2)

for train_index, test_index in sss:
    xtrain, xtest = data[train_index], data[test_index]
    ytrain, ytest = target[train_index], target[test_index]

# Check target series for distribution of classes
ytrain.value_counts()
ytest.value_counts()
Run Code Online (Sandbox Code Playgroud)

但是,运行此脚本时,我收到以下错误:

IndexError: indices are out-of-bounds …
Run Code Online (Sandbox Code Playgroud)

python pandas scikit-learn

23
推荐指数
1
解决办法
1万
查看次数

使用StructType为Pyspark.sql设置模式时的语法

我是新来的火花,正在玩Pyspark.sql.根据这里的pyspark.sql文档,可以像这样设置Spark数据帧和架构:

rdd = sc.textFile('./some csv_to_play_around.csv'

schema = StructType([StructField('Name', StringType(), True),
                     StructField('DateTime', TimestampType(), True)
                     StructField('Age', IntegerType(), True)])

# create dataframe
df3 = sqlContext.createDataFrame(rdd, schema)
Run Code Online (Sandbox Code Playgroud)

我的问题是,上面列表True中的含义是什么schema?我似乎无法在文档中找到它.提前致谢

apache-spark pyspark

17
推荐指数
2
解决办法
3万
查看次数

将特殊情况习惯添加到Python Vader情感中

我一直在使用Vader Sentiment进行一些文本情绪分析,我注意到我的数据有很多"路要走"的短语被错误地归类为中性:

In[11]: sentiment('way to go John')
Out[11]: {'compound': 0.0, 'neg': 0.0, 'neu': 1.0, 'pos': 0.0}
Run Code Online (Sandbox Code Playgroud)

在深入了解Vader源代码后,我找到了以下字典:

# check for special case idioms using a sentiment-laden keyword known to SAGE
SPECIAL_CASE_IDIOMS = {"the shit": 3, "the bomb": 3, "bad ass": 1.5, "yeah right": -2,
                       "cut the mustard": 2, "kiss of death": -1.5, "hand to mouth": -2,
                       "way to go": 3}
Run Code Online (Sandbox Code Playgroud)

如您所见,我手动添加了"方式去"条目.但是,似乎没有效果:

In [12]: sentiment('way to go John')
Out[12]: {'compound': 0.0, 'neg': 0.0, 'neu': 1.0, 'pos': 0.0}
Run Code Online (Sandbox Code Playgroud)

知道我错过了什么吗?或者更具体地说,我需要做些什么来添加自定义习语?这是Vader Sentiment源代码:

#######################################################################################################################
# …
Run Code Online (Sandbox Code Playgroud)

python sentiment-analysis text-classification

5
推荐指数
1
解决办法
1664
查看次数

为什么dropna()不起作用?

系统:Cloudera Quickstart VM 5.4上的Spark 1.3.0(Anaconda Python dist.)

这是一个Spark DataFrame:

from pyspark.sql import SQLContext
from pyspark.sql.types import *
sqlContext = SQLContext(sc)

data = sc.parallelize([('Foo',41,'US',3),
                       ('Foo',39,'UK',1),
                       ('Bar',57,'CA',2),
                       ('Bar',72,'CA',3),
                       ('Baz',22,'US',6),
                       (None,75,None,7)])

schema = StructType([StructField('Name', StringType(), True),
                     StructField('Age', IntegerType(), True),
                     StructField('Country', StringType(), True),
                     StructField('Score', IntegerType(), True)])

df = sqlContext.createDataFrame(data,schema)
Run Code Online (Sandbox Code Playgroud)

data.show()

Name Age Country Score
Foo  41  US      3    
Foo  39  UK      1    
Bar  57  CA      2    
Bar  72  CA      3    
Baz  22  US      6    
null 75  null    7 
Run Code Online (Sandbox Code Playgroud)

然而,这些都不起作用!

df.dropna()
df.na.drop()
Run Code Online (Sandbox Code Playgroud)

我收到这条消息:

>>> df.show() …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark cloudera-quickstart-vm

3
推荐指数
1
解决办法
6908
查看次数

将日期拆分为新列的自定义转换器

我正在关注在 github上的sklearn_pandas README 中找到的sklearn_pandas 演练,并尝试修改 DateEncoder() 自定义转换器示例以执行另外两件事:

  • 以日期格式为参数,将字符串类型的列转换为日期时间
  • 吐出新列时附加原始列名。例如:如果输入列:Date1 则输出:Date1_year、Date1_month、Date_1 天。

这是我的尝试(对 sklearn 管道有相当初步的了解):

import pandas as pd
import numpy as np
from sklearn.base import TransformerMixin, BaseEstimator
from sklearn_pandas import DataFrameMapper

class DateEncoder(TransformerMixin):

    '''
    Specify date format using python strftime formats
    '''

    def __init__(self, date_format='%Y-%m-%d'):
        self.date_format = date_format

    def fit(self, X, y=None):
        self.dt = pd.to_datetime(X, format=self.date_format)
        return self

    def transform(self, X):
        dt = X.dt
        return pd.concat([dt.year, dt.month, dt.day], axis=1)


data = pd.DataFrame({'dates1': ['2001-12-20','2002-10-21','2003-08-22','2004-08-23', 
                                 '2004-07-20','2007-12-21','2006-12-22','2003-04-23'], …
Run Code Online (Sandbox Code Playgroud)

pandas scikit-learn sklearn-pandas

2
推荐指数
1
解决办法
2665
查看次数