小编Bry*_*ind的帖子

从列表列表创建dict

我有一个我读过的文本文件.这是一个日志文件,因此它遵循特定的模式.我最终需要创建一个JSON,但是从研究这个问题开始,一旦它出现在dict中,它将是一个使用json.loads()或者问题json.dumps().

下面是文本文件的示例.

INFO:20180606_141527:submit:is_test=False
INFO:20180606_141527:submit:username=Mary
INFO:20180606_141527:env:sys.platform=linux2
INFO:20180606_141527:env:os.name=ubuntu
Run Code Online (Sandbox Code Playgroud)

我最后寻找的字典结构是

{
  "INFO": {
    "submit": {
      "is_test": false,
      "username": "Mary"
    },
    "env": {
      "sys.platform": "linux2",
      "os.name": "ubuntu"
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

我暂时忽略每个列表中的时间戳信息.

这是我正在使用的代码的片段,

import csv
tree_dict = {}
with open('file.log') as file:
    for row in file:
        for key in reversed(row.split(":")):
            tree_dict = {key: tree_dict}
Run Code Online (Sandbox Code Playgroud)

这导致不希望的输出,

{'INFO': {'20180606_141527': {'submit': {'os.name=posix\n': {'INFO': {'20180606_141527': {'submit': {'sys.platform=linux2\n': {'INFO': {'20180606_141527': {'submit': {'username=a227874\n': {'INFO': {'20180606_141527': {'submit': {'is_test=False\n': {}}}}}}}}}}}}}}}}}
Run Code Online (Sandbox Code Playgroud)

我需要动态填充dict,因为我不知道实际的字段/键名.

python dictionary file

16
推荐指数
2
解决办法
2424
查看次数

PySpark - 在数据帧中对一列进行求和,并将结果作为int返回

我有一个带有一列数字的pyspark数据框.我需要对该列求和,然后将结果返回为python变量中的int.

df = spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "Number"])
Run Code Online (Sandbox Code Playgroud)

我执行以下操作来对列进行求和.

df.groupBy().sum()
Run Code Online (Sandbox Code Playgroud)

但是我得到了一个数据帧.

+-----------+
|sum(Number)|
+-----------+
|        130|
+-----------+
Run Code Online (Sandbox Code Playgroud)

我将130作为存储在变量中的int返回,以便在程序中使用.

result = 130
Run Code Online (Sandbox Code Playgroud)

python sum dataframe pyspark

11
推荐指数
4
解决办法
3万
查看次数

PySpark - 将列表作为参数传递给UDF

我需要将列表传递给UDF,列表将确定距离的分数/类别.就目前而言,我很难将所有距离编码为第4分.

a= spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])

from pyspark.sql.functions import udf
def cate(label, feature_list):
    if feature_list == 0:
        return label[4]
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]
udf_score=udf(cate, StringType())
a.withColumn("category", udf_score(label_list,a["distances"])).show(10)
Run Code Online (Sandbox Code Playgroud)

当我尝试这样的事情时,我得到了这个错误.

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
    at py4j.Gateway.invoke(Gateway.java:274)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud)

python user-defined-functions pyspark spark-dataframe

11
推荐指数
2
解决办法
1万
查看次数

PySpark - 逐行转换为 JSON

我有一个非常大的 pyspark 数据框。我需要将数据帧转换为每一行的 JSON 格式的字符串,然后将该字符串发布到 Kafka 主题。我最初使用了以下代码。

for message in df.toJSON().collect():
        kafkaClient.send(message) 
Run Code Online (Sandbox Code Playgroud)

但是,数据框非常大,因此在尝试collect().

我正在考虑使用 aUDF因为它逐行处理它。

from pyspark.sql.functions import udf, struct

def get_row(row):
    json = row.toJSON()
    kafkaClient.send(message) 
    return "Sent"

send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()
Run Code Online (Sandbox Code Playgroud)

但是我收到一个错误,因为列被输入到函数而不是行。

出于说明目的,我们可以使用下面的 df,我们可以假设必须发送 Col1 和 Col2。

df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])
Run Code Online (Sandbox Code Playgroud)

每行的 JSON 字符串:

'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'
Run Code Online (Sandbox Code Playgroud)

python json pyspark spark-dataframe

7
推荐指数
1
解决办法
1万
查看次数

PySpark - 将地图功能添加为列

我有一个pyspark DataFrame

a = [
    ('Bob', 562),
    ('Bob',880),
    ('Bob',380),
    ('Sue',85),
    ('Sue',963)
] 
df = spark.createDataFrame(a, ["Person", "Amount"])
Run Code Online (Sandbox Code Playgroud)

我需要创建一个哈希Amount并返回金额的列.问题是我无法使用,UDF所以我使用了映射功能.

df.rdd.map(lambda x: hash(x["Amount"]))
Run Code Online (Sandbox Code Playgroud)

rdd apache-spark-sql pyspark

5
推荐指数
1
解决办法
8592
查看次数

Plotly:如何使用下拉菜单过滤 Pandas 数据框?

我有一个数据框并使用plotly我想可视化数据。我有以下代码

fig = px.line(df, x="row_num", y="audienceWatchRatio", color='vid_id')
fig.show() 
Run Code Online (Sandbox Code Playgroud)

在此处输入图片说明

它真的很乱,所以我想要一个下拉菜单,用户可以在其中选择vid_id并且它只显示 1 个图形。

python plotly plotly-python

5
推荐指数
1
解决办法
9700
查看次数

PySpark-从Numpy矩阵创建DataFrame

我有一个numpy的矩阵:

arr = np.array([[2,3], [2,8], [2,3],[4,5]])
Run Code Online (Sandbox Code Playgroud)

我需要从创建一个PySpark数据框arr。我无法手动输入值,因为的长度/值arr将动态变化,因此我需要转换arr为数据框。

我尝试以下代码未成功。

df= sqlContext.createDataFrame(arr,["A", "B"])
Run Code Online (Sandbox Code Playgroud)

但是,出现以下错误。

TypeError: Can not infer schema for type: <type 'numpy.ndarray'>
Run Code Online (Sandbox Code Playgroud)

python arrays numpy pyspark spark-dataframe

4
推荐指数
2
解决办法
4026
查看次数

PySpark-将列表的列转换为行

我有一个pyspark数据框。我必须进行分组,然后将某些列聚合到列表中,以便可以在数据框架上应用UDF。

例如,我创建了一个数据框,然后按人员分组。

df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])
df = df.groupby("Person").agg(F.collect_list(F.struct("Amount", "Budget", "Date")).alias("data"))
df.show(truncate=False)
+------+----------------------------------------------------------------------------+
|Person|data                                                                        |
+------+----------------------------------------------------------------------------+
|Bob   |[[85.8,Food,2017-09-13], [7.8,Household,2017-09-13], [6.52,Food,2017-06-13]]|
+------+----------------------------------------------------------------------------+ 
Run Code Online (Sandbox Code Playgroud)

我省略了UDF,但下面是UDF的结果数据框。

+------+--------------------------------------------------------------+
|Person|res                                                           |
+------+--------------------------------------------------------------+
|Bob   |[[562,Food,June,1], [380,Household,Sept,4], [880,Food,Sept,2]]|
+------+--------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

我需要将结果数据帧转换为行,其中列表中的每个元素都是带有新列的新行。可以在下面看到。

+------+------------------------------+
|Person|Amount|Budget   |Month|Cluster|
+------+------------------------------+
|Bob   |562   |Food     |June |1      |
|Bob   |380   |Household|Sept |4      |
|Bob   |880   |Food     |Sept |2      |
+------+------------------------------+
Run Code Online (Sandbox Code Playgroud)

python group-by pyspark spark-dataframe

4
推荐指数
1
解决办法
2750
查看次数

在列表的数据框中查找最常见的对

我有一个带有列的数据框ID, Product.例如,

ID  Product
1   ['a','b']
2   ['a','b','e']
3   ['c','d']
4   ['a','b','c','d']
Run Code Online (Sandbox Code Playgroud)

Product是一个列表字段,其中每个列表包含一个人拥有的产品.例如,1人ID有产品ab.我需要找到最受欢迎/最常见的产品对.在这个例子中,产品[a,b]是最受欢迎的.它必须是最常见的产品对,因为没有人可以拥有1种产品.

python list dataframe pandas

3
推荐指数
1
解决办法
638
查看次数

检查输入是否在 Conda 环境中

我有一个 bash 文件,需要包含验证检查。基本上,该文件的目的是接受输入并激活虚拟环境。这是用这一行完成的:

source activate $1
Run Code Online (Sandbox Code Playgroud)

但是,我需要检查输入是否是有效的 conda 环境。当我执行以下命令时:

conda env list
Run Code Online (Sandbox Code Playgroud)

它将返回:

# conda environments:
#
py2713                   /opt/anaconda2/envs/py2713
py341                    /opt/anaconda2/envs/py341
py345                    /opt/anaconda2/envs/py345
root                  *  /opt/anaconda2
Run Code Online (Sandbox Code Playgroud)

即脚本必须检查输入是否为py2713py341或。如果它不在环境列表中,则应返回一条消息并退出。py345root

unix bash shell

3
推荐指数
1
解决办法
4841
查看次数