小编Tom*_*ous的帖子

没有触发气流'one_success'任务

我正在使用LocalExecutor在4 CPU机器上运行Airflow

我已经定义了一个上游任务是一次成功

create_spark_cluster_task = BashOperator(
    task_id='create_spark_cluster',
    trigger_rule='one_success',
    bash_command= ...,
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

...

download_bag_data_task >> create_spark_cluster_task
download_google_places_data_task >> create_spark_cluster_task
download_facebook_places_details_data_task >> create_spark_cluster_task
download_facebook_places_details_data_task_2 >> create_spark_cluster_task
download_facebook_places_details_data_task_3 >> create_spark_cluster_task
download_factual_data_task >> create_spark_cluster_task
download_dataoutlet_data_task >> create_spark_cluster_task
Run Code Online (Sandbox Code Playgroud)

但即使有些人明确标记为成功,任务也不会触发

"下载任务"确实并行运行,因此不会出现问题

图形

检查任务显示:

依赖性:未知

原因:满足所有依赖项但任务实例未运行.在大多数情况下,这只是意味着任务可能很快就会被安排,除非: - 调度程序已关闭或负载过重 - 此任务实例已经运行并且手动更改了状态(例如在UI中清除)

我看过负载,确实很高:

负载平均值:2.45,3.55,3.71 CPU为50-60%

但其他任务已经完成,所以应该有资源可以自由启动另一项任务,对吧?

airflow apache-airflow airflow-scheduler

8
推荐指数
0
解决办法
784
查看次数

为PySpark捆绑Python3包会导致导入丢失

我正在尝试运行依赖于某些python3库的PySpark作业.我知道我可以在Spark Cluster上安装这些库,但由于我正在重新使用群集进行多个作业,我想将所有依赖项捆绑在一起并通过--py-files指令将它们传递给每个作业.

为此,我使用:

pip3 install -r requirements.txt --target ./build/dependencies
cd ./build/dependencies
zip -qrm . ../dependencies.zip
Run Code Online (Sandbox Code Playgroud)

这有效地压缩了在根级别使用的所需包中的所有代码.

在此输入图像描述

在我,main.py我可以导入依赖项

if os.path.exists('dependencies.zip'):
    sys.path.insert(0, 'dependencies.zip')
Run Code Online (Sandbox Code Playgroud)

并将.zip添加到我的Spark上下文中

sc.addPyFile('dependencies.zip')
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好.

但出于某种原因,这将在Spark Cluster上转移到某种依赖地狱

比如跑步

spark-submit --py-files dependencies.zip main.py

main.py(或类)我想要使用熊猫的地方.它将触发此错误的代码:

Traceback(最近一次调用最后一次):

在job_module = importlib.import_module('spark.jobs.%s'%args.job_name)中输入文件"/Users/tomlous/Development/Python/enrichers/build/main.py",第53行...

文件"",第978行,在_gcd_import中

文件"",第961行,在_find_and_load中

文件"",第950行,在_find_and_load_unlocked中

文件"",第646行,在_load_unlocked中

文件"",第616行,在_load_backward_compatible中

文件"dependencies.zip/spark/jobs/classify_existence.py",第9行,in

文件"dependencies.zip/enrich/existence.py",第3行,in

文件"dependencies.zip/pandas/ 初始化 py"为19行,在

ImportError:缺少必需的依赖项['numpy']

看着熊猫,__init__.py 我看到了类似的东西__import__(numpy)

所以我认为numpy没有加载.

但是,如果我改变我的代码以显式调用numpy函数,它实际上发现numpy,但不是它的一些dependecies

import numpy as np
a = np.array([1, 2, 3])
Run Code Online (Sandbox Code Playgroud)

代码返回

Traceback(最近一次调用最后一次):

文件"dependencies.zip/numpy/core/ 初始化的.py",第16行,在

ImportError:无法导入名称'multiarray'

所以我的问题是:

我应该如何将python3库与我的spark作业捆绑在一起,以便我不必在Spark集群上安装所有可能的库?

python numpy python-3.x apache-spark pyspark

7
推荐指数
1
解决办法
1032
查看次数

从 Scala 源代码 (http4s) 生成 Swagger/OpenAPI 规范

所以我不是 swagger 专家,但所有使用 swagger 的系统都要求您拥有 JSON 或 YAML 中的 swagger 规范来定义 API 的所有端点(等)。

我的问题是:有没有办法根据实际的源代码生成这些规范文件?我在问,因为当您开始添加属性或返回略有不同的结果时,似乎很难保持端点代码和文档同步。

所以当我有这个代码时(使用 http4s 和 RhoService):

object HelloWorld {
  val service = new RhoService {
    GET / "hello" / 'name |>> { name: String =>
      Ok(Json.obj("message" -> Json.fromString(s"Hello, ${name}")))
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

如果它可以产生(以某种方式:)

/hello/{name}:
    get:
      tags:
      - "HelloWorld"
      summary: ""
      description: ""
      operationId: "getHellobyName"
      produces:         
      - "application/json"
      parameters:
      - name: "name"
        in: "path"
        description: ""
        required: true
        type: "string"
      responses:
        200:
          description: "successful operation"
          schema:
            $ref: "#/definitions/Hello"           
      security: …
Run Code Online (Sandbox Code Playgroud)

swagger swagger-2.0 openapi http4s

6
推荐指数
2
解决办法
2536
查看次数

奇怪的性能问题 Spark LSH MinHash approxSimilarityJoin

我正在使用 Apache Spark ML LSH 的 approxSimilarityJoin 方法加入 2 个数据集,但我看到了一些奇怪的行为。

在(内部)连接之后,数据集有点偏斜,但是每次完成一个或多个任务都需要花费过多的时间。

sparkui-1

正如您所看到的,每个任务的中位数是 6 毫秒(我在较小的源数据集上运行它进行测试),但 1 个任务需要 10 分钟。它几乎不使用任何 CPU 周期,它实际上连接了数据,但是速度太慢了。下一个最慢的任务在 14 秒内运行,有 4 倍多的记录并且实际上溢出到磁盘。

如果你看 sparkuisql

连接本身是根据 minhash 规范和 udf 计算匹配对之间的 jaccard 距离的 pos & hashValue (minhash) 上的两个数据集之间的内部连接。

分解哈希表:

modelDataset.select(
      struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
Run Code Online (Sandbox Code Playgroud)

杰卡德距离函数:

modelDataset.select(
      struct(col("*")).as(inputName), posexplode(col($(outputCol))).as(explodeCols))
Run Code Online (Sandbox Code Playgroud)

连接处理过的数据集:

 override protected[ml] def keyDistance(x: Vector, y: Vector): Double = {
    val xSet = x.toSparse.indices.toSet
    val ySet = y.toSparse.indices.toSet
    val intersectionSize = xSet.intersect(ySet).size.toDouble
    val unionSize = xSet.size + ySet.size - intersectionSize
    assert(unionSize > 0, "The …
Run Code Online (Sandbox Code Playgroud)

duplicates minhash apache-spark apache-spark-mllib lsh

6
推荐指数
1
解决办法
1001
查看次数

在 Google App Engine (GAE) 中使用 zip_read 解压缩上传的文件

据我所知,用户上传的文件必须通过 CloudStorageTools::createUploadUrl() 将文件上传到 PHP GAE 环境,从而在 gs 存储桶中生成一个文件 (gs://[name]/[id])。

文件上传很有用,但解压缩上传的文件会带来一些问题。我尝试了 3 种方法,但似乎没有一种对我有用:

  1. 支持 PHP zip 函数 ( http://www.php.net/manual/en/ref.zip.php ),但在 gs 存储桶路径上使用 zip_open 不起作用(检查 fopen 命令导致工作文件指针:资源 id #120)

  2. ZipArchive ( http://nl1.php.net/manual/en/book.zip.php )。不幸的是,GAE 还不支持 ZipArchive 库(还?)。必须编译。

  3. PclZip ( http://www.phpconcept.net/pclzip ) 给了我一个有效的资源句柄(支持 zlib),但我遇到了这个问题(fseek 经常为 0):https : //code.google.com/p /googleappengine/issues/detail?id=10881

有人知道如何将大型 zip 文件上传到 GAE (PHP)、解压缩并使用它们吗?我几乎要要求用户自己解压缩 zip,分别上传解压缩的文件并绕过整个解压缩过程。

php google-app-engine file-upload unzip

5
推荐指数
1
解决办法
825
查看次数

具有特征的 Spark 2.0 数据集编码器

我正在构建一个数据集,其中每个记录都映射到一个案例类(例如带有原始类型的CustomDataEntry)。

val dataset = spark.read (...) .as[CustomDataEntry]
Run Code Online (Sandbox Code Playgroud)

到现在为止还挺好

现在我正在编写一个带有CustomDataEntry 的数据集的转换器,进行一些计算并添加一些新列,例如。找到经纬度并计算geohash

我的CustomDataEntry现在有一个属性/列(geohash),它存在于案例类中,但存在于数据集中。这再次工作正常,但似乎不太好type safe(如果编码器甚至可以这样做)。

我可以将它添加为案例类中的 Option 字段,但这看起来很混乱,而不是可组合的。更好的方法似乎是我应该在CustomDataEntry上混合一些特征

例如

trait Geo{
    val geohash:String
}
Run Code Online (Sandbox Code Playgroud)

然后将数据集返回为

dataset.as[CustomDataEntry with Geo]
Run Code Online (Sandbox Code Playgroud)

这行不通

错误:(21, 10) 无法找到存储在数据集中的类型的编码器。导入 spark.implicits 支持原始类型(Int、String 等)和产品类型(case 类)。未来版本中将添加对序列化其他类型的支持。.as[带地理信息的自定义数据条目]

答案似乎很明显(不支持,未来版本),但也许我忽略了一些东西?

scala dataset apache-spark

5
推荐指数
1
解决办法
1454
查看次数

使用 Spark / Scala 在镶木地板文件中存储和读取自定义元数据

我知道镶木地板文件存储元数据,但是否可以使用 Scala(最好)和 Spark将自定义元数据添加到镶木地板文件中?

这个想法是我在 Hadoop 存储中存储了许多类似的结构化镶木地板文件,但每个都有一个唯一命名的源(字符串字段,也作为镶木地板文件中的列出现),但是,我想访问这些信息而不创建实际读取镶木地板的开销,甚至可能从镶木地板中删除这个多余的列。

我真的不想将此信息放在文件名中,所以我现在最好的选择是读取每个镶木地板的第一行并将源列用作字符串字段。

它有效,但我只是想知道是否有更好的方法。

scala apache-spark parquet

5
推荐指数
0
解决办法
1726
查看次数

gcloud 使用本地密钥和项目限制计算 ssh

我们有一个用户可以通过 SSH 连接到谷歌云平台上的虚拟机。

他的密钥被添加到虚拟机,他可以使用 SSH

gcloud compute ssh name-of-vm

但是,以这种方式连接将始终让 gcloud 尝试更新项目范围的元数据

更新项目 ssh 元数据...失败

失败,因为他只有访问和管理此 VM 的权限

然而很烦人的是,每次他必须以这种方式连接时,他都必须等待 GCP 尝试更新元数据,这是不允许的,然后检查机器上的 sshkeys。

  1. 命令中是否有一个标志可以跳过检查/更新项目范围的 ssh 密钥?

是的,我们可以在实例上“阻止项目范围内的 ssh 密钥”,但这意味着其他项目管理员无法再登录。

我还尝试尽量减少对该用户的访问。

  1. 但是,理想情况下,如果允许他通过 SSH 连接到机器、启动和停止实例并将数据存储到存储桶中,他应该拥有什么权利?

ssh google-cloud-platform gcloud google-cloud-iam

5
推荐指数
1
解决办法
2433
查看次数

Spark 从 Kafka 批量读取并使用 Kafka 跟踪偏移量

我知道使用 Kafka 自己的偏移跟踪而不是其他方法(如检查点)对于流作业来说是有问题的。

然而,我只想每天运行一个 Spark 批处理作业,读取从最后一个偏移量到最新偏移量的所有消息,并用它做一些 ETL。

理论上我想像这样读取这些数据:

val dataframe = spark.read
      .format("kafka")
      .option("kafka.bootstrap.servers", "localhost:6001")
      .option("subscribe", "topic-in")
      .option("includeHeaders", "true")
      .option("kafka.group.id", s"consumer-group-for-this-job")
      .load()
Run Code Online (Sandbox Code Playgroud)

并让 Spark 根据以下信息将偏移量提交回 Kafkagroup.id

不幸的是,Spark 从未将这些提交回来,因此我发挥了创意,在 etl 作业的末尾添加了以下代码,用于手动更新 Kafka 中消费者的偏移量:

val offsets: Map[TopicPartition, OffsetAndMetadata] = dataFrame
      .select('topic, 'partition, 'offset)
      .groupBy("topic", "partition")
      .agg(max('offset))
      .as[(String, Int, Long)]
      .collect()
      .map {
        case (topic, partition, maxOffset) => new TopicPartition(topic, partition) -> new OffsetAndMetadata(maxOffset)
      }
      .toMap

val props = new Properties()
    props.put("group.id", "consumer-group-for-this-job")
    props.put("bootstrap.servers", "localhost:6001")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer")
    props.put("enable.auto.commit", "false")
    val …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-structured-streaming spark-kafka-integration

5
推荐指数
1
解决办法
7940
查看次数

气流失败的松弛消息

如何配置Airflow,以便DAG中的任何故障都会(立即)导致松弛消息?

此时我通过创建slack_failed_task来管理它:

slack_failed_task =  SlackAPIPostOperator(
    task_id='slack_failed',
    channel="#datalabs",
    trigger_rule='one_failed',
    token="...",
    text = ':red_circle: DAG Failed',
    icon_url = 'http://airbnb.io/img/projects/airflow3.png',
    dag=dag)
Run Code Online (Sandbox Code Playgroud)

并在DAG中的每个其他任务的上游设置此任务(one_failed):

slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
Run Code Online (Sandbox Code Playgroud)

它有效,但它容易出错,因为忘记添加任务会跳过松弛通知,看起来很多工作.

是否有可能扩展email_on_failureDAG中的财产?

奖金;-)包括将失败任务的名称传递给消息的方法.

slack airflow apache-airflow

2
推荐指数
2
解决办法
6396
查看次数