我正在尝试将一个数据帧写入到HDFS位置的spark中,我希望如果我添加partitionBy符号Spark将创建分区(类似于以Parquet格式编写)文件夹的形式
partition_column_name=partition_value
Run Code Online (Sandbox Code Playgroud)
(即partition_date=2016-05-03).为此,我运行了以下命令:
(df.write
.partitionBy('partition_date')
.mode('overwrite')
.format("com.databricks.spark.csv")
.save('/tmp/af_organic'))
Run Code Online (Sandbox Code Playgroud)
但是没有创建分区文件夹,知道我为了火花DF自动创建那些文件夹我应该做些什么?
谢谢,
我试图从mysql读取数据并将其写回s3中具有特定分区的镶木地板文件,如下所示:
df=sqlContext.read.format('jdbc')\
.options(driver='com.mysql.jdbc.Driver',url="""jdbc:mysql://<host>:3306/<>db?user=<usr>&password=<pass>""",
dbtable='tbl',
numPartitions=4 )\
.load()
df2=df.withColumn('updated_date',to_date(df.updated_at))
df2.write.parquet(path='s3n://parquet_location',mode='append',partitionBy=['updated_date'])
Run Code Online (Sandbox Code Playgroud)
我的问题是它只打开一个与mysql的连接(而不是4),并且它不会写入parquert,直到它从mysql中获取所有数据,因为我在mysql中的表很大(100M行),这个过程在OutOfMemory上失败了.
有没有办法配置Spark来打开多个与mysql的连接并将部分数据写入镶木地板?
我正在使用presto主要与蜂巢连接器连接到蜂巢Metastore.
我的所有表都是指向存储在S3中的数据的外部表.
我的主要问题是没有办法(至少我知道)在Presto中进行分区发现,所以在我开始在presto中查询表之前我需要切换到hive并运行 msck repair table mytable
在Presto有更合理的方式吗?
我正在尝试使用 pureConfig 和 configFactory 进行我的 Spark 应用程序配置。这是我的代码:
import pureconfig.{loadConfigOrThrow}
object Source{
def apply(keyName: String, configArguments: Config): Source = {
keyName.toLowerCase match {
case "mysql" =>
val properties = loadConfigOrThrow[DBConnectionProperties](configArguments)
new MysqlSource(None, properties)
case "files" =>
val properties = loadConfigOrThrow[FilesSourceProperties](configArguments)
new Files(properties)
case _ => throw new NoSuchElementException(s"Unknown Source ${keyName.toLowerCase}")
}
}
}
import Source
val config = ConfigFactory.parseString(result.mkString("\n"))
val source = Source("mysql",config.getConfig("source.mysql"))
Run Code Online (Sandbox Code Playgroud)
当我从 IDE (intelliJ) 或直接从 java (即 java jar...) 运行它时,它工作正常。
但是当我使用 Spark-submit 运行它时,它失败并出现以下错误:
Exception in thread "main" java.lang.NoSuchMethodError: shapeless.Witness$.mkWitness(Ljava/lang/Object;)Lshapeless/Witness; …Run Code Online (Sandbox Code Playgroud) 跟随这个问题:如何在 presto 中交叉连接取消嵌套 json 数组
我尝试运行提供的示例,但在执行此操作时出现错误
SQL命令:
select x.n
from
unnest(cast(json_extract('{"payload":[{"type":"b","value":"9"},
{"type":"a","value":"8"}]}','$.payload') as array<varchar>)) as x(n)
Run Code Online (Sandbox Code Playgroud)
我得到的错误:
Value cannot be cast to array<varchar>
java.lang.RuntimeException: java.lang.NullPointerException: string is null
当 kafka-streams 应用程序正在运行并且 Kafka 突然关闭时,应用程序进入“等待”模式,向其发送警告日志的消费者和生产者线程无法连接,当 Kafka 回来时,一切都应该(理论上)进行恢复正常。我正在尝试获取有关这种情况的警报,但无法找到捕获该情况并发送日志/指标的位置。我尝试了以下方法:
streams.setUncaughtExceptionHandler但这仅发生在异常情况下,但这里不是这种情况ProductionExceptionHandler并更改default.production.exception.handler我的类的属性,以扩展此接口。再说一次,因为setUncaughtExceptionHandler这里没有抛出异常,所以什么也没有发生。我知道 Kafka 有自己的指标,我可以监听这些指标并发现 Broker 是否出现故障。但在某些情况下,Kafka 代理很好,而我的 kafka-streams 应用程序无法连接(即错误的身份验证配置或 vpn/vpc 问题)
我该怎么做才能发现这些问题并记录/报告它们?
更新
如果 kafka 不可用,请查看消费者/生产者日志:
2020-08-24 21:41:32,055 [my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1] WARN o.apache.kafka.clients.NetworkClient - [] [Consumer clientId=my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1-consumer, groupId=my-kafka-streams-app] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
2020-08-24 21:41:32,186 [kafka-admin-client-thread | my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-admin] WARN o.apache.kafka.clients.NetworkClient - [] [AdminClient clientId=my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-admin] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
2020-08-24 21:41:32,250 [kafka-producer-network-thread | my-kafka-streams-app-23a462fe-28c6-415a-a08a-b11d3ffffc2e-StreamThread-1-producer] WARN …Run Code Online (Sandbox Code Playgroud) 我们大量使用 presto JSON 功能,而我们缺少的是在 JSON 无效时能够为空,这样如果 JSON 格式有问题,使用 JSON 函数的 SQL 语句不会中断。
起初,我还以为它可以用的某种组合来实现JSON_PARSE和NULLIF,但不能设法拉本的..有办法做到让这种验证?
谢谢
我有一个 CSV 文件,如下所示:
"a","b","c","{""x"":""xx"",""y"":""yy""}"
Run Code Online (Sandbox Code Playgroud)
当我使用 java CSV reader ( au.com.bytecode.opencsv.CSVParser) 时,它会在我指示时设法解析字符串defaultEscapeChar = '\u0000'
当我尝试使用 spark 2.2 CSV 阅读器阅读它时,它失败了,无法将其拆分为 4 列。这是我尝试过的:
val df = spark.read.format("csv")
.option("quoteMode","ALL")
.option("quote", "\u0000")
.load("s3://...")
Run Code Online (Sandbox Code Playgroud)
我也尝试过,option("escape", "\u0000")
但没有运气。
我需要选择哪些 CSV 选项才能正确解析此文件?
apache-spark ×4
presto ×3
csv ×2
hadoop ×2
hive ×2
json ×2
scala ×2
amazon-s3 ×1
apache-kafka ×1
mysql ×1
partitioning ×1
pureconfig ×1
pyspark ×1
shapeless ×1