我有多个要并行执行的作业,它们使用分区将每日数据附加到同一路径中.
例如
dataFrame.write().
partitionBy("eventDate", "category")
.mode(Append)
.parquet("s3://bucket/save/path");
Run Code Online (Sandbox Code Playgroud)
工作1 - category ="billing_events"工作2 - category ="click_events"
这两个作业都将在执行之前截断s3存储桶中存在的任何现有分区,然后将生成的镶木地板文件保存到各自的分区.
即
作业1 - > s3:// bucket/save/path/eventDate = 20160101/channel = billing_events
job 2 - > s3:// bucket/save/path/eventDate = 20160101/channel = click_events
我面临的问题是在作业执行期间由spark创建的临时文件.它将处理文件保存到基本路径
S3://桶/保存/路/ _temporary/...
因此两个作业最终共享相同的临时文件夹并导致冲突,我注意到这可能导致一个作业删除临时文件,而另一个作业失败,来自s3的404表示预期的临时文件不存在.
有没有人遇到过这个问题,并提出了在同一个基本路径中并行执行作业的策略?
我现在使用spark 1.6.0
我正在玩Symfony2和Im abit不确定Symfony2如何在View组件中处理Polymorphic集合.看来我可以创建一个包含AbstractChildren集合的实体,但不知道如何在Form Type类中使用它.
例如,我有以下实体关系.
/**
* @ORM\Entity
*/
class Order
{
/**
* @ORM\OneToMany(targetEntity="AbstractOrderItem", mappedBy="order", cascade={"all"}, orphanRemoval=true)
*
* @var AbstractOrderItem $items;
*/
$orderItems;
...
}
/**
* Base class for order items to be added to an Order
*
* @ORM\Entity
* @ORM\InheritanceType("JOINED")
* @ORM\DiscriminatorColumn(name="discr", type="string")
* @ORM\DiscriminatorMap({
* "ProductOrderItem" = "ProductOrderItem",
* "SubscriptionOrderItem " = "SubscriptionOrderItem "
* })
*/
class AbstractOrderItem
{
$id;
...
}
/**
* @ORM\Entity
*/
class ProductOrderItem extends AbstractOrderItem
{
$productName;
}
/** …Run Code Online (Sandbox Code Playgroud) 我试图了解CDI和EJB以及实体边界控制(ECB)模式.我对ECB模式的理解是边界是事务边界的起点和终点.除此之外,CDI不像EJB那样提供事务支持.
因此,如果我想成功实现ECB模式,那么以下是正确的;
谢谢
我们正在使用apache zeppelin来分析我们的数据集.我们有一些疑问,我们想运行有大量来自他们回来,并想运行飞艇查询,但结果保存(显示被限制在1000)的结果.是否有一种简单的方法可以让zeppelin将查询的所有结果保存到s3存储桶中?
我正在设计一项新服务,使"客户"能够为他们执行的特定搜索注册并支付每次使用费.将使用RESTFul和SOAP接口公开此服务.通常,Web服务将与客户的网站集成,然后暴露给"公共",任何人都可以使用客户的网站,并利用我的Web服务功能(客户将支付但完全控制调节)请求所以他们不会收取太多费用).
我想设计优化集成的服务,使其尽可能简单.Web服务API将发生变化,因此创建内部代理以在某些情况下向公众公开Web服务对客户来说太过贬低.因此,我认为这个问题是创建一个平衡身份验证,安全性和集成的Web服务.
理想
这似乎需要一些3方式身份验证过程才能工作,即验证特定客户端(在公共场合),Web服务(客户)和Web服务.
有没有人实现类似的东西,他们是如何解决这种情况的?
我也理解在可以做什么和违反跨域安全性之间存在平衡,因此整个Web服务可能会被另一个仅返回JSONP数据的GET接口暴露.
/**附录**/
我已经发现了一个可以完成我正在照顾的Web服务.但是,我不完全了解实施细节.所以也许有人也可以详细说明我的想法.
我发现的Web服务似乎在服务端托管了Javascript.然后,客户将其网站与服务端集成,方法是将Javascript包含在脚本标记中,但提供密钥即可
不知何故,如果我将脚本添加到我的网站,它不起作用.所以,在某个地方,令牌必须注册到特定的客户域,而'client-lib.js'实际上是一个servlet或类似的东西,它可以某种方式检测到来自'public'的用户实际上来自'客户'域名.
我的想法是对的吗?是否有某种http标头可以这种方式使用?这样安全吗?
干杯
过去几天,我一直在评估 Airflow 作为我们 ETL 工作流程的可能替代工具,并发现在 Airflow 中重命名 DAG 时出现了一些有趣的行为。
如果我在名为hello_world.py的文件中有一个 dag
dag = DAG('hello_world', description='Simple DAG',
schedule_interval='0 12 * * *',
start_date=datetime(2017, 11, 1), catchup=True)
Run Code Online (Sandbox Code Playgroud)
这个 dag 已经在 11 月执行了 10 天,然后我决定我只想将 dag 的名称更改为“yet_another_hello_world”,例如在同一个文件hello_world.py 中
dag = DAG('yet_another_hello_world', description='Simple DAG',
schedule_interval='0 12 * * *',
start_date=datetime(2017, 11, 1), catchup=True)
Run Code Online (Sandbox Code Playgroud)
我只是对作业进行重命名,而不是更改业务逻辑等。当它部署到 Airflow 中时,它会被自动选取并注册为新作业,因此现在在 DAG 视图中可以看到 2 个作业
由于 DAG 定义中的catchup=True,调度程序会自动看到此更改并注册一个新作业Yet_another_hello_world,然后它会继续回填从 11 月 1 日起丢失的执行。它还继续保持现有的hello_world作业完好无损。
最终,我希望这是对现有作业的重命名,而不是保留旧的 hello_world 作业。有没有办法向气流表明这是一个简单的重命名?
我正在尝试测试聚合,并想断言固定装置之外的事件,甚至可能使用 Hamcrest 进行评估?
使用时间戳的示例
fixture.given()
.when(new UserCreateCommand("1","test@bob.com"))
.expectEvents(new UserCreatedEvent("1","test@bob.com");
Run Code Online (Sandbox Code Playgroud)
该夹具允许我轻松地测试相等性,例如该命令准确地生成此事件,如果我想说引入事件创建时间的时间戳,那么它就不那么容易了
fixture.given()
.when(new UserCreateCommand("1","test@bob.com"))
.expectEvents(new UserCreatedEvent("1","test@bob.com", LocalDateTime.now());
Run Code Online (Sandbox Code Playgroud)
这种期望永远不会起作用,因为 LocalDateTime.now() 永远不会精确等于聚合中生成的时间戳。
我可以简单地将时间戳包含在命令有效负载中,但更喜欢在聚合内部进行处理,以确保以一致的方式生成此时间戳。
有没有办法从夹具中检索事件以独立于夹具进行断言,例如
UserCreatedEvent uce = fixture.given()
.when(new UserCreateCommand("1","test@bob.com"))
.extractEvent(UserCreatedEvent.class)
Run Code Online (Sandbox Code Playgroud)
这将允许我使用其他断言库,例如 hamcrest:
例如
assertThat(uce.getCreatedAt(), is(greaterThanOrEqualto(LocalDateTime.now().minusSeconds(1);
Run Code Online (Sandbox Code Playgroud) 我遇到了关于关闭hibernate会话的问题,问题的细节是:UI(使用JSF2.1实现)在多次执行某些操作后因空请求而挂起.问题的原因是hibernate会话在执行任何数据库操作后都没有关闭
所以我的问题为什么这个悬挂发生了?为什么一个空的请求?我认为当这样的事情发生时,异常会像hibernate API抛出IllegalStateException告诉你"你打开了很多会话"对吗?
我已经设置了一个收集事件数据的Spark 1.3.1应用程序.其中一个属性是名为'occurrence'的时间戳.我打算将事件数据划分为文件存储区中的镶木地板文件,并根据文档(https://spark.apache.org/docs/1.3.1/sql-programming-guide.html#partition-discovery)表示基于时间的值不仅仅支持string和int,所以我将日期分为Year,Month,Day值并按如下方式分区
events
|---occurredAtYear=2015
| |---occurredAtMonth=07
| | |---occurredAtDay=16
| | | |---<parquet-files>
...
Run Code Online (Sandbox Code Playgroud)
然后我从根路径/事件加载镶木地板文件
sqlContext.parquetFile('/var/tmp/events')
Run Code Online (Sandbox Code Playgroud)
文件说:
'Spark SQL将自动从路径中提取分区信息'
但是我的查询
SELECT * FROM events where occurredAtYear=2015
Run Code Online (Sandbox Code Playgroud)
悲惨地说火花无法解决'happenAtYear'
我可以看到事件的所有其他方面的模式,并可以对这些属性进行查询,但printSchema根本没有列出schemaAtYear/Month/Day?我错过了让分区正常工作的原因.
干杯
我正在查看 AxonIQ 框架,并设法启动并运行了一个测试应用程序。但是我有一个关于在使用在读取模型中具有持久性的存储时应如何处理 EventHandlers 的问题。
从我(可能天真)的理解来看。我的 Projection 类中的 @EventHandler 注释方法在第一次启动时从一开始就被调用。这种机制似乎假设投影利用某种在应用程序启动期间从头开始重新创建的易失性存储(例如,像 h2 这样的内存中 sql)。
但是,如果存储在 Elastic Search 之类的东西中是持久的,我希望 @EventHandler 从它的最后一个持久事件而不是从开始事件恢复。
有没有办法以这种方式控制@EventHandler 的行为?
apache-spark ×2
axon ×2
java ×2
airflow ×1
cdi ×1
doctrine ×1
ecb-pattern ×1
ejb ×1
hibernate ×1
java-ee ×1
javascript ×1
jsf ×1
parquet ×1
polymorphism ×1
rest ×1
symfony ×1
web-services ×1