小编Ste*_*ven的帖子

在pyspark中缓存数据帧

我想更准确地了解在pyspark中使用方法缓存for dataframe

当我运行df.cache()它时返回一个数据帧.因此,如果我这样做df2 = df.cache(),哪个数据帧在缓存中?难道df,df2或两者兼而有之?

caching pyspark

9
推荐指数
1
解决办法
9517
查看次数

如何在 jupyter 中像 Pyspark Dataframe 一样打印 Pyspark Dataframe

当我df.show()在 jupyter notebook 中查看 pyspark 数据框时

它告诉我:

+---+-------+-------+-------+------+-----------+-----+-------------+-----+---------+----------+-----+-----------+-----------+--------+---------+-------+------------+---------+------------+---------+---------------+------------+---------------+---------+------------+
| Id|groupId|matchId|assists|boosts|damageDealt|DBNOs|headshotKills|heals|killPlace|killPoints|kills|killStreaks|longestKill|maxPlace|numGroups|revives|rideDistance|roadKills|swimDistance|teamKills|vehicleDestroys|walkDistance|weaponsAcquired|winPoints|winPlacePerc|
+---+-------+-------+-------+------+-----------+-----+-------------+-----+---------+----------+-----+-----------+-----------+--------+---------+-------+------------+---------+------------+---------+---------------+------------+---------------+---------+------------+
|  0|     24|      0|      0|     5|   247.3000|    2|            0|    4|       17|      1050|    2|          1|    65.3200|      29|       28|      1|    591.3000|        0|      0.0000|        0|              0|    782.4000|              4|     1458|      0.8571|
|  1| 440875|      1|      1|     0|    37.6500|    1|            1|    0|       45|      1072|    1|          1|    13.5500|      26|       23|      0|      0.0000|        0|      0.0000|        0|              0|    119.6000|              3|     1511|      0.0400|
|  2| 878242|      2|      0|     1|    93.7300|    1|            0|    2|       54|      1404|    0| …
Run Code Online (Sandbox Code Playgroud)

view dataframe pandas pyspark jupyter

7
推荐指数
2
解决办法
5725
查看次数

比较模式忽略可为空

我正在尝试比较 2 个数据帧的架构。基本上,列和类型是相同的,但“可为空”可以不同:

数据框A

StructType(List(
StructField(ClientId,StringType,True),
StructField(PublicId,StringType,True),
StructField(ExternalIds,ArrayType(StructType(List(
    StructField(AppId,StringType,True),
    StructField(ExtId,StringType,True),
)),True),True),
....
Run Code Online (Sandbox Code Playgroud)

数据框B

StructType(List(
StructField(ClientId,StringType,True),
StructField(PublicId,StringType,False),
StructField(ExternalIds,ArrayType(StructType(List(
    StructField(AppId,StringType,True),
    StructField(ExtId,StringType,False),
)),True),True),
....
Run Code Online (Sandbox Code Playgroud)

当我这样做时df_A.schema == df_B.schema,结果False很明显。但我想忽略“nullable”参数,无论它是 false 还是 true,如果结构相同,它应该返回True

是否可以 ?

apache-spark pyspark

6
推荐指数
1
解决办法
1886
查看次数

使用 pysftp 通过 HTTP 代理进行 Python 连接

目前,我正在使用 Pythonsubprocess.POPEN和 PuTTY进行 SFTP 传输psftp.exe

它正在工作,但不是很干净,也不是很便携。

我想使用 Python pysftp 重现相同的行为,但我不知道在哪里输入所有参数。我在 PuTTY 中有以下配置:

  • 服务器 IP : 123.123.123.255
  • 服务器端口:22
  • 连接类型:SSH
  • 自动登录用户名:MyUser
  • 代理类型:HTTP
  • 代理主机名:gw.proxy.fr
  • 代理端口:1234
  • 代理用户名:ProxyUser
  • 代理密码:ProxyPass

我应该如何在 pysftp 中输入所有这些参数,以便我可以检索我的文件?

编辑:使用 Martin Prikryl 的答案,我发现了一些新的东西可以探索。如果我理解得很好,我需要使用套接字。把我输入我需要的所有信息仍然有一些问题。

import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy = ("gw.proxy.fr",1234)
sock.connect(proxy)
target=("123.123.123.255",23)
cmd_connect = "CONNECT {}:{} HTTP/1.1\r\n\r\n".format(*target)
sock.sendall(cmd_connect)
Run Code Online (Sandbox Code Playgroud)

我收到的响应是HTTP/1.0 407 Proxy Authentication Required,这很正常,因为我没有在任何地方使用代理身份验证信息。那么,您知道我如何使用它们并将它们输入到我的套接字中吗?

python windows proxy putty pysftp

5
推荐指数
1
解决办法
5524
查看次数

使用 pyspark 创建 SparkSession 时出现问题

我是 Spark 新手。我正在尝试创建 Spark 会话pyspark.sql以加载 .csv 文件。但是,每次我尝试执行第二行(如下所示)时,该命令都会继续执行几个小时并且似乎永远不会生成代码的其他行。代码如下:

from pyspark.sql import SparkSession
sp = SparkSession.builder.appName("solution").config("spark.some.config.option", "some-value").getOrCreate()
df = sp.read.csv('walmart_stock.csv', header= True, inferSchema= True)
df.columns
Run Code Online (Sandbox Code Playgroud)

另外,如果我等待很长时间后杀死内核,则会出现以下异常:

<ipython-input-23-16c3797ce83f> in <module>
----> 1 sp = SparkSession.builder.appName("solution").config("spark.some.config.option", "some-value").getOrCreate()

~\anaconda3\lib\site-packages\pyspark\sql\session.py in getOrCreate(self)
    184                             sparkConf.set(key, value)
    185                         # This SparkContext may be an existing one.
--> 186                         sc = SparkContext.getOrCreate(sparkConf)
    187                     # Do not update `SparkConf` for existing `SparkContext`, as it's shared
    188                     # by all sessions.

~\anaconda3\lib\site-packages\pyspark\context.py in getOrCreate(cls, conf)
    369         with SparkContext._lock:
    370 …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark jupyter-notebook

5
推荐指数
1
解决办法
3630
查看次数

PySpark 对结构体数组进行排序

这是我的数据框的虚拟样本

data = [
    [3273, "city y", [["ids", 27], ["smf", 13], ["tlk", 35], ["thr", 24]]],
    [3213, "city x", [["smf", 23], ["tlk", 15], ["ids", 17], ["thr", 34]]],
]
df = spark.createDataFrame(
    data, "city_id:long, city_name:string, cel:array<struct<carr:string, subs:int>>"
)
df.show(2, False)

+-------+---------+--------------------------------------------+
|city_id|city_name|cel                                         |
+-------+---------+--------------------------------------------+
|3273   |city y   |[[ids, 27], [smf, 13], [tlk, 35], [thr, 24]]|
|3213   |city x   |[[smf, 23], [tlk, 15], [ids, 17], [thr, 34]]|
+-------+---------+--------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

我需要根据其subs值对列cel的数组进行降序排序。会是这样的

+-------+---------+--------------------------------------------+
|city_id|city_name|cel                                         |
+-------+---------+--------------------------------------------+
|3273   |city y   |[[tlk, 35], [ids, 27], …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

5
推荐指数
1
解决办法
7474
查看次数

我应该使用属性还是私有属性?

假设我创建了一个类:

class SomeClass:    
    def __init__(self, some_attribute):
        self._attribute = some_attribute

    @property
    def attribute(self):
        return self._attribute
Run Code Online (Sandbox Code Playgroud)

然后,我new_method向我的对象添加一个方法,该方法将使用"属性".因此,我应该使用self._attributeself.attribute?:

def new_method(self):
    DoSomething(self.attribute) # or     DoSomething(self._attribute)
Run Code Online (Sandbox Code Playgroud)

它会产生任何影响或差异吗?

python

3
推荐指数
1
解决办法
84
查看次数

lambda函数中的未定义变量

我有代码:

from functools import reduce

public_ids = [1,2,3,4,5]
filepath = '/path/to/file/'

rdd = sc.textFile(
    filepath
)

new_rdd = reduce(
    lambda a, b: a.filter(
        lambda x: b not in x
    ),
    public_ids,
    rdd
)
Run Code Online (Sandbox Code Playgroud)

此代码假定根据id列表过滤rdd中的行.rdd是使用spark context sc的textFile方法从位于filepath中的文件创建的.

此代码工作正常,但是pylint会引发错误:

E:未定义变量'b'(未定义变量)

我相信我编码它的方式不是正确的方法.如何更改它,以便pylint不会再次引发错误?或者它只是一个pylint无法正确识别的结构?

python pylint pyspark

3
推荐指数
1
解决办法
268
查看次数

在 pyspark 中将行转置为列

如何转置只有一列和多行的 Dataframe 表,例如:

1
2
3
5
6
7
...
Run Code Online (Sandbox Code Playgroud)

到只有一行和多列的数据框,例如:

1,2,3,4,5,6,7,8,9,10,...
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

3
推荐指数
1
解决办法
1万
查看次数

由于 java.lang.ClassNotFoundException: org.postgresql.Driver 导致 pyspark 数据帧错误

我想使用 JDBC 从 Postgresql 读取数据并将其存储在 pyspark dataframe 中。当我想使用 df.show()、df.take() 等方法预览数据框中的数据时,它们返回一个错误,指出原因是:java.lang.ClassNotFoundException: org.postgresql.Driver。但是 df.printschema() 会完美地返回数据库表的信息。这是我的代码:

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.master("spark://spark-master:7077")
    .appName("read-postgres-jdbc")
    .config("spark.driver.extraClassPath", "/opt/workspace/postgresql-42.2.18.jar")
    .config("spark.executor.memory", "1g")
    .getOrCreate()
)
sc = spark.sparkContext

df = (
    spark.read.format("jdbc")
    .option("driver", "org.postgresql.Driver")
    .option("url", "jdbc:postgresql://postgres/postgres")
    .option("table", 'public."ASSET_DATA"')
    .option("dbtable", _select_sql)
    .option("user", "airflow")
    .option("password", "airflow")
    .load()
)

df.show(1)
Run Code Online (Sandbox Code Playgroud)

错误日志:

Py4JJavaError: An error occurred while calling o44.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 …
Run Code Online (Sandbox Code Playgroud)

postgresql jdbc apache-spark pyspark

2
推荐指数
1
解决办法
1万
查看次数