小编Ole*_*siy的帖子

使用pyspark用以前已知的良好值填充null

有没有办法null用最后一个有效值替换pyspark数据帧中的值?如果您认为需要它们用于Windows分区和排序,则有附加列timestampsession列.更具体地说,我想实现以下转换:

+---------+-----------+-----------+      +---------+-----------+-----------+
| session | timestamp |         id|      | session | timestamp |         id|
+---------+-----------+-----------+      +---------+-----------+-----------+
|        1|          1|       null|      |        1|          1|       null|
|        1|          2|        109|      |        1|          2|        109|
|        1|          3|       null|      |        1|          3|        109|
|        1|          4|       null|      |        1|          4|        109|
|        1|          5|        109| =>   |        1|          5|        109|
|        1|          6|       null|      |        1|          6|        109|
|        1|          7|        110|      |        1|          7|        110| …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

19
推荐指数
3
解决办法
8387
查看次数

如何使tqdm与詹金斯玩得很好?

喜欢tqdm进度条,但是当我在jenkins上使用它时,我会在stdout(特别是无所不在[A)中得到一堆奇怪的文物和太多的臃肿.在tqdm中是否有秘密模式使其与jenkins很好地协同工作?用于无缝检测jenkins等非交互式shell的奖励积分.这是我的典型输出:

label: 0it [00:00, ?it/s][A
[A
 16%|#6        | 5378/33302 [36:28<2:30:49,  3.09it/s]
[A
 16%|#6        | 5379/33302 [36:29<2:36:46,  2.97it/s]
[A
...
Run Code Online (Sandbox Code Playgroud)

jenkins tqdm

11
推荐指数
1
解决办法
364
查看次数

Jenkins 并行管道中的顺序阶段

我在 Jenkins 中有一个动态脚本化管道,它有许多并行阶段,但在每个阶段内都有多个串行步骤。我已经浪费了好几天的时间试图让它工作:无论我尝试什么,所有串行子阶段都集中到一个阶段!这是我现在所拥有的:

node () {
    stage("Parallel Demo") {
        // Canonical example to run steps in parallel

        // The map we'll store the steps in
        def stepsToRun = [:]

        for (int i = 1; i < 5; i++) {
            stepsToRun["Step${i}"] = { node {
                echo "start"
                sleep 1
                echo "done"
            }}
        }
        // Actually run the steps in parallel
        // parallel takes a map as an argument
        parallel stepsToRun
    }
}
Run Code Online (Sandbox Code Playgroud)

它给我带来了这个美丽的并行管道:

在此输入图像描述

然而,当我添加串行阶段时,又名:

node () {
    stage("Parallel Demo") {
        // …
Run Code Online (Sandbox Code Playgroud)

jenkins jenkins-groovy jenkins-pipeline

10
推荐指数
1
解决办法
5200
查看次数

使用mock模拟嵌套属性

我有一个函数调用返回一个对象:

r = Foo(x,y)
Run Code Online (Sandbox Code Playgroud)

哪里r有一组丰富的嵌套属性.例如,我可以访问r.prop_a.prop_b.prop_c.我想模拟Foo,以便r修改一个特定的叶属性,即r.prop_a.prop_b.prop_c返回一个我控制下的值:

>> r = Foo(x,y)
>> r.prop_a.prop_b.prop_c
'fish'
>> # some mock magic patching of Foo is taking place here
>> r = Foo(x,y)
>> r.prop_a.prop_b.prop_c
'my_fish'
Run Code Online (Sandbox Code Playgroud)

我不太关心中间性质.

有没有一种优雅的方式来模拟嵌套属性与mock

python unit-testing nested properties mocking

8
推荐指数
1
解决办法
4755
查看次数

重复使用Django代码自动重载功能以用于自定义管理命令

我喜欢django服务器如何在代码更改时自动重新加载自身,因此无需重新启动服务器。

当前,我们使用django自定义管理命令,这可能需要很长时间才能完成。

有什么方法可以对我们的管理命令使用django服务器的自动重载功能?

例如,如果检测到基础django代码库的更改,该命令将重新加载自身并恢复非常长(无状态)循环的执行。

django django-command-extensions

6
推荐指数
2
解决办法
1332
查看次数

当用户登录Windows时,保持jenkins slave脱机

当用户开始使用slave时,有没有办法配置Jenkins将Windows slave标记为脱机?

编辑:开始使用我的意思是用户登录.

windows continuous-integration jenkins jenkins-plugins

6
推荐指数
1
解决办法
678
查看次数

5
推荐指数
1
解决办法
1153
查看次数

mock.side_effect 中的临时“unpatching”功能

有什么方法可以使用mock内临时撤消修补side_effect吗?特别是我想做这样的工作:

from mock import patch
import urllib2
import unittest

class SimpleTest(unittest.TestCase):
    def setUp(self):
        self.urlpatcher = patch('urllib2.urlopen')
        self.urlopen = self.urlpatcher.start()

        def side_effect(url):
            #do some interesting stuff first

            #... temporary unpatch urllib2.urlopen so that we can call a real one here
            r = urllib2.urlopen(url) #this ought to be the real deal now
            #... patch it again

            return r

        self.urlopen.side_effect = side_effect

    def test_feature(self):
        #almost real urllib2.urlopen usage goes here
        p = urllib2.urlopen("www.google.com").read()


if __name__ == '__main__':
    unittest.main()
Run Code Online (Sandbox Code Playgroud)

python unit-testing mocking

5
推荐指数
1
解决办法
2607
查看次数

Scapy:添加具有复杂字段分组的新协议

我正在尝试使用指定新的数据包格式scapy.在数据包中有一个项目列表,项目由"分组字段"组成."分组字段"是指不同类型的字段的子序列.在scapy中创建我所知道的"分组字段"的唯一方法是使用Packet类和使用FieldLenField/ PacketListField来引用序列的长度和列表成员的类型.这是要走的路吗?看起来像这样的东西:

from scapy.packet import Packet
from scapy.fields import *

class RepeatingGroupedSequence(Packet):
    name = "Simple group of two fields"

    fields_desc = [IntField('field1', 1), 
                   IntField('field2', 2)]

class TopLayer(Packet):
    name = "Storage for Repeating Sequence"

    fields_desc = [FieldLenField("length", None, count_of='rep_seq'),
                   PacketListField('rep_seq', None, RepeatingGroupedSequence, 
                                   count_from = lambda pkt: pkt.length),
                  ]

#Now here is the problem that I have with assembling PacketListField: 

#craft TopLayer packet
p = TopLayer()

#add two "repeated sequences"
p.rep_seq = [ RepeatingGroupedSequence(), RepeatingGroupedSequence() …
Run Code Online (Sandbox Code Playgroud)

python scapy

5
推荐指数
1
解决办法
3689
查看次数

在python中按时联接两个Spark数据帧(TimestampType)

我有两个数据框,我想基于一个列将它们连接起来,但需要注意的是,该列是一个时间戳,并且该时间戳必须在一定的偏移量(5秒)内才能连接记录。更具体地讲,dates_dfwith中的记录date=1/3/2015:00:00:00应该与events_dfwith 一起加入,time=1/3/2015:00:00:01因为两个时间戳都在5秒钟之内。

我正在尝试让这种逻辑与python spark一起使用,这非常痛苦。人们如何像这样在火花中加入?

我的方法是向其添加两个额外的列,以5秒的偏移量dates_df确定lower_timestampupper_timestamp边界,并执行条件连接。这就是失败的地方,更具体地说:

joined_df = dates_df.join(events_df, 
    dates_df.lower_timestamp < events_df.time < dates_df.upper_timestamp)

joined_df.explain()
Run Code Online (Sandbox Code Playgroud)

仅捕获查询的最后一部分:

Filter (time#6 < upper_timestamp#4)
 CartesianProduct
 ....
Run Code Online (Sandbox Code Playgroud)

这给我一个错误的结果。

我是否真的需要对每个不等式进行完全笛卡尔联接,并在进行过程中删除重复项?

这是完整的代码:

from datetime import datetime, timedelta

from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf


master = 'local[*]'
app_name = 'stackoverflow_join'

conf = SparkConf().setAppName(app_name).setMaster(master)
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

def lower_range_func(x, offset=5):
    return x …
Run Code Online (Sandbox Code Playgroud)

join apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
1万
查看次数