这是我正在为reduceByKey尝试的代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.SparkContext
import scala.math.random
import org.apache.spark._
import org.apache.spark.storage.StorageLevel
object MapReduce {
def main(args: Array[String]) {
val sc = new SparkContext("local[4]" , "")
val file = sc.textFile("c:/data-files/myfile.txt")
val counts = file.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
}
}
Run Code Online (Sandbox Code Playgroud)
给编译器错误:"无法解析符号reduceByKey"
当我将鼠标悬停在reduceByKey的实现上时,它提供了三种可能的实现,因此看起来它正在被发现?:

我想以编程方式收集Spark应用程序驱动程序中的所有执行程序日志.(当出现故障时我想收集并存储所有相关日志.)有一个很好的方法吗?
一个想法是创建一个空的RDD,每个执行器有一个分区.然后,我以某种方式确保每个分区实际上在不同的执行器上处理(不知道如何)并mapPartitions执行从磁盘加载执行程序日志,然后将collect其提取到应用程序.
出于结帐目的,我尝试将Amazon S3存储桶设置为检查点文件.
val checkpointDir = "s3a://bucket-name/checkpoint.txt"
val sc = new SparkContext(conf)
sc.setLocalProperty("spark.default.parallelism", "30")
sc.hadoopConfiguration.set("fs.s3a.access.key", "xxxxx")
sc.hadoopConfiguration.set("fs.s3a.secret.key", "xxxxx")
sc.hadoopConfiguration.set("fs.s3a.endpoint", "bucket-name.s3-website.eu-central-1.amazonaws.com")
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint(checkpointDir)
Run Code Online (Sandbox Code Playgroud)
但它会因此异常而停止
Exception in thread "main" com.amazonaws.services.s3.model.AmazonS3Exception: Status Code: 400, AWS Service: Amazon S3, AWS Request ID: 9D8E8002H3BBDDC7, AWS Error Code: null, AWS Error Message: Bad Request, S3 Extended Request ID: Qme5E3KAr/KX0djiq9poGXPJkmr0vuXAduZujwGlvaAl+oc6vlUpq7LIh70IF3LNgoewjP+HnXA=
at com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:798)
at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:421)
at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:232)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3528)
at com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:154)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at …Run Code Online (Sandbox Code Playgroud) amazon-s3 amazon-web-services hdfs apache-spark spark-streaming
"old" SparkContext.hadoopFile接受一个minPartitions参数,这是分区数量的提示:
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions
): RDD[(K, V)]
Run Code Online (Sandbox Code Playgroud)
但是没有这样的论点SparkContext.newAPIHadoopFile:
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
kClass: Class[K],
vClass: Class[V],
conf: Configuration = hadoopConfiguration): RDD[(K, V)]
Run Code Online (Sandbox Code Playgroud)
实际上mapred.InputFormat.getSplits需要一个提示参数,但mapreduce.InputFormat.getSplits需要一个JobContext.通过新API影响拆分数量的方法是什么?
我已经尝试设置mapreduce.input.fileinputformat.split.maxsize与fs.s3n.block.size上Configuration对象,但他们没有影响.我正在尝试从中加载4.5 GB文件s3n,并将其加载到单个任务中.
https://issues.apache.org/jira/browse/HADOOP-5861是相关的,但它表明我应该已经看到多个拆分,因为默认块大小是64 MB.
我有两个DataFrames:a和b.这是他们的样子:
a
-------
v1 string
v2 string
roughly hundreds of millions rows
b
-------
v2 string
roughly tens of millions rows
Run Code Online (Sandbox Code Playgroud)
我想保留DataFrame a中v2不存在的行b("v2").
我知道我可以使用左连接和过滤器,其中右侧为null或SparkSQL具有"不在"构造.我打赌有更好的方法.
我正在尝试使用GoDaddy的API将A记录添加到域中,但我在浏览器的控制台中收到了422(不可处理的实体)响应错误.但是,当我在https://developer.godaddy.com/doc#!/_v1_domains/recordAdd/ArrayOfDNSRecord上使用GoDaddy的文档测试请求时,我在下面的主体中收到404响应错误:
错误回复正文:
{
"code": "UNKNOWN_DOMAIN",
"message": "The given domain is not registered, or does not have a zone file",
"name": "_Class"
}
Run Code Online (Sandbox Code Playgroud)
我试图添加A记录的域肯定存在,所以我不知道为什么它会返回404错误.我使用GET请求检索属于域的所有A记录没有问题,但是当我尝试运行下面的PATCH请求时,我得到了错误.
GoDaddy的API有什么问题,或者我是如何构建我的请求的?
PATCH请求返回错误
$.ajax({
type: 'PATCH',
url: 'https://api.godaddy.com/v1/domains/{domain}/records',
data: {
'records': [{
'type': 'A',
'name': 'test',
'data': '255.255.255.255'
}]
},
headers: {
'Authorization': 'sso-key {API_KEY}:{API_SECRET}'
},
success: function(body) {
console.log(body);
}
});
Run Code Online (Sandbox Code Playgroud)
GET请求工作正常
$.ajax({
type: 'GET',
url: 'https://api.godaddy.com/v1/domains/{domain}/records/A',
headers: {
'Authorization': 'sso-key {API_KEY}:{API_SECRET}'
},
success: function(body) {
$.each(body, function(i, v) {
$('body').append('<p>Name: ' + v.name + …Run Code Online (Sandbox Code Playgroud) 我有一个TensorFlow模型(卷积神经网络),我成功地使用梯度下降(GD)训练了一些输入数据.
现在,在第二步中,我想提供一个输入图像作为初始化,然后使用GD在固定网络参数上对此输入图像进行优化.损失函数将是另一个,但这是一个细节.
所以,我的主要问题是如何告诉梯度下降算法
你们有关于第二点的想法吗?
我想我可以使用TF渐变功能自行重新编码渐变下降算法,但我的直觉感觉告诉我应该有一种更简单的方法,这也让我可以从更复杂的GD变体(Adam等)中受益.
我从此Rust代码收到意外错误:
struct Container<'a> {
x: &'a i32,
}
trait Reply {}
impl Reply for i32 {}
fn json<T>(_val: &T) -> impl Reply {
3
}
fn f() -> impl Reply {
let i = 123;
let a = Container { x: &i };
json(&a)
}
Run Code Online (Sandbox Code Playgroud)
错误是:
error[E0597]: `i` does not live long enough
--> src/lib.rs:14:28
|
12 | fn f() -> impl Reply {
| ---------- opaque type requires that `i` is borrowed for `'static`
13 | let i …Run Code Online (Sandbox Code Playgroud) 我真的很好奇自定义链接前缀是如何工作的(我真的不知道它们叫什么),而且我在网上找不到任何关于它的信息。没有任何。
如果有人知道这是如何工作的,和/或可以为我指明教程的方向,那我会很惊讶。
编辑:我确实找到了关于在 iOS 中执行此操作的教程,但我需要在 PC/Windows 应用程序中使用它。
我有一个关于在访问Spark RDD时在闭包中使用局部变量的问题.我想解决的问题如下:
我有一个应该读入RDD的文本文件列表.但是,首先我需要向从单个文本文件创建的RDD添加其他信息.从文件名中提取此附加信息.然后,使用union()将RDD放入一个大的RDD中.
from pyspark import SparkConf, SparkContext
spark_conf = SparkConf().setAppName("SparkTest")
spark_context = SparkContext(conf=spark_conf)
list_of_filenames = ['file_from_Ernie.txt', 'file_from_Bert.txt']
rdd_list = []
for filename in list_of_filenames:
tmp_rdd = spark_context.textFile(filename)
# extract_file_info('file_from_Owner.txt') == 'Owner'
file_owner = extract_file_info(filename)
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner))
rdd_list.append(tmp_rdd)
overall_content_rdd = spark_context.union(rdd_list)
# ...do something...
overall_content_rdd.collect()
# However, this does not work:
# The result is that always Bert will be the owner, i.e., never Ernie.
Run Code Online (Sandbox Code Playgroud)
问题是循环中的map()函数没有引用"正确的"file_owner.相反,它将引用file_owner的最新值.在我的本地机器上,我设法通过为每个RDD调用cache()函数来解决问题:
# ..
tmp_rdd = tmp_rdd.map(lambda x : (x, file_owner)) …Run Code Online (Sandbox Code Playgroud)