小编j9d*_*9dy的帖子

通过Web应用程序启动Spark应用程序的最佳实践?

我想通过Web应用程序向用户公开我的Spark应用程序.

基本上,用户可以决定他想要运行哪个动作并输入一些需要传递给spark应用程序的变量.例如:用户输入几个字段,然后点击一个按钮,用于执行以下操作的"运行sparkApp1与放慢参数MIN_X,MAX_X,MIN_Y,MAX_Y".

应该使用用户给出的参数启动spark应用程序.完成后,可能需要Web应用程序来检索结果(来自hdfs或mongodb)并将其显示给用户.处理时,Web应用程序应显示Spark应用程序的状态.

我的问题:

  • Web应用程序如何启动Spark应用程序?它可能能够从引擎盖下的命令行启动它,但可能有更好的方法来执行此操作.
  • Web应用程序如何访问Spark应用程序的当前状态?从Spark WebUI的REST API获取状态的方式是什么?

我正在运行带有YARN/Mesos(尚不确定)和MongoDB的Spark 1.6.1集群.

apache-spark

27
推荐指数
2
解决办法
5296
查看次数

Helm在本地导出YAML文件(只使用模板引擎,不要发送到Kubernetes)

我想将已经模板化的 Helm Charts 导出为YAML文件.我目前无法在我的Kubernetes集群上使用Tiller,但仍想使用Helm Charts.基本上,我希望Helm将使用Helm模板化的值导出发送到Kubernetes API的YAML.之后,我将YAML文件上传到我的Kubernetes集群.

我试图跑,.\helm.exe install --debug --dry-run incubator\kafka但我得到错误Error: Unauthorized.

请注意,我在Windows上运行Helm(版本为helm-v2.9.1-windows-amd64).

kubernetes kubernetes-helm

10
推荐指数
3
解决办法
1万
查看次数

MongoDB为聚合查询获取executionStats

我正在寻找一种方法来检索executionStats聚合.

使用find()时,我可以使用explain轻松检索它们.示例输出:

  "executionStats": {
    "nReturned": 332505,
    "executionTimeMillis": 1349,
    "totalKeysExamined": 332505,
    "totalDocsExamined": 332505,
    ...
Run Code Online (Sandbox Code Playgroud)

但是当使用启用了解释的聚合时,它将不会返回上面显示的统计数据.

有关,但没有给出可行的解决方案.因为这可能在此期间发生了变化,我打开了这个问题.

如果不测量客户端的统计数据,有什么方法可以做到这一点吗?

mongodb mongodb-java mongodb-query

9
推荐指数
3
解决办法
3715
查看次数

为什么有人会在Tez上运行Spark/Flink?

在Saha等人的Tez论文中,显示了Hadoop 2与Tez的以下模块化架构:

Hadoop 2与Tez

为什么有人会在Tez上运行Spark/Flink?

有什么好处?更好地利用YARN?

hadoop apache-spark apache-tez apache-flink tez

8
推荐指数
1
解决办法
422
查看次数

最终的 Kotlin 类不能被模拟,因为方法“应该返回验证器”

我正在尝试为我的Javalin.io Web 应用程序编写单元测试。有一些对 Mockito 的引用用于模拟Context 对象,这是 Javalins 为用户提供对传入 Web 请求的访问权限的方式。我试图模拟该类的.header(String)方法,Context因为被测单元正在读取“授权”标头并对其执行 JWT 检查。

我的 pom 包含最新版本的 Mockito,它应该能够模拟最终类

<dependency>
  <groupId>org.mockito</groupId>
  <artifactId>mockito-core</artifactId>
  <version>3.2.0</version>
  <scope>test</scope>
</dependency>
Run Code Online (Sandbox Code Playgroud)

我通过创建resources/mockito-extensions/org.mockito.plugins.MockMaker包含内容的文件,启用了 Mockito 文档中所述的内联模拟生成器mock-maker-inline

现在我有一个愚蠢的测试,它Context模拟一个对象,并且每当调用上下文对象的 header() 方法时都应该返回“hello123”。以下代码是真实单元测试的一部分,但足以在运行测试时引发异常:

  @Test
  void stupidTest1() {
    Context context = mock(Context.class);

    String test1 = "hello123";
    when(context.header("Authorization")).thenReturn(test1);
  }
Run Code Online (Sandbox Code Playgroud)

也试过这个:

  @Test
  void stupidTest1() {
    Context context = mock(Context.class);

    String test1 = "hello123";
    given(context.header("Authorization")).willReturn(test1);
  }
Run Code Online (Sandbox Code Playgroud)

执行此测试mvn …

java unit-testing mockito

8
推荐指数
1
解决办法
429
查看次数

MongoDB Spark Connector-聚合速度很慢

我在Spark应用程序和Mongos控制台上运行相同的聚合管道。在控制台上,眨眼间即可获取数据,只需第二次使用“ it”即可检索所有预期数据。但是,根据Spark WebUI,Spark应用程序将花费近两分钟的时间。

在此处输入图片说明

如您所见,正在启动242个任务来获取结果。我不确定为什么在MongoDB聚合仅返回40个文档的同时启动如此大量的任务。看起来开销很高。

我在Mongos控制台上运行的查询:

db.data.aggregate([
   {
      $match:{
         signals:{
            $elemMatch:{
               signal:"SomeSignal",
               value:{
                  $gt:0,
                  $lte:100
               }
            }
         }
      }
   },
   {
      $group:{
         _id:"$root_document",
         firstTimestamp:{
            $min:"$ts"
         },
         lastTimestamp:{
            $max:"$ts"
         },
         count:{
            $sum:1
         }
      }
   }
])
Run Code Online (Sandbox Code Playgroud)

Spark应用程序代码

    JavaMongoRDD<Document> rdd = MongoSpark.load(sc);

    JavaMongoRDD<Document> aggregatedRdd = rdd.withPipeline(Arrays.asList(
            Document.parse(
                    "{ $match: { signals: { $elemMatch: { signal: \"SomeSignal\", value: { $gt: 0, $lte: 100 } } } } }"),
            Document.parse(
                    "{ $group : { _id : \"$root_document\", firstTimestamp: { $min: \"$ts\"}, lastTimestamp: { $max: \"$ts\"} …
Run Code Online (Sandbox Code Playgroud)

mongodb mongodb-java mongodb-query apache-spark

5
推荐指数
1
解决办法
1702
查看次数

MongoDB罗盘-代理设置

我想在地图上显示我的地理位置,但是Compass需要访问互联网才能加载地图。由于此外部连接,我需要更改程序的代理设置。

我在文档中找不到关于此的任何信息。我也尝试使用Beta而不是Stable版本,但是两者似乎都没有在GUI中提供更改的代理设置。

有什么方法可以将代理设置添加到MongoDB Compass?

mongodb mongodb-query mongodb-compass

5
推荐指数
1
解决办法
1566
查看次数

在 Kubernetes 上运行 Apache Hive(没有 YARN)

是否可以在 Kubernetes 上运行 Apache Hive(没有在 Kubernetes 上运行 YARN)?

我在网络上找不到任何合理的信息——在 Kubernetes 上运行 Hive 是一件非常罕见的事情吗?

hive hadoop-yarn kubernetes

5
推荐指数
1
解决办法
4455
查看次数

将日志文件从边缘节点摄取到 Hadoop

我正在寻找一种将整个日志文件从边缘节点流式传输到 Hadoop 的方法。总结一下用例:

  • 我们的应用程序可以生成每个文件从几 MB 到数百 MB 的日志文件。
  • 我们不想流式传输所有发生的日志事件。
  • 在日志文件完全写入后将其全部推送是我们正在寻找的(完全写入 = 例如移动到另一个文件夹中......这对我们来说不是问题)。
  • 这应该由边缘节点上的某种轻量级代理直接处理到 HDFS,或者 - 如有必要 - 一个中间“接收器”,之后会将数据推送到 HDFS。
  • 集中式管道管理(= 以集中方式配置所有边缘节点)会很棒

我想出了以下评价:

  • Elastic 的 Logstash 和 FileBeats
    • 边缘节点的集中管道管理可用,例如所有边缘节点的集中配置(需要许可证)
    • 配置很简单,Logstash 存在 WebHDFS 输出接收器(使用 FileBeats 需要一个带有 FileBeats + Logstash 的中间解决方案,输出到 WebHDFS)
    • 这两种工具都被证明在生产级环境中是稳定的
    • 这两种工具都用于跟踪日志并在这些单个事件发生时对其进行流式处理,而不是摄取完整的文件
  • 带有 MiNiFi 的 Apache NiFi
    • 收集日志并将整个文件发送到具有大量边缘节点的另一个位置的用例,这些节点都运行相同的“作业”,这看起来很适合 NiFi 和 MiNiFi
    • 在边缘节点上运行的 MiNiFi 是轻量级的(而 Logstash 不是那么轻量级)
    • 日志可以从 MiNiFi 代理流式传输到 NiFi 集群,然后摄取到 HDFS
    • NiFi UI 中的集中式管道管理
    • 写入 HDFS sink 是开箱即用的
    • 社区看起来很活跃,开发由 Hortonworks (?)
    • 我们过去在 NiFi 方面有很好的经验
  • 阿帕奇水槽
    • 写入 HDFS sink 是开箱即用的
    • 看起来 …

hadoop bigdata flume logstash apache-nifi

5
推荐指数
1
解决办法
731
查看次数

与MongoDB相比,使用Java Driver的Cassandra Bulk-Write性能非常糟糕

我为MongoDB和Cassandra构建了一个导入器.基本上导入器的所有操作都是相同的,除了最后一部分形成数据以匹配所需的cassandra表模式和想要的mongodb文档结构.与MongoDB相比,Cassandra的写入性能非常差,我认为我做错了.

基本上,我的抽象导入器类加载数据,读出所有数据并将其传递给扩展的MongoDBImporter或CassandraImporter类以将数据发送到数据库.一次针对一个数据库 - 同时没有"双重"插入C*和MongoDB.导入器在相同数量的节点上运行在同一台机器上(6).


问题:

MongoDB导入在57分钟后完成.我摄取了10.000.000个文档,我希望Cassandra的行数相同.我的Cassandra导入器现在运行2.5小时,并且只插入了5.000.000行.我将等待进口商完成并在此处编辑实际完成时间.


我如何用Cassandra导入:

我准备两个语句一旦摄取数据之前.这两个语句都是UPDATE查询,因为有时我必须将数据附加到现有列表.在开始导入之前,我的表格已完全清除.准备好的陈述一次又一次地被使用.

PreparedStatement statementA = session.prepare(queryA);
PreparedStatement statementB = session.prepare(queryB);
Run Code Online (Sandbox Code Playgroud)

对于每一行,我创建一个BoundStatement并将该语句传递给我的"自定义"批处理方法:

    BoundStatement bs = new BoundStatement(preparedStatement); //either statementA or B
    bs = bs.bind();

    //add data... with several bs.setXXX(..) calls

    cassandraConnection.executeBatch(bs);
Run Code Online (Sandbox Code Playgroud)

使用MongoDB,我可以一次插入1000个文档(这是最大的)没有问题.对于Cassandra来说,进口商com.datastax.driver.core.exceptions.InvalidQueryException: Batch too large在某些时候仅仅因为我的10个陈述而崩溃了.我正在使用此代码来构建批次.顺便说一句,我以1000,500,300,200,100,50,20批量开始,但显然它们也不起作用.然后我将其设置为10并再次抛出异常.现在我已经没有想法为什么它会破裂.

private static final int MAX_BATCH_SIZE = 10;

private Session session;
private BatchStatement currentBatch;

...

@Override
public ResultSet executeBatch(Statement statement) {
    if (session == null) {
        throw new IllegalStateException(CONNECTION_STATE_EXCEPTION);
    }

    if (currentBatch == …
Run Code Online (Sandbox Code Playgroud)

java cassandra datastax-java-driver

2
推荐指数
2
解决办法
1736
查看次数