我想更准确地了解在pyspark中使用方法缓存for dataframe
当我运行df.cache()它时返回一个数据帧.因此,如果我这样做df2 = df.cache(),哪个数据帧在缓存中?难道df,df2或两者兼而有之?
当我df.show()在 jupyter notebook 中查看 pyspark 数据框时
它告诉我:
+---+-------+-------+-------+------+-----------+-----+-------------+-----+---------+----------+-----+-----------+-----------+--------+---------+-------+------------+---------+------------+---------+---------------+------------+---------------+---------+------------+
| Id|groupId|matchId|assists|boosts|damageDealt|DBNOs|headshotKills|heals|killPlace|killPoints|kills|killStreaks|longestKill|maxPlace|numGroups|revives|rideDistance|roadKills|swimDistance|teamKills|vehicleDestroys|walkDistance|weaponsAcquired|winPoints|winPlacePerc|
+---+-------+-------+-------+------+-----------+-----+-------------+-----+---------+----------+-----+-----------+-----------+--------+---------+-------+------------+---------+------------+---------+---------------+------------+---------------+---------+------------+
| 0| 24| 0| 0| 5| 247.3000| 2| 0| 4| 17| 1050| 2| 1| 65.3200| 29| 28| 1| 591.3000| 0| 0.0000| 0| 0| 782.4000| 4| 1458| 0.8571|
| 1| 440875| 1| 1| 0| 37.6500| 1| 1| 0| 45| 1072| 1| 1| 13.5500| 26| 23| 0| 0.0000| 0| 0.0000| 0| 0| 119.6000| 3| 1511| 0.0400|
| 2| 878242| 2| 0| 1| 93.7300| 1| 0| 2| 54| 1404| 0| …Run Code Online (Sandbox Code Playgroud) 我正在尝试比较 2 个数据帧的架构。基本上,列和类型是相同的,但“可为空”可以不同:
数据框A
StructType(List(
StructField(ClientId,StringType,True),
StructField(PublicId,StringType,True),
StructField(ExternalIds,ArrayType(StructType(List(
StructField(AppId,StringType,True),
StructField(ExtId,StringType,True),
)),True),True),
....
Run Code Online (Sandbox Code Playgroud)
数据框B
StructType(List(
StructField(ClientId,StringType,True),
StructField(PublicId,StringType,False),
StructField(ExternalIds,ArrayType(StructType(List(
StructField(AppId,StringType,True),
StructField(ExtId,StringType,False),
)),True),True),
....
Run Code Online (Sandbox Code Playgroud)
当我这样做时df_A.schema == df_B.schema,结果False很明显。但我想忽略“nullable”参数,无论它是 false 还是 true,如果结构相同,它应该返回True。
是否可以 ?
目前,我正在使用 Pythonsubprocess.POPEN和 PuTTY进行 SFTP 传输psftp.exe。
它正在工作,但不是很干净,也不是很便携。
我想使用 Python pysftp 重现相同的行为,但我不知道在哪里输入所有参数。我在 PuTTY 中有以下配置:
我应该如何在 pysftp 中输入所有这些参数,以便我可以检索我的文件?
编辑:使用 Martin Prikryl 的答案,我发现了一些新的东西可以探索。如果我理解得很好,我需要使用套接字。把我输入我需要的所有信息仍然有一些问题。
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy = ("gw.proxy.fr",1234)
sock.connect(proxy)
target=("123.123.123.255",23)
cmd_connect = "CONNECT {}:{} HTTP/1.1\r\n\r\n".format(*target)
sock.sendall(cmd_connect)
Run Code Online (Sandbox Code Playgroud)
我收到的响应是HTTP/1.0 407 Proxy Authentication Required,这很正常,因为我没有在任何地方使用代理身份验证信息。那么,您知道我如何使用它们并将它们输入到我的套接字中吗?
我是 Spark 新手。我正在尝试创建 Spark 会话pyspark.sql以加载 .csv 文件。但是,每次我尝试执行第二行(如下所示)时,该命令都会继续执行几个小时并且似乎永远不会生成代码的其他行。代码如下:
from pyspark.sql import SparkSession
sp = SparkSession.builder.appName("solution").config("spark.some.config.option", "some-value").getOrCreate()
df = sp.read.csv('walmart_stock.csv', header= True, inferSchema= True)
df.columns
Run Code Online (Sandbox Code Playgroud)
另外,如果我等待很长时间后杀死内核,则会出现以下异常:
<ipython-input-23-16c3797ce83f> in <module>
----> 1 sp = SparkSession.builder.appName("solution").config("spark.some.config.option", "some-value").getOrCreate()
~\anaconda3\lib\site-packages\pyspark\sql\session.py in getOrCreate(self)
184 sparkConf.set(key, value)
185 # This SparkContext may be an existing one.
--> 186 sc = SparkContext.getOrCreate(sparkConf)
187 # Do not update `SparkConf` for existing `SparkContext`, as it's shared
188 # by all sessions.
~\anaconda3\lib\site-packages\pyspark\context.py in getOrCreate(cls, conf)
369 with SparkContext._lock:
370 …Run Code Online (Sandbox Code Playgroud) 这是我的数据框的虚拟样本
data = [
[3273, "city y", [["ids", 27], ["smf", 13], ["tlk", 35], ["thr", 24]]],
[3213, "city x", [["smf", 23], ["tlk", 15], ["ids", 17], ["thr", 34]]],
]
df = spark.createDataFrame(
data, "city_id:long, city_name:string, cel:array<struct<carr:string, subs:int>>"
)
df.show(2, False)
+-------+---------+--------------------------------------------+
|city_id|city_name|cel |
+-------+---------+--------------------------------------------+
|3273 |city y |[[ids, 27], [smf, 13], [tlk, 35], [thr, 24]]|
|3213 |city x |[[smf, 23], [tlk, 15], [ids, 17], [thr, 34]]|
+-------+---------+--------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
我需要根据其subs值对列cel的数组进行降序排序。会是这样的
+-------+---------+--------------------------------------------+
|city_id|city_name|cel |
+-------+---------+--------------------------------------------+
|3273 |city y |[[tlk, 35], [ids, 27], …Run Code Online (Sandbox Code Playgroud) 假设我创建了一个类:
class SomeClass:
def __init__(self, some_attribute):
self._attribute = some_attribute
@property
def attribute(self):
return self._attribute
Run Code Online (Sandbox Code Playgroud)
然后,我new_method向我的对象添加一个方法,该方法将使用"属性".因此,我应该使用self._attribute或self.attribute?:
def new_method(self):
DoSomething(self.attribute) # or DoSomething(self._attribute)
Run Code Online (Sandbox Code Playgroud)
它会产生任何影响或差异吗?
我有代码:
from functools import reduce
public_ids = [1,2,3,4,5]
filepath = '/path/to/file/'
rdd = sc.textFile(
filepath
)
new_rdd = reduce(
lambda a, b: a.filter(
lambda x: b not in x
),
public_ids,
rdd
)
Run Code Online (Sandbox Code Playgroud)
此代码假定根据id列表过滤rdd中的行.rdd是使用spark context sc的textFile方法从位于filepath中的文件创建的.
此代码工作正常,但是pylint会引发错误:
E:未定义变量'b'(未定义变量)
我相信我编码它的方式不是正确的方法.如何更改它,以便pylint不会再次引发错误?或者它只是一个pylint无法正确识别的结构?
如何转置只有一列和多行的 Dataframe 表,例如:
1
2
3
5
6
7
...
Run Code Online (Sandbox Code Playgroud)
到只有一行和多列的数据框,例如:
1,2,3,4,5,6,7,8,9,10,...
Run Code Online (Sandbox Code Playgroud) 我想使用 JDBC 从 Postgresql 读取数据并将其存储在 pyspark dataframe 中。当我想使用 df.show()、df.take() 等方法预览数据框中的数据时,它们返回一个错误,指出原因是:java.lang.ClassNotFoundException: org.postgresql.Driver。但是 df.printschema() 会完美地返回数据库表的信息。这是我的代码:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.master("spark://spark-master:7077")
.appName("read-postgres-jdbc")
.config("spark.driver.extraClassPath", "/opt/workspace/postgresql-42.2.18.jar")
.config("spark.executor.memory", "1g")
.getOrCreate()
)
sc = spark.sparkContext
df = (
spark.read.format("jdbc")
.option("driver", "org.postgresql.Driver")
.option("url", "jdbc:postgresql://postgres/postgres")
.option("table", 'public."ASSET_DATA"')
.option("dbtable", _select_sql)
.option("user", "airflow")
.option("password", "airflow")
.load()
)
df.show(1)
Run Code Online (Sandbox Code Playgroud)
错误日志:
Py4JJavaError: An error occurred while calling o44.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 …Run Code Online (Sandbox Code Playgroud)