小编Dan*_*bos的帖子

IntelliJ中找不到reduceByKey方法

这是我正在为reduceByKey尝试的代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import scala.math.random

import org.apache.spark._
import org.apache.spark.storage.StorageLevel

object MapReduce {

  def main(args: Array[String]) {

  val sc = new SparkContext("local[4]" , "")

  val file = sc.textFile("c:/data-files/myfile.txt")

  val counts = file.flatMap(line => line.split(" "))
    .map(word => (word, 1))
    .reduceByKey(_ + _)

  }

}
Run Code Online (Sandbox Code Playgroud)

给编译器错误:"无法解析符号reduceByKey"

当我将鼠标悬停在reduceByKey的实现上时,它提供了三种可能的实现,因此看起来它正在被发现?:

在此输入图像描述

scala intellij-idea apache-spark

8
推荐指数
2
解决办法
6423
查看次数

获取所有Apache Spark执行程序日志

我想以编程方式收集Spark应用程序驱动程序中的所有执行程序日志.(当出现故障时我想收集并存储所有相关日志.)有一个很好的方法吗?

一个想法是创建一个空的RDD,每个执行器有一个分区.然后,我以某种方式确保每个分区实际上在不同的执行器上处理(不知道如何)并mapPartitions执行从磁盘加载执行程序日志,然后将collect其提取到应用程序.

apache-spark

8
推荐指数
1
解决办法
1221
查看次数

亚马逊s3a使用Spark返回400 Bad Request

出于结帐目的,我尝试将Amazon S3存储桶设置为检查点文件.

val checkpointDir = "s3a://bucket-name/checkpoint.txt"
val sc = new SparkContext(conf)
sc.setLocalProperty("spark.default.parallelism", "30")
sc.hadoopConfiguration.set("fs.s3a.access.key", "xxxxx")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "xxxxx")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "bucket-name.s3-website.eu-central-1.amazonaws.com")
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint(checkpointDir)
Run Code Online (Sandbox Code Playgroud)

但它会因此异常而停止

Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 9D8E8002H3BBDDC7, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: Qme5E3KAr/KX0djiq9poGXPJkmr0vuXAduZujwGlvaAl+oc6vlUpq7LIh70IF3LNgoewjP+HnXA=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at …
Run Code Online (Sandbox Code Playgroud)

amazon-s3 amazon-web-services hdfs apache-spark spark-streaming

8
推荐指数
2
解决办法
9770
查看次数

如何设置newAPIHadoopFile的分区数?

"old" SparkContext.hadoopFile接受一个minPartitions参数,这是分区数量的提示:

def hadoopFile[K, V](
  path: String,
  inputFormatClass: Class[_ <: InputFormat[K, V]],
  keyClass: Class[K],
  valueClass: Class[V],
  minPartitions: Int = defaultMinPartitions
  ): RDD[(K, V)]
Run Code Online (Sandbox Code Playgroud)

但是没有这样的论点SparkContext.newAPIHadoopFile:

def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
  path: String,
  fClass: Class[F],
  kClass: Class[K],
  vClass: Class[V],
  conf: Configuration = hadoopConfiguration): RDD[(K, V)]
Run Code Online (Sandbox Code Playgroud)

实际上mapred.InputFormat.getSplits需要一个提示参数,但mapreduce.InputFormat.getSplits需要一个JobContext.通过新API影响拆分数量的方法是什么?

我已经尝试设置mapreduce.input.fileinputformat.split.maxsizefs.s3n.block.sizeConfiguration对象,但他们没有影响.我正在尝试从中加载4.5 GB文件s3n,并将其加载到单个任务中.

https://issues.apache.org/jira/browse/HADOOP-5861是相关的,但它表明我应该已经看到多个拆分,因为默认块大小是64 MB.

hadoop apache-spark

7
推荐指数
1
解决办法
2508
查看次数

Spark Scala过滤DataFrame,其中值不在另一个DataFrame中

我有两个DataFrames:ab.这是他们的样子:

a
-------
v1 string
v2 string

roughly hundreds of millions rows


b
-------
v2 string

roughly tens of millions rows
Run Code Online (Sandbox Code Playgroud)

我想保留DataFrame av2不存在的行b("v2").

我知道我可以使用左连接和过滤器,其中右侧为null或SparkSQL具有"不在"构造.我打赌有更好的方法.

scala apache-spark

7
推荐指数
1
解决办法
4175
查看次数

使用GoDaddy API添加A记录

我正在尝试使用GoDaddy的API将A记录添加到域中,但我在浏览器的控制台中收到了422(不可处理的实体)响应错误.但是,当我在https://developer.godaddy.com/doc#!/_v1_domains/recordAdd/ArrayOfDNSRecord上使用GoDaddy的文档测试请求时,我在下面的主体中收到404响应错误:

错误回复正文:

{
  "code": "UNKNOWN_DOMAIN",
  "message": "The given domain is not registered, or does not have a zone file",
  "name": "_Class"
}
Run Code Online (Sandbox Code Playgroud)

我试图添加A记录的域肯定存在,所以我不知道为什么它会返回404错误.我使用GET请求检索属于域的所有A记录没有问题,但是当我尝试运行下面的PATCH请求时,我得到了错误.

GoDaddy的API有什么问题,或者我是如何构建我的请求的?

PATCH请求返回错误

$.ajax({
  type: 'PATCH',
  url: 'https://api.godaddy.com/v1/domains/{domain}/records',
  data: {
    'records': [{
      'type': 'A',
      'name': 'test',
      'data': '255.255.255.255'
    }]
  },
  headers: {
    'Authorization': 'sso-key {API_KEY}:{API_SECRET}'
  },
  success: function(body) {
    console.log(body);
  }
});
Run Code Online (Sandbox Code Playgroud)

GET请求工作正常

$.ajax({
  type: 'GET',
  url: 'https://api.godaddy.com/v1/domains/{domain}/records/A',
  headers: {
    'Authorization': 'sso-key {API_KEY}:{API_SECRET}'
  },
  success: function(body) {
    $.each(body, function(i, v) {
      $('body').append('<p>Name: ' + v.name + …
Run Code Online (Sandbox Code Playgroud)

dns ajax jquery godaddy-api

7
推荐指数
2
解决办法
5271
查看次数

Tensorflow:使用梯度下降优化输入

我有一个TensorFlow模型(卷积神经网络),我成功地使用梯度下降(GD)训练了一些输入数据.

现在,在第二步中,我想提供一个输入图像作为初始化,然后使用GD在固定网络参数上对此输入图像进行优化.损失函数将是另一个,但这是一个细节.

所以,我的主要问题是如何告诉梯度下降算法

  • 停止优化网络参数
  • 优化输入图像

第一个可能在优化器期间使用此 保持变量保持不变

你们有关于第二点的想法吗?

我想我可以使用TF渐变功能自行重新编码渐变下降算法,但我的直觉感觉告诉我应该有一种更简单的方法,这也让我可以从更复杂的GD变体(Adam等)中受益.

tensorflow

7
推荐指数
1
解决办法
1530
查看次数

带有返回impl trait的泛型函数的“借入的值寿命不足”

我从此Rust代码收到意外错误:

struct Container<'a> {
    x: &'a i32,
}

trait Reply {}
impl Reply for i32 {}

fn json<T>(_val: &T) -> impl Reply {
    3
}

fn f() -> impl Reply {
    let i = 123;
    let a = Container { x: &i };
    json(&a)
}
Run Code Online (Sandbox Code Playgroud)

操场

错误是:

error[E0597]: `i` does not live long enough
  --> src/lib.rs:14:28
   |
12 | fn f() -> impl Reply {
   |           ---------- opaque type requires that `i` is borrowed for `'static`
13 |     let i …
Run Code Online (Sandbox Code Playgroud)

generics rust borrow-checker

7
推荐指数
1
解决办法
98
查看次数

自定义链接前缀如何工作?(如蒸汽://)

我真的很好奇自定义链接前缀是如何工作的(我真的不知道它们叫什么),而且我在网上找不到任何关于它的信息。没有任何。

如果有人知道这是如何工作的,和/或可以为我指明教程的方向,那我会很惊讶。

编辑:我确实找到了关于在 iOS 中执行此操作的教程,但我需要在 PC/Windows 应用程序中使用它。

browser pc

6
推荐指数
1
解决办法
247
查看次数

访问Spark RDD时在闭包中使用局部变量

我有一个关于在访问Spark RDD时在闭包中使用局部变量的问题.我想解决的问题如下:

我有一个应该读入RDD的文本文件列表.但是,首先我需要向从单个文本文件创建的RDD添加其他信息.从文件名中提取此附加信息.然后,使用union()将RDD放入一个大的RDD中.

from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)

list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
    tmp_rdd = spark_context.textFile(filename)
    # extract_file_info('file_from_Owner.txt') == 'Owner'
    file_owner = extract_file_info(filename)   
    tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
    rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work: 
# The result is that always Bert will be the owner, i.e., never Ernie.
Run Code Online (Sandbox Code Playgroud)

问题是循环中的map()函数没有引用"正确的"file_owner.相反,它将引用file_owner的最新值.在我的本地机器上,我设法通过为每个RDD调用cache()函数来解决问题:

# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner)) …
Run Code Online (Sandbox Code Playgroud)

closures apache-spark rdd pyspark

6
推荐指数
1
解决办法
4757
查看次数