我正在用一个json体调用aws lambda.所以json的字段与POJO中的字段名称不同.所以我做的是在字段上添加@JsonProperty告诉jackson json中的名字是什么.但由于某种原因,它似乎无法识别它们并且所有字段都为空.如果我传递一个与POJO具有相同字段名称的json,它正在工作.这是我的班级:
public class Event implements Identifiable {
@JsonProperty("distinct_id")
private String distinctId;
@JsonProperty("user_id")
private Integer userId;
@JsonDeserialize(using = LocalDateTimeDeserializer.class)
@JsonSerialize(using = LocalDateTimeSerializer.class)
private LocalDateTime eventDateTime;
//Here are the getters and setters
}
Run Code Online (Sandbox Code Playgroud)
如果我通过
{"distinct_id":"123", "user_id":123, "dt":"2017-01-04T08:45:04+00:00"}
Run Code Online (Sandbox Code Playgroud)
所有字段都为null,并且使用distinctId,userId,eventDateTime,它正在序列化,但它也无法识别我的自定义序列化器/反序列化器,但这实际上是同样的问题.
我的结论是,由于某些原因,aws jackson没有使用注释,但它没有意义.
大家.我有一个HTTP API用于在RabbitMQ代理中发布消息,我需要实现请求 - 响应模式,以便从服务器接收响应.所以我就像是客户端和服务器之间的桥梁.我使用特定的路由密钥将消息推送到代理,并且有消息用于消息,它将按摩作为响应发布回来,我的API必须消耗每个请求的响应.所以图表是这样的:

所以,我做的是以下各项为我创建一个临时responseQueue每一个HTTP会话(这势必会默认交换与路由的关键是队列的名称),在那之后我设置了消息的ReplyTo头是名响应队列(我将等待响应),并将模板replyQueue设置为该队列.这是我的代码:
public void sendMessage(AbstractEvent objectToSend, final String routingKey) {
final Queue responseQueue = rabbitAdmin.declareQueue();
byte[] messageAsBytes = null;
try {
messageAsBytes = new ObjectMapper().writeValueAsBytes(objectToSend);
} catch (JsonProcessingException e) {
e.printStackTrace();
}
MessageProperties properties = new MessageProperties();
properties.setHeader("ContentType", MessageBodyFormat.JSON);
properties.setReplyTo(responseQueue.getName());
requestTemplate.setReplyQueue(responseQueue);
Message message = new Message(messageAsBytes, properties);
Message receivedMessage = (Message)requestTemplate.convertSendAndReceive(routingKey, message);
}
Run Code Online (Sandbox Code Playgroud)
那么,什么是问题:被发送的消息,之后,它是由消费者消费及其响应正确发送到正确的队列中,但由于某种原因,没有在convertSendAndReceived方法和设置超时我receivedMessage后收回一片空白.于是,我就做几件事情 - 我开始检查春季代码(顺便说这是一个真正的噩梦做到这一点),看到是我不声明响应队列为我创建了一个时间,和ReplyTo头设置为队列的名称(与我的相同).结果是相同的 - receivedMessage仍为null.之后我决定使用另一个使用默认交换的模板,因为responseQueue绑定到该交换:
requestTemplate.send(routingKey, message);
Message receivedMessage = receivingTemplate.receive(responseQueue.getName());
Run Code Online (Sandbox Code Playgroud)
结果是相同的 - responseMessage仍为null.amqp和兔子的版本分别为1.2.1和1.2.0.所以我相信我会错过一些东西,但我不知道它是什么,所以如果有人可以帮助我,我将非常感激.
我使用了与aws lambda(java)集成的aws api网关,但是我发现这种方法存在一些严重的问题。删除服务器并开箱即用地扩展应用程序的概念确实不错,但这是我面临的问题。我的lambda正在做2件简单的事情-验证从客户端收到的有效负载,然后将其发送到kinesis流以从另一个lambda进行进一步处理(您会问为什么我不直接发送到该流,而仅对所有lambda使用假设我们要分离逻辑并具有抽象层,并且还能够告诉客户端他正在发送无效数据。)
在执行lambda时,我集成了弹簧DI。到目前为止,一切都很好。我开始进行性能测试。我模拟了50个并发用户,每个用户发出4个请求,每个请求之间间隔5秒。所以发生了什么-在lambda的冷启动中,我初始化了spring的应用程序上下文,但似乎在未启动lambda时有这么多同时请求正在做一些奇怪的事情。这是上下文初始化时间的屏幕截图。
从屏幕截图中我们可以看到,初始化上下文的时间有很大的不同。我对发生的事情的假设是,当接收到如此多的请求并且没有“活动的” lambda时,它将为每个请求初始化一个lambda容器,并同时“阻止”其中的一些请求(时间很长的请求)。 18s),直到其他已经准备就绪。因此,也许它具有可以同时启动的容器的一些内部限制。问题是,如果您没有平均分配的流量,那么这将不时发生,并且某些请求将超时。我们不希望这种情况发生。
因此,接下来的事情是在没有spring容器的情况下进行一些测试,因为我的想法是“好的,初始化很繁重,让我们简单地对旧的Java对象进行初始化”。不幸的是,发生了同样的事情(也许只是减少了某些请求的3s容器初始化)。这是测试数据的更详细的屏幕截图:
因此,我记录了整个lambda执行时间(从构造到结束),kinesis客户端初始化以及将数据实际发送到流的过程,因为这些是lambda中最繁重的操作。我们仍然有18s左右的大时光,但有趣的是,时间以某种方式成比例。因此,如果整个lambda花费18s,则大约7-8s是客户端初始化,而6-7是用于将数据发送到流,而lambda中的其他操作还剩下4-5秒,目前仅是验证。另一方面,如果我们花一小段时间(这意味着它重用了已经启动的lambda),即820毫秒,则kinesis客户端初始化需要100毫秒,数据发送需要340毫秒,验证需要400毫秒。因此,这又使我再次想到这样的想法:由于某些限制,它在内部使一些睡眠。下一个屏幕截图显示了在启动lamda之后下一轮请求发生的情况:
所以我们没有那么大的时间,是的,我们在某些请求中仍然有一些相对较大的增量(对我来说这也很奇怪),但是情况看起来要好得多。
因此,我正在寻找一个真正了解实际情况的人进行澄清,因为对于使用云的严肃应用程序来说,这不是好行为,因为它存在“无限”的可能性。
另一个问题与区域内帐户中所有lambda中的lambda-200并发调用的另一个限制有关。对我来说,这对于流量很大的大型应用程序也是一个很大的限制。因此,就目前而言(我不知道未来),我的业务案例或多或少引起了人们的注意,忘记了请求。我开始考虑更改逻辑,以使网关将数据直接发送到流,而另一个lambda负责验证和进一步处理。是的,我正在失去当前的抽象(目前不需要),但是我在提高应用程序的可用性很多次。你怎么看?
java amazon-web-services amazon-kinesis aws-lambda aws-api-gateway
我LongAccumulator在地图操作中使用a 作为共享计数器。但是似乎我没有正确使用它,因为工作节点上计数器的状态没有更新。这是我的计数器类的样子:
public class Counter implements Serializable {
private LongAccumulator counter;
public Long increment() {
log.info("Incrementing counter with id: " + counter.id() + " on thread: " + Thread.currentThread().getName());
counter.add(1);
Long value = counter.value();
log.info("Counter's value with id: " + counter.id() + " is: " + value + " on thread: " + Thread.currentThread().getName());
return value;
}
public Counter(JavaSparkContext javaSparkContext) {
counter = javaSparkContext.sc().longAccumulator();
}
}
Run Code Online (Sandbox Code Playgroud)
据我所了解的文档,当应用程序在多个工作程序节点中运行时,这应该可以正常工作:
累加器是仅通过关联和交换操作“添加”的变量,因此可以有效地并行支持。它们可用于实现计数器(如MapReduce中的计数器)或总和。Spark本身支持数字类型的累加器,程序员可以添加对新类型的支持。
但是,这是当计数器在2个不同的工作线程上递增且看起来状态未在节点之间共享时的结果:
INFO计数器:在线程上ID为866的递增计数器:Executor任务启动worker-6 INFO计数器:在线程上ID为866的计数器值为:1在线程上:Executor任务启动worker-6
INFO计数器:ID为866的递增计数器:线程:执行程序任务启动worker-0 INFO计数器:ID为866的计数器的值是:1在线程上:执行程序任务启动worker-0
我是否理解累加器概念错误,或者必须使用任何设置来启动任务?
我的 Spark 应用程序在内存使用方面遇到了大问题。因此,我在集群模式下的 aws emr 集群上运行 Spark(这意味着它使用纱线作为集群管理器)。我无法提供应用程序的任何代码,因为它很大,我不知道问题出在哪里,但基本上我所做的是从 aws s3 加载几种类型的 json 对象(数据按小时存储在不同的文件夹中,即 2017/05/31/00 是 Spark 应用程序应在 2017 年 5 月 31 日 00:00 处理的数据,我每小时运行该应用程序),将它们反序列化为 Java pojo 对象,制作一些转换并缓存它们,因为我不想在每个操作上重新加载它们。一些对象被转换为java映射,并且在所有逻辑完成之后,java映射被转换回RDD并保存回s3中。
问题一:
具有以下配置:
-conf Spark.yarn.executor.memoryOverhead=3000 --driver-内存 8G --executor-内存 8G --deploy-mode 集群
我转换为 RDD 并保存它的 Map 的大小随着执行的每一小时而增加。但在完全随机的某些执行中,保存的输出丢失了数据。例如,如果成功执行后 json 对象的计数为 1000,而下一次执行时可能会变为 900,而不是增加。如果您在同一小时再次运行它,那么绝对有可能没问题(我在我的应用程序中看不到错误,因为这种情况很少发生并且是随机的)。所以这让我认为这是一个内存问题,也许内存中的某些对象被垃圾收集或类似的东西。
问题2:
当我使用以下配置运行程序时:
-conf Spark.yarn.executor.memoryOverhead=3000 --driver-内存 8G --executor-内存1G --deploy-mode 集群
在一次执行中几个小时(在应用程序中每小时执行一次 while 循环)然后一切都很好。但是,如果我在第三个小时将执行程序内存更改为超过 1G,则会抛出一个非常奇怪的异常:
17/05/31 09:38:52 INFO S3NativeFileSystem: Encountered an exception while reading 'data/appletv/user_mapping/part-00000.gz', will retry by attempting to reopen stream.
java.io.EOFException: …Run Code Online (Sandbox Code Playgroud) 我做了一个简单的WindowsFormsApplication,但我遇到了一些困难.在表单中我有10个TextBoxes和一个按钮.该程序的目的是在单击按钮时在每个框中生成不同的数字.这是代码的一部分:
private void button1_Click(object sender, EventArgs e)
{
int p = 0;
int[] array = GeneratingArray();
foreach (Control c in tableLayoutPanel1.Controls)
{
if (c.GetType().ToString() == "System.Windows.Forms.TextBox")
{
c.Text = array[p].ToString();
p++;
}
}
}
public int GeneratingInt()
{
int random;
Contract.Ensures(Contract.Result<int>() > -11, "Array out of Bounds(-10 ; 10)");
Contract.Ensures(Contract.Result<int>() < 11, "Array out of Bounds(-10 ; 10)");
Random gnr = new Random();
random = gnr.Next(20);
return random;
}
public int[] GeneratingArray()
{
int[] array = new int[10];
int random; …Run Code Online (Sandbox Code Playgroud) 我试图在playframework视图中迭代,但现在没有成功.我有以下结构:
@if(list != null) {
for(a <- 0 to list.size()/5)
{
// some html, where I want to get the value of a
for(b <- a*5 to a*5+5) // here I want to use the a value again
{
some html
}
}
Run Code Online (Sandbox Code Playgroud)
所以我的问题是如何获取循环的当前索引,以便我能够使用它.
java ×4
apache-spark ×2
aws-lambda ×2
c# ×1
jackson ×1
rabbitmq ×1
scala ×1
spring ×1
spring-amqp ×1
winforms ×1