小编Leo*_*oli的帖子

Apache Spark,将一个"CASE WHEN ... ELSE ..."计算列添加到现有的DataFrame中

我正在尝试使用Scala API向现有DataFrame添加"CASE WHEN ... ELSE ..."计算列.启动数据帧:

color
Red
Green
Blue
Run Code Online (Sandbox Code Playgroud)

期望的数据帧(SQL语法:CASE WHEN color == Green THEN 1 ELSE 0 END AS bool):

color bool
Red   0
Green 1
Blue  0
Run Code Online (Sandbox Code Playgroud)

我该如何实现这个逻辑?

scala dataframe apache-spark apache-spark-sql

22
推荐指数
2
解决办法
5万
查看次数

通过Google Cloud Dataflow将PubSub消息插入BigQuery

我想使用Google Cloud Dataflow将来自主题的PubSub消息数据插入到BigQuery表中.一切都很好,但在BigQuery表中我可以看到像"߈ "这样难以理解的字符串.这是我的管道:

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
     .withSchema(schema)
     .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))
Run Code Online (Sandbox Code Playgroud)

我的简单StringToRowConverter函数是:

class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;

@Override
public void processElement(ProcessContext c) {
    for (String word : c.element().split(",")) {
      if (!word.isEmpty()) {
          System.out.println(word);
        c.output(new TableRow().set("data", word));
      }
    }
}
}
Run Code Online (Sandbox Code Playgroud)

这是我通过POST请求发送的消息:

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
 "messages": [
  {
   "attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
   },
   "data": "34gf5ert"
  }
 ]
}
Run Code Online (Sandbox Code Playgroud)

我错过了什么?谢谢!

google-bigquery google-cloud-pubsub google-cloud-dataflow

6
推荐指数
1
解决办法
2477
查看次数