我正在开发一个Scala(2.11)/ Spark(1.6.1)流式项目,mapWithState()用于跟踪以前批次中看到的数据.
状态分布在多个节点上的20个分区中,使用StateSpec.function(trackStateFunc _).numPartitions(20).在这种状态下,我们只有几个键(~100)映射到Sets最多约160,000个条目,这些条目在整个应用程序中增长.整个状态最多3GB,可以由群集中的每个节点处理.在每个批次中,一些数据被添加到一个状态,但直到过程结束时才被删除,即约15分钟.
在遵循应用程序UI时,与其他批次相比,每10个批次的处理时间非常长.看图像:
黄色字段代表高处理时间.
更详细的作业视图显示,在这些批次中发生在某一点,恰好是"跳过"所有20个分区.或者这就是UI所说的.
我的理解skipped是每个状态分区是一个可能的任务,没有被执行,因为它不需要重新计算.但是,我不明白为什么skips每个工作的数量变化以及为什么最后的工作需要如此多的处理.无论状态大小如何,都会出现更高的处理时间,它只会影响持续时间.
这是mapWithState()功能中的错误还是这个预期的行为?底层数据结构是否需要某种重新洗牌,Set状态是否需要复制数据?或者它更可能是我的应用程序中的缺陷?
我正在开发一个Scala(2.11)/ Spark(1.6.1)流式项目,mapWithState()用于跟踪以前批次中看到的数据.
状态分为20个分区,用StateSpec.function(trackStateFunc _).numPartitions(20).创建.我曾希望在整个集群中分发状态,但似乎每个节点都保持完整状态,并且执行总是只执行一个节点.
Locality Level Summary: Node local: 50在每个批次的UI中显示,完整批次是随机读取.之后,我写信给Kafka,分区再次传遍整个集群.我似乎无法找出为什么mapWithState()需要在单个节点上运行.如果它被一个节点而不是整个集群限制,这是否会破坏分区状态的概念?难道不能按密钥分配状态吗?
我正在研究一个小瓶子应用程序,我想在其中返回包含变音符号的字符串(通常是德语特殊字符,例如'ß').作为JSONEncoderflask中的默认值ensure_ascii=True,这将始终转换我的字符串
Hauptstraße 213
Run Code Online (Sandbox Code Playgroud)
对此:
Hauptstra\u00dfe 213
Run Code Online (Sandbox Code Playgroud)
我的第一个方法是创建一个非常基本的习惯 JSONEncoder
class NonAsciiJSONEncoder(json.JSONEncoder):
def __init__(self, **kwargs):
super(NonAsciiJSONEncoder, self).__init__(kwargs)
self.ensure_ascii = False
Run Code Online (Sandbox Code Playgroud)
如果我使用它,通过设置
app.json_encoder = NonAsciiJSONEncoder
Run Code Online (Sandbox Code Playgroud)
我return jsonify(...)将实际返回包含'ß'的字符串.
但是,正如ensure_ascii默认编码器的属性一样,我认为通过将其设置为更改它可能会更好False
app.json_encoder.ensure_ascii = False
Run Code Online (Sandbox Code Playgroud)
这实际上将属性设置为False,我在返回之前检查了它.但不知何故,字符串仍然返回没有'ß'但是\u00df.
这怎么可能,正如我在自定义编码器中所做的那样,将此属性设置为False?
这件事已经困扰了我很长时间了。我通过序列化一些 Python Dict 对象生成了以下泡菜文件(名为 rawFile.raw):
rawFile.raw 的内容(为了易读性而被截断):
(dp0
S'request_body # 1'
p1
S''
p2
sS'port # 1'
p3
I80
sS'query_params # 1'
p4
ccopy_reg
_reconstructor
p5
(cnetlib.odict
ODict
p6
c__builtin__
object
p7
Ntp8
Rp9
(dp10
S'lst'
p11
(lp12
(S'layoutId'
p13
S'-1123196643'
p14
tp15
asbsS'headers # 1'
p16
g5
(cnetlib.odict
ODictCaseless
p17
g7
Ntp18
Rp19
(dp20
g11
(lp21
(lp22
S'sn'
p23
aS'2.VI7D9DF640615B4948854C88C5E769B94C.SIE5FB3A28D0DA4F27A3D2C03B8FAAFFAE.VS144257070601359422212.1442570840'
p24
aa(lp25
S'Browser-Name'
p26
aS'Mobile Safari'
p27
aa(lp28
S'Accept-Encoding'
p29
aS'gzip'
p30
aa(lp31
S'secureToken'
p32
aS'5nANXZrwYBrl9sNykA+qlpLsjHXlnF97tQLHnPgcjwZm9u0t8XAHtO4XTjKODcIb0ee4LlFchmUiptWZEPDUng=='
p33
aa(lp34
S'User-Agent' …Run Code Online (Sandbox Code Playgroud) 我m在python中有一个列表,我想删除m中不是所有先前元素的相对素数的所有元素.所以,如果m=[2,3,4]我想要输出[2,3].
我尝试迭代值m,但它不起作用,因为m更改的大小,然后索引值超出范围.
我在调用 API 时遇到一些问题。我需要发送包含两件事的发布数据:一个 ID 和一个 int 数组。我尝试了很多方法,但都导致错误或根本没有以正确的方式发送数据。我找到的所有答案都没有处理我想发送两种不同数据类型的事实。
在另一边我有:
Call<DefaultResponseList> someMethod(@Field("parm1") String parm1, @Field("parm2[]") ArrayList<Integer> parm2);
Run Code Online (Sandbox Code Playgroud)
我试过:
List<int> ints1 = new List<int>();
ids.Add(1);
ids.Add(2);
ids.Add(3);
var values = new List<KeyValuePair<string, object>>
{
new KeyValuePair<string, object>("parm1", "lkdjfowejfd123"),
new KeyValuePair<string, object>("parm2", ints1)
};
var httpClient = new HttpClient(new HttpClientHandler());
HttpResponseMessage response = await httpClient.PostAsync(someUrl, new FormUrlEncodedContent(values));
response.EnsureSuccessStatusCode();
string data = await response.Content.ReadAsStringAsync();
Run Code Online (Sandbox Code Playgroud) python ×3
apache-spark ×2
scala ×2
c# ×1
encoding ×1
flask ×1
httpclient ×1
json ×1
mysql ×1
mysql-python ×1
pickle ×1
post ×1
python-2.7 ×1