小编kal*_*thy的帖子

如何在 DataFrame 中跨组使用 QuantileDiscretizer?

我有一个包含以下列的数据框。

scala> show_times.printSchema
root
 |-- account: string (nullable = true)
 |-- channel: string (nullable = true)
 |-- show_name: string (nullable = true)
 |-- total_time_watched: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

这是有关客户观看特定节目的次数的数据。我应该根据观看的总时间对每个节目的客户进行分类。

该数据集共有 1.33 亿行,其中 192 个不同的show_names.

对于每个单独的节目,我应该将客户分为 3 类(1、2、3)。

我使用 Spark MLlib 的QuantileDiscretizer

目前,我循环播放每个节目并按QuantileDiscretizer顺序运行,如下面的代码所示。

在此输入图像描述

我最终想要的是以下示例输入以获得示例输出。

输入示例:

account,channel,show_name,total_time_watched
acct1,ESPN,show1,200
acct2,ESPN,show1,250
acct3,ESPN,show1,800
acct4,ESPN,show1,850
acct5,ESPN,show1,1300
acct6,ESPN,show1,1320
acct1,ESPN,show2,200
acct2,ESPN,show2,250
acct3,ESPN,show2,800
acct4,ESPN,show2,850
acct5,ESPN,show2,1300
acct6,ESPN,show2,1320
Run Code Online (Sandbox Code Playgroud)

示例输出:

account,channel,show_name,total_time_watched,Time_watched_bin
acct1,ESPN,show1,200,1
acct2,ESPN,show1,250,1
acct3,ESPN,show1,800,2
acct4,ESPN,show1,850,2
acct5,ESPN,show1,1300,3
acct6,ESPN,show1,1320,3
acct1,ESPN,show2,200,1
acct2,ESPN,show2,250,1
acct3,ESPN,show2,800,2
acct4,ESPN,show2,850,2
acct5,ESPN,show2,1300,3
acct6,ESPN,show2,1320,3
Run Code Online (Sandbox Code Playgroud)

是否有一种更有效和分布式的方法来使用groupBy类似的操作来完成此操作,而不是循环遍历每个操作show_name …

scala apache-spark apache-spark-sql apache-spark-mllib

8
推荐指数
1
解决办法
1984
查看次数

火花流的调度延迟突然增加作业不改变其他参数

我有一个火花流工作在生产中运行1秒批次.我使用CDH 5.5 Spark 1.5.我们使用Kafka Create Directstream.我们启用了背压.我们不想起诉动态分配所以用固定数量的执行者执行的工作.

下面的图像我可以看到这些是从13.50的调度延迟突然增加但在同一时间我没有在处理时间看到任何dealy.

  1. 在处理时间相同的情况下,增加调度时间的可能原因是什么.
  2. 群集中的其他作业负载是否会影响当前的流式传输作业.根据我的理解,不应该是这种情况,因为流式传输的执行程序是预先分配的并且已经在运行

有什么想法吗?

在此输入图像描述

apache-spark spark-streaming

5
推荐指数
1
解决办法
1512
查看次数

配置Nginx根据端口号将代理请求反向到后端服务器

我有四个 Web 应用程序在一个 ec2 实例中运行,主机名为“ip-10-176-225-83.us-west-2.compute.internal”,端口为 8888、8088、8042 和 8890。所有这些 Web 应用程序都在 HTTP 上。

我们的安全团队不允许打开从本地到 AWS 的 http 端口。建议在同一 VPC 子目录中设置反向代理,该代理接受 HTTPS 请求并使用 HTTP 将其转发到后端 Web 服务器。

我在同一子网中创建了一个主机名为“ip-10-176-225-84.us-west-2.compute.internal”的新实例并安装了 Niginx 服务器。

我如何配置 Nginx 使其执行以下操作

https://ip-10-176-225-84.us-west-2.compute.internal:8080 调用http://ip-10-176-225-83.us-west-2.compute.internal: 8080并回复

其他端口也一样

reverse-proxy nginx

5
推荐指数
1
解决办法
8372
查看次数

在写入HDFS或S3时Spark是否锁定文件

我有一个S3位置与在它上面创建的配置单元表中的以下的目录结构:

s3://<Mybucket>/<Table Name>/<day Partition>
Run Code Online (Sandbox Code Playgroud)

比方说,我有一个将数据写入上述表格的位置横跨使用下面的代码行多个分区的星火计划:

Df.write.partitionBy("orderdate").parquet("s3://<Mybucket>/<Table Name>/")
Run Code Online (Sandbox Code Playgroud)

如果另一个程序,如“蜂巢SQL查询”或“AWS雅典娜查询”开始在阅读的同时,从表中的数据:

他们是否考虑写入临时文件?

在写入S3位置时,spark会锁定数据文件吗?

我们如何使用Spark作为ETL工具来处理此类并发情况?

apache-spark apache-spark-sql

5
推荐指数
1
解决办法
279
查看次数

Spark 程序的 S3 减速异常

我在 EMR 集群中运行了简单的 spark 程序,试图将 60 GB 的 CSV 文件转换为镶木地板。当我提交工作时,我得到以下异常。

391, ip-172-31-36-116.us-west-2.compute.internal, executor 96): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)Caused by: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Slow Down (Service: Amazon S3; Status Code: 503; Error Code: 503 Slow Down; Request ID: D13A3F4D7DD970FA; S3 Extended Request ID: gj3cPalkkOwtaf9XN/P+sb3jX0CNHu/QF9WTabkgP2ISuXcXdbvYO1Irg0O54OCvKlLz8WoR8E4=), S3 Extended Request ID: gj3cPalkkOwtaf9XN/P+sb3jX0CNHu/QF9WTabkgP2ISuXcXdbvYO1Irg0O54OCvKlLz8WoR8E4=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1639)
Run Code Online (Sandbox Code Playgroud)

amazon-s3 apache-spark

5
推荐指数
1
解决办法
4900
查看次数

使用AWS API Gateway的两种SSL

我们可以通过AWS API Gateway使用双向SSL功能吗?我们想在实时流应用程序中使用API​​ Gateway作为运动学的代理。

以下是我的要求

客户端向apigateway发出请求,并且apigateway需要将数据放入运动流。

验证客户端的唯一方法是使用两种SSL。我们的客户不支持其他选择。

目前,本地F5负载平衡器为我们完成了这项工作,并且在F5后面运行着许多雄猫,将数据放入运动机中。

我可以使用API​​ Gateway实现相同的功能吗?看起来甚至aws ELB似乎也不支持此选项。

我看了下面的链接,但这是在服务器上对API网关进行身份验证,而不是对客户端进行apigateway身份验证。

https://docs.aws.amazon.com/apigateway/latest/developerguide/getting-started-client-side-ssl-authentication.html

问候卡利扬

aws-api-gateway api-gateway

4
推荐指数
1
解决办法
2408
查看次数