小编tsa*_*512的帖子

用于行类型Spark数据集的编码器

我想在DataSet中为Row类型编写一个编码器,用于我正在进行的地图操作.基本上,我不明白如何编写编码器.

以下是地图操作的示例:

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
            @Override
            public Iterator<String> call(Row row) throws Exception {

                ArrayList<String> obj = //some map operation
                return obj.iterator();
            }
        },Encoders.STRING());
Run Code Online (Sandbox Code Playgroud)

我明白,编码器需要编写如下代码:

    Encoder<Row> encoder = new Encoder<Row>() {
        @Override
        public StructType schema() {
            return join.schema();
            //return null;
        }

        @Override
        public ClassTag<Row> clsTag() {
            return null;
        }
    };
Run Code Online (Sandbox Code Playgroud)

但是,我不理解编码器中的clsTag(),我试图找到一个可以演示相似内容的运行示例(即行类型的编码器)

编辑 - 这不是所提问题的副本:尝试将数据帧行映射到更新行时编码器错误,因为答案谈到在Spark 2.x中使用Spark 1.x(我不是这样做),我也在寻找用于Row类的编码器而不是解决错误.最后,我一直在寻找Java解决方案,而不是Scala.

java apache-spark apache-spark-sql apache-spark-dataset apache-spark-encoders

24
推荐指数
2
解决办法
2万
查看次数

Maven预先下载所有依赖项

我需要将我们的maven build java项目发布到远程QA团队.为此,我想下载所有依赖项,并发送它们,以便它们不需要下载它们.

目前,所有依赖项都在pom.xml文件中定义,我们使用mvn install或mvn包来构建项目.一些项目成员使用超级罐,其他人使用jar +依赖项来执行.

预先打包相关jar文件的最简单方法是什么,这样就不会从互联网上下载,也不会过多地改变我们当前的构建过程?

java maven

16
推荐指数
1
解决办法
2万
查看次数

咖啡因缓存 - 指定条目的到期时间

我正在尝试进一步了解咖啡因缓存。我想知道是否有一种方法可以为缓存中填充的条目指定超时,但其余记录没有基于时间的到期时间。

本质上我希望有以下界面:

put(key, value, timeToExpiry)// 输入具有指定 timeToExpiry 的键和值

put(key, value)// 输入没有 timeToExpiry 的键值

我可以编写接口和管道,但我想了解如何配置咖啡因以满足上述两个要求。我也愿意拥有两个单独的咖啡因缓存实例。

java caffeine

9
推荐指数
1
解决办法
9230
查看次数

intelliJ -&gt; 有什么方法可以将您的搜索范围限定在一个函数内

在编码时,我经常需要搜索在大文件中常见的关键字,但我正在寻找函数中的一个实例。

默认搜索功能通常会从顶部开始为我提供所有结果。是否有任何方法/工作流程可以仅在特定方法/函数或代码块中进行搜索?

search intellij-idea

8
推荐指数
2
解决办法
1105
查看次数

HashMap作为Spark Streaming中的广播变量?

我有一些数据需要归类为火花流.分类键值在HashMap中的程序开头加载.因此,需要将每个输入数据包与这些密钥进行比较并相应地进行标记.

我意识到spark有一些变量叫做广播变量和准确器来分配对象.教程中的示例使用简单的变量等.

如何使用HashMap在所有spark worker上共享我的HashMap.或者,有更好的方法吗?

我正在用Java编写我的spark流应用程序.

java apache-spark spark-streaming

6
推荐指数
1
解决办法
4316
查看次数

netcat,毫秒间隔

我试图使用netcat每隔几毫秒读取一个文件中的一行,并将其发送到一个端口..

到目前为止,我从netcat文档中了解到它可以在发送的每一行之间插入一个时间间隔:

这是来自netcat帮助手册:

-i secs发送的线路的延迟间隔,扫描的端口

我尝试了以下操作,允许我在发送的每一行之间插入至少1秒的时间间隔.

nc -q 10 -i 1 -lk 9999 < file_input
Run Code Online (Sandbox Code Playgroud)

我想知道是否还有将此时间间隔缩短为毫秒.也许通过使用一些实用程序将文件的输入传递给netcat,允许在毫秒级的每个读取之间配置间隔?

linux shell pipe netcat

6
推荐指数
1
解决办法
5236
查看次数

如何在pygal中添加垂直的x轴线?

我想放入类似于matplotlib中提供的以下内容的axspan,以便在图形中有一条垂直线指出一个位置。这是使用pygal的折线图

plt.axvspan(fin_loc, fin_loc+1, color='red', alpha=0.5)

pygal

6
推荐指数
1
解决办法
107
查看次数

GRPC迁移:在Spring Boot中同时支持grpc和rest?

我在生产中有一个现有的微服务,它有一个安静的端点。对于新客户,以及从长期目标来看,我们愿意支持 grpc。我想了解在同一个 spring-boot 应用程序中支持 grpc 和 Restful 端点是否是一个好主意。这样做可能有什么潜在的缺点?

rest netty spring-boot grpc

6
推荐指数
1
解决办法
2378
查看次数

如何在芹菜任务中执行长时间运行的子进程?

我有以下代码,我在 celery 任务中使用子进程运行 shell 脚本。它不起作用,因为我没有收到错误或任何前进进度,或 celery 任务的任何输出:

以下是执行任务的代码:

def run_shell_command(command_line):
    command_line_args = shlex.split(command_line)

    logging.info('Subprocess: "' + command_line + '"')

    try:
        command_line_process = subprocess.Popen(
            command_line_args,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT,
        )

        for l in iter(command_line_process.stdout.readline,b''):
            print l.strip()

        command_line_process.communicate()
        command_line_process.wait()

    except (OSError, subprocess.CalledProcessError) as exception:
        logging.info('Exception occured: ' + str(exception))
        logging.info('Subprocess failed')
        return False
    else:
        # no exception was raised
        logging.info('Subprocess finished')

    return True
Run Code Online (Sandbox Code Playgroud)

它是从任务中调用的:

@app.task
def execute(jsonConfig, projectName, tagName, stage, description):

     command = 'python ' + runScript + ' -c ' + fileName …
Run Code Online (Sandbox Code Playgroud)

python shell subprocess celery

5
推荐指数
0
解决办法
3607
查看次数

在Python中使用pdb以编程方式添加断点

我发现在大型多层软件中,通过将以下内容放入代码中通常更容易调试 python 代码。

import pdb; pdb.set_trace()这在我放置语句的 LOC 处设置了一个断点,我可以在代码中以交互方式使用 pdb 继续并检查代码执行等。

我想知道是否可以为此类 python 调试添加多个断点,以便我可以c在交互式 python 调试器中执行并命中下一个断点?

即例如

<python code>
import pdb; pdb.set_trace();
... interactive debugging....
...press c here....
<more python code>
...
....
<breakpoint>// How can we insert such a breakpoint? 
Run Code Online (Sandbox Code Playgroud)

python pdb

3
推荐指数
1
解决办法
1606
查看次数

Jmeter如何每秒发送恒定数量的请求

JMeter 是否允许我每秒发送固定数量的请求,而不是仅在请求完成时才按顺序发送?即,即使请求 1 尚未响应,我可以在请求 1 之后发送请求 2 吗?

我正在使用 Apache JMeter 5.2.1,如果有人能给我有关这方面的指导,我将不胜感激。恒定吞吐量计时器似乎更关注吞吐量,但是我想简单地发送 X 请求/秒。

jmeter jmeter-plugins jmeter-5.0

3
推荐指数
1
解决办法
2985
查看次数

弹性搜索多个键值时完全匹配

我正在尝试在Elasticsearch中编写一个在多个字段上完全匹配的查询

我有以下查询,用于单个字段的完全匹配:

GET /index/data/_search
{
    "query": {
        "term": {
            "table":"abc"   
        }
    }
}
Run Code Online (Sandbox Code Playgroud)

此处的键是“表”,值是“ abc”。我想为精确匹配查询添加另一个名为“ chair”的键,其值为“ def”。

elasticsearch

2
推荐指数
1
解决办法
831
查看次数