我正在使用YARN在集群中运行Spark Streaming任务.集群中的每个节点都运行多个spark worker.在流式传输开始之前,我想在群集中所有节点上的所有工作程序上执行"设置"功能.
流式传输任务将传入的邮件分类为垃圾邮件或非垃圾邮件,但在此之前,它需要将最新的预先训练的模型从HDFS下载到本地磁盘,如此伪代码示例:
def fetch_models():
if hadoop.version > local.version:
hadoop.download()
Run Code Online (Sandbox Code Playgroud)
我在SO上看过以下示例:
sc.parallelize().map(fetch_models)
Run Code Online (Sandbox Code Playgroud)
但是在Spark 1.6中parallelize()需要使用一些数据,比如我现在正在做的这种糟糕的解决方法:
sc.parallelize(range(1, 1000)).map(fetch_models)
Run Code Online (Sandbox Code Playgroud)
为了确保该函数在所有工作程序上运行,我将范围设置为1000.我还不确切地知道在运行时集群中有多少个工作程序.
我已经阅读了编程文档并无情地搜索了,但我似乎无法找到任何方法实际上只向所有工作人员分发任何没有任何数据的东西.
完成此初始化阶段后,流式传输任务与往常一样,对来自Kafka的传入数据进行操作.
我使用模型的方法是运行类似这样的函数:
spark_partitions = config.get(ConfigKeys.SPARK_PARTITIONS)
stream.union(*create_kafka_streams())\
.repartition(spark_partitions)\
.foreachRDD(lambda rdd: rdd.foreachPartition(lambda partition: spam.on_partition(config, partition)))
Run Code Online (Sandbox Code Playgroud)
从理论上讲,我可以检查模型中的模型是否是最新的on_partition,尽管在每个批次上执行此操作会非常浪费.我想在Spark开始从Kafka检索批次之前这样做,因为从HDFS下载可能需要几分钟......
更新:
要明确:这不是关于如何分发文件或如何加载它们的问题,而是关于如何在不对任何数据进行操作的情况下对所有工作程序运行任意方法.
澄清当前实际加载模型的含义:
def on_partition(config, partition):
if not MyClassifier.is_loaded():
MyClassifier.load_models(config)
handle_partition(config, partition)
Run Code Online (Sandbox Code Playgroud)
虽然MyClassifier是这样的:
class MyClassifier:
clf = None
@staticmethod
def is_loaded():
return MyClassifier.clf is not None
@staticmethod
def load_models(config):
MyClassifier.clf = load_from_file(config)
Run Code Online (Sandbox Code Playgroud)
静态方法,因为PySpark似乎无法使用非静态方法序列化类(类的状态与另一个worker的关系无关).在这里,我们只需要调用load_models()一次,并在将来的所有批次中调用MyClassifier.clf.对于每个批次来说,这是不应该做的事情,这是一次性的事情.与使用fetch_models()从HDFS下载文件相同.
有没有办法确定或计算是否以及如何使用rowspans对HTML表进行规范化?或者,如果有一个JavaScript库可以做到这一点.
例如,这个表:
+-----------+---------+
| Apple | Red |
| Apple | Green |
| Apple | Yellow |
| Sun | Yellow |
| Sun | Hot |
| Charizard | Hot |
| Charizard | Pokémon |
+-----------+---------+
Run Code Online (Sandbox Code Playgroud)
会变成这样:
+-----------+---------+
| Apple | Red |
| | Green |
| |---------|
|-----------| Yellow |
| Sun |-------- |
|-----------| Hot |
| |---------|
| Charizard | Pokémon |
+-----------+---------+
Run Code Online (Sandbox Code Playgroud)
看看这个小提琴,看看我的意思:http://jsfiddle.net/scorch/LZKkQ/
其中一些组合很容易手动弄清楚,但有些可能非常复杂.我想尽可能地减少表格,并确保没有其他组合可以进一步减少它.即,最好只有表中的唯一值.
编辑:别介意小提琴中的额外栏目.似乎Firefox在最右边的列上有一些rowpan的问题,所以我不得不为它添加另一个以获得所需的效果.
编辑2:
下面提到的DataTables插件 …
我前几天升级到ubuntu 18.04,从那以后我再也不能再到我的机器上了.auth.log告诉我这个:
Jan 15 08:41:15 pc207 sshd[5358]: Accepted publickey for oscar from 10.60.0.15 port 42004 ssh2: RSA SHA256:59dtkmxMKMJG22+SQEoo7D55JSr+xlFjRyLMclLY210
Jan 15 08:41:15 pc207 sshd[5358]: debug1: monitor_child_preauth: oscar has been authenticated by privileged process
Jan 15 08:41:16 pc207 sshd[5358]: debug1: monitor_read_log: child log fd closed
Jan 15 08:41:16 pc207 sshd[5358]: fatal: privsep_preauth: preauth child terminated by signal 31
Run Code Online (Sandbox Code Playgroud)
虽然ssh -vvv localhost告诉我这个:
debug1: Authentication succeeded (publickey).
Authenticated to localhost ([127.0.0.1]:22).
debug1: channel 0: new [client-session]
debug3: ssh_session2_open: channel_new: 0
debug2: channel …Run Code Online (Sandbox Code Playgroud) 我用这种方式使用Metro 1.2 生成了一个带有本地wsdl的webservice客户端:
./wsimport.sh -extension -verbose -wsdllocation service.wsdl -s src -d target service.wsdl -Xendorsed
Run Code Online (Sandbox Code Playgroud)
wsdl使用SOAP 1.2和wsHttpBinding.它应该连接到使用NTLM身份验证方法的WCF服务器.
我创建了一个Authenticator处理NTLM身份验证:
public class NtlmAuthenticator extends Authenticator
{
private String username = "";
private String password = "";
public NtlmAuthenticator(String username, String password) {
this.username = username;
this.password = password;
}
@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(username, password.toCharArray());
}
}
Run Code Online (Sandbox Code Playgroud)
我在调用每个webservice方法之前设置的:
@WebEndpoint(name = "WSHttpBinding_ICustomerService")
public ICustomerService getWSHttpBindingICustomerService() {
ICustomerService service =
super.getPort(new …Run Code Online (Sandbox Code Playgroud)