我正在尝试运行提交带有 k8s 集群的 spark 应用程序的基本示例。
我使用 spark 文件夹中的脚本创建了我的 docker 镜像:
sudo ./bin/docker-image-tool.sh -mt spark-docker build
sudo docker image ls
REPOSITORY TAG IMAGE ID CREATED SIZE
spark-r spark-docker 793527583e00 17 minutes ago 740MB
spark-py spark-docker c984e15fe747 18 minutes ago 446MB
spark spark-docker 71950de529b3 18 minutes ago 355MB
openjdk 8-alpine 88d1c219f815 15 hours ago 105MB
hello-world latest fce289e99eb9 3 months ago 1.84kB
Run Code Online (Sandbox Code Playgroud)
然后尝试提交 SparkPi 示例(如官方文档中所示)。
./bin/spark-submit \
--master k8s://[MY_IP]:8443 \
--deploy-mode cluster \
--name spark-pi --class org.apache.spark.examples.SparkPi \
--driver-memory 1g …Run Code Online (Sandbox Code Playgroud) 我有一个 pandas DataFrame df,我想为其计算每批行的一些统计信息。
例如,假设我有一个batch_size = 200000.
对于每批batch_size行,我希望获得IDDataFrame 列的唯一值的数量。
我怎样才能做这样的事情呢?
这是我想要的一个例子:
print(df)
>>
+-------+
| ID|
+-------+
| 1|
| 1|
| 2|
| 2|
| 2|
| 3|
| 3|
| 3|
| 3|
+-------+
batch_size = 3
my_new_function(df,batch_size)
>>
For batch 1 (0 to 2) :
2 unique values
1 appears 2 times
2 appears 1 time
For batch 2 (3 to 5) :
2 unique values
2 appears 2 times …Run Code Online (Sandbox Code Playgroud) 我正在尝试在推理模式下运行Mask_RCNN 的 Keras 实现。它基本上是运行这段代码demo.ipynb
但是当我运行它时,我在模型创建时收到以下错误:
ValueError: Tried to convert 'shape' to a tensor and failed. Error: None values not supported.
Run Code Online (Sandbox Code Playgroud)
这是堆栈跟踪:
Traceback (most recent call last):
File "/snap/pycharm-community/128/helpers/pydev/pydevd.py", line 1758, in <module>
main()
File "/snap/pycharm-community/128/helpers/pydev/pydevd.py", line 1752, in main
globals = debugger.run(setup['file'], None, None, is_module)
File "/snap/pycharm-community/128/helpers/pydev/pydevd.py", line 1147, in run
pydev_imports.execfile(file, globals, locals) # execute the script
File "/snap/pycharm-community/128/helpers/pydev/_pydev_imps/_pydev_execfile.py", line 18, in execfile
exec(compile(contents+"\n", file, 'exec'), glob, loc)
File "[PATH/TO/Mask_RCNN/]/Own_code/Test.py", line 44, in <module>
model = …Run Code Online (Sandbox Code Playgroud) 我有一个包含几百万行的 Pandas DataFrame。我想根据条件从行中选择一个值C。
我有以下正在运行的代码:
all_matches= df.loc[C, "column_name"]
first_match = next(iter(all_matches), 'no match')
Run Code Online (Sandbox Code Playgroud)
问题是它的效率极低。我想知道如何做类似的事情df.loc[C, "column_name"],但停在第一场比赛。
我正在使用 Spark 2.0.0 的 SQL API。
我想知道当我必须对我的数据执行两个独立的操作时,什么是好的做法。这是一个基本示例:
val ds = sc.parallelize(List(
("2018-12-07T15:31:48Z", "AAA",3),
("2018-12-07T15:32:48Z", "AAA",25),
("2018-12-07T15:33:48Z", "AAA",20),
("2018-12-07T15:34:48Z", "AAA",10),
("2018-12-07T15:35:48Z", "AAA",15),
("2018-12-07T15:36:48Z", "AAA",16),
("2018-12-07T15:37:48Z", "AAA",8),
("2018-12-07T15:31:48Z", "BBB",15),
("2018-12-07T15:32:48Z", "BBB",0),
("2018-12-07T15:33:48Z", "BBB",0),
("2018-12-07T15:34:48Z", "BBB",1),
("2018-12-07T15:35:48Z", "BBB",8),
("2018-12-07T15:36:48Z", "BBB",7),
("2018-12-07T15:37:48Z", "BBB",6)
)).toDF("timestamp","tag","value")
val newDs = commonTransformation(ds).cache();
newDs.count() // force computation of the dataset
val dsAAA = newDs.filter($"tag"==="AAA")
val dsBBB = newDs.filter($"tag"==="BBB")
actionAAA(dsAAA)
actionBBB(dsBBB)
Run Code Online (Sandbox Code Playgroud)
使用以下功能:
def commonTransformation(ds:Dataset[Row]):Dataset[Row]={
ds // do multiple transformations on dataframe
}
def actionAAA(ds:Dataset[Row]){
Thread.sleep(5000) // Sleep to simulate an …Run Code Online (Sandbox Code Playgroud) 我有一个 Pandas 数据框,如下所示:
tags value
[tag1, tag2, tag3] 0
[tag2, tag3] 10
[tag1, tag3] 50
...
Run Code Online (Sandbox Code Playgroud)
在此数据框上,我想应用一个函数,对于每行的每个标签,将创建一个包含“标签”列和“相关标签”列的新行。这是我所期待的一个例子:
tag value related_tags
tag1 0 [tag2, tag3]
tag2 0 [tag1, tag3]
tag3 0 [tag1, tag2]
tag2 10 [tag3]
tag3 10 [tag2]
tag1 50 [tag3]
tag3 50 [tag1]
Run Code Online (Sandbox Code Playgroud)
我熟悉 Spark DataFrames 但不熟悉 Pandas,有没有一种简单的方法可以实现这一点?
我有两个 Pandas Dataframe df1,df2其中df2是 的一部分df1,我想创建一个 Dataframe ,其中包含不在 中的df3所有行。df1df2
这是一个例子:
print(df1)
>>
+---------+
| ID|
+---------+
| AAA|
| DDD|
| BBB|
| CCC|
| EEE|
| FFF|
+---------+
print(df2)
>>
+---------+
| ID|
+---------+
| AAA|
| EEE|
| FFF|
+---------+
print(df3)
>>
+---------+
| ID|
+---------+
| DDD|
| BBB|
| CCC|
+---------+
Run Code Online (Sandbox Code Playgroud)
笔记:
ID仅在列上完成。我尝试将Spark应用程序提交到Kubernetes集群(Minikube)。在客户端模式下运行我的spark提交时,一切正常。在3个容器中创建3个执行程序,并执行代码。这是我的提交命令:
[MY_PATH]/bin/spark-submit \
--master k8s://https://[API_SERVER_IP]:8443 \
--deploy-mode client \
--name [Name] \
--class [MyClass] \
--conf spark.kubernetes.container.image=spark:2.4.0 \
--conf spark.executor.instances=3 \
[PATH/TO/MY/JAR].jar
Run Code Online (Sandbox Code Playgroud)
现在,我将其修改为以集群模式运行:
[MY_PATH]/bin/spark-submit \
--master k8s://https://[API_SERVER_IP]:8443 \
--deploy-mode cluster \
--name [Name] \
--class [MyClass] \
--conf spark.kubernetes.container.image=spark:2.4.0 \
--conf spark.executor.instances=3 \
local://[PATH/TO/MY/JAR].jar
Run Code Online (Sandbox Code Playgroud)
这次,将创建一个驱动程序窗格以及一个驱动程序服务,然后该驱动程序窗格将失败。在Kubernetes上,我可以看到以下错误:
MountVolume.SetUp failed for volume "spark-conf-volume" : configmap "sparkpi-1555314081444-driver-conf-map" not found
Run Code Online (Sandbox Code Playgroud)
在pod日志中,我出现了错误:
Forbidden!Configured service account doesn't have access.
Service account may have been revoked.
pods "sparkpi-1555314081444-driver" is forbidden: User "system:serviceaccount:default:default" cannot get resource "pods" in API …Run Code Online (Sandbox Code Playgroud) 我想使用BERT模型对Tensorflow进行多标签分类。
要做到这一点,我想适应的例子run_classifier.py来自BERT GitHub的仓库,这是关于如何使用BERT做简单的分类,使用一个例子由谷歌研究给出预训练的权重。(例如使用BERT-Base, Cased)
我有X不同的标签,它们的值为0或1,所以我想在原始BERT模型中添加一个新的Dense层,X并使用sigmoid_cross_entropy_with_logits激活函数。
因此,从理论上讲,我认为我很好。
问题是我不知道如何附加一个新的输出层,并使用现有BertModel类仅使用我的数据集重新训练该新层。
这是我想必须从中进行修改的原始create_model()功能run_classifier.py。但是我对如何做却有些迷茫。
def create_model(bert_config, is_training, input_ids, input_mask, segment_ids,
labels, num_labels, use_one_hot_embeddings):
"""Creates a classification model."""
model = modeling.BertModel(
config=bert_config,
is_training=is_training,
input_ids=input_ids,
input_mask=input_mask,
token_type_ids=segment_ids,
use_one_hot_embeddings=use_one_hot_embeddings)
output_layer = model.get_pooled_output()
hidden_size = output_layer.shape[-1].value
output_weights = tf.get_variable(
"output_weights", [num_labels, hidden_size],
initializer=tf.truncated_normal_initializer(stddev=0.02))
output_bias = tf.get_variable(
"output_bias", [num_labels], initializer=tf.zeros_initializer())
with tf.variable_scope("loss"):
if is_training:
# I.e., 0.1 dropout
output_layer …Run Code Online (Sandbox Code Playgroud) 我有一个存储在 Pandas Dataframe 中的图像 URL 列表。我想下载所有这些图像并将它们存储在本地。
我用来执行此操作的代码是:
import os
import requests
def load(df, output_folder):
print("Ready to load "+str(len(df.index))+" images.")
for i,row in df.iterrows():
print("Image "+str(i))
save_image_from_url(row["image_url"], os.path.join(output_folder, row["image_name"]))
''' From a given URL, download the image and store it at the given path'''
def save_image_from_url(url, output_path):
image = requests.get(url)
with open(output_path, 'wb') as f:
f.write(image.content)
Run Code Online (Sandbox Code Playgroud)
问题是该过程非常慢(每个图像从 0.5 秒到 4 秒)。有没有办法做得更快?
python ×7
pandas ×4
apache-spark ×3
kubernetes ×2
tensorflow ×2
concurrency ×1
dataframe ×1
keras ×1
minikube ×1
performance ×1
scala ×1
web-scraping ×1