小编Lio*_*ber的帖子

将Spark数据帧写为带分区的CSV

我正在尝试将一个数据帧写入到HDFS位置的spark中,我希望如果我添加partitionBy符号Spark将创建分区(类似于以Parquet格式编写)文件夹的形式

partition_column_name=partition_value
Run Code Online (Sandbox Code Playgroud)

(即partition_date=2016-05-03).为此,我运行了以下命令:

(df.write
    .partitionBy('partition_date')
    .mode('overwrite')
    .format("com.databricks.spark.csv")
    .save('/tmp/af_organic'))
Run Code Online (Sandbox Code Playgroud)

但是没有创建分区文件夹,知道我为了火花DF自动创建那些文件夹我应该做些什么?

谢谢,

csv partitioning apache-spark apache-spark-sql

10
推荐指数
1
解决办法
1万
查看次数

同时从mysql中读取数据

我试图从mysql读取数据并将其写回s3中具有特定分区的镶木地板文件,如下所示:

df=sqlContext.read.format('jdbc')\
   .options(driver='com.mysql.jdbc.Driver',url="""jdbc:mysql://<host>:3306/<>db?user=<usr>&password=<pass>""",
         dbtable='tbl',
         numPartitions=4 )\
   .load()


df2=df.withColumn('updated_date',to_date(df.updated_at))
df2.write.parquet(path='s3n://parquet_location',mode='append',partitionBy=['updated_date'])
Run Code Online (Sandbox Code Playgroud)

我的问题是它只打开一个与mysql的连接(而不是4),并且它不会写入parquert,直到它从mysql中获取所有数据,因为我在mysql中的表很大(100M行),这个过程在OutOfMemory上失败了.

有没有办法配置Spark来打开多个与mysql的连接并将部分数据写入镶木地板?

mysql apache-spark apache-spark-sql pyspark

9
推荐指数
2
解决办法
1万
查看次数

Presto和hive分区发现

我正在使用presto主要与蜂巢连接器连接到蜂巢Metastore.

我的所有表都是指向存储在S3中的数据的外部表.

我的主要问题是没有办法(至少我知道)在Presto中进行分区发现,所以在我开始在presto中查询表之前我需要切换到hive并运行 msck repair table mytable

在Presto有更合理的方式吗?

hadoop hive amazon-s3 presto

6
推荐指数
2
解决办法
2231
查看次数

Spark 无法与 pureconfig 一起使用

我正在尝试使用 pureConfig 和 configFactory 进行我的 Spark 应用程序配置。这是我的代码:

import pureconfig.{loadConfigOrThrow}
object Source{
  def apply(keyName: String, configArguments: Config): Source = {
    keyName.toLowerCase match {
      case "mysql" =>
          val properties = loadConfigOrThrow[DBConnectionProperties](configArguments)
          new MysqlSource(None, properties)
      case "files" =>
        val properties = loadConfigOrThrow[FilesSourceProperties](configArguments)
        new Files(properties)
      case _ => throw new NoSuchElementException(s"Unknown Source ${keyName.toLowerCase}")
    }

  }
}

import Source
val config = ConfigFactory.parseString(result.mkString("\n"))
    val source = Source("mysql",config.getConfig("source.mysql"))
Run Code Online (Sandbox Code Playgroud)

当我从 IDE (intelliJ) 或直接从 java (即 java jar...) 运行它时,它工作正常。

但是当我使用 Spark-submit 运行它时,它失败并出现以下错误:

Exception in thread "main" java.lang.NoSuchMethodError: shapeless.Witness$.mkWitness(Ljava/lang/Object;)Lshapeless/Witness; …
Run Code Online (Sandbox Code Playgroud)

scala shapeless apache-spark pureconfig

6
推荐指数
1
解决办法
3264
查看次数

Presto unnest json

跟随这个问题:如何在 presto 中交叉连接取消嵌套 json 数组

我尝试运行提供的示例,但在执行此操作时出现错误

SQL命令:

select x.n
from
unnest(cast(json_extract('{"payload":[{"type":"b","value":"9"}, 
{"type":"a","value":"8"}]}','$.payload') as array<varchar>)) as  x(n)
Run Code Online (Sandbox Code Playgroud)

我得到的错误:

Value cannot be cast to array<varchar> java.lang.RuntimeException: java.lang.NullPointerException: string is null

json hadoop hive presto

5
推荐指数
1
解决办法
2万
查看次数

kafka-streams 关于 kafka 连接失败的警报

当 kafka-streams 应用程序正在运行并且 Kafka 突然关闭时,应用程序进入“等待”模式,向其发送警告日志的消费者和生产者线程无法连接,当 Kafka 回来时,一切都应该(理论上)进行恢复正常。我正在尝试获取有关这种情况的警报,但无法找到捕获该情况并发送日志/指标的位置。我尝试了以下方法:

  1. streams.setUncaughtExceptionHandler但这仅发生在异常情况下,但这里不是这种情况
  2. 扩展ProductionExceptionHandler并更改default.production.exception.handler我的类的属性,以扩展此接口。再说一次,因为setUncaughtExceptionHandler这里没有抛出异常,所以什么也没有发生。

我知道 Kafka 有自己的指标,我可以监听这些指标并发现 Broker 是否出现故障。但在某些情况下,Kafka 代理很好,而我的 kafka-streams 应用程序无法连接(即错误的身份验证配置或 vpn/vpc 问题)

我该怎么做才能发现这些问题并记录/报告它们?

更新

如果 kafka 不可用,请查看消费者/生产者日志:

2020-08-24 21:41:32,055 [my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1] WARN  o.apache.kafka.clients.NetworkClient - [] [Consumer clientId=my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1-consumer, groupId=my-kafka-streams-app] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2020-08-24 21:41:32,186 [kafka-admin-client-thread | my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-admin] WARN  o.apache.kafka.clients.NetworkClient - [] [AdminClient clientId=my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-admin] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-08-24 21:41:32,250 [kafka-producer-network-thread | my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1-producer] WARN …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api apache-kafka-streams

5
推荐指数
1
解决办法
1811
查看次数

在 prestoDB 中验证 json 格式

我们大量使用 presto JSON 功能,而我们缺少的是在 JSON 无效时能够为空,这样如果 JSON 格式有问题,使用 JSON 函数的 SQL 语句不会中断。

起初,我还以为它可以用的某种组合来实现JSON_PARSENULLIF,但不能设法拉本的..有办法做到让这种验证?

谢谢

json presto

3
推荐指数
1
解决办法
600
查看次数

spark 2.0 用 json 读取 csv

我有一个 CSV 文件,如下所示:

"a","b","c","{""x"":""xx"",""y"":""yy""}"
Run Code Online (Sandbox Code Playgroud)

当我使用 java CSV reader ( au.com.bytecode.opencsv.CSVParser) 时,它会在我指示时设法解析字符串defaultEscapeChar = '\u0000'

当我尝试使用 spark 2.2 CSV 阅读器阅读它时,它失败了,无法将其拆分为 4 列。这是我尝试过的:

val df = spark.read.format("csv")
              .option("quoteMode","ALL")
              .option("quote", "\u0000")
              .load("s3://...")
Run Code Online (Sandbox Code Playgroud)

我也尝试过,option("escape", "\u0000") 但没有运气。

我需要选择哪些 CSV 选项才能正确解析此文件?

csv scala apache-spark

3
推荐指数
1
解决办法
4022
查看次数