小编Rap*_*oth的帖子

Java流"forEach"但不消耗流

有时,在处理流的步骤之间对流中的每个元素进行"某事"(例如打印)将是方便的,例如用于调试.

一个简单的例子可能看起来像这样,遗憾的是这不会forEach消耗流:

List<String> list = new ArrayList<>();
list.add("one");
list.add("two");
list.add("three");
list.add("four");

List<String> filteredList = 
        list.stream()
        .filter(s -> s.startsWith("t"))
        .forEach(System.out::println)
        .collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)

怎么能实现这一目标?

java lambda java-8 java-stream

8
推荐指数
1
解决办法
2056
查看次数

Spark自定义聚合:collect_list + UDF vs UDAF

我经常需要在spark 2.1中对数据帧执行自定义聚合,并使用以下两种方法:

  • 使用groupby/collect_list获取单行中的所有值,然后应用UDF来聚合值
  • 编写自定义UDAF(用户定义的聚合函数)

我通常更喜欢第一个选项,因为它比UDAF实现更容易实现和更易读.但我认为第一个选项通常较慢,因为在网络周围发送了更多数据(没有部分聚合),但我的经验表明UDAF通常很慢.这是为什么?

具体示例:计算直方图:

数据在蜂巢表中(1E6随机双值)

val df = spark.table("testtable")

def roundToMultiple(d:Double,multiple:Double) = Math.round(d/multiple)*multiple
Run Code Online (Sandbox Code Playgroud)

UDF方法:

val udf_histo = udf((xs:Seq[Double]) => xs.groupBy(x => roundToMultiple(x,0.25)).mapValues(_.size))

df.groupBy().agg(collect_list($"x").as("xs")).select(udf_histo($"xs")).show(false)

+--------------------------------------------------------------------------------+
|UDF(xs)                                                                         |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)

UDAF-方法

import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

import scala.collection.mutable

class HistoUDAF(binWidth:Double) extends UserDefinedAggregateFunction {

  override def inputSchema: StructType =
    StructType(
      StructField("value", DoubleType) :: Nil
    )

  override def bufferSchema: StructType =
    new StructType() …
Run Code Online (Sandbox Code Playgroud)

aggregate-functions user-defined-functions dataframe apache-spark

8
推荐指数
1
解决办法
1120
查看次数

Eclipse中的Tomcat配置

我正在使用Eclipse Luna和Tomcat 8,并且对配置文件的不同位置感到困惑.

我在我的本地机器上安装了Tomcat,并在Eclipse中配置了Tomcat,如下所示:

在此输入图像描述

所以涉及4个目录

  • a)catalina home:C:\ Program Files\AppServers\Tomcat8

  • b)catalina base:C:\ Users\xxx\projectname\Tomcat

  • c)我的Eclipse工作区中的配置路径:/ servers/Tomcat v8.0 Server at localhost-config

  • d)部署路径:wtpwebapps,它是a)的子目录

在a,b,c中我都有相同的配置文件集(catalina.properties,context.xml,server.xml ...).这些文件的层次结构是什么,我在哪里为我在开发期间从Eclipse中启动的特定应用程序配置服务器?

编辑:与此相关:如果我在服务器选项卡中右键单击服务器并选择"清除Tomcat工作目录"或只是"清理",会发生什么?

eclipse tomcat config

7
推荐指数
1
解决办法
2万
查看次数

如何将数组[行]转换为DataFrame

如何将这一行转换为数据帧?

val oneRowDF = myDF.first // gives Array[Row]
Run Code Online (Sandbox Code Playgroud)

谢谢

scala dataframe apache-spark

7
推荐指数
4
解决办法
2万
查看次数

在Spark 2.2之前是否有任何使用的表统计信息?

Spark 2.2引入了基于成本的优化(CBO,https://databricks.com/blog/2017/08/31/cost-based-optimizer-in-apache-spark-2-2.html ),它利用表统计(按计算ANALYZE TABLE COMPUTE STATISTICS....)

我的问题是:预先计算的统计数据在Spark 2.2(在我的情况下是2.1)操作(外部蜂巢)表之前是否也有用?统计数据会影响优化器吗?如果是,我还可以在Impala而不是Hive中计算统计数据吗?

更新:

到目前为止我发现的唯一提示是https://issues.apache.org/jira/browse/SPARK-15365

显然,统计数据用于决定是否进行广播加入

hive apache-spark

7
推荐指数
1
解决办法
468
查看次数

如何提示排序合并连接或混洗哈希连接(并跳过广播哈希连接)?

join在 Spark 2.1 中有一个问题。join尽管表非常大(1400 万行),但Spark(错误地?)选择了广播哈希。然后作业崩溃,因为没有足够的内存,Spark 以某种方式尝试将广播片段持久化到磁盘,然后导致超时。

所以,我知道有一个查询提示可以强制进行广播连接 ( org.apache.spark.sql.functions.broadcast),但是还有一种方法可以强制使用另一种连接算法吗?

我通过设置解决了我的问题spark.sql.autoBroadcastJoinThreshold=0,但我更喜欢另一种更精细的解决方案,即不全局禁用广播连接。

scala apache-spark apache-spark-sql

7
推荐指数
1
解决办法
5861
查看次数

将PDF转换为多页tiff(第4组)

我正在尝试将org.apache.pdfbox.pdmodel.PDDocument类和icafe库(https://github.com/dragon66/icafe/)所代表的PDF转换为具有第4组压缩和300 dpi 的多重tiff .示例代码适用于我288 dpi,但奇怪的是不是300 dpi,导出的tiff仍然只是白色.有谁知道这里的问题是什么?

我在示例中使用的示例pdf位于:http://www.bergophil.ch/a.pdf

import java.awt.image.BufferedImage;
import java.io.FileOutputStream;
import java.io.IOException;

import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.pdmodel.PDPage;

import cafe.image.ImageColorType;
import cafe.image.ImageParam;
import cafe.image.options.TIFFOptions;
import cafe.image.tiff.TIFFTweaker;
import cafe.image.tiff.TiffFieldEnum.Compression;
import cafe.io.FileCacheRandomAccessOutputStream;
import cafe.io.RandomAccessOutputStream;

public class Pdf2TiffConverter {
    public static void main(String[] args) {
        String pdf = "a.pdf";
        PDDocument pddoc = null;
        try {
            pddoc = PDDocument.load(pdf);
        } catch (IOException e) {
        }

        try {
            savePdfAsTiff(pddoc);
        } catch (IOException e) {
        }
    }

    private static void savePdfAsTiff(PDDocument pdf) …
Run Code Online (Sandbox Code Playgroud)

java pdf tiff pdfbox icafe

6
推荐指数
1
解决办法
1万
查看次数

在Eclipse IDE中使用JUnit 4排除集成测试

我在Java(Maven)Web项目中有两种测试:使用嵌入式Tomcat 7服务器进行"正常"单元测试和集成测试,使用Selenium进行Jenkins上的自动GUI测试.所有测试都使用JUnit进行注释@Test,正常测试以"Test.java"结束,而集成测试以"IntegrationTest.java"结束.所有测试类都位于src/test/java中

我通常使用构建我的项目mvn clean verify,而我的相关部分pom.xml启动tomcat服务器并相应地拆分测试类别如下所示:

    <!-- For front-end testing -->
        <plugin>
            <groupId>org.apache.tomcat.maven</groupId>
            <artifactId>tomcat7-maven-plugin</artifactId>
            <version>2.2</version>
            <configuration>
                <uriEncoding>UTF-8</uriEncoding>
                <additionalConfigFilesDir>${basedir}/conf</additionalConfigFilesDir>
                <contextFile>${basedir}/src/test/resources/context.xml</contextFile>
            </configuration>
            <executions>
                <execution>
                    <id>start-tomcat</id>
                    <phase>pre-integration-test</phase>
                    <goals>
                        <goal>run-war-only</goal>
                    </goals>
                    <configuration>
                        <fork>true</fork>
                        <port>9090</port>
                    </configuration>

                </execution>
                <execution>
                    <id>stop-tomcat</id>
                    <phase>post-integration-test</phase>
                    <goals>
                        <goal>shutdown</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-surefire-plugin</artifactId>
            <version>2.16</version>
            <configuration>
                <excludes>
                    <exclude>**/*IntegrationTest*</exclude>
                </excludes>
            </configuration>
        </plugin>

        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-failsafe-plugin</artifactId>
            <version>2.16</version>
            <configuration>
                <includes>
                    <include>**/*IntegrationTest*</include>
                </includes>
            </configuration>
            <executions>
                <execution>
                    <goals>
                        <goal>integration-test</goal>
                        <goal>verify</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
Run Code Online (Sandbox Code Playgroud)

这个过程工作正常,除非我想在eclipse中运行我的测试,我通常右键单击我的项目 - >运行为 - > JUnit测试.通过选择此选项,可以运行所有测试(包括集成测试).在这种情况下,集成测试失败,因为Tomcat没有运行(它只在Maven的 …

eclipse junit maven

6
推荐指数
1
解决办法
1173
查看次数

复制Spark Row N次

我想在DataFrame中复制一行,我该怎么做?

例如,我有一个由1行组成的DataFrame,我想创建一个具有100个相同行的DataFrame.我提出了以下解决方案:

  var data:DataFrame=singleRowDF

   for(i<-1 to 100-1) {
       data = data.unionAll(singleRowDF)
   }
Run Code Online (Sandbox Code Playgroud)

但这引入了许多转换,似乎我的后续行动变得非常缓慢.还有另一种方法吗?

scala apache-spark

6
推荐指数
1
解决办法
5195
查看次数

每个 Spark UDAF 都可以与 Window 一起使用吗?

我一直认为 Spark 不允许定义用户定义的窗口函数。我刚刚从这里测试了“几何平均值”UDAF 示例(https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html)作为窗口函数,它似乎工作得很好,例如:

val geomMean = new GeometricMean

(1 to 10).map(i=>
  (i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()

+---+----+------------------+
|  i|   x|         geom_mean|
+---+----+------------------+
|  1| 1.0|1.4142135623730951|
|  2| 2.0|1.8171205928321397|
|  3| 3.0|2.8844991406148166|
|  4| 4.0|3.9148676411688634|
|  5| 5.0|  4.93242414866094|
|  6| 6.0| 5.943921952763129|
|  7| 7.0| 6.952053289772898|
|  8| 8.0| 7.958114415792783|
|  9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+
Run Code Online (Sandbox Code Playgroud)

我从未见过 Spark 文档谈论使用 UDAF 作为窗口函数。这是允许的吗?即结果是否正确?顺便说一下我正在使用spark 2.1

编辑:

让我困惑的是,在标准聚合中(即后跟 a groupBy),数据总是添加到缓冲区中,即它们总是会增长,从不收缩。使用窗口函数(特别是与 结合使用rowsBetween()),数据还需要从缓冲区中删除,因为“旧”元素在沿着排序定义的行移动时会从窗口中删除。我认为窗口函数可以沿着状态的顺序移动。所以我认为必须有类似“删除”方法的东西要实现

scala user-defined-aggregate dataframe apache-spark

6
推荐指数
1
解决办法
1852
查看次数