小编Met*_*ata的帖子

如何获取HDFS上存在的文件的创建日期?

我正在开发一个Scala-Spark程序,该程序需要从HDFS上的目录中获取最新创建的文件(每天都会在目录中创建一个文件),并读取其中的数据以进行进一步处理。从我的研究中,我发现以下代码可以使用Scala将文件提早2天:

import java.nio.file._
import java.time._
import java.time.temporal.ChronoUnit.DAYS
val isMoreThan2DaysOld = Files.getLastModifiedTime(path).toInstant.plus(2, DAYS) isBefore Instant.now
Run Code Online (Sandbox Code Playgroud)

但是此代码不适用于HDFS上存在的文件。任何人都可以让我知道如何使用Scala从HDFS上存在的目录中实施最新创建的文件。

hadoop scala

1
推荐指数
1
解决办法
3725
查看次数

有没有办法在 Kafka Streaming 的“foreachBatch”函数中传递附加/额外参数?

我正在尝试将流数据帧与配置单元表连接起来,并将生成的数据帧插入到另一个 Kafka 主题中。下面是我实现的代码,它按照要求工作。

def write_stream_batches(kafka_df: DataFrame,table_config):
    table_config = state_config
    kafka_df.writeStream \
    .format('kafka') \
    .foreachBatch(join_kafka_streams_denorm) \
    .option('checkpointLocation', table_config['checkpoint_location']) \
    .start() \
    .awaitTermination()

def join_kafka_streams_denorm(kafka_df, batch_id):
    try:
        table_config = state_config
        kafka_config = kafkaconfig

        filters = ata_filter(kafka_df=kafka_df)
        main_df = spark.sql(f'select * from db.table where {filters}')

        joined_df = join_remove_duplicate_col(kafka_df=kafka_df, denorm=main_df, table_config=table_config)
        push_to_kafka(joined_df, kafka_config, table_config, 'state')
    except Exception as error:
        print(f'Join failed with the exception: {error}')
        traceback.print_exc()
        print('Stopping the application')
        sys.exit(1)
Run Code Online (Sandbox Code Playgroud)

该方法write_stream_batches正在从 kafka 接收流数据帧。我正在将此主题数据合并到配置单元表中,并且我的表配置在从 config.py 文件导入的字典中,下面是该行。

table_config = state_config
Run Code Online (Sandbox Code Playgroud)

这里的问题是给出检查点配置,我在 write_stream_batches 中导入 state_config …

apache-kafka apache-spark spark-streaming

1
推荐指数
1
解决办法
1229
查看次数

如何在Spark中向数据集添加模式?

我正在尝试将文件加载到spark中.如果我将正常的textFile加载到Spark中,如下所示:

val partFile = spark.read.textFile("hdfs://quickstart:8020/user/cloudera/partfile")
Run Code Online (Sandbox Code Playgroud)

结果是:

partFile: org.apache.spark.sql.Dataset[String] = [value: string]
Run Code Online (Sandbox Code Playgroud)

我可以在输出中看到一个数据集.但是如果我加载一个Json文件:

val pfile = spark.read.json("hdfs://quickstart:8020/user/cloudera/pjson")
Run Code Online (Sandbox Code Playgroud)

结果是具有现成模式的数据框:

pfile: org.apache.spark.sql.DataFrame = [address: struct<city: string, state: string>, age: bigint ... 1 more field]
Run Code Online (Sandbox Code Playgroud)

Json/parquet/orc文件有架构.所以我可以理解这是Spark版本的一个特性:2x,这使得事情变得更容易,因为我们在这种情况下直接获得DataFrame,而对于普通的textFile,你得到的数据集中没有任何架构是有意义的.我想知道的是如何将模式添加到数据集中,该数据集是将textFile加载到spark中的结果.对于RDD,有一个case类/ StructType选项来添加模式并将其转换为DataFrame.谁能让我知道我该怎么做?

apache-spark

0
推荐指数
1
解决办法
9383
查看次数

How to retrieve common keys from two hashMaps and put the result into a ArrayList

I am trying to retrieve common 'keys' from two hashMaps and put the result in a ArrayList. To do that, I have come up with following logic.

    List<String> commonList = new ArrayList<String>();
    for(String key : mapA.keySet()) {
        if(mapB.get(key) !=null ) {
            if(mapA.get(key).equals(mapB.get(key))) {
                commonList.add(key);
            }
        }
    }
Run Code Online (Sandbox Code Playgroud)

This is the data I added in the maps: mapA & mapB:

mapA.put("abc_ap_bank_accounts,XYZ","6372,43272.6648842593");
mapA.put("abc_ap_bank_accounts_secured_attributes,XYZ","6372,43272.6648829051");
mapA.put("abc_ap_checks,XYZ","56207,43272.676245");
mapA.put("abc_ap_holds,XYZ","9523,43272.6735710995");
mapA.put("abc_ap_holds_dh,XYZ","14,43272.6735710995");
mapA.put("abc_ap_invoice_distributions,XYZ","1573699,43272.6735710995");
mapA.put("abc_ap_invoice_distributions_dh,XYZ","9,43272.6735710995");
mapA.put("abc_ap_invoices,XYZ","141096,43272.6735710995");
mapA.put("abc_ap_invoices_dh,XYZ","47,43272.6735710995");
mapA.put("abc_ap_payment_history,XYZ","454441,43272.6763922106");
mapA.put("abc_ap_payment_methods,XYZ","41,43193.0547537269");
mapA.put("abc_ap_payment_schedules,XYZ","141099,43272.6735710995");
mapA.put("abc_ap_payment_schedules_dh,XYZ","47,43272.6735710995");
mapA.put("abc_ap_terms,XYZ","73,43193.0547620718");
mapA.put("abc_ar_cash_receipts,XYZ","198815,43272.6634247337");
mapA.put("abc_ar_collectors,XYZ","8,43192.4939946643");
mapA.put("abc_ar_customer_contacts,XYZ","4978,43272.6613442824");
mapA.put("abc_ar_customer_site_uses,XYZ","71313,43272.6617516204");
mapA.put("abc_ar_customer_sites,XYZ","38740,43272.6617516204");
mapA.put("abc_ar_customers,XYZ","12521,43272.6617516204"); …
Run Code Online (Sandbox Code Playgroud)

java for-loop list arraylist hashmap

0
推荐指数
1
解决办法
80
查看次数

如何在不使用 SQL 插入的情况下向 Scala 中的 DataFrame 添加/追加新行?

我有一个按以下方式创建的 DataFrame。

val someDF = Seq((8, "bat"),(64, "mouse"),(-27, "horse")).toDF("number", "word")
someDF.printSchema
root
 |-- number: integer (nullable = false)
 |-- word: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)

使用 SQL API,可以通过创建临时表并运行插入查询向其中插入一行。有什么方法可以使用 DataFrame API 的方法追加/添加新行?

scala apache-spark apache-spark-sql

0
推荐指数
1
解决办法
1062
查看次数

如果整个文件存在于单个节点上,spark 将如何加载巨大的 csv 文件?

如果我在单个 HDFS 节点上有一个 50GB 的巨大 CSV 文件,并且我正在尝试使用 spark.read 读取该文件,如下所示:

file_df = spark.read.format('csv').option('header', 'true').option('inferSchema', 'true').load('/hdfspath/customer.csv')
Run Code Online (Sandbox Code Playgroud)

我正在使用以下 spark-submit 提交 spark 作业:

spark-submit --master yarn --deploy-mode cluster --num-executors 4 --executor-memory 3G --executor-cores 5 --driver-memory 1G load_csv.py
Run Code Online (Sandbox Code Playgroud)

我知道在有动作之前,spark 不会将任何数据加载到内存中。但是当一个动作被触发时会发生什么,首先要做的是将文件读入内存以启动转换。spark是如何根据我提到的内核和执行器以小部分读取50GB文件的?

例如:我提到了 4 个 executor 和每个 executor 的 3GB 内存。在阅读时,将 spark 将主 customer.csv 文件转换为每个执行程序的 3GB 块并加载以下文件:

对于第一个 12GB:

Executor 1: 3GB
Executor 2: 3GB
Executor 3: 3GB
Executor 4: 3GB
Run Code Online (Sandbox Code Playgroud)

依此类推,直到整个文件完成处理?

或者它会根据 HDFS 块大小拆分文件并逐块读取例如:128MB 并尝试在每个 3GB 执行器中尽可能多地装入块?

如果文件完全存在于单个集群上(在我的情况下是这样),如何触发处理文件?

我理解它的解释有点广泛和繁琐,但任何帮助将不胜感激。

apache-spark

0
推荐指数
1
解决办法
599
查看次数

如何在scala的for循环中应用continue并产生多个值?

我正在尝试将 java 程序的模块转换为 Scala。到目前为止,我已经能够在我转换的每个模块中应用 Scala 的函数式编程范式及其语法。但是我遇到了一种可以进行一些验证、使用continue并最终yield输出的方法。下面是Java中的代码:

public boolean checkColumn(String server, String database, String schema, String table, String column) {
        boolean bServer, bDatabase, bSchema, bTable, bColumn, bRet = false;
        for (int i = 0; i < columns.length; i++) {
            if ((server == null) || (server.length() == 0)) {
                bServer = true;
            } else {
                bServer = columns[i][0].equalsIgnoreCase(server);
            }
            if (!bServer) continue;

            if ((database == null) || (database.length() == 0)) {
                bDatabase = true;
            } else {
                bDatabase …
Run Code Online (Sandbox Code Playgroud)

java scala short-circuiting

0
推荐指数
1
解决办法
128
查看次数

如何从scala中的for循环返回产生的值?

在我的项目中,我们正在从 Java 迁移到 Scala。我必须使用 Java 中的一个函数,该函数在continuefor 循环中使用,并根据 for 循环内的 if 条件返回一个值,如下所示。

private TSourceToken getBeforeToken(TSourceToken token) {
  TSourceTokenList tokens = token.container;
  int index = token.posinlist;     
  for ( int i = index - 1; i >= 0; i-- ) {
    TSourceToken currentToken = tokens.get( i );
    if ( currentToken.toString( ).trim( ).length( ) == 0 ) {
      continue;
    }
    else {
      return currentToken;
    }
  }
  return token;
}
Run Code Online (Sandbox Code Playgroud)

为了将其转换为 Scala,我使用了 Yield 选项来过滤掉满足 if 表达式的 else 条件的值,如下所示。

def getBeforeToken(token: TSourceToken): TSourceToken = …
Run Code Online (Sandbox Code Playgroud)

scala

0
推荐指数
1
解决办法
122
查看次数