我正在Scala中构建一个Apache Spark应用程序,我正在使用SBT来构建它.这是事情:
sbt test,我希望Spark依赖项包含在类路径中(与#1相同,但来自SBT)为了匹配约束#2,我将Spark依赖关系声明为provided:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-streaming" % sparkVersion % "provided",
...
)
Run Code Online (Sandbox Code Playgroud)
然后,sbt-assembly的文档建议添加以下行以包含单元测试的依赖项(约束#3):
run in Compile <<= Defaults.runTask(fullClasspath in Compile, mainClass in (Compile, run), runner in (Compile, run))
Run Code Online (Sandbox Code Playgroud)
这使得约束#1没有被完全填充,即我无法在IntelliJ IDEA中运行应用程序,因为没有拾取Spark依赖项.
使用Maven,我使用特定的配置文件来构建超级JAR.这样,我将Spark依赖关系声明为主要配置文件(IDE和单元测试)的常规依赖关系,同时将它们声明provided为胖JAR打包.请参阅https://github.com/aseigneurin/kafka-sandbox/blob/master/pom.xml
使用SBT实现这一目标的最佳方法是什么?
我正在使用Ansible来部署webapp.我想通过检查给定页面返回具有给定键/值的JSON来等待应用程序运行.
我希望在失败之前尝试几次任务.因此我使用了until/ retries/ delaykeybwords 的组合.
问题是,我想要retries从变量中获取数量.如果我写:
retries: {{apache_test_retries}}
Run Code Online (Sandbox Code Playgroud)
我陷入了通常的Yaml Gotcha(http://docs.ansible.com/YAMLSyntax.html#gotchas).
相反,如果我写:
retries: "{{apache_test_retries}}"
Run Code Online (Sandbox Code Playgroud)
我被说是价值不是整数.
ValueError:int()的基数为10的无效文字:'{{apache_test_retries}}'
这是我的完整代码:
- name: Wait for the application to be running
local_action:
uri
url=http://{{webapp_url}}/health
timeout=60
register: res
sudo: false
when: updated.changed and apache_test_url is defined
until: res.status == 200 and res['json'] is defined and res['json']['status'] == 'UP'
retries: "{{apache_test_retries}}"
delay: 1
Run Code Online (Sandbox Code Playgroud)
关于如何解决这个问题的任何想法?谢谢.
我正在开发一个由两个模块组成的应用程序.这些模块在以下环境中通过命名管道进行通信:
服务器以管理员权限运行(高完整性级别).客户端以低完整性级别运行.为了使客户端可以连接到服务器,我需要以低完整性级别创建管道.我设法只在服务器以中等完整性级别运行时才这样做.
我测试了以下设置:
设置#4显示创建的命名管道的完整性级别与进程的完整性级别不同,这很好.但是,我感兴趣的设置是第一个.
我有一个样本,可以很容易地测试.如果连接成功,客户端将写入"已连接",服务器将写入"已接收连接".如果连接失败,客户端将写入"失败",服务器将保持"等待"状态.
以下是我执行客户端程序的方法(对于服务器,只需将NamePipeClient替换为NamedPipeServer):
icacls NamedPipeClient.exe/setintegritylevel Medium
NamedPipeClient.exe
icacls NamedPipeClient.exe/setintegritylevel低
NamedPipeClient.exe
icacls NamedPipeClient.exe/setintegritylevel高
NamedPipeClient.exe
任何帮助将不胜感激!
Program.cs中
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using Microsoft.Win32.SafeHandles;
using System.IO.Pipes;
namespace NamedPipeServer
{
class Program
{
static void Main(string[] args)
{
SafePipeHandle handle = LowIntegrityPipeFactory.CreateLowIntegrityNamedPipe("NamedPipe/Test");
NamedPipeServerStream pipeServer = …Run Code Online (Sandbox Code Playgroud) 我正在使用 Spark Streaming 1.5.2,并使用 Direct Stream 方法从 Kafka 0.8.2.2 中提取数据。
我已启用检查点,以便我的驱动程序可以重新启动并从中断处继续,而不会丢失未处理的数据。
检查点被写入 S3,因为我在 Amazon AWS 上而不是在 Hadoop 集群之上运行。
批处理间隔为 1 秒,因为我想要低延迟。
问题是,将单个检查点写入 S3 需要 1 到 20 秒。它们在内存中备份,最终应用程序失败。
2016-04-28 18:26:55,483 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6071 bytes and 1724 ms
2016-04-28 18:26:58,812 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882407000 ms saved to file 's3a://.../checkpoints/cxp-filter/checkpoint-1461882407000', took 6024 bytes and 3329 ms
2016-04-28 18:27:00,327 INFO [org.apache.spark.streaming.CheckpointWriter] [pool-16-thread-1] - Checkpoint for time 1461882408000 …Run Code Online (Sandbox Code Playgroud) TL; DR - 我在PySpark应用程序中看起来像字符串的DStream.我想将它作为DStream[String]一个Scala库发送.但是,字符串不会被Py4j转换.
我正在开发一个PySpark应用程序,它使用Spark Streaming从Kafka中提取数据.我的消息是字符串,我想在Scala代码中调用一个方法,并传递一个DStream[String]实例.但是,我无法在Scala代码中接收正确的JVM字符串.在我看来,Python字符串不会转换为Java字符串,而是序列化.
我的问题是:如何从DStream对象中获取Java字符串?
这是我提出的最简单的Python代码:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])
ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
ssc.start()
Run Code Online (Sandbox Code Playgroud)
我在PySpark中运行此代码,将其路径传递给我的JAR:
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)
在Scala方面,我有:
package com.seigneurin
import org.apache.spark.streaming.api.java.JavaDStream
object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}
Run Code Online (Sandbox Code Playgroud)
现在,假设我将一些数据发送到Kafka:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
Run Code Online (Sandbox Code Playgroud)
printlnScala代码中的语句打印出如下内容:
[B@758aa4d9 …Run Code Online (Sandbox Code Playgroud) apache-spark ×3
amazon-s3 ×1
ansible ×1
c# ×1
jinja2 ×1
pyspark ×1
rdd ×1
sbt ×1
sbt-assembly ×1
yaml ×1