我尝试使用以下代码模拟 hdfs,但总是收到此特定错误。
test("some test") {
val testDataPath = new File(PathUtils.getTestDir(getClass()), "miniclusters")
//Configuration conf;
//MiniDFSCluster cluster;
//testDataPath = new File(PathUtils.getTestDir(getClass()), miniclusters");
System.clearProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA)
val confMini = new HdfsConfiguration()
val testDataCluster1 = new File(testDataPath, "CLUSTER_1")
println(testDataCluster1)
val c1Path = testDataCluster1.getAbsolutePath()
println(c1Path)
confMini.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, c1Path)
val cluster = new MiniDFSCluster.Builder(confMini).build()
val fs = FileSystem.get(confMini);
println(fs)
assert(true)
}
Run Code Online (Sandbox Code Playgroud)
错误如下
An exception or error caused a run to abort: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
Run Code Online (Sandbox Code Playgroud)
我不确定该错误以及错误的原因是什么。
我有以下数据,其中数据按商店和月份ID进行分区,并按数量排序,以获取商店的主要供应商。
如果两个供应商之间的金额相等,则我需要一个平局决胜者,然后,如果捆绑的供应商中有一个是前几个月销售最多的供应商,则将该供应商设为该月的销售最多的供应商。
如果再次打成平手,则回头会增加。如果再打结,则1个月的延迟将不起作用。最坏的情况是上个月我们还会有更多重复项。
样本数据
val data = Seq((201801, 10941, 115, 80890.44900, 135799.66400),
(201801, 10941, 3, 80890.44900, 135799.66400) ,
(201712, 10941, 3, 517440.74500, 975893.79000),
(201712, 10941, 115, 517440.74500, 975893.79000),
(201711, 10941, 3 , 371501.92100, 574223.52300),
(201710, 10941, 115, 552435.57800, 746912.06700),
(201709, 10941, 115,1523492.60700,1871480.06800),
(201708, 10941, 115,1027698.93600,1236544.50900),
(201707, 10941, 33 ,1469219.86900,1622949.53000)
).toDF("MTH_ID", "store_id" ,"brand" ,"brndSales","TotalSales")
Run Code Online (Sandbox Code Playgroud)
码:
val window = Window.partitionBy("store_id","MTH_ID").orderBy("brndSales")
val res = data.withColumn("rank",rank over window)
Run Code Online (Sandbox Code Playgroud)
输出:
+------+--------+-----+-----------+-----------+----+
|MTH_ID|store_id|brand| brndSales| TotalSales|rank|
+------+--------+-----+-----------+-----------+----+
|201801| 10941| 115| 80890.449| 135799.664| 1|
|201801| 10941| 3| 80890.449| …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用 scala 或 java 读取 Spark .7z 文件。我没有找到任何合适的方法或功能。
对于 zip 文件,我能够读取,因为 ZipInputStream 类采用输入流,但对于 7Z 文件,SevenZFile 类不采用任何输入流。 https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/SevenZFile.html
压缩文件代码
spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String, content: PortableDataStream) =>
val zis = new ZipInputStream(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
Run Code Online (Sandbox Code Playgroud)
我正在尝试类似的 7z 文件代码
spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String, content: PortableDataStream) =>
val zis = new SevenZFile(content.open)
Stream.continually(zis.getNextEntry)
.takeWhile(_ != null)
.flatMap { _ =>
val br = new BufferedReader(new InputStreamReader(zis))
Stream.continually(br.readLine()).takeWhile(_ != null)
}}
Run Code Online (Sandbox Code Playgroud)
但 …
我正在编写一个spark / scala程序,以读取ZIP文件,将其解压缩并将内容写入一组新文件。我可以将其用于写入本地文件系统,但想知道是否存在一种将输出文件写入分布式文件系统(例如HDFS)的方法。代码如下所示。
import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._
var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) => {
val zipStream = new ZipInputStream(file._2.open)
val entry = zipStream.getNextEntry
val iter = scala.io.Source.fromInputStream(zipStream).getLines
val fname = f"/d/tmp/myfile$i.txt"
i = i + 1
val xx = iter.mkString
val writer = new PrintWriter(new File(fname))
writer.write(xx)
writer.close()
iter
}).collect()
Run Code Online (Sandbox Code Playgroud)
`
apache-spark ×4
hdfs ×3
scala ×3
7zip ×1
hadoop ×1
java ×1
pyspark ×1
pyspark-sql ×1
sql ×1
testing ×1