小编shi*_*455的帖子

CompletableFuture,Future和RxJava的Observable之间的区别

我想知道的区别 CompletableFuture,FutureObservable RxJava.

我所知道的都是异步但是

Future.get() 阻止线程

CompletableFuture 给出了回调方法

RxJava Observable--- CompletableFuture与其他好处相似(不确定)

例如:如果客户端需要进行多次服务调用,那么当我们使用Futures(Java)时Future.get()将按顺序执行...想知道它在RxJava中的表现如何...

文档http://reactivex.io/intro.html

很难使用Futures来优化组合条件异步执行流程(或者不可能,因为每个请求的延迟在运行时会有所不同).当然,这可以完成,但它很快变得复杂(因此容易出错)或者过早地阻塞Future.get(),这消除了异步执行的好处.

真的很想知道如何RxJava解决这个问题.我发现从文档中很难理解.

java multithreading asynchronous java-8 rx-java

173
推荐指数
4
解决办法
5万
查看次数

WebApiConfig.cs和RouteConfig.cs之间的区别

Visual Studio 2012中MVC Web API项目的文件夹WebApiConfig.cs和文件夹之间有什么区别?RouteConfig.csApp_Start

asp.net-mvc asp.net-mvc-4 asp.net-web-api

42
推荐指数
2
解决办法
4万
查看次数

多个Kafka消费者可以从分区读取相同的消息

我们计划编写一个Kafka使用者(java),它读取Kafka队列以执行消息中的操作.

由于消费者独立运营,消息一次只能由一个消费者处理吗?否则,所有消费者处理相同的消息,因为他们在分区中具有自己的偏移量.

请帮我理解.

apache-kafka kafka-consumer-api

28
推荐指数
3
解决办法
3万
查看次数

使用cloudformation将Lambda函数启用到S3存储桶

我们正在使用CloudFormation模板创建一个S3存储桶.每当将文件添加到S3存储桶时,我想关联(将事件添加到S3存储桶)Lambda函数.

如何通过CloudFormation模板实现.CloudFormation中需要使用哪些属性.

amazon-s3 amazon-web-services aws-cloudformation

25
推荐指数
2
解决办法
2万
查看次数

使用Cloudformation在S3存储桶中创建文件夹

我可以使用cloudformation创建一个S3存储桶,但是想在S3存储桶中创建一个文件夹

<mybucket>--><myfolder>
Run Code Online (Sandbox Code Playgroud)

请让我知道用于在存储桶中创建文件夹的模板......两者都应该在同一时间创建...

我正在使用AWS lambda,如下所示

stackname = 'myStack'
client = boto3.client('cloudformation')
response = client.create_stack(
    StackName= (stackname),
    TemplateURL= 'https://s3.amazonaws.com/<myS3bucket>/<myfolder>/nestedstack.json',
    Parameters=<params>
)
Run Code Online (Sandbox Code Playgroud)

amazon-s3 aws-cloudformation

22
推荐指数
2
解决办法
2万
查看次数

从Python 2.7上的现有lambda函数调用AWS lambda函数

我试图从现有的lambda函数调用另一个lambda函数,如下所示(python 2.7)

from __future__ import print_function
import boto3
import json

lambda_client = boto3.client('lambda')

def lambda_handler(event, context):

    invoke_response = lambda_client.invoke(FunctionName="teststack",
                                           InvocationType='RequestResponse'
                                           )
    print(invoke_response)

    return str(invoke_response)
Run Code Online (Sandbox Code Playgroud)

我得到的是以下的响应而不是实际的结果.当我运行teststack lambda时,它运行正常,但得到低于响应而不是teststackLambda函数返回的"test" .

{u'Payload': <botocore.response.StreamingBody object at ****>, 'ResponseMetadata': {'HTTPStatusCode': 200, 'RequestId': '******', 'HTTPHeaders': {'x-amzn-requestid': '******', 'content-length': '155', 'x-amzn-remapped-content-length': '0', 'connection': 'keep-alive', 'date': 'Sun, 17 Jul 2016 21:02:01 GMT', 'content-type': 'application/json'}}, u'StatusCode': 200}
Run Code Online (Sandbox Code Playgroud)

python python-2.7 aws-lambda

21
推荐指数
1
解决办法
2万
查看次数

df.cache() 存储在哪里

我想了解以下代码存储在哪个节点(驱动程序或工作程序/执行程序)中

df.cache() //df is a large dataframe (200GB)
Run Code Online (Sandbox Code Playgroud)

并且具有更好的性能:使用 sqlcachetablecache(). 我的理解是,其中一个是懒惰的,另一个是渴望的。

apache-spark apache-spark-sql

10
推荐指数
1
解决办法
2万
查看次数

Spark 2.3提交Kubernetes错误

当我尝试在k8集群上运行spark-submit时,获得以下错误

错误1:这看起来像是一个警告,它不会中断在执行程序窗格内运行的应用程序,但会继续收到此警告

2018-03-09 11:15:21 WARN  WatchConnectionManager:192 - Exec Failure
java.io.EOFException
       at okio.RealBufferedSource.require(RealBufferedSource.java:60)
       at okio.RealBufferedSource.readByte(RealBufferedSource.java:73)
       at okhttp3.internal.ws.WebSocketReader.readHeader(WebSocketReader.java:113)
       at okhttp3.internal.ws.WebSocketReader.processNextFrame(WebSocketReader.java:97)
       at okhttp3.internal.ws.RealWebSocket.loopReader(RealWebSocket.java:262)
       at okhttp3.internal.ws.RealWebSocket$2.onResponse(RealWebSocket.java:201)
       at okhttp3.RealCall$AsyncCall.execute(RealCall.java:141)
       at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:748)
Run Code Online (Sandbox Code Playgroud)

错误2:这是间歇性错误,导致执行程序窗格无法运行

org.apache.spark.SparkException: External scheduler cannot be instantiated
    at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2747)
    at org.apache.spark.SparkContext.<init>(SparkContext.scala:492)
    at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2486)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:930)
    at org.apache.spark.sql.SparkSession$Builder$$anonfun$7.apply(SparkSession.scala:921)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:921)
    at com.capitalone.quantum.spark.core.QuantumSession$.initialize(QuantumSession.scala:62)
    at com.capitalone.quantum.spark.core.QuantumSession$.getSparkSession(QuantumSession.scala:80)
    at com.capitalone.quantum.workflow.WorkflowApp$.getSession(WorkflowApp.scala:116)
    at com.capitalone.quantum.workflow.WorkflowApp$.main(WorkflowApp.scala:90)
    at com.capitalone.quantum.workflow.WorkflowApp.main(WorkflowApp.scala)
Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Pod]  with name: [myapp-ef79db3d9f4831bf85bda14145fdf113-driver-driver]  in namespace: [default]  failed. …
Run Code Online (Sandbox Code Playgroud)

apache-spark kubernetes

10
推荐指数
1
解决办法
1611
查看次数

当 docker 容器 pod 出现错误或 CarshLoopBackOff kubernetes 时发出警报

我在 AWS 上设置了 kubernetes 集群,我尝试使用 cAdvisor + Prometheus + Alert manager 来监控多个 pod。如果容器 / pod 出现故障或卡在 Error 或 CarshLoopBackOff 状态或除了运行之外的任何其他状态,我想要做的是启动电子邮件警报(带有服务/容器名称)。

docker kubernetes prometheus prometheus-alertmanager

10
推荐指数
1
解决办法
1万
查看次数

优雅地停止结构化流查询

我正在使用 Spark 2.1 并尝试优雅地停止流式查询。

StreamingQuery.stop()一个优雅的停止,因为我在文档中没有看到有关此方法的任何详细信息:

void stop() 如果该查询正在运行,则停止执行该查询。此方法会阻塞,直到执行执行的线程停止。自:2.0.0

而在过去的流媒体世界 (DStreams) 中,有一个选项可以停止流的执行,并可以选择确保所有接收到的数据都已被处理:

def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit 停止流的执行,可选择确保所有接收到的数据都已处理。

stopSparkContext 如果为 true,则停止关联的 SparkContext。无论此 StreamingContext 是否已启动,底层 SparkContext 都将停止。

stopGracefully 如果为 true,则通过等待所有接收到的数据的处理完成来优雅地停止

所以问题是如何优雅地停止结构化流查询?

apache-spark spark-structured-streaming

9
推荐指数
4
解决办法
5438
查看次数