小编Pab*_*blo的帖子

如何从Google Dataflow中的PCollection中获取元素列表并在管道中使用它来循环写入转换?

我正在使用带有Python SDK的Google Cloud Dataflow.

我想要 :

  • 从主PCollection中获取唯一日期列表
  • 循环遍历该列表中的日期以创建过滤的PCollections(每个都具有唯一的日期),并将每个过滤的PCollection写入BigQuery中时间分区表中的分区.

我怎样才能获得该列表?在下面的combine变换之后,我创建了一个ListPCollectionView对象但是我无法迭代该对象:

class ToUniqueList(beam.CombineFn):

    def create_accumulator(self):
        return []

    def add_input(self, accumulator, element):
        if element not in accumulator:
            accumulator.append(element)
        return accumulator

    def merge_accumulators(self, accumulators):
        return list(set(accumulators))

    def extract_output(self, accumulator):
        return accumulator


def get_list_of_dates(pcoll):

    return (pcoll
            | 'get the list of dates' >> beam.CombineGlobally(ToUniqueList()))
Run Code Online (Sandbox Code Playgroud)

我做错了吗?最好的方法是什么?

谢谢.

python google-bigquery google-cloud-dataflow apache-beam

2
推荐指数
1
解决办法
4427
查看次数

threading.local是否使用锁进行访问?

Lock.acquire/release在以下声明中是否有一个隐藏的内容:

my_thread_local = threading.local()
my_thread_local.some_value = 1
Run Code Online (Sandbox Code Playgroud)

这个怎么样:

local_variable = my_thread_local.some_value
Run Code Online (Sandbox Code Playgroud)

python multithreading python-multithreading

2
推荐指数
1
解决办法
93
查看次数

在 Apache Beam PCollection 中使用 AutoValueSchema 会出现“RuntimeException:创建者参数 arg0 不对应于架构字段”

我试图拥有我创建的 AutoValue 定义对象的 PCollection,并且我添加了适当的注释以通过DefaultSchema(AutoValueSchema.class). 就像这样:

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class MyAutoClas {
  public abstract String getMyStr();
  public abstract Integer getMyInt();

  @CreateSchema
  public static MyAutoClass create(String myStr, Integer myInt) {
    return new AutoValue_MyAutoClass(myStr, myInt);
  }
}
Run Code Online (Sandbox Code Playgroud)

我有一个小测试用例,如下所示:

PCollection<KV<String, MyAutoClass>> result = pipeline
    .apply(Create.of(MyAutoClass.create("abc", 1)))
    .apply(WithKeys.of(in -> in.getMyStr()));

PAssert.that(result).containsInAnyOrder(KV.of("abc", MyAutoClass.create("abc", 1)));
pipeline.run().waitUntilFinish();
Run Code Online (Sandbox Code Playgroud)

当我尝试运行此程序时,我看到以下错误:

[ERROR] testMyAutoValueClass(.....)  Time elapsed: 1.891 s  <<< ERROR!
java.lang.RuntimeException: Creator parameter arg0 Doesn't correspond to a schema field
    at org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$InvokeUserCreateInstruction.<init>(ByteBuddyUtils.java:717)
    at org.apache.beam.sdk.schemas.utils.ByteBuddyUtils$StaticFactoryMethodInstruction.<init>(ByteBuddyUtils.java:660)
    at org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createStaticCreator(JavaBeanUtils.java:284)
    at org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getStaticCreator$4(JavaBeanUtils.java:273)
    at …
Run Code Online (Sandbox Code Playgroud)

java auto-value apache-beam

2
推荐指数
1
解决办法
1626
查看次数

ls在目录中的文件列表

我有一个C代码库,都位于同一目录中.我想找到所有具有相同名称的代码文件的头文件.

现在,我有以下命令:

ls *.h | sed s/.h/.c/
Run Code Online (Sandbox Code Playgroud)

这将返回我要搜索的文件名"列表".如何将此列表传递给另一个命令,以便我可以看到哪些头文件具有共享相同名称的代码文件?

unix linux shell ls

1
推荐指数
1
解决办法
490
查看次数

使用 Python 的 argparse 添加复杂的默认值

我用来argparse解析控制台输入。对于我的输入选项之一,我想添加一个依赖于其他选项的默认选项:

parser.add_argument('--job_name')
parser.add_argument('--user_name',
                    default='guest')
Run Code Online (Sandbox Code Playgroud)

我希望用户能够定义 a job_name,但如果用户没有定义,我希望是job_nameuser_name + datetime.date.today()(例如'guest2016-10-23')。

可以在 之内做到这一点吗argparse?也许通过给予lambda...?

python

1
推荐指数
1
解决办法
100
查看次数

ParDo中的侧输出 Apache Beam Python SDK

由于文档仅适用于JAVA,我无法理解其含义.

它声明 - "虽然ParDo总是产生一个主输出PCollection(作为应用的返回值),你也可以让你的ParDo产生任意数量的额外输出PCollections.如果你选择有多个输出,你的ParDo将返回所有的输出PCollections(包括主输出)捆绑在一起.例如,在Java中,输出PCollections捆绑在一个类型安全的PCollectionTuple中."

我理解捆绑在一起意味着什么,但如果我在我的DoFn中产生一个标签,它是否会产生一个包含所有其他输出的空包,并在代码中遇到它们时产生其他输出?或者它等待所有产量准备好输入并将它们全部输出到一起?

文档中没有太多清晰度.虽然我认为它不会等待,只是遇到收益,但我仍然需要了解发生了什么.

google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
1796
查看次数

在 Google Dataflow 中使用 FireStore

我想在带有 python 的数据流模板中使用 FireStore。

我做了这样的事情:

with beam.Pipeline(options=options) as p:
(p
| 'Read from PubSub' >> beam.io.ReadFromPubSub(sub).with_output_types(bytes)
| 'String to dictionary' >> beam.Map(firestore_update_multiple)
)
Run Code Online (Sandbox Code Playgroud)

这是使用它的适当方式吗?


额外的信息

def firestore_update_multiple(row):
    from google.cloud import firestore
    db = firestore.Client()
    doc_ref = db.collection(u'data').document(u'one')

    doc_ref.update({
        u'arrayExample': u'DataflowRunner',
        u'booleanExample': True
    })
Run Code Online (Sandbox Code Playgroud)

python google-cloud-platform google-cloud-dataflow google-cloud-firestore

1
推荐指数
1
解决办法
2141
查看次数

在 Python Apache Beam 中使用 value provider 参数的方法

现在我只能使用 ParDo 获取类中的 RunTime 值,还有其他方法可以像在我的函数中一样使用运行时参数吗?

这是我现在得到的代码:

class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument('--firestore_document',default='')

def run(argv=None):

    parser = argparse.ArgumentParser()

    pipeline_options = PipelineOptions()

    user_options = pipeline_options.view_as(UserOptions)

    pipeline_options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=pipeline_options) as p:

        rows = (p 
        | 'Create inputs' >> beam.Create(['']) 
        | 'Call Firestore' >> beam.ParDo(
                CallFirestore(user_options.firestore_document)) 
        | 'Read DB2' >> beam.Map(ReadDB2))
Run Code Online (Sandbox Code Playgroud)

我希望 user_options.firestore_document 无需执行 ParDo 即可在其他功能中使用

python parameters google-cloud-platform google-cloud-dataflow apache-beam

1
推荐指数
1
解决办法
1769
查看次数

apache 光束数据流中的外部 api 调用

我有一个用例,我读入存储在谷歌云存储中的换行 json 元素并开始处理每个 json。在处理每个 json 时,我必须调用外部 API 来执行重复数据删除,无论该 json 元素之前是否被发现。我在做一个ParDoDoFn每个JSON。

我还没有看到任何在线教程说明如何从 apache beam DoFnDataflow调用外部 API 端点。

我正在使用JAVABeam 的 SDK。我学习的一些教程解释了使用startBundleFinishBundle但我不清楚如何使用它

java google-cloud-dataflow apache-beam apache-beam-io

1
推荐指数
1
解决办法
3908
查看次数

如何以角度没有父子关系的方式向其他控制器广播和发送事件

我试图从这篇文章中获得提示 - 使用$ scope.$ emit和$ scope.$ on 但是当控制器彼此无关时,似乎没有任何工作.

那是 -

<div ng-controller="CtrlA">
</div>

<div ng-controller="CtrlB">
</div>
Run Code Online (Sandbox Code Playgroud)

在CtrlB中我会做这样的事情:

$rootScope.$broadcast('userHasLoggedIn', {})
Run Code Online (Sandbox Code Playgroud)

在CtrlA中,我会这样听:

$rootScope.$on('userHasLoggedIn, function(event, data){});
Run Code Online (Sandbox Code Playgroud)

并且没有 - 除非我在CtrlA div中嵌入CtrlB div,否则CtrlA永远不会收到广播事件

任何的想法?

angularjs angular-services angular-broadcast

0
推荐指数
1
解决办法
667
查看次数

在哪里处理线程中断

我有一个Java 8应用程序,我在一个线程上运行一个任务:

Thread t = new Thread(() -> { runTask();
                              finalizeTask(); });
t.start()
saveThread(t);
Run Code Online (Sandbox Code Playgroud)

在某些时候,从不同的线程,这个任务可能会被中断:

Thread t = getThread(); // Obtains thread t
t.interrupt();
Run Code Online (Sandbox Code Playgroud)

有了这个,我希望runTask();线路被中断,但我还是喜欢finalizeTask();运行.

如何捕获线程中断并处理它?

java multithreading java-8

0
推荐指数
1
解决办法
53
查看次数