小编BdE*_*eer的帖子

How to handle small file problem in spark structured streaming?

I have a scenario in my project , where I am reading the kafka topic messages using spark-sql-2.4.1 version. I am able to process the day using structured streaming. Once the data is received and after processed I need to save the data into respective parquet files in hdfs store.

I am able to store and read parquet files, I kept a trigger time of 15 seconds to 1 minutes. These files are very small in size hence resulting into …

apache-spark parquet spark-streaming apache-spark-sql

12
推荐指数
2
解决办法
3695
查看次数

任务':compileJava'的执行失败.>无效源释放:1.7

我在用 :

  1. gradle这个-2.3
  2. javac -version = 1.7
  3. jre = 1.7
  4. 注册表明它指向1.7.

但我仍然低于错误

任务':compileJava'的执行失败.>无效源释放:1.7

请让我知道如何解决它.

java gradle-eclipse build.gradle

7
推荐指数
4
解决办法
4万
查看次数

如何在 spark 2.4.1 中将 jdbc/partitionColumn 类型设置为 Date

我正在尝试使用 spark-sql-2.4.1 版本从 oracle 检索数据。我尝试将 JdbcOptions 设置为如下:

    .option("lowerBound", "31-MAR-02");
    .option("upperBound", "01-MAY-19");
    .option("partitionColumn", "data_date");
    .option("numPartitions", 240);
Run Code Online (Sandbox Code Playgroud)

但给出错误:

    java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff]
        at java.sql.Timestamp.valueOf(Timestamp.java:204)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.toInternalBoundValue(JDBCRelation.scala:179)
Run Code Online (Sandbox Code Playgroud)

然后尝试如下

    .option("lowerBound", "2002-03-31"); //changed the date format
    .option("upperBound", "2019-05-02");
    .option("partitionColumn", "data_date"); 
    .option("numPartitions", 240);
Run Code Online (Sandbox Code Playgroud)

仍然没有运气。那么将日期作为“下限/上限”传递的正确方法是什么?有没有办法指定/设置选项参数数据类型?

Part-2 正确检查选项。它们在执行查询之前被覆盖。所以更正了。...现在该错误已解决。

但对于以下选项:

.option("lowerBound", "2002-03-31 00:00:00"); 
.option("upperBound", "2019-05-01 23:59:59");
.option("timestampFormat", "yyyy-mm-dd hh:mm:ss");
Run Code Online (Sandbox Code Playgroud)

请求参数 :

query ->  ( SELECT * FROM MODEL_VALS ) T
Run Code Online (Sandbox Code Playgroud)

它引发了另一个错误:

java.sql.SQLException: ORA-12801: error signaled in parallel query server P022, instance nj0005

ORA-01861: literal does not …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql databricks

7
推荐指数
1
解决办法
6088
查看次数

如何修复 kafka.common.errors.TimeoutException: Expiring 1 record(s) xxx ms 自批处理创建以来已经过去了加上逗留时间

我使用 kafka_2.11-2.1.1 和 Producer 使用 spring 2.1.0.RELEASE。

我在向 Kafka 主题发送消息时正在使用 spring,我的生产者生成了很多 TimeoutExceptions

org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for COMPANY_INBOUND--19: 229 ms has passed since batch creation plus linger time
Run Code Online (Sandbox Code Playgroud)

我正在使用以下 kafka 生产者设置

acks: 1
retries: 1
batchSize: 100
lingerMs: 5
bufferMemory: 33554432
requestTimeoutMs: 60
Run Code Online (Sandbox Code Playgroud)

我尝试了很多组合(特别是batchSize& lingerMs)但没有任何效果。任何帮助请问上述场景的设置应该是什么。

使用以下配置再次尝试......但没有运气同样的错误

acks = 1
    batch.size = 15
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-producer-api spring-kafka

7
推荐指数
1
解决办法
1万
查看次数

如何解决此错误org.apache.spark.sql.catalyst.errors.package $ TreeNodeException

我有两个过程,每个过程都要做1)连接oracle db读取特定表2)形成数据框并对其进行处理。3)将df保存到cassandra。

如果我同时运行两个进程,则两者都尝试从oracle读取,而第二个进程读取数据时却出现错误提示

 ERROR ValsProcessor2: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange SinglePartition
+- *(1) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#290L])
   +- *(1) Scan JDBCRelation((SELECT * FROM BM_VALS WHERE ROWNUM <= 10) T) [numPartitions=2] [] PushedFilters: [], ReadSchema: struct<>
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.InputAdapter.inputRDDs(WholeStageCodegenExec.scala:371)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.inputRDDs(HashAggregateExec.scala:150)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:605)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:294)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2770)
at org.apache.spark.sql.Dataset$$anonfun$count$1.apply(Dataset.scala:2769)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3254)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at …
Run Code Online (Sandbox Code Playgroud)

datastax-enterprise apache-spark cassandra-3.0 databricks

6
推荐指数
1
解决办法
1万
查看次数

如何使用ojdbc14.jar在spark-sql-2.4.1v中将日期/时间戳作为lowerBound/upperBound传递?

我正在使用spark-sql-2.4.1v和ojdbc6.jar从oracle读取数据。

有oracle表如下

create table schema1.modal_vals(
  FAMILY_ID          NOT NULL NUMBER,
  INSERTION_DATE     NOT NULL DATE,
  ITEM_VALUE         VARCHAR2(4000),
  YEAR               NUMBER,
  QUARTER            NUMBER,
  LAST_UPDATE_DATE   DATE
) 
Run Code Online (Sandbox Code Playgroud)

加载样本数据:

insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-02","bbb-",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-13","b+",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-17","bbb-",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-13","bb",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-02","ccc-",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-13","aa-",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-OCT-13","a-",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-03","bbb-",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-13","b",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-FEB-03","aa+",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-13","aa+",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JAN-19","aaa+",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"30-JUN-18","ccc-",2013,2,null);
insert into modal_vals(FAMILY_ID,INSERTION_DATE,ITEM_VALUE,YEAR,QUARTER,LAST_UPDATE_DATE) values(2,"01-MAY-19","bb-",2013,2,null);  
Run Code Online (Sandbox Code Playgroud)

尝试将数据加载到spark-sql中,如下所示: …

oracle10g oracle11g oracle11gr2 apache-spark apache-spark-sql

5
推荐指数
1
解决办法
5580
查看次数

如何解决“BlockManagerMasterEndpoint - 没有更多副本可用于 rdd”问题?

我正在使用 spark 2.4.1 版本和 java8 将数据复制到 cassandra-3.0。

我的火花作业脚本是

$SPARK_HOME/bin/spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --name MyDriver  \
    --jars "/local/jars/*.jar" \
    --files hdfs://files/application-cloud-dev.properties,hdfs://files/column_family_condition.yml \
    --class com.sp.MyDriver \
    --executor-cores 3 \
    --executor-memory 9g \
    --num-executors 5 \
    --driver-cores 2 \
    --driver-memory 4g \
    --driver-java-options -Dconfig.file=./application-cloud-dev.properties \
    --conf spark.executor.extraJavaOptions=-Dconfig.file=./application-cloud-dev.properties \
    --conf spark.driver.extraClassPath=. \
    --driver-class-path . \
     ca-datamigration-0.0.1.jar application-cloud-dev.properties
Run Code Online (Sandbox Code Playgroud)

以为工作成功了,我的日志文件中充满了以下 WARN。

WARN  org.apache.spark.storage.BlockManagerMasterEndpoint - No more replicas available for rdd_558_5026 !
2019-09-20 00:02:37,882 [dispatcher-event-loop-1] WARN    org.apache.spark.storage.BlockManagerMasterEndpoint - No more replicas available for …
Run Code Online (Sandbox Code Playgroud)

datastax-java-driver apache-spark apache-spark-sql cassandra-3.0

5
推荐指数
0
解决办法
2864
查看次数

在 cassandra 中何时使用十进制与浮点/双精度?

我正在使用 apache cassandra 3.x 版本。我对什么时候应该使用小数和浮点类型感到有些困惑?

是否有任何特定的用例/差异何时应该使用浮点数或避免小数点,反之亦然?

我已经完成了一些快速教程,但没有涵盖这种差异。谁能帮我理解这一点?

cassandra datastax

5
推荐指数
1
解决办法
1484
查看次数

如何调用从 Spark 作业调用的 Web 服务?

我想调用一个 Web 服务来获取 Spark Structured Streaming 中的一些数据。是否可以?如何?

apache-spark apache-spark-sql spark-structured-streaming

5
推荐指数
1
解决办法
3107
查看次数

如何在 Java 中使用 Spark typedLit 函数

我在 Spark v2.4.1 中使用 Java 8。

我正在尝试Map使用名为 typedLit 的 Spark 函数添加。但是得到编译错误。我如何在 Java API 中做到这一点?

下面是场景:

    Map<Integer,Integer> lookup_map= new HashMap<>();
    lookup_map.put(1,11);
    lookup_map.put(2,21);
    lookup_map.put(3,31);
    lookup_map.put(4,41);
    lookup_map.put(5,51);
    
    JavaSparkContext javaSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
   
   Column typedMapCol = functions.typedLit(lookUpScoreHm, Map<Encoders.INT(), Encoders.INT()>  );  
    // this is not correct and giving error in at typedLit.
    
    
        Dataset<Row> resultDs= dataDs
           .withColumn("map_col", typedMapCol)
Run Code Online (Sandbox Code Playgroud)

如何在 Java 8 中定义 functions.typedLit?

java-8 apache-spark apache-spark-sql

5
推荐指数
0
解决办法
312
查看次数