小编Que*_*ank的帖子

使用pySpark将DataFrame写入mysql表

我试图将记录插入MySql表中.该表包含idname作为列.

我在pysparkshell中做如下所示.

name = 'tester_1'
id = '103'  
import pandas as pd
l = [id,name]

df = pd.DataFrame([l])

df.write.format('jdbc').options(
      url='jdbc:mysql://localhost/database_name',
      driver='com.mysql.jdbc.Driver',
      dbtable='DestinationTableName',
      user='your_user_name',
      password='your_password').mode('append').save()
Run Code Online (Sandbox Code Playgroud)

我收到以下属性错误

AttributeError: 'DataFrame' object has no attribute 'write'

我究竟做错了什么?将记录插入MySql表中的正确方法是什么?pySpark

python mysql apache-spark apache-spark-sql pyspark

10
推荐指数
1
解决办法
9099
查看次数

pyspark数据框中的完全外连接

我在pyspark下面创建了两个数据框.在这些data frames我有专栏id.我想full outer join对这两个数据帧执行a .

valuesA = [('Pirate',1),('Monkey',2),('Ninja',3),('Spaghetti',4)]
a = sqlContext.createDataFrame(valuesA,['name','id'])

a.show()
+---------+---+
|     name| id|
+---------+---+
|   Pirate|  1|
|   Monkey|  2|
|    Ninja|  3|
|Spaghetti|  4|
+---------+---+


valuesB = [('dave',1),('Thor',2),('face',3), ('test',5)]
b = sqlContext.createDataFrame(valuesB,['Movie','id'])

b.show()
+-----+---+
|Movie| id|
+-----+---+
| dave|  1|
| Thor|  2|
| face|  3|
| test|  5|
+-----+---+


full_outer_join = a.join(b, a.id == b.id,how='full')
full_outer_join.show()

+---------+----+-----+----+
|     name|  id|Movie|  id|
+---------+----+-----+----+
|   Pirate|   1| dave|   1| …
Run Code Online (Sandbox Code Playgroud)

apache-spark pyspark

9
推荐指数
2
解决办法
1万
查看次数

使用python测试单元测试pyspark代码

我有pyspark下面的脚本.我想function在这个脚本中对单元进行单元测试.

def rename_chars(column_name):
    chars = ((' ', '_&'), ('.', '_$'))
    new_cols = reduce(lambda a, kv: a.replace(*kv), chars, column_name)
    return new_cols


def column_names(df):
    changed_col_names = df.schema.names
    for cols in changed_col_names:
        df = df.withColumnRenamed(cols, rename_chars(cols))
    return df   
Run Code Online (Sandbox Code Playgroud)

我在unittest下面写了一个测试函数.

但我不知道如何提交unittest.我做过spark-submit哪些都没有做任何事情.

import unittest
from my_script import column_names

from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)

cols = ['ID', 'NAME', 'last.name', 'abc test']
val = …
Run Code Online (Sandbox Code Playgroud)

python unit-testing pyspark

5
推荐指数
2
解决办法
2056
查看次数

将double值取整并转换为整数

我在PySpark中有一个数据框,如下所示。

import pyspark.sql.functions as func

df = sqlContext.createDataFrame(
        [(0.0, 0.2, 3.45631),
         (0.4, 1.4, 2.82945),
         (0.5, 1.9, 7.76261),
         (0.6, 0.9, 2.76790),
         (1.2, 1.0, 9.87984)],
         ["col1", "col2", "col3"])

df.show()
+----+----+-------+ 
|col1|col2|   col3|
+----+----+-------+
| 0.0| 0.2|3.45631| 
| 0.4| 1.4|2.82945|
| 0.5| 1.9|7.76261| 
| 0.6| 0.9| 2.7679| 
| 1.2| 1.0|9.87984| 
+----+----+-------+

# round 'col3' in a new column:
df2 = df.withColumn("col4", func.round(df["col3"], 2))
df2.show()

+----+----+-------+----+
|col1|col2|   col3|col4|
+----+----+-------+----+
| 0.0| 0.2|3.45631|3.46|
| 0.4| 1.4|2.82945|2.83|
| 0.5| 1.9|7.76261|7.76|
| 0.6| 0.9| 2.7679|2.77|
| 1.2| …
Run Code Online (Sandbox Code Playgroud)

python rounding apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
5951
查看次数

在熊猫数据框中使用正则表达式替换列值

我在pandas数据框中有一列,如下所示。列名是ABC

ABC
Fuel
FUEL
Fuel_12_ab
Fuel_1
Lube
Lube_1
Lube_12_a
cat_Lube
Run Code Online (Sandbox Code Playgroud)

现在我想使用正则表达式替换此列中的值,如下所示

ABC
Fuel
FUEL
Fuel
Fuel
Lube
Lube
Lube
cat_Lube
Run Code Online (Sandbox Code Playgroud)

我们如何在pandas数据帧中进行这种类型的字符串匹配。

python regex pandas

2
推荐指数
3
解决办法
2217
查看次数

在pyspark数据框中复制一列

我在pyspark下面的示例中有一个数据框.我想复制数据框中的列并重命名为另一个列名.

Name    Age    Rate
Aira     23     90
Ben      32     98
Cat      27     95
Run Code Online (Sandbox Code Playgroud)

期望的输出是:

Name    Age     Rate     Rate2
Aira    23      90       90
Ben     32      98       98
Cat     27      95       95
Run Code Online (Sandbox Code Playgroud)

我该怎么做?

apache-spark pyspark

1
推荐指数
1
解决办法
5147
查看次数