有时,在处理流的步骤之间对流中的每个元素进行"某事"(例如打印)将是方便的,例如用于调试.
一个简单的例子可能看起来像这样,遗憾的是这不会forEach消耗流:
List<String> list = new ArrayList<>();
list.add("one");
list.add("two");
list.add("three");
list.add("four");
List<String> filteredList =
list.stream()
.filter(s -> s.startsWith("t"))
.forEach(System.out::println)
.collect(Collectors.toList());
Run Code Online (Sandbox Code Playgroud)
怎么能实现这一目标?
我经常需要在spark 2.1中对数据帧执行自定义聚合,并使用以下两种方法:
我通常更喜欢第一个选项,因为它比UDAF实现更容易实现和更易读.但我认为第一个选项通常较慢,因为在网络周围发送了更多数据(没有部分聚合),但我的经验表明UDAF通常很慢.这是为什么?
具体示例:计算直方图:
数据在蜂巢表中(1E6随机双值)
val df = spark.table("testtable")
def roundToMultiple(d:Double,multiple:Double) = Math.round(d/multiple)*multiple
Run Code Online (Sandbox Code Playgroud)
UDF方法:
val udf_histo = udf((xs:Seq[Double]) => xs.groupBy(x => roundToMultiple(x,0.25)).mapValues(_.size))
df.groupBy().agg(collect_list($"x").as("xs")).select(udf_histo($"xs")).show(false)
+--------------------------------------------------------------------------------+
|UDF(xs) |
+--------------------------------------------------------------------------------+
|Map(0.0 -> 125122, 1.0 -> 124772, 0.75 -> 250819, 0.5 -> 248696, 0.25 -> 250591)|
+--------------------------------------------------------------------------------+
Run Code Online (Sandbox Code Playgroud)
UDAF-方法
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
import scala.collection.mutable
class HistoUDAF(binWidth:Double) extends UserDefinedAggregateFunction {
override def inputSchema: StructType =
StructType(
StructField("value", DoubleType) :: Nil
)
override def bufferSchema: StructType =
new StructType() …Run Code Online (Sandbox Code Playgroud) aggregate-functions user-defined-functions dataframe apache-spark
我正在使用Eclipse Luna和Tomcat 8,并且对配置文件的不同位置感到困惑.
我在我的本地机器上安装了Tomcat,并在Eclipse中配置了Tomcat,如下所示:

所以涉及4个目录
a)catalina home:C:\ Program Files\AppServers\Tomcat8
b)catalina base:C:\ Users\xxx\projectname\Tomcat
c)我的Eclipse工作区中的配置路径:/ servers/Tomcat v8.0 Server at localhost-config
d)部署路径:wtpwebapps,它是a)的子目录
在a,b,c中我都有相同的配置文件集(catalina.properties,context.xml,server.xml ...).这些文件的层次结构是什么,我在哪里为我在开发期间从Eclipse中启动的特定应用程序配置服务器?
编辑:与此相关:如果我在服务器选项卡中右键单击服务器并选择"清除Tomcat工作目录"或只是"清理",会发生什么?
如何将这一行转换为数据帧?
val oneRowDF = myDF.first // gives Array[Row]
Run Code Online (Sandbox Code Playgroud)
谢谢
Spark 2.2引入了基于成本的优化(CBO,https://databricks.com/blog/2017/08/31/cost-based-optimizer-in-apache-spark-2-2.html ),它利用表统计(按计算ANALYZE TABLE COMPUTE STATISTICS....)
我的问题是:预先计算的统计数据在Spark 2.2(在我的情况下是2.1)操作(外部蜂巢)表之前是否也有用?统计数据会影响优化器吗?如果是,我还可以在Impala而不是Hive中计算统计数据吗?
更新:
到目前为止我发现的唯一提示是https://issues.apache.org/jira/browse/SPARK-15365
显然,统计数据用于决定是否进行广播加入
我join在 Spark 2.1 中有一个问题。join尽管表非常大(1400 万行),但Spark(错误地?)选择了广播哈希。然后作业崩溃,因为没有足够的内存,Spark 以某种方式尝试将广播片段持久化到磁盘,然后导致超时。
所以,我知道有一个查询提示可以强制进行广播连接 ( org.apache.spark.sql.functions.broadcast),但是还有一种方法可以强制使用另一种连接算法吗?
我通过设置解决了我的问题spark.sql.autoBroadcastJoinThreshold=0,但我更喜欢另一种更精细的解决方案,即不全局禁用广播连接。
我正在尝试将org.apache.pdfbox.pdmodel.PDDocument类和icafe库(https://github.com/dragon66/icafe/)所代表的PDF转换为具有第4组压缩和300 dpi 的多重tiff .示例代码适用于我288 dpi,但奇怪的是不是300 dpi,导出的tiff仍然只是白色.有谁知道这里的问题是什么?
我在示例中使用的示例pdf位于:http://www.bergophil.ch/a.pdf
import java.awt.image.BufferedImage;
import java.io.FileOutputStream;
import java.io.IOException;
import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.pdmodel.PDPage;
import cafe.image.ImageColorType;
import cafe.image.ImageParam;
import cafe.image.options.TIFFOptions;
import cafe.image.tiff.TIFFTweaker;
import cafe.image.tiff.TiffFieldEnum.Compression;
import cafe.io.FileCacheRandomAccessOutputStream;
import cafe.io.RandomAccessOutputStream;
public class Pdf2TiffConverter {
public static void main(String[] args) {
String pdf = "a.pdf";
PDDocument pddoc = null;
try {
pddoc = PDDocument.load(pdf);
} catch (IOException e) {
}
try {
savePdfAsTiff(pddoc);
} catch (IOException e) {
}
}
private static void savePdfAsTiff(PDDocument pdf) …Run Code Online (Sandbox Code Playgroud) 我在Java(Maven)Web项目中有两种测试:使用嵌入式Tomcat 7服务器进行"正常"单元测试和集成测试,使用Selenium进行Jenkins上的自动GUI测试.所有测试都使用JUnit进行注释@Test,正常测试以"Test.java"结束,而集成测试以"IntegrationTest.java"结束.所有测试类都位于src/test/java中
我通常使用构建我的项目mvn clean verify,而我的相关部分pom.xml启动tomcat服务器并相应地拆分测试类别如下所示:
<!-- For front-end testing -->
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>2.2</version>
<configuration>
<uriEncoding>UTF-8</uriEncoding>
<additionalConfigFilesDir>${basedir}/conf</additionalConfigFilesDir>
<contextFile>${basedir}/src/test/resources/context.xml</contextFile>
</configuration>
<executions>
<execution>
<id>start-tomcat</id>
<phase>pre-integration-test</phase>
<goals>
<goal>run-war-only</goal>
</goals>
<configuration>
<fork>true</fork>
<port>9090</port>
</configuration>
</execution>
<execution>
<id>stop-tomcat</id>
<phase>post-integration-test</phase>
<goals>
<goal>shutdown</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.16</version>
<configuration>
<excludes>
<exclude>**/*IntegrationTest*</exclude>
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.16</version>
<configuration>
<includes>
<include>**/*IntegrationTest*</include>
</includes>
</configuration>
<executions>
<execution>
<goals>
<goal>integration-test</goal>
<goal>verify</goal>
</goals>
</execution>
</executions>
</plugin>
Run Code Online (Sandbox Code Playgroud)
这个过程工作正常,除非我想在eclipse中运行我的测试,我通常右键单击我的项目 - >运行为 - > JUnit测试.通过选择此选项,可以运行所有测试(包括集成测试).在这种情况下,集成测试失败,因为Tomcat没有运行(它只在Maven的 …
我想在DataFrame中复制一行,我该怎么做?
例如,我有一个由1行组成的DataFrame,我想创建一个具有100个相同行的DataFrame.我提出了以下解决方案:
var data:DataFrame=singleRowDF
for(i<-1 to 100-1) {
data = data.unionAll(singleRowDF)
}
Run Code Online (Sandbox Code Playgroud)
但这引入了许多转换,似乎我的后续行动变得非常缓慢.还有另一种方法吗?
我一直认为 Spark 不允许定义用户定义的窗口函数。我刚刚从这里测试了“几何平均值”UDAF 示例(https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html)作为窗口函数,它似乎工作得很好,例如:
val geomMean = new GeometricMean
(1 to 10).map(i=>
(i,i.toDouble)
)
.toDF("i","x")
.withColumn("geom_mean",geomMean($"x").over(Window.orderBy($"i").rowsBetween(-1,1)))
.show()
+---+----+------------------+
| i| x| geom_mean|
+---+----+------------------+
| 1| 1.0|1.4142135623730951|
| 2| 2.0|1.8171205928321397|
| 3| 3.0|2.8844991406148166|
| 4| 4.0|3.9148676411688634|
| 5| 5.0| 4.93242414866094|
| 6| 6.0| 5.943921952763129|
| 7| 7.0| 6.952053289772898|
| 8| 8.0| 7.958114415792783|
| 9| 9.0| 8.962809493114328|
| 10|10.0| 9.486832980505138|
+---+----+------------------+
Run Code Online (Sandbox Code Playgroud)
我从未见过 Spark 文档谈论使用 UDAF 作为窗口函数。这是允许的吗?即结果是否正确?顺便说一下我正在使用spark 2.1
编辑:
让我困惑的是,在标准聚合中(即后跟 a groupBy),数据总是添加到缓冲区中,即它们总是会增长,从不收缩。使用窗口函数(特别是与 结合使用rowsBetween()),数据还需要从缓冲区中删除,因为“旧”元素在沿着排序定义的行移动时会从窗口中删除。我认为窗口函数可以沿着状态的顺序移动。所以我认为必须有类似“删除”方法的东西要实现