我尝试使用HDInsight上的Spark数据帧以下列方式创建一个pandas数据帧:
tmp = sqlContext.createDataFrame(sparkDf)
tmp.registerTempTable('temp')
Run Code Online (Sandbox Code Playgroud)
它看起来像是registerTempTable从数据框中删除了一些行.
以下命令返回11000
sparkDf.count()
Run Code Online (Sandbox Code Playgroud)
虽然tmp只有2500行.
我按照这里描述的步骤.
我有一个时间段内用户的Lat/Lon格式的位置数据集.我想计算这些用户旅行的距离.样本数据集:
| 时间戳| 用户| 纬度|经度| | 1462838468 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838512 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838389 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1462838497 | 49B4361512443A4DA ... | 39.777982 | -7.054599 | | 1465975885 | 6E9E0581E2A032FD8 ... | 37.118362 | -8.205041 | | 1457723815 | 405C238E25FE0B9E7 ... | 37.177322 | -7.426781 | | 1457897289 | 405C238E25FE0B9E7 ... | 37.177922 | -7.447443 | | 1457899229 | 405C238E25FE0B9E7 ... …
我通过JDBC将数据从MYSQL服务器加载到Spark,但我需要在加载数据后关闭该连接.关闭连接的确切语法是什么?
df_mysql = sqlContext.read.format("jdbc").options(
url="jdbc:mysql://***/****”,
driver="com.mysql.jdbc.Driver",
dbtable="((SELECT jobid, system, FROM Jobs LIMIT 500) as T)",
user=“*****”,
password=“*****”).load()
Run Code Online (Sandbox Code Playgroud)
我试过dbtable.close().那不起作用.
我有以下sparkdataframe:
id weekly_sale
1 40000
2 120000
3 135000
4 211000
5 215000
6 331000
7 337000
Run Code Online (Sandbox Code Playgroud)
我需要查看weekly_sale列中以下哪些间隔项属于:
under 100000
between 100000 and 200000
between 200000 and 300000
more than 300000
Run Code Online (Sandbox Code Playgroud)
所以我想要的输出将是:
id weekly_sale label
1 40000 under 100000
2 120000 between 100000 and 200000
3 135000 between 100000 and 200000
4 211000 between 200000 and 300000
5 215000 between 200000 and 300000
6 331000 more than 300000
7 337000 more than 300000
Run Code Online (Sandbox Code Playgroud)
任何pyspark,spark.sql和Hive上下文实现都将对我有所帮助。
我正在尝试在MacOS上配置apache-spark.所有在线指南要求下载火花焦油并设置一些env变量或使用brew install apache-spark然后设置一些env变量.
现在我用apache-spark安装了brew install apache-spark.我pyspark在终端运行,我得到一个python提示,表明安装成功.
现在,当我尝试import pyspark进入我的python文件时,我正面临着错误的说法ImportError: No module named pyspark
我无法理解的最奇怪的事情是它如何启动pyspark的REPL并且无法将模块导入python代码.
我也尝试过,pip install pyspark但它也无法识别模块.
除了用自制软件安装apache-spark之外,我还设置了以下env变量.
if which java > /dev/null; then export JAVA_HOME=$(/usr/libexec/java_home); fi
if which pyspark > /dev/null; then
export SPARK_HOME="/usr/local/Cellar/apache-spark/2.1.0/libexec/"
export PYSPARK_SUBMIT_ARGS="--master local[2]"
fi
Run Code Online (Sandbox Code Playgroud)
请在我的本地计算机上运行pyspark代码,建议我的设置缺少什么.
我想在中创建自己的功能转换器DataFrame,以便添加一列,例如,这是其他两列之间的差异。我遵循了这个问题,但是那里的变压器只能在一个列上运行。pyspark.ml.Transformer以字符串作为参数inputCol,因此我当然不能指定多个列。
因此,基本上,我想要实现的是一种_transform()类似于该方法的方法:
def _transform(self, dataset):
out_col = self.getOutputCol()
in_col = dataset.select([self.getInputCol()])
# Define transformer logic
def f(col1, col2):
return col1 - col2
t = IntegerType()
return dataset.withColumn(out_col, udf(f, t)(in_col))
Run Code Online (Sandbox Code Playgroud)
这怎么可能呢?
我正在尝试使用Pyspark实现KMeans算法,它在while循环的最后一行给出了上述错误.它在循环外工作正常,但在我创建循环后它给了我这个错误我该怎么解决这个问题?
# Find K Means of Loudacre device status locations
#
# Input data: file(s) with device status data (delimited by '|')
# including latitude (13th field) and longitude (14th field) of device locations
# (lat,lon of 0,0 indicates unknown location)
# NOTE: Copy to pyspark using %paste
# for a point p and an array of points, return the index in the array of the point closest to p
def closestPoint(p, points):
bestIndex = 0
closest = float("+inf")
# …Run Code Online (Sandbox Code Playgroud) 又是我.这是一个与我正在做的项目相关的代码,称为Twitter数据上的情感分析.以下代码主要用于显示正负推文的数量,我在下面给出了错误.
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import operator
import numpy as np
import matplotlib.pyplot as plt
def main():
conf = SparkConf().setMaster("local[2]").setAppName("Streamer")
sc = SparkContext(conf=conf)
# Creating a streaming context with batch interval of 10 sec
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")
pwords = load_wordlist("positive.txt")
nwords = load_wordlist("negative.txt")
counts = stream(ssc, pwords, nwords, 100)
make_plot(counts)
def make_plot(counts):
"""
This function plots the counts of positive and negative words for each timestep.
"""
positiveCounts = …Run Code Online (Sandbox Code Playgroud) 我有{'abc':1,'def':2,'ghi':3}形式的json数据,如何在python中将其转换为pyspark数据框?
我有一个带有一些属性的数据框,它具有下一个外观:
+-------+-------+
| Atr1 | Atr2 |
+-------+-------+
| 3,06 | 4,08 |
| 3,03 | 4,08 |
| 3,06 | 4,08 |
| 3,06 | 4,08 |
| 3,06 | 4,08 |
| ... | ... |
+-------+-------+
Run Code Online (Sandbox Code Playgroud)
如您所见,数据框的Atr1和Atr2的值是具有","字符的数字.这是因为我从CSV加载了这些数据,其中DoubleType数字的小数用','表示.
当我将数据加载到数据框中时,值被强制转换为String,因此我将String的类型转换为DoubleType,如下所示:
df = df.withColumn("Atr1", df["Atr1"].cast(DoubleType()))
df = df.withColumn("Atr2", df["Atr2"].cast(DoubleType()))
Run Code Online (Sandbox Code Playgroud)
但是当我这样做时,值将转换为null
+-------+-------+
| Atr1 | Atr2 |
+-------+-------+
| null | null |
| null | null |
| null | null |
| null | null |
| null | …Run Code Online (Sandbox Code Playgroud) pyspark ×10
apache-spark ×8
python ×6
dataframe ×2
azure ×1
casting ×1
hdinsight ×1
hivecontext ×1
jdbc ×1
json ×1
k-means ×1
mysql ×1
pycairo ×1
pyspark-sql ×1
python-3.x ×1