标签: pyspark

为什么registerTempTable会从数据框中删除一些行?

我尝试使用HDInsight上的Spark数据帧以下列方式创建一个pandas数据帧:

tmp = sqlContext.createDataFrame(sparkDf)
tmp.registerTempTable('temp') 
Run Code Online (Sandbox Code Playgroud)

它看起来像是registerTempTable从数据框中删除了一些行.

以下命令返回11000

sparkDf.count()
Run Code Online (Sandbox Code Playgroud)

虽然tmp只有2500行.

我按照这里描述的步骤.

python azure hdinsight apache-spark pyspark

0
推荐指数
1
解决办法
250
查看次数

如何使用(Py)Spark汇总数据集中数据点之间的距离?

我有一个时间段内用户的Lat/Lon格式的位置数据集.我想计算这些用户旅行的距离.样本数据集:

| 时间戳| 用户| 纬度|经度| | 1462838468 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838512 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838389 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838497 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1465975885 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1457723815 | 405C238E25FE0B9E7 ... | 37.177322 | -7.426781 | | 1457897289 | 405C238E25FE0B9E7 ... | 37.177922 | -7.447443 | | 1457899229 | 405C238E25FE0B9E7 ... …

apache-spark apache-spark-sql pyspark

0
推荐指数
1
解决办法
1926
查看次数

在Spark中关闭MYSQL JDBC连接

我通过JDBC将数据从MYSQL服务器加载到Spark,但我需要在加载数据后关闭该连接.关闭连接的确切语法是什么?

df_mysql = sqlContext.read.format("jdbc").options(
  url="jdbc:mysql://***/****”,
  driver="com.mysql.jdbc.Driver",
  dbtable="((SELECT jobid, system, FROM Jobs LIMIT 500)  as T)",
  user=“*****”,
  password=“*****”).load()
Run Code Online (Sandbox Code Playgroud)

我试过dbtable.close().那不起作用.

python mysql jdbc apache-spark pyspark

0
推荐指数
1
解决办法
1572
查看次数

如何在范围内划分数字列并为Apache Spark中的每个范围分配标签?

我有以下sparkdataframe:

id weekly_sale
1    40000
2    120000
3    135000
4    211000
5    215000
6    331000
7    337000
Run Code Online (Sandbox Code Playgroud)

我需要查看weekly_sale列中以下哪些间隔项属于:

under 100000
between 100000 and 200000
between 200000 and 300000
more than 300000
Run Code Online (Sandbox Code Playgroud)

所以我想要的输出将是:

id weekly_sale  label
1    40000       under 100000    
2    120000      between 100000 and 200000
3    135000      between 100000 and 200000
4    211000      between 200000 and 300000
5    215000      between 200000 and 300000
6    331000      more than 300000
7    337000      more than 300000
Run Code Online (Sandbox Code Playgroud)

任何pyspark,spark.sql和Hive上下文实现都将对我有所帮助。

dataframe apache-spark apache-spark-sql pyspark hivecontext

0
推荐指数
1
解决办法
1154
查看次数

MacOS上的Spark安装和配置ImportError:没有名为pyspark的模块

我正在尝试在MacOS上配置apache-spark.所有在线指南要求下载火花焦油并设置一些env变量或使用brew install apache-spark然后设置一些env变量.

现在我用apache-spark安装了brew install apache-spark.我pyspark在终端运行,我得到一个python提示,表明安装成功.

现在,当我尝试import pyspark进入我的python文件时,我正面临着错误的说法ImportError: No module named pyspark

我无法理解的最奇怪的事情是它如何启动pyspark的REPL并且无法将模块导入python代码.

我也尝试过,pip install pyspark但它也无法识别模块.

除了用自制软件安装apache-spark之外,我还设置了以下env变量.

if which java > /dev/null; then export JAVA_HOME=$(/usr/libexec/java_home); fi

if which pyspark > /dev/null; then
  export SPARK_HOME="/usr/local/Cellar/apache-spark/2.1.0/libexec/"
  export PYSPARK_SUBMIT_ARGS="--master local[2]"
fi
Run Code Online (Sandbox Code Playgroud)

请在我的本地计算机上运行pyspark代码,建议我的设置缺少什么.

python apache-spark pyspark

0
推荐指数
1
解决办法
2386
查看次数

在pyspark.ml中的多个功能上运行的变压器

我想在中创建自己的功能转换器DataFrame,以便添加一列,例如,这是其他两列之间的差异。我遵循了这个问题,但是那里的变压器只能在一个列上运行。pyspark.ml.Transformer以字符串作为参数inputCol,因此我当然不能指定多个列。

因此,基本上,我想要实现的是一种_transform()类似于该方法的方法:

def _transform(self, dataset):
    out_col = self.getOutputCol()
    in_col = dataset.select([self.getInputCol()])

    # Define transformer logic
    def f(col1, col2):
        return col1 - col2
    t = IntegerType()

    return dataset.withColumn(out_col, udf(f, t)(in_col))
Run Code Online (Sandbox Code Playgroud)

这怎么可能呢?

apache-spark pyspark apache-spark-ml

0
推荐指数
2
解决办法
788
查看次数

TypeError:'map'类型的对象没有len()Python3

我正在尝试使用Pyspark实现KMeans算法,它在while循环的最后一行给出了上述错误.它在循环外工作正常,但在我创建循环后它给了我这个错误我该怎么解决这个问题?

#  Find K Means of Loudacre device status locations
#
# Input data: file(s) with device status data (delimited by '|')
# including latitude (13th field) and longitude (14th field) of device locations
# (lat,lon of 0,0 indicates unknown location)
# NOTE: Copy to pyspark using %paste

# for a point p and an array of points, return the index in the array of the point closest to p
def closestPoint(p, points):
    bestIndex = 0
    closest = float("+inf")
    # …
Run Code Online (Sandbox Code Playgroud)

python k-means python-3.x apache-spark pyspark

0
推荐指数
1
解决办法
9241
查看次数

找不到'cairo.Context'的外来struct转换器

又是我.这是一个与我正在做的项目相关的代码,称为Twitter数据上的情感分析.以下代码主要用于显示正负推文的数量,我在下面给出了错误.

from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import operator
import numpy as np
import matplotlib.pyplot as plt


def main():
        conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
        sc = SparkContext(conf=conf)

        # Creating a streaming context with batch interval of 10 sec
        ssc = StreamingContext(sc, 10)
        ssc.checkpoint("checkpoint")
        pwords = load_wordlist("positive.txt")
        nwords = load_wordlist("negative.txt")
        counts = stream(ssc, pwords, nwords, 100)
        make_plot(counts)


def make_plot(counts):
        """
        This function plots the counts of positive and negative words for each timestep.
        """
        positiveCounts = …
Run Code Online (Sandbox Code Playgroud)

python pycairo pyspark

0
推荐指数
2
解决办法
4960
查看次数

如何将json转换为pyspark dataframe(更快的实现)

我有{'abc':1,'def':2,'ghi':3}形式的json数据,如何在python中将其转换为pyspark数据框?

json pyspark spark-dataframe pyspark-sql

0
推荐指数
1
解决办法
9109
查看次数

替换Pyspark中数据框中值的SubString

我有一个带有一些属性的数据框,它具有下一个外观:

+-------+-------+
| Atr1  | Atr2  |
+-------+-------+
|  3,06 |  4,08 |
|  3,03 |  4,08 |
|  3,06 |  4,08 |
|  3,06 |  4,08 |
|  3,06 |  4,08 |
|  ...  |  ...  |
+-------+-------+
Run Code Online (Sandbox Code Playgroud)

如您所见,数据框的Atr1和Atr2的值是具有","字符的数字.这是因为我从CSV加载了这些数据,其中DoubleType数字的小数用','表示.

当我将数据加载到数据框中时,值被强制转换为String,因此我将String的类型转换为DoubleType,如下所示:

df = df.withColumn("Atr1", df["Atr1"].cast(DoubleType()))
df = df.withColumn("Atr2", df["Atr2"].cast(DoubleType()))
Run Code Online (Sandbox Code Playgroud)

但是当我这样做时,值将转换为null

+-------+-------+
| Atr1  | Atr2  |
+-------+-------+
|  null |  null |
|  null |  null |
|  null |  null |
|  null |  null |
|  null | …
Run Code Online (Sandbox Code Playgroud)

python casting dataframe apache-spark pyspark

0
推荐指数
1
解决办法
4715
查看次数