小编cin*_*ata的帖子

展平集合

说我有一个 Map<? extends Object, List<String>>

我可以很容易地获得地图的值,并迭代它以生成单个List<String>.

   for (List<String> list : someMap.values()) {
        someList.addAll(list);
    }
Run Code Online (Sandbox Code Playgroud)

有没有办法一次性压扁它?

  List<String> someList = SomeMap.values().flatten();
Run Code Online (Sandbox Code Playgroud)

java collections

58
推荐指数
6
解决办法
7万
查看次数

PostgreSQL部分索引和UPSERT

经过谷歌搜索后,我的问题描述如下:

CREATE TABLE security (
  id          SERIAL PRIMARY KEY,
  vendor      VARCHAR(20),
  external_id VARCHAR(20),
  extinct     BOOLEAN DEFAULT FALSE
);

CREATE UNIQUE INDEX unique_vendor ON security(vendor, extinct) where vendor is not null;
CREATE UNIQUE INDEX unique_external_id ON security(external_id, extinct) where external_id is not null;
Run Code Online (Sandbox Code Playgroud)

试图插入值:

insert into security (vendor, external_id, extinct) 
  values('Legion', 'LGNONE', false)
  ON CONFLICT(vendor, external_id, extinct) DO UPDATE
  SET vendor = 'Legion', external_id = 'LGNONE', extinct = false;
Run Code Online (Sandbox Code Playgroud)

结果是:

[42P10] ERROR: there is no unique or exclusion constraint matching the …
Run Code Online (Sandbox Code Playgroud)

sql postgresql unique upsert

7
推荐指数
1
解决办法
1596
查看次数

Apache Spark Spark-submit 从 --files 参数读取文件

我有 Spark 提交脚本如下:

spark-submit \
  --name daily_job\
  --class com.test.Bootstrapper \
  --files /home/user/*.csv\
  --conf spark.executor.memory=2g\
  --conf spark.executor.cores=2\
  --master spark://172.17.0.4:7077\
  --deploy-mode client \
  --packages com.typesafe:config:1.3.1\
  file:///home/user/workspace/spark-test/target/spark-test-0.1-SNAPSHOT.jar
Run Code Online (Sandbox Code Playgroud)

集群配置 - 主节点和 2 个工作节点位于不同的容器中。

作业开始后,我可以看到 csv 文件被放入:

工人:

/usr/local/spark-2.0.2-bin-hadoop2.7/work/app-20170116160937-0036/0/test.csv
Run Code Online (Sandbox Code Playgroud)

司机:

/tmp/spark-f65b2466-e419-49bd-8da7-9f2b94cbf870/userFiles-abb14b33-58b1-47d6-935e-6c2943e3d55c/test.csv
Run Code Online (Sandbox Code Playgroud)

问题是——如何正确读取这个文件?目前我正在做如下:

private var initial: DataFrame = spark.sqlContext.read
    .option("mode", "DROPMALFORMED")
    .option("delimiter", conf.delimiter)
    .option("dateFormat", conf.dateFormat)
    .schema(conf.schema)
    .csv("file:///*.csv")
Run Code Online (Sandbox Code Playgroud)

这会导致 FileNotFoundException。

apache-spark

6
推荐指数
1
解决办法
1万
查看次数

UDF仅从Spark SQL中的路径中提取文件名

Apache Spark中有input_file_name函数,我用它来向Dataset添加新列,其中包含当前正在处理的文件名.

问题是我想以某种方式自定义此函数以仅返回文件名,在s3上省略它的完整路径.

现在,我正在使用map函数替换第二步中的路径:

val initialDs = spark.sqlContext.read
.option("dateFormat", conf.dateFormat)
.schema(conf.schema)
.csv(conf.path).withColumn("input_file_name", input_file_name)
...
...
def fromFile(fileName: String): String = {
  val baseName: String = FilenameUtils.getBaseName(fileName)
  val tmpFileName: String = baseName.substring(0, baseName.length - 8) //here is magic conversion ;)
  this.valueOf(tmpFileName)
}
Run Code Online (Sandbox Code Playgroud)

但我想用类似的东西

val initialDs = spark.sqlContext.read
    .option("dateFormat", conf.dateFormat)
    .schema(conf.schema)
    .csv(conf.path).withColumn("input_file_name", **customized_input_file_name_function**)
Run Code Online (Sandbox Code Playgroud)

java scala apache-spark apache-spark-sql spark-dataframe

5
推荐指数
2
解决办法
3337
查看次数