小编Kal*_*esh的帖子

在终止风暴拓扑之前如何调用特定方法

在终止风暴拓扑之前如何调用特定方法。

我已经在风暴中创建了一个拓扑,我想在拓扑被杀死之前调用特定的方法。

在Storm框架中是否有任何预定义的替代方法或任何可用的方法来做到这一点。

提前致谢:)

java topology kill apache-storm

6
推荐指数
1
解决办法
631
查看次数

带有 kerberos 的 Kafka Java Producer

在 kerberosed 环境中向 kafka 主题发送消息时出错。我们在 hdp 2.3 上有集群

我跟着这个http://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

但是对于发送消息,我必须先明确地执行 kinit ,然后才能将消息发送到 kafka 主题。我试图通过 java 类来编织,但这也不起作用。PFB代码:

package com.ct.test.kafka;

import java.util.Date;
import java.util.Properties;
import java.util.Random;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class TestProducer {

    public static void main(String[] args) {

        String principalName = "ctadmin";
        String keyTabPath = "/etc/security/keytabs/ctadmin.keytab";
        boolean authStatus = CTSecurityUtil.loginUserFromKeytab(principalName, keyTabPath);

        if (!authStatus) {
            System.out.println("Authntication fails, try something else  "  + authStatus);
        } else {
            System.out.println("Authntication successfull " + authStatus);
        }

        System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
        System.setProperty("java.security.auth.login.config", "/etc/kafka/2.3.4.0-3485/0/kafka_jaas.conf");
        System.setProperty("javax.security.auth.useSubjectCredsOnly", "false"); …
Run Code Online (Sandbox Code Playgroud)

java kerberos jaas apache-kafka hortonworks-data-platform

5
推荐指数
2
解决办法
3万
查看次数

我可以使用 Spark 作为服务吗

用例是我想将数据帧作为对象返回,以提供休息服务。Rest 服务没有 Spark 上下文控制。那么有什么方法可以执行 ANSI 查询,就像我在 registerAsTemptable 上执行的那样。
我将通过休息服务传递表名和查询。然后我应该返回一些作为对象的东西,我可以将其显示为视图中的表格。

如果有任何替代方法,也请提出。但我想使用 Spark 作为基础框架。

architecture rest scala dataframe apache-spark

5
推荐指数
1
解决办法
2504
查看次数

为 avro 架构的十进制逻辑类型和字节类型创建 json 表示

我正在尝试按照以下 avro 模式创建 JSON 字符串,用于十进制值。 https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types

{
 "name": "score",
 "type": "bytes",
 "logicalType": "decimal",
 "precision": 10,
 "scale": 5
 }
Run Code Online (Sandbox Code Playgroud)

价值

"score":3.4,
Run Code Online (Sandbox Code Playgroud)

我得到了例外

Caused by: org.apache.avro.AvroTypeException: Expected bytes. Got VALUE_NUMBER_FLOAT.
Run Code Online (Sandbox Code Playgroud)

如果我给出“\u0000”而不是 3.4,那么它可以工作,但这是 0 的表示,我将如何获得 3.4 的表示?现在我正在创建硬编码的 JSON 字符串,但将来我必须将输出转换为十进制,我如何在 Scala 中做到这一点。

有没有办法将值转换为十进制逻辑格式?

scala event-bus avro avro-tools

5
推荐指数
1
解决办法
1649
查看次数

如何在spark数据帧,scala中将行转换为列

有什么方法可以将数据帧行转换为列。我有以下结构作为输入:

val inputDF = Seq(("pid1","enc1", "bat"),
                  ("pid1","enc2", ""),
                  ("pid1","enc3", ""),
                  ("pid3","enc1", "cat"),
                  ("pid3","enc2", "")
              ).toDF("MemberID", "EncounterID", "entry" )

inputDF.show:

+--------+-----------+-----+
|MemberID|EncounterID|entry|
+--------+-----------+-----+
|    pid1|       enc1|  bat|
|    pid1|       enc2|     |
|    pid1|       enc3|     |
|    pid3|       enc1|  cat|
|    pid3|       enc2|     |
+--------+-----------+-----+

expected result:

+--------+----------+----------+----------+-----+
|MemberID|Encounter1|Encounter2|Encounter3|entry|
+--------+----------+----------+----------+-----+
|    pid1|      enc1|      enc2|      enc3|  bat|
|    pid3|      enc1|      enc2|      null|  cat|
+--------+----------+----------+----------+-----+
Run Code Online (Sandbox Code Playgroud)

请建议是否有任何优化的直接 API 可用于将行转换为列。我的输入数据量非常大,所以像收集这样的操作,我将无法执行,因为它会占用驱动程序上的所有数据。我正在使用 Spark 2.x

transpose scala apache-spark apache-spark-sql

2
推荐指数
1
解决办法
9918
查看次数