我有一个我读过的文本文件.这是一个日志文件,因此它遵循特定的模式.我最终需要创建一个JSON,但是从研究这个问题开始,一旦它出现在dict中,它将是一个使用json.loads()或者问题json.dumps().
下面是文本文件的示例.
INFO:20180606_141527:submit:is_test=False
INFO:20180606_141527:submit:username=Mary
INFO:20180606_141527:env:sys.platform=linux2
INFO:20180606_141527:env:os.name=ubuntu
Run Code Online (Sandbox Code Playgroud)
我最后寻找的字典结构是
{
"INFO": {
"submit": {
"is_test": false,
"username": "Mary"
},
"env": {
"sys.platform": "linux2",
"os.name": "ubuntu"
}
}
}
Run Code Online (Sandbox Code Playgroud)
我暂时忽略每个列表中的时间戳信息.
这是我正在使用的代码的片段,
import csv
tree_dict = {}
with open('file.log') as file:
for row in file:
for key in reversed(row.split(":")):
tree_dict = {key: tree_dict}
Run Code Online (Sandbox Code Playgroud)
这导致不希望的输出,
{'INFO': {'20180606_141527': {'submit': {'os.name=posix\n': {'INFO': {'20180606_141527': {'submit': {'sys.platform=linux2\n': {'INFO': {'20180606_141527': {'submit': {'username=a227874\n': {'INFO': {'20180606_141527': {'submit': {'is_test=False\n': {}}}}}}}}}}}}}}}}}
Run Code Online (Sandbox Code Playgroud)
我需要动态填充dict,因为我不知道实际的字段/键名.
我有一个带有一列数字的pyspark数据框.我需要对该列求和,然后将结果返回为python变量中的int.
df = spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "Number"])
Run Code Online (Sandbox Code Playgroud)
我执行以下操作来对列进行求和.
df.groupBy().sum()
Run Code Online (Sandbox Code Playgroud)
但是我得到了一个数据帧.
+-----------+
|sum(Number)|
+-----------+
| 130|
+-----------+
Run Code Online (Sandbox Code Playgroud)
我将130作为存储在变量中的int返回,以便在程序中使用.
result = 130
Run Code Online (Sandbox Code Playgroud) 我需要将列表传递给UDF,列表将确定距离的分数/类别.就目前而言,我很难将所有距离编码为第4分.
a= spark.createDataFrame([("A", 20), ("B", 30), ("D", 80)],["Letter", "distances"])
from pyspark.sql.functions import udf
def cate(label, feature_list):
if feature_list == 0:
return label[4]
label_list = ["Great", "Good", "OK", "Please Move", "Dead"]
udf_score=udf(cate, StringType())
a.withColumn("category", udf_score(label_list,a["distances"])).show(10)
Run Code Online (Sandbox Code Playgroud)
当我尝试这样的事情时,我得到了这个错误.
Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.col. Trace:
py4j.Py4JException: Method col([class java.util.ArrayList]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:339)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Run Code Online (Sandbox Code Playgroud) 我有一个非常大的 pyspark 数据框。我需要将数据帧转换为每一行的 JSON 格式的字符串,然后将该字符串发布到 Kafka 主题。我最初使用了以下代码。
for message in df.toJSON().collect():
kafkaClient.send(message)
Run Code Online (Sandbox Code Playgroud)
但是,数据框非常大,因此在尝试collect().
我正在考虑使用 aUDF因为它逐行处理它。
from pyspark.sql.functions import udf, struct
def get_row(row):
json = row.toJSON()
kafkaClient.send(message)
return "Sent"
send_row_udf = F.udf(get_row, StringType())
df_json = df.withColumn("Sent", get_row(struct([df[x] for x in df.columns])))
df_json.select("Sent").show()
Run Code Online (Sandbox Code Playgroud)
但是我收到一个错误,因为列被输入到函数而不是行。
出于说明目的,我们可以使用下面的 df,我们可以假设必须发送 Col1 和 Col2。
df= spark.createDataFrame([("A", 1), ("B", 2), ("D", 3)],["Col1", "Col2"])
Run Code Online (Sandbox Code Playgroud)
每行的 JSON 字符串:
'{"Col1":"A","Col2":1}'
'{"Col1":"B","Col2":2}'
'{"Col1":"D","Col2":3}'
Run Code Online (Sandbox Code Playgroud) 我有一个pyspark DataFrame
a = [
('Bob', 562),
('Bob',880),
('Bob',380),
('Sue',85),
('Sue',963)
]
df = spark.createDataFrame(a, ["Person", "Amount"])
Run Code Online (Sandbox Code Playgroud)
我需要创建一个哈希Amount并返回金额的列.问题是我无法使用,UDF所以我使用了映射功能.
df.rdd.map(lambda x: hash(x["Amount"]))
Run Code Online (Sandbox Code Playgroud) 我有一个数据框并使用plotly我想可视化数据。我有以下代码
fig = px.line(df, x="row_num", y="audienceWatchRatio", color='vid_id')
fig.show()
Run Code Online (Sandbox Code Playgroud)
它真的很乱,所以我想要一个下拉菜单,用户可以在其中选择vid_id并且它只显示 1 个图形。
我有一个numpy的矩阵:
arr = np.array([[2,3], [2,8], [2,3],[4,5]])
Run Code Online (Sandbox Code Playgroud)
我需要从创建一个PySpark数据框arr。我无法手动输入值,因为的长度/值arr将动态变化,因此我需要转换arr为数据框。
我尝试以下代码未成功。
df= sqlContext.createDataFrame(arr,["A", "B"])
Run Code Online (Sandbox Code Playgroud)
但是,出现以下错误。
TypeError: Can not infer schema for type: <type 'numpy.ndarray'>
Run Code Online (Sandbox Code Playgroud) 我有一个pyspark数据框。我必须进行分组,然后将某些列聚合到列表中,以便可以在数据框架上应用UDF。
例如,我创建了一个数据框,然后按人员分组。
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])
df = df.groupby("Person").agg(F.collect_list(F.struct("Amount", "Budget", "Date")).alias("data"))
df.show(truncate=False)
+------+----------------------------------------------------------------------------+
|Person|data |
+------+----------------------------------------------------------------------------+
|Bob |[[85.8,Food,2017-09-13], [7.8,Household,2017-09-13], [6.52,Food,2017-06-13]]|
+------+----------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
我省略了UDF,但下面是UDF的结果数据框。
+------+--------------------------------------------------------------+
|Person|res |
+------+--------------------------------------------------------------+
|Bob |[[562,Food,June,1], [380,Household,Sept,4], [880,Food,Sept,2]]|
+------+--------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
我需要将结果数据帧转换为行,其中列表中的每个元素都是带有新列的新行。可以在下面看到。
+------+------------------------------+
|Person|Amount|Budget |Month|Cluster|
+------+------------------------------+
|Bob |562 |Food |June |1 |
|Bob |380 |Household|Sept |4 |
|Bob |880 |Food |Sept |2 |
+------+------------------------------+
Run Code Online (Sandbox Code Playgroud) 我有一个带有列的数据框ID, Product.例如,
ID Product
1 ['a','b']
2 ['a','b','e']
3 ['c','d']
4 ['a','b','c','d']
Run Code Online (Sandbox Code Playgroud)
Product是一个列表字段,其中每个列表包含一个人拥有的产品.例如,1人ID有产品a和b.我需要找到最受欢迎/最常见的产品对.在这个例子中,产品[a,b]是最受欢迎的.它必须是最常见的产品对,因为没有人可以拥有1种产品.
我有一个 bash 文件,需要包含验证检查。基本上,该文件的目的是接受输入并激活虚拟环境。这是用这一行完成的:
source activate $1
Run Code Online (Sandbox Code Playgroud)
但是,我需要检查输入是否是有效的 conda 环境。当我执行以下命令时:
conda env list
Run Code Online (Sandbox Code Playgroud)
它将返回:
# conda environments:
#
py2713 /opt/anaconda2/envs/py2713
py341 /opt/anaconda2/envs/py341
py345 /opt/anaconda2/envs/py345
root * /opt/anaconda2
Run Code Online (Sandbox Code Playgroud)
即脚本必须检查输入是否为py2713、py341或。如果它不在环境列表中,则应返回一条消息并退出。py345root