小编sji*_*han的帖子

Pyspark:在UDF中传递多个列

我正在编写一个用户定义的函数,它将获取除数据帧中第一个之外的所有列并进行求和(或任何其他操作).现在数据框有时可以有3列或4列或更多列.它会有所不同.

我知道我可以硬编码4个列名作为UDF传递,但在这种情况下它会有所不同所以我想知道如何完成它?

以下是第一个示例中的两个示例,我们有两列要添加,第二个示例中我们有三列要添加.

在此输入图像描述

apache-spark pyspark spark-dataframe

30
推荐指数
4
解决办法
3万
查看次数

PySpark:when子句中的多个条件

我想修改数据帧列(Age)的单元格值,其中当前它是空白的,我只会在另一列(Survived)的值为0时为相应的行进行修改,其中Age为空白.如果它在Survived列中为1但在Age列中为空,那么我将它保持为null.

我试图使用&&运算符,但它没有用.这是我的代码:

tdata.withColumn("Age",  when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
Run Code Online (Sandbox Code Playgroud)

任何建议如何处理?谢谢.

错误信息:

SyntaxError: invalid syntax
  File "<ipython-input-33-3e691784411c>", line 1
    tdata.withColumn("Age",  when((tdata.Age == "" && tdata.Survived == "0"), mean_age_0).otherwise(tdata.Age)).show()
                                                    ^
Run Code Online (Sandbox Code Playgroud)

python dataframe apache-spark apache-spark-sql pyspark

29
推荐指数
2
解决办法
8万
查看次数

Kinesis Firehose 在没有分隔符逗号的情况下将 JSON 对象放入 S3

在发送数据之前,我使用 JSON.stringify 到数据,它看起来像这样

{"data": [{"key1": value1, "key2": value2}, {"key1": value1, "key2": value2}]}
Run Code Online (Sandbox Code Playgroud)

但是一旦它通过 AWS API Gateway 并且 Kinesis Firehose 将它放入 S3,它看起来像这样

    {
     "key1": value1, 
     "key2": value2
    }{
     "key1": value1, 
     "key2": value2
    }
Run Code Online (Sandbox Code Playgroud)

JSON 对象之间的分隔符逗号不见了,但我需要它来正确处理数据。

API 网关中的模板:

#set($root = $input.path('$'))
{
    "DeliveryStreamName": "some-delivery-stream",
    "Records": [
#foreach($r in $root.data)
#set($data = "{
    ""key1"": ""$r.value1"",
    ""key2"": ""$r.value2""
}")
    {
        "Data": "$util.base64Encode($data)"
    }#if($foreach.hasNext),#end
#end
    ]
}
Run Code Online (Sandbox Code Playgroud)

json amazon-web-services amazon-kinesis aws-api-gateway amazon-kinesis-firehose

11
推荐指数
1
解决办法
6284
查看次数

Pyspark:将列中的json爆炸为多列

数据看起来像这样 -

+-----------+-----------+-----------------------------+
|         id|      point|                         data|
+-----------------------------------------------------+
|        abc|          6|{"key1":"124", "key2": "345"}|
|        dfl|          7|{"key1":"777", "key2": "888"}|
|        4bd|          6|{"key1":"111", "key2": "788"}|
Run Code Online (Sandbox Code Playgroud)

我试图将其分解为以下格式.

+-----------+-----------+-----------+-----------+
|         id|      point|       key1|       key2|
+------------------------------------------------
|        abc|          6|        124|        345|
|        dfl|          7|        777|        888|
|        4bd|          6|        111|        788|
Run Code Online (Sandbox Code Playgroud)

explode函数将数据框分解为多行.但这不是理想的解决方案.

注意:此解决方案不能回答我的问题. PySpark在列中"爆炸"字典

python apache-spark apache-spark-sql pyspark

10
推荐指数
2
解决办法
6676
查看次数

Chrome:使用javascript在输入文本字段上模拟按键事件

堆栈溢出中有很多内容,但似乎没有一个适用于我的情况.我有一个输入文本字段,我想模拟按键事件来填充文本字段.

原因:我在Web界面上自动执行大量数据输入任务,该界面不提供API.使用更改输入字段.value不会触发界面的JS侧(角度).这就是我想要模拟按键事件的原因.

首先我尝试了这个:

var inp = document.getElementById('rule-type');
inp.dispatchEvent(new KeyboardEvent('keypress',{'key':'a'}));
Run Code Online (Sandbox Code Playgroud)

然后我在Chrome中学习keycode保持0并且不会改变KeyBoardEvent.

所以我创建了单独的事件 ev = new KeyboardEvent('keypress',{'key':'a', 'code': 'KeyA'})

然后我再次调度,返回语句true但它不会更改输入字段.

解决方案需要是纯javascript而不是jQuery.

javascript

7
推荐指数
3
解决办法
4787
查看次数

redis:达到最大客户端数

我有这个 redis 缓存,其中的值每天设置大约 100 次。完美运行几天后,我收到连接错误“已达到最大客户端数”。重新启动服务器后,它现在工作正常,但是我想在将来避免这个问题。

在我看来,一旦我创建了一个客户端对象,它就会留在连接池中并且永远不会被杀死或删除。

这是我的代码

r = redis.StrictRedis(host= host, port=6379, db=0)
r.set(key_name, data)
Run Code Online (Sandbox Code Playgroud)

这是在迭代中。而且,我在 python 中使用 redis。

python redis

7
推荐指数
2
解决办法
2万
查看次数

查找特定节点的连接组件而不是整个图(GraphFrame/GraphX)

我在 Spark 中创建了一个 GraphFrame,该图目前如下所示:

基本上,会有很多这样的子图,其中每个子图都将彼此断开。给定一个特定的节点 ID,我想在子图中找到所有其他节点。例如,如果给定节点 ID 1,则图将遍历并返回 2,10,20,3,30。

我创建了一个主题,但它没有给出正确的结果。

testgraph.find("(a)-[]->(b); (c)-[]->(b)").filter("(a.id = '1')").show()
Run Code Online (Sandbox Code Playgroud)

不幸的是,连通分量函数考虑了整个图。是否可以使用GraphFrame/GraphX在给定特定节点 ID 的情况下获取断开连接的子图中的所有节点?

apache-spark spark-graphx spark-dataframe graphframes

5
推荐指数
1
解决办法
1950
查看次数

全局变量更改时更新反应组件状态

我有一个经常更改的全局变量。假设它存储在中window.something。在反应中,我需要将此更改反映到组件及其状态中。

示例代码:

class Example extends React.Component {

  constructor(props) {
        super(props);
        this.state = { something: '1'}
    }


  render() {
     return (
      <div>
      <input value={window.something}
             onChange={event => {this.setState({'something': event.target.value})}} 
      />
      </div>
    )
  }
}
Run Code Online (Sandbox Code Playgroud)

但是,该值仅是第一次设置,并且随着变量的更新而没有变化。

javascript reactjs

5
推荐指数
1
解决办法
5164
查看次数

PySpark:不使用循环将 DataFrame 拆分为多个 DataFrame

嗨,我有一个如图所示的 DataFrame -

ID       X        Y

1      1234      284

1      1396      179

2      8620      178

3      1620      191

3      8820      828
Run Code Online (Sandbox Code Playgroud)

我想根据 ID 将此 DataFrame 拆分为多个 DataFrame。因此,对于此示例,将有 3 个 DataFrame。实现它的一种方法是在循环中运行过滤器操作。但是,我想知道是否可以以更有效的方式完成。

python apache-spark pyspark spark-dataframe

3
推荐指数
2
解决办法
1万
查看次数

Pyspark:读取对象之间没有分隔符的 JSON 数据文件

我有一个将数据放入 S3 的 kinesis firehose 传输流。但是在数据文件中,json 对象之间没有分隔符。所以它看起来像这样,

{
  "key1" : "value1",
  "key2" : "value2"
}{
  "key1" : "value1",
  "key2" : "value2"
}
Run Code Online (Sandbox Code Playgroud)

在 Apache Spark 中,我这样做是为了读取数据文件,

df = spark.read.schema(schema).json(path, multiLine=True)
Run Code Online (Sandbox Code Playgroud)

这只能读取文件中的第一个 json 对象,其余的将被忽略,因为没有分隔符。

如何在 spark 中使用解决此问题?

json apache-spark pyspark databricks amazon-kinesis-firehose

3
推荐指数
1
解决办法
1631
查看次数

Pyspark 按列分区数据并写入 parquet

我需要按列中的值在单独的 s3 键中写入镶木地板文件。该列city有数千个值。使用 for 循环进行迭代,按每个列值过滤数据帧,然后写入镶木地板非常慢。有什么方法可以按列对数据帧进行分区city并写入镶木地板文件吗?

我目前正在做的事情——

for city in cities:
  print(city)
  spark_df.filter(spark_df.city == city).write.mode('overwrite').parquet(f'reporting/date={date_string}/city={city}')
Run Code Online (Sandbox Code Playgroud)

dataframe apache-spark pyspark

3
推荐指数
1
解决办法
5868
查看次数

NumPy:从每一行中找到最大值,将其设置为1并保持为0

我有一个2D numpy数组,

array([[ 0.49596769,  1.15846407, -1.38944733],
       [-0.47042814, -0.07512128 , 1.90417981]], dtype=float32)
Run Code Online (Sandbox Code Playgroud)

我想找到每一行的最大值并将其更改为1并保持为0.就像这样.

array([[ 0.,  1.,  0.],
       [ 0.,  0.,  1.]], dtype=float32)
Run Code Online (Sandbox Code Playgroud)

使用numpy完成任务的最有效方法是什么?

python numpy

2
推荐指数
1
解决办法
1579
查看次数