小编Ale*_*rin的帖子

如何有效地使用SBT,Spark和"提供"依赖项?

我正在Scala中构建一个Apache Spark应用程序,我正在使用SBT来构建它.这是事情:

  1. 当我在IntelliJ IDEA下开发时,我希望Spark依赖项包含在类路径中(我正在使用主类启动常规应用程序)
  2. 当我打包应用程序(感谢sbt-assembly)插件时,我希望Spark依赖项包含在我的胖JAR中
  3. 当我运行单元测试时sbt test,我希望Spark依赖项包含在类路径中(与#1相同,但来自SBT)

为了匹配约束#2,我将Spark依赖关系声明为provided:

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
  ...
)
Run Code Online (Sandbox Code Playgroud)

然后,sbt-assembly的文档建议添加以下行以包含单元测试的依赖项(约束#3):

run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
Run Code Online (Sandbox Code Playgroud)

这使得约束#1没有被完全填充,即我无法在IntelliJ IDEA中运行应用程序,因为没有拾取Spark依赖项.

使用Maven,我使用特定的配置文件来构建超级JAR.这样,我将Spark依赖关系声明为主要配置文件(IDE和单元测试)的常规依赖关系,同时将它们声明provided为胖JAR打包.请参阅https://github.com/aseigneurin/kafka-sandbox/blob/master/pom.xml

使用SBT实现这一目标的最佳方法是什么?

intellij-idea sbt sbt-assembly apache-spark

25
推荐指数
3
解决办法
1万
查看次数

YAML中的Ansible整型变量

我正在使用Ansible来部署webapp.我想通过检查给定页面返回具有给定键/值的JSON来等待应用程序运行.

我希望在失败之前尝试几次任务.因此我使用了until/ retries/ delaykeybwords 的组合.

问题是,我想要retries从变量中获取数量.如果我写:

  retries: {{apache_test_retries}}
Run Code Online (Sandbox Code Playgroud)

我陷入了通常的Yaml Gotcha(http://docs.ansible.com/YAMLSyntax.html#gotchas).

相反,如果我写:

  retries: "{{apache_test_retries}}"
Run Code Online (Sandbox Code Playgroud)

我被说是价值不是整数.

ValueError:int()的基数为10的无效文字:'{{apache_test_retries}}'

这是我的完整代码:

- name: Wait for the application to be running
  local_action:
    uri
    url=http://{{webapp_url}}/health
    timeout=60
  register: res
  sudo: false
  when: updated.changed and apache_test_url is defined
  until: res.status == 200 and res['json'] is defined and res['json']['status'] == 'UP'
  retries: "{{apache_test_retries}}"
  delay: 1
Run Code Online (Sandbox Code Playgroud)

关于如何解决这个问题的任何想法?谢谢.

yaml jinja2 ansible

16
推荐指数
2
解决办法
4万
查看次数

以低完整性级别打开命名管道

我正在开发一个由两个模块组成的应用程序.这些模块在以下环境中通过命名管道进行通信:

  • Windows 7家庭高级版x64
  • Visual Studio 2008
  • C#/ .Net 3.5

服务器以管理员权限运行(高完整性级别).客户端以低完整性级别运行.为了使客户端可以连接到服务器,我需要以低完整性级别创建管道.我设法只在服务器以中等完整性级别运行时才这样做.

我测试了以下设置:

  1. 服务器:高,客户端:低=>访问被拒绝
  2. server:high,client:medium => access refused
  3. server:high,client:high => OK
  4. server:medium,client:low => OK
  5. server:medium,client:medium => OK
  6. server:low,client:low => OK

设置#4显示创建的命名管道的完整性级别与进程的完整性级别不同,这很好.但是,我感兴趣的设置是第一个.

我有一个样本,可以很容易地测试.如果连接成功,客户端将写入"已连接",服务器将写入"已接收连接".如果连接失败,客户端将写入"失败",服务器将保持"等待"状态.

以下是我执行客户端程序的方法(对于服务器,只需将NamePipeClient替换为NamedPipeServer):

  • 中等诚信水平:
    • 打开命令提示符
    • icacls NamedPipeClient.exe/setintegritylevel Medium

    • NamedPipeClient.exe

  • 低完整性水平:
    • 打开命令提示符
    • icacls NamedPipeClient.exe/setintegritylevel低

    • NamedPipeClient.exe

  • 高完整性水平:
    • 在管理员模式下打开命令提示符
    • icacls NamedPipeClient.exe/setintegritylevel高

    • NamedPipeClient.exe

任何帮助将不胜感激!

服务器代码

Program.cs中

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.Win32.SafeHandles;
using System.IO.Pipes;

namespace NamedPipeServer
{
    class Program
    {
        static void Main(string[] args)
        {
            SafePipeHandle handle = LowIntegrityPipeFactory.CreateLowIntegrityNamedPipe("NamedPipe/Test");
            NamedPipeServerStream pipeServer = …
Run Code Online (Sandbox Code Playgroud)

c#

14
推荐指数
2
解决办法
7368
查看次数

将 Spark 检查点写入 S3 太慢

我正在使用 Spark Streaming 1.5.2,并使用 Direct Stream 方法从 Kafka 0.8.2.2 中提取数据。

我已启用检查点,以便我的驱动程序可以重新启动并从中断处继续,而不会丢失未处理的数据。

检查点被写入 S3,因为我在 Amazon AWS 上而不是在 Hadoop 集群之上运行。

批处理间隔为 1 秒,因为我想要低延迟。

问题是,将单个检查点写入 S3 需要 1 到 20 秒。它们在内存中备份,最终应用程序失败。

2016-04-28 18:26:55,483 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6071 bytes and 1724 ms
2016-04-28 18:26:58,812 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6024 bytes and 3329 ms
2016-04-28 18:27:00,327 INFO  [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882408000 …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 apache-spark

5
推荐指数
1
解决办法
2753
查看次数

使用Scala转换PySpark RDD

TL; DR - 我在PySpark应用程序中看起来像字符串的DStream.我想将它作为DStream[String]一个Scala库发送.但是,字符串不会被Py4j转换.

我正在开发一个PySpark应用程序,它使用Spark Streaming从Kafka中提取数据.我的消息是字符串,我想在Scala代码中调用一个方法,并传递一个DStream[String]实例.但是,我无法在Scala代码中接收正确的JVM字符串.在我看来,Python字符串不会转换为Java字符串,而是序列化.

我的问题是:如何从DStream对象中获取Java字符串?


这是我提出的最简单的Python代码:

from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))

from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)

ssc.start()
Run Code Online (Sandbox Code Playgroud)

我在PySpark中运行此代码,将其路径传递给我的JAR:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)

在Scala方面,我有:

package com.seigneurin

import org.apache.spark.streaming.api.java.JavaDStream

object MyPythonHelper {
  def doSomething(jdstream: JavaDStream[String]) = {
    val dstream = jdstream.dstream
    dstream.foreachRDD(rdd => {
      rdd.foreach(println)
    })
  }
}
Run Code Online (Sandbox Code Playgroud)

现在,假设我将一些数据发送到Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
Run Code Online (Sandbox Code Playgroud)

printlnScala代码中的语句打印出如下内容:

[B@758aa4d9 …
Run Code Online (Sandbox Code Playgroud)

apache-spark rdd pyspark

5
推荐指数
1
解决办法
1529
查看次数