如何在Apache Beam中提取Google PubSub发布时间

Evg*_*ich 2 google-cloud-platform google-cloud-dataflow apache-beam

我的目标是能够访问Google Beam在Apache Beam(数据流)中记录和设置的PubSub消息发布时间。

    PCollection<PubsubMessage> pubsubMsg
            = pipeline.apply("Read Messages From PubSub",
            PubsubIO.readMessagesWithAttributes()
                .fromSubscription(pocOptions.getInputSubscription()));
Run Code Online (Sandbox Code Playgroud)

似乎不包含一个作为属性。我试过了

 .withTimestampAttribute("publish_time")
Run Code Online (Sandbox Code Playgroud)

也没有运气。我想念什么?是否可以在数据流中提取Google PubSub发布时间?

Gui*_*ins 5

Java版本

PubsubIO将从Pub / Sub中读取消息,并将消息发布时间分配给元素作为记录时间戳。因此,您可以使用访问它ProcessContext.timestamp()。举个例子:

p
    .apply("Read Messages", PubsubIO.readStrings().fromSubscription(subscription))
    .apply("Log Publish Time", ParDo.of(new DoFn<String, Void>() {
        @ProcessElement
        public void processElement(ProcessContext c) throws Exception {
            LOG.info("Message: " + c.element());
            LOG.info("Publish time: " + c.timestamp().toString());
            Date date= new Date();
            Long time = date.getTime();
            LOG.info("Processing time: " + new Instant(time).toString());
        }
    }));
Run Code Online (Sandbox Code Playgroud)

我提前发布了一条消息(以使事件和处理时间有显着差异),DirectRunner的输出是:

Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Message: I published this message a little bit before
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Publish time: 2019-03-27T09:57:07.005Z
Mar 27, 2019 11:03:08 AM com.dataflow.samples.LogPublishTime$1 processElement
INFO: Processing time: 2019-03-27T10:03:08.229Z
Run Code Online (Sandbox Code Playgroud)

最少的代码在这里


Python版本

现在,时间戳可以通过被访问DoFn.TimestampParam的的process方法(文档):

class GetTimestampFn(beam.DoFn):
  """Prints element timestamp"""
  def process(self, element, timestamp=beam.DoFn.TimestampParam):
    timestamp_utc = datetime.datetime.utcfromtimestamp(float(timestamp))
    logging.info(">>> Element timestamp: %s", timestamp_utc.strftime("%Y-%m-%d %H:%M:%S"))
    yield element
Run Code Online (Sandbox Code Playgroud)

注意:由于有了这个答案所以可以进行日期解析。

输出:

INFO:root:>>> Element timestamp: 2019-08-12 20:16:53
Run Code Online (Sandbox Code Playgroud)

完整代码