我正在开发一个Scala-Spark程序,该程序需要从HDFS上的目录中获取最新创建的文件(每天都会在目录中创建一个文件),并读取其中的数据以进行进一步处理。从我的研究中,我发现以下代码可以使用Scala将文件提早2天:
import java.nio.file._
import java.time._
import java.time.temporal.ChronoUnit.DAYS
val isMoreThan2DaysOld = Files.getLastModifiedTime(path).toInstant.plus(2, DAYS) isBefore Instant.now
Run Code Online (Sandbox Code Playgroud)
但是此代码不适用于HDFS上存在的文件。任何人都可以让我知道如何使用Scala从HDFS上存在的目录中实施最新创建的文件。
我正在尝试将流数据帧与配置单元表连接起来,并将生成的数据帧插入到另一个 Kafka 主题中。下面是我实现的代码,它按照要求工作。
def write_stream_batches(kafka_df: DataFrame,table_config):
table_config = state_config
kafka_df.writeStream \
.format('kafka') \
.foreachBatch(join_kafka_streams_denorm) \
.option('checkpointLocation', table_config['checkpoint_location']) \
.start() \
.awaitTermination()
def join_kafka_streams_denorm(kafka_df, batch_id):
try:
table_config = state_config
kafka_config = kafkaconfig
filters = ata_filter(kafka_df=kafka_df)
main_df = spark.sql(f'select * from db.table where {filters}')
joined_df = join_remove_duplicate_col(kafka_df=kafka_df, denorm=main_df, table_config=table_config)
push_to_kafka(joined_df, kafka_config, table_config, 'state')
except Exception as error:
print(f'Join failed with the exception: {error}')
traceback.print_exc()
print('Stopping the application')
sys.exit(1)
Run Code Online (Sandbox Code Playgroud)
该方法write_stream_batches正在从 kafka 接收流数据帧。我正在将此主题数据合并到配置单元表中,并且我的表配置在从 config.py 文件导入的字典中,下面是该行。
table_config = state_config
Run Code Online (Sandbox Code Playgroud)
这里的问题是给出检查点配置,我在 write_stream_batches 中导入 state_config …
我正在尝试将文件加载到spark中.如果我将正常的textFile加载到Spark中,如下所示:
val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")
Run Code Online (Sandbox Code Playgroud)
结果是:
partFile: org.apache.spark.sql.Dataset[String] = [value: string]
Run Code Online (Sandbox Code Playgroud)
我可以在输出中看到一个数据集.但是如果我加载一个Json文件:
val pfile = spark.read.json("hdfs://quickstart:8020/user/cloudera/pjson")
Run Code Online (Sandbox Code Playgroud)
结果是具有现成模式的数据框:
pfile: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, age: bigint ... 1 more field]
Run Code Online (Sandbox Code Playgroud)
Json/parquet/orc文件有架构.所以我可以理解这是Spark版本的一个特性:2x,这使得事情变得更容易,因为我们在这种情况下直接获得DataFrame,而对于普通的textFile,你得到的数据集中没有任何架构是有意义的.我想知道的是如何将模式添加到数据集中,该数据集是将textFile加载到spark中的结果.对于RDD,有一个case类/ StructType选项来添加模式并将其转换为DataFrame.谁能让我知道我该怎么做?
I am trying to retrieve common 'keys' from two hashMaps and put the result in a ArrayList. To do that, I have come up with following logic.
List<String> commonList = new ArrayList<String>();
for(String key : mapA.keySet()) {
if(mapB.get(key) !=null ) {
if(mapA.get(key).equals(mapB.get(key))) {
commonList.add(key);
}
}
}
Run Code Online (Sandbox Code Playgroud)
This is the data I added in the maps: mapA & mapB:
mapA.put("abc_ap_bank_accounts,XYZ","6372,43272.6648842593");
mapA.put("abc_ap_bank_accounts_secured_attributes,XYZ","6372,43272.6648829051");
mapA.put("abc_ap_checks,XYZ","56207,43272.676245");
mapA.put("abc_ap_holds,XYZ","9523,43272.6735710995");
mapA.put("abc_ap_holds_dh,XYZ","14,43272.6735710995");
mapA.put("abc_ap_invoice_distributions,XYZ","1573699,43272.6735710995");
mapA.put("abc_ap_invoice_distributions_dh,XYZ","9,43272.6735710995");
mapA.put("abc_ap_invoices,XYZ","141096,43272.6735710995");
mapA.put("abc_ap_invoices_dh,XYZ","47,43272.6735710995");
mapA.put("abc_ap_payment_history,XYZ","454441,43272.6763922106");
mapA.put("abc_ap_payment_methods,XYZ","41,43193.0547537269");
mapA.put("abc_ap_payment_schedules,XYZ","141099,43272.6735710995");
mapA.put("abc_ap_payment_schedules_dh,XYZ","47,43272.6735710995");
mapA.put("abc_ap_terms,XYZ","73,43193.0547620718");
mapA.put("abc_ar_cash_receipts,XYZ","198815,43272.6634247337");
mapA.put("abc_ar_collectors,XYZ","8,43192.4939946643");
mapA.put("abc_ar_customer_contacts,XYZ","4978,43272.6613442824");
mapA.put("abc_ar_customer_site_uses,XYZ","71313,43272.6617516204");
mapA.put("abc_ar_customer_sites,XYZ","38740,43272.6617516204");
mapA.put("abc_ar_customers,XYZ","12521,43272.6617516204"); …Run Code Online (Sandbox Code Playgroud) 我有一个按以下方式创建的 DataFrame。
val someDF = Seq((8, "bat"),(64, "mouse"),(-27, "horse")).toDF("number", "word")
someDF.printSchema
root
|-- number: integer (nullable = false)
|-- word: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
使用 SQL API,可以通过创建临时表并运行插入查询向其中插入一行。有什么方法可以使用 DataFrame API 的方法追加/添加新行?
如果我在单个 HDFS 节点上有一个 50GB 的巨大 CSV 文件,并且我正在尝试使用 spark.read 读取该文件,如下所示:
file_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/hdfspath/customer.csv')
Run Code Online (Sandbox Code Playgroud)
我正在使用以下 spark-submit 提交 spark 作业:
spark-submit --master yarn --deploy-mode cluster --num-executors 4 --executor-memory 3G --executor-cores 5 --driver-memory 1G load_csv.py
Run Code Online (Sandbox Code Playgroud)
我知道在有动作之前,spark 不会将任何数据加载到内存中。但是当一个动作被触发时会发生什么,首先要做的是将文件读入内存以启动转换。spark是如何根据我提到的内核和执行器以小部分读取50GB文件的?
例如:我提到了 4 个 executor 和每个 executor 的 3GB 内存。在阅读时,将 spark 将主 customer.csv 文件转换为每个执行程序的 3GB 块并加载以下文件:
对于第一个 12GB:
Executor 1: 3GB
Executor 2: 3GB
Executor 3: 3GB
Executor 4: 3GB
Run Code Online (Sandbox Code Playgroud)
依此类推,直到整个文件完成处理?
或者它会根据 HDFS 块大小拆分文件并逐块读取例如:128MB 并尝试在每个 3GB 执行器中尽可能多地装入块?
如果文件完全存在于单个集群上(在我的情况下是这样),如何触发处理文件?
我理解它的解释有点广泛和繁琐,但任何帮助将不胜感激。
我正在尝试将 java 程序的模块转换为 Scala。到目前为止,我已经能够在我转换的每个模块中应用 Scala 的函数式编程范式及其语法。但是我遇到了一种可以进行一些验证、使用continue并最终yield输出的方法。下面是Java中的代码:
public boolean checkColumn(String server, String database, String schema, String table, String column) {
boolean bServer, bDatabase, bSchema, bTable, bColumn, bRet = false;
for (int i = 0; i < columns.length; i++) {
if ((server == null) || (server.length() == 0)) {
bServer = true;
} else {
bServer = columns[i][0].equalsIgnoreCase(server);
}
if (!bServer) continue;
if ((database == null) || (database.length() == 0)) {
bDatabase = true;
} else {
bDatabase …Run Code Online (Sandbox Code Playgroud) 在我的项目中,我们正在从 Java 迁移到 Scala。我必须使用 Java 中的一个函数,该函数在continuefor 循环中使用,并根据 for 循环内的 if 条件返回一个值,如下所示。
private TSourceToken getBeforeToken(TSourceToken token) {
TSourceTokenList tokens = token.container;
int index = token.posinlist;
for ( int i = index - 1; i >= 0; i-- ) {
TSourceToken currentToken = tokens.get( i );
if ( currentToken.toString( ).trim( ).length( ) == 0 ) {
continue;
}
else {
return currentToken;
}
}
return token;
}
Run Code Online (Sandbox Code Playgroud)
为了将其转换为 Scala,我使用了 Yield 选项来过滤掉满足 if 表达式的 else 条件的值,如下所示。
def getBeforeToken(token: TSourceToken): TSourceToken = …Run Code Online (Sandbox Code Playgroud) apache-spark ×4
scala ×4
java ×2
apache-kafka ×1
arraylist ×1
for-loop ×1
hadoop ×1
hashmap ×1
list ×1