Nat*_*han 10 java apache-spark apache-spark-sql spark-dataframe databricks
我在Java应用程序中使用SparkSQL使用Databricks对CSV文件进行一些处理以进行解析.
我正在处理的数据来自不同的来源(远程URL,本地文件,谷歌云存储),我习惯将所有内容都变成一个InputStream,以便我可以解析和处理数据,而无需知道它来自何处.
我在Spark上看到的所有文档都从路径中读取文件,例如
SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);
DataFrame df = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("path/to/file.csv");
DataFrame dfGrouped = df.groupBy("varA","varB")
.avg("varC","varD");
dfGrouped.show();
Run Code Online (Sandbox Code Playgroud)
我想要做的是从InputStream中读取,或者甚至只读取已经在内存中的字符串.类似于以下内容:
InputStream stream = new URL(
"http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
).openStream();
DataFrame dfRemote = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load(stream);
String someString = "imagine,some,csv,data,here";
DataFrame dfFromString = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.read(someString);
Run Code Online (Sandbox Code Playgroud)
这里有什么简单的东西吗?
我已经阅读了一些关于Spark Streaming和自定义接收器的文档,但据我所知,这是为了打开一个连续提供数据的连接.Spark Streaming似乎将数据分解为块并对其进行一些处理,期望更多的数据进入无休止的流中.
我最好的猜测是,Spark作为Hadoop的后代,期望大量的数据可能存在于某个文件系统中.但是由于Spark无论如何都要在内存中进行处理,因此我认为SparkSQL能够解析内存中的数据.
任何帮助,将不胜感激.
您可以使用至少四种不同的方法来让您的生活更轻松:
使用输入流,写入本地文件(使用 SSD 速度更快),使用 Spark 读取。
使用适用于 S3、 Google Cloud Storage 的Hadoop 文件系统连接器,将一切都转化为文件操作。(这并不能解决从任意 URL 读取的问题,因为没有 HDFS 连接器。)
将不同的输入类型表示为不同的 URI,并创建一个实用程序函数来检查 URI 并触发适当的读取操作。
与 (3) 相同,但使用案例类而不是 URI,并根据输入类型简单地重载。
| 归档时间: |
|
| 查看次数: |
5672 次 |
| 最近记录: |