我正在设法让所有人都与当地主人和两名远程工作人员合作.现在,我想连接到具有相同远程工作者的远程主服务器.我尝试了在Internet上使用/ etc/hosts和其他推荐的不同设置组合,但没有工作.
Main类是:
public static void main(String[] args) {
ScalaInterface sInterface = new ScalaInterface(CHUNK_SIZE,
"awsAccessKeyId",
"awsSecretAccessKey");
SparkConf conf = new SparkConf().setAppName("POC_JAVA_AND_SPARK")
.setMaster("spark://spark-master:7077");
org.apache.spark.SparkContext sc = new org.apache.spark.SparkContext(
conf);
sInterface.enableS3Connection(sc);
org.apache.spark.rdd.RDD<Tuple2<Path, Text>> fileAndLine = (RDD<Tuple2<Path, Text>>) sInterface.getMappedRDD(sc, "s3n://somebucket/");
org.apache.spark.rdd.RDD<String> pInfo = (RDD<String>) sInterface.mapPartitionsWithIndex(fileAndLine);
JavaRDD<String> pInfoJ = pInfo.toJavaRDD();
List<String> result = pInfoJ.collect();
String miscInfo = sInterface.getMiscInfo(sc, pInfo);
System.out.println(miscInfo);
}
Run Code Online (Sandbox Code Playgroud)
它失败了:
List<String> result = pInfoJ.collect();
Run Code Online (Sandbox Code Playgroud)
我得到的错误是:
1354 [sparkDriver-akka.actor.default-dispatcher-3] ERROR akka.remote.transport.netty.NettyTransport - failed to bind to spark-master/192.168.0.191:0, shutting down Netty transport
1354 [main] WARN …Run Code Online (Sandbox Code Playgroud) 请注意,我必须使用sc.textFile,但我会接受任何其他答案.
我想要做的是简单地将正在处理的文件名添加到RDD ....有些事情如下:
var rdd = sc.textFile("s3n://bucket/*.csv").map(line => filename +","+ line)
非常感激!
EDIT2:EDIT1的解决方案是使用Hadoop 2.4或更高版本.但是,我没有通过使用从属设备等来测试它.但是,一些提到的解决方案仅适用于小型数据集.如果要使用大数据,则必须使用HadoopRDD
编辑:我尝试了以下,它没有工作:
:cp symjar/aws-java-sdk-1.9.29.jar
:cp symjar/aws-java-sdk-flow-build-tools-1.9.29.jar
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.{S3ObjectSummary, ObjectListing, GetObjectRequest}
import com.amazonaws.auth._
def awsAccessKeyId = "AKEY"
def awsSecretAccessKey = "SKEY"
val hadoopConf = sc.hadoopConfiguration;
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoopConf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId)
hadoopConf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey)
var rdd = sc.wholeTextFiles("s3n://bucket/dir/*.csv").map { case (filename, content) => filename }
rdd.count
Run Code Online (Sandbox Code Playgroud)
注意:它连接到S3,这不是问题(因为我已经多次测试过).
我得到的错误是:
INFO input.FileInputFormat: Total input paths to process : 4
java.io.FileNotFoundException: File does not exist: /RTLM-918/simple/t1-100.csv
at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:517)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat$OneFileInfo.<init>(CombineFileInputFormat.java:489)
at org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.getMoreSplits(CombineFileInputFormat.java:280) …Run Code Online (Sandbox Code Playgroud) 我试图想到最好的方法,但是,我无法想到一种方法,不包括将所有文件中的标题读入数组,然后从这些标题中过滤RDD.
有更简单的方法吗?
注意:我正在读取S3存储桶中的所有csv文件,并且所有这些文件都有不同的标头.
始终是驱动程序(作为运行主节点的程序)必须位于主节点上的情况吗?
例如,如果我使用一个主服务器和两个工作服务器设置ec2,那么必须从主EC2实例执行具有主服务器的代码吗?
如果答案为否,那么设置驱动程序在ec2主节点之外的系统的最佳方法是什么(比方说,Driver是从我的计算机运行的,而Master和Workers在EC2上)?我是否总是必须使用spark-submit,或者我可以从Eclipse或IntelliJ IDEA等IDE中执行此操作吗?
如果答案是肯定的,那么了解更多信息的最佳参考是什么(因为我需要提供某种证据)?
谢谢你的回答,参考将非常感谢!
apache-spark ×4
amazon-s3 ×2
rdd ×2
amazon-ec2 ×1
binding ×1
csv ×1
filenames ×1
header ×1
mapping ×1
master-slave ×1