说我有一个 Map<? extends Object, List<String>>
我可以很容易地获得地图的值,并迭代它以生成单个List<String>.
for (List<String> list : someMap.values()) {
someList.addAll(list);
}
Run Code Online (Sandbox Code Playgroud)
有没有办法一次性压扁它?
List<String> someList = SomeMap.values().flatten();
Run Code Online (Sandbox Code Playgroud) 经过谷歌搜索后,我的问题描述如下:
CREATE TABLE security (
id SERIAL PRIMARY KEY,
vendor VARCHAR(20),
external_id VARCHAR(20),
extinct BOOLEAN DEFAULT FALSE
);
CREATE UNIQUE INDEX unique_vendor ON security(vendor, extinct) where vendor is not null;
CREATE UNIQUE INDEX unique_external_id ON security(external_id, extinct) where external_id is not null;
Run Code Online (Sandbox Code Playgroud)
试图插入值:
insert into security (vendor, external_id, extinct)
values('Legion', 'LGNONE', false)
ON CONFLICT(vendor, external_id, extinct) DO UPDATE
SET vendor = 'Legion', external_id = 'LGNONE', extinct = false;
Run Code Online (Sandbox Code Playgroud)
结果是:
[42P10] ERROR: there is no unique or exclusion constraint matching the …Run Code Online (Sandbox Code Playgroud) 我有 Spark 提交脚本如下:
spark-submit \
--name daily_job\
--class com.test.Bootstrapper \
--files /home/user/*.csv\
--conf spark.executor.memory=2g\
--conf spark.executor.cores=2\
--master spark://172.17.0.4:7077\
--deploy-mode client \
--packages com.typesafe:config:1.3.1\
file:///home/user/workspace/spark-test/target/spark-test-0.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)
集群配置 - 主节点和 2 个工作节点位于不同的容器中。
作业开始后,我可以看到 csv 文件被放入:
工人:
/usr/local/spark-2.0.2-bin-hadoop2.7/work/app-20170116160937-0036/0/test.csv
Run Code Online (Sandbox Code Playgroud)
司机:
/tmp/spark-f65b2466-e419-49bd-8da7-9f2b94cbf870/userFiles-abb14b33-58b1-47d6-935e-6c2943e3d55c/test.csv
Run Code Online (Sandbox Code Playgroud)
问题是——如何正确读取这个文件?目前我正在做如下:
private var initial: DataFrame = spark.sqlContext.read
.option("mode", "DROPMALFORMED")
.option("delimiter", conf.delimiter)
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv("file:///*.csv")
Run Code Online (Sandbox Code Playgroud)
这会导致 FileNotFoundException。
Apache Spark中有input_file_name函数,我用它来向Dataset添加新列,其中包含当前正在处理的文件名.
问题是我想以某种方式自定义此函数以仅返回文件名,在s3上省略它的完整路径.
现在,我正在使用map函数替换第二步中的路径:
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
val baseName: String = FilenameUtils.getBaseName(fileName)
val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
this.valueOf(tmpFileName)
}
Run Code Online (Sandbox Code Playgroud)
但我想用类似的东西
val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)
Run Code Online (Sandbox Code Playgroud)