小编lon*_*tar的帖子

Windows 机器上的 HDFS MiniCluster 错误

我尝试使用以下代码模拟 hdfs,但总是收到此特定错误。

test("some test") {

  val testDataPath = new File(PathUtils.getTestDir(getClass()), "miniclusters")

  //Configuration conf;

  //MiniDFSCluster cluster;

  //testDataPath = new File(PathUtils.getTestDir(getClass()), miniclusters");

  System.clearProperty(MiniDFSCluster.PROP_TEST_BUILD_DATA)
  val confMini = new HdfsConfiguration()

  val testDataCluster1 = new File(testDataPath, "CLUSTER_1")
  println(testDataCluster1)

  val c1Path = testDataCluster1.getAbsolutePath()
  println(c1Path)
  confMini.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, c1Path)

  val cluster = new MiniDFSCluster.Builder(confMini).build()


  val fs = FileSystem.get(confMini);

  println(fs)

  assert(true)
}
Run Code Online (Sandbox Code Playgroud)

错误如下

An exception or error caused a run to abort: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z 
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
    at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
Run Code Online (Sandbox Code Playgroud)

我不确定该错误以及错误的原因是什么。

testing hadoop scala hdfs apache-spark

5
推荐指数
0
解决办法
339
查看次数

在其他字段上使用窗口功能领带断路器,以获取最新记录

我有以下数据,其中数据按商店和月份ID进行分区,并按数量排序,以获取商店的主要供应商。

如果两个供应商之间的金额相等,则我需要一个平局决胜者,然后,如果捆绑的供应商中有一个是前几个月销售最多的供应商,则将该供应商设为该月的销售最多的供应商。

如果再次打成平手,则回头会增加。如果再打结,则1个月的延迟将不起作用。最坏的情况是上个月我们还会有更多重复项。

样本数据

val data = Seq((201801,      10941,            115,  80890.44900, 135799.66400),
               (201801,      10941,            3,  80890.44900, 135799.66400) ,
               (201712,      10941,            3, 517440.74500, 975893.79000),
               (201712,      10941,            115, 517440.74500, 975893.79000),
               (201711,      10941,            3 , 371501.92100, 574223.52300),
               (201710,      10941,            115, 552435.57800, 746912.06700),
               (201709,      10941,            115,1523492.60700,1871480.06800),
               (201708,      10941,            115,1027698.93600,1236544.50900),
               (201707,      10941,            33 ,1469219.86900,1622949.53000)
               ).toDF("MTH_ID", "store_id" ,"brand" ,"brndSales","TotalSales")
Run Code Online (Sandbox Code Playgroud)

码:

val window = Window.partitionBy("store_id","MTH_ID").orderBy("brndSales")
val res = data.withColumn("rank",rank over window)
Run Code Online (Sandbox Code Playgroud)

输出:

    +------+--------+-----+-----------+-----------+----+
 |MTH_ID|store_id|brand|  brndSales| TotalSales|rank|
+------+--------+-----+-----------+-----------+----+
|201801|   10941|  115|  80890.449| 135799.664|   1|
|201801|   10941|    3|  80890.449| …
Run Code Online (Sandbox Code Playgroud)

sql apache-spark apache-spark-sql pyspark pyspark-sql

5
推荐指数
1
解决办法
215
查看次数

Spark 读取 .7z 文件

我正在尝试使用 scala 或 java 读取 Spark .7z 文件。我没有找到任何合适的方法或功能。

对于 zip 文件,我能够读取,因为 ZipInputStream 类采用输入流,但对于 7Z 文件,SevenZFile 类不采用任何输入流。 https://commons.apache.org/proper/commons-compress/javadocs/api-1.16/org/apache/commons/compress/archivers/sevenz/SevenZFile.html

压缩文件代码

spark.sparkContext.binaryFiles("fileName").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new ZipInputStream(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}
Run Code Online (Sandbox Code Playgroud)

我正在尝试类似的 7z 文件代码

spark.sparkContext.binaryFiles(""filename"").flatMap{case (name: String, content: PortableDataStream) =>
        val zis = new SevenZFile(content.open)
        Stream.continually(zis.getNextEntry)
              .takeWhile(_ != null)
              .flatMap { _ =>
                  val br = new BufferedReader(new InputStreamReader(zis))
                  Stream.continually(br.readLine()).takeWhile(_ != null)
              }}
Run Code Online (Sandbox Code Playgroud)

但 …

java scala 7zip hdfs apache-spark

5
推荐指数
1
解决办法
1530
查看次数

在Spark / Scala中写入HDFS以读取zip文件

我正在编写一个spark / scala程序,以读取ZIP文件,将其解压缩并将内容写入一组新文件。我可以将其用于写入本地文件系统,但想知道是否存在一种将输出文件写入分布式文件系统(例如HDFS)的方法。代码如下所示。

import java.util.zip.ZipInputStream
import org.apache.spark.input.PortableDataStream
import java.io._

var i =1
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file:(String, PortableDataStream)) => {   
   val zipStream = new ZipInputStream(file._2.open)            
   val entry = zipStream.getNextEntry                            
   val iter = scala.io.Source.fromInputStream(zipStream).getLines          
   val fname = f"/d/tmp/myfile$i.txt" 

   i = i + 1

   val xx = iter.mkString
   val writer = new PrintWriter(new File(fname))
   writer.write(xx)
   writer.close()

   iter                                                       
}).collect()
Run Code Online (Sandbox Code Playgroud)

`

scala hdfs apache-spark

1
推荐指数
1
解决办法
2万
查看次数