小编Met*_*ata的帖子

Scala 中的 foldLeft 如何在 DataFrame 上工作?

我需要将 RDBMS 表摄取到 Hive 中,并且在使用 regex_replace 模式将其插入 Hive 表之前,我必须清理其 String 列中的数据。在无法理解如何将它应用于我的 dataFrame 之后,我终于在 Scala 中遇到了一种foldLeft有助于满足要求的方法。

我了解 foldLeft 如何处理集合,例如:

List(1,3,9).foldLeft(100)((x,y) => x+y)
Run Code Online (Sandbox Code Playgroud)

foldLeft 接受参数:initialValue 和一个函数。它将函数的结果添加到累加器中。在上面的例子中,结果是:113。

但是当涉及到数据框时,我无法理解它是如何工作的。

val stringColumns = yearDF.schema.fields.filter(_.dataType == StringType).map(_.name)
val finalDF = stringColumns.foldLeft(yearDF){ (tempdf, colName) => tempdf.withColumn(colName, regexp_replace(col(colName), "\n", "")) }
Run Code Online (Sandbox Code Playgroud)

在上面的代码中,我从 dataFrame:yearDF中获取了 String 列,它保存在foldLeft. 我对 中使用的函数有以下疑问foldLeft

  1. tempDF 持有什么价值?如果它与 yearDF 相同,它如何映射到 yearDF ?
  2. 如果withColumns在函数中使用并将结果添加到yearDF,为什么它不创建重复列时

谁能解释一下,以便我可以更好地了解foldLeft。

scala

9
推荐指数
1
解决办法
9540
查看次数

从JDBC源迁移数据时如何优化分区?

我试图将数据从PostgreSQL表中的表移动到HDFS上的Hive表.为此,我想出了以下代码:

  val conf  = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
  val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
  def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
        val colList                = allColumns.split(",").toList
        val (partCols, npartCols)  = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
        val queryCols              = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
        val execQuery              = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
        val yearDF                 = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", …
Run Code Online (Sandbox Code Playgroud)

hive partitioning jdbc apache-spark apache-spark-sql

9
推荐指数
1
解决办法
2399
查看次数

如何在 PySpark 中将字典转换为数据帧?

我正在尝试将字典:转换 data_dict = {'t1': '1', 't2': '2', 't3': '3'}为数据框:

key   |   value|
----------------
t1          1
t2          2
t3          3
Run Code Online (Sandbox Code Playgroud)

为此,我尝试了:

schema = StructType([StructField("key", StringType(), True), StructField("value", StringType(), True)])
ddf = spark.createDataFrame(data_dict, schema)
Run Code Online (Sandbox Code Playgroud)

但我收到以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/sql/session.py", line 748, in createDataFrame
    rdd, schema = self._createFromLocal(map(prepare, data), schema)
  File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/sql/session.py", line 413, in _createFromLocal
    data = list(data)
  File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/sql/session.py", line 730, in prepare
    verify_func(obj)
  File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/sql/types.py", line 1389, in verify
    verify_value(obj) …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

9
推荐指数
1
解决办法
9546
查看次数

在 Apache Spark 中,什么情况下数据集优先于数据帧,反之亦然?

我一直在寻找任何链接、文档或文章来帮助我了解我们什么时候应该选择数据集而不是数据框,反之亦然?

我在互联网上找到的都是标题,when to use a Dataset但是当打开时,它们只是指定了 Dataframe 和 Dataset 之间的差异。有很多链接只是列出了场景名称的差异。

stackoverflow 上只有一个问题具有正确的标题,但即使在该答案中,databricks 文档链接也不起作用。

我正在寻找一些信息,可以帮助我从根本上理解我们何时选择数据集,或者在什么情况下数据集优于数据帧,反之亦然。如果没有答案,即使是可以帮助我理解的链接或文档也是值得赞赏的。

dataframe apache-spark pyspark apache-spark-dataset

8
推荐指数
1
解决办法
1317
查看次数

如何从Hive表中删除重复记录?

我正在尝试从Hive表中删除重复记录.

我的Hive表:带有列的'dynpart':Id,Name,Technology

Id  Name  Technology
1   Abcd  Hadoop
2   Efgh  Java
3   Ijkl  MainFrames
2   Efgh  Java
Run Code Online (Sandbox Code Playgroud)

我们在选择查询中使用"Distinct"等选项,但select查询只是从表中检索数据.任何人都可以告诉如何使用删除查询从Hive表中删除重复的行.

确保不建议在Hive中删除/更新记录的标准.但我想学习如何做到这一点.

hadoop hive

7
推荐指数
2
解决办法
2万
查看次数

在Hive中'InputFormat,OutputFormat'和'Stored as'之间有什么区别?

我是Bigdata的新手,目前正在学习Hive.作为SerDe的一部分,我理解Hive中的InputFormat和OutputFormat的概念.我也理解'Stored as'用于以特定格式存储文件,就像InputFormat一样.但我不明白使用'InputFormat,OutputFormat'和'Stored as'之间有什么重大区别.

任何帮助表示赞赏.

hadoop hive hiveql hive-serde

6
推荐指数
1
解决办法
5985
查看次数

如何使用Scala获取文件的创建日期

我的项目中的一个要求需要检查文件的创建日期,并确定它是否超过当天的2天.在Java中,有类似下面的代码可以让我们获取文件的创建日期和其他信息.

Path file = ...;
BasicFileAttributes attr = Files.readAttributes(file, BasicFileAttributes.class);

System.out.println("creationTime: " + attr.creationTime());
System.out.println("lastAccessTime: " + attr.lastAccessTime());
System.out.println("lastModifiedTime: " + attr.lastModifiedTime());
Run Code Online (Sandbox Code Playgroud)

但我不知道如何在Scala中编写相同的代码.任何人都可以告诉我如何在Scala中实现相同的功能.

scala

6
推荐指数
1
解决办法
1311
查看次数

如何加快从 Python-Flask 应用程序返回 20MB Json 文件的速度?

我正在尝试调用一个 API,该 API 又会触发我们的 sqlserver 数据库中的存储过程。这就是我编码的方式。

class Api_Name(Resource):

    def __init__(self):
        pass

    @classmethod
    def get(self):
        try:
            engine = database_engine
            connection = engine.connect()
            sql = "DECLARE @return_value int EXEC @return_value = [dbname].[dbo].[proc_name])
            return call_proc(sql, apiname, starttime, connection)
        except Exception as e:
            return {'message': 'Proc execution failed with error => {error}'.format(error=e)}, 400
        pass
Run Code Online (Sandbox Code Playgroud)

call_proc是我从数据库返回 JSON 的方法。

def call_proc(sql: str, connection):
    try:
        json_data = []
        rv = connection.execute(sql)
        for result in rv:
            json_data.append(dict(zip(result.keys(), result)))
        return Response(json.dumps(json_data), status=200)
    except Exception as e:
        return {'message': …
Run Code Online (Sandbox Code Playgroud)

python json flask python-3.x flask-sqlalchemy

6
推荐指数
1
解决办法
4471
查看次数

在 Python Flask 应用程序中使用 app.route() 和 api.add_resource() 配置端点有什么区别?

我正在学习 Python-Flask,发现有两种方法可以在应用程序中创建端点。

1. app.routing(/endpoint)
2. api.add_resource(CLASSNAME, endpoint)
Run Code Online (Sandbox Code Playgroud)

使用app.routing()我们可以在方法上添加一个端点并调用它。使用时api.add_resource()我们需要注册类名和端点。

我已经看到方法名称的给出就像get() & put()您使用api.add_resource() For ex 一样:

app = Flask(__name__)
api = Api(app)
vehicles = []

class VehicleData(Resource):    
    parser = reqparse.RequestParser()
    parser.add_argument('vehicle', type=str, required=True, help='name cannot be empty')
    parser.add_argument('type', type=str, required=True, help='vehicle type cannot be empty')
    parser.add_argument('wheels', type=int, required=True, help='number of wheels cannot be empty')
    parser.add_argument('suv', type=bool, required=False, help='SUV or not can be empty')

    def get(self, name):
        vehicle = next(filter(lambda x: x['name'] == name, vehicles), None) …
Run Code Online (Sandbox Code Playgroud)

python flask flask-restful

5
推荐指数
1
解决办法
3752
查看次数

在使用 Kafka 进行 Spark 流式传输的情况下,spark.streaming.kafka.maxRatePerPartition 与 Spark.streaming.backPressure.enabled 有何关系?

在读取如下的配置单元表后,我尝试将数据写入 Kafka 主题。

\n
write_kafka_data.py:\nread_df = spark.sql("select * from db.table where some_column in (\'ASIA\', \'Europe\')")\nfinal_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))\n\nfinal_df.write.format("kafka")\\\n        .option("kafka.bootstrap.servers", kafka_broker)\\\n        .option("kafka.batch.size", 51200)\\\n        .option("retries", 3)\\\n        .option("kafka.max.request.size", 500000)\\\n        .option("kafka.max.block.ms", 120000)\\\n        .option("kafka.metadata.max.age.ms", 120000)\\\n        .option("kafka.request.timeout.ms", 120000)\\\n        .option("kafka.linger.ms", 0)\\\n        .option("kafka.delivery.timeout.ms", 130000)\\\n        .option("acks", "1")\\\n        .option("kafka.compression.type", "snappy")\\\n        .option("kafka.security.protocol", "SASL_SSL")\\\n        .option("kafka.sasl.jaas.config", oauth_config)\\\n        .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\\\n        .option("kafka.sasl.mechanism", "OAUTHBEARER")\\\n        .option("topic", \'topic_name\')\\\n        .save()\n
Run Code Online (Sandbox Code Playgroud)\n

成功写入后(记录数为29000),我正在另一个文件中读取来自同一主题的数据,如下所示:\nread_kafka_data.py:

\n
    # SCHEMA\n    schema = StructType([StructField("col1", StringType()),\n            StructField("col2", IntegerType())\n    ])\n\n    # READ FROM TOPIC\n    jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \\\n                          + " oauth.token.endpoint.uri=" + \'"\' + "uri" + \'"\' …
Run Code Online (Sandbox Code Playgroud)

apache-kafka apache-spark spark-streaming pyspark spark-structured-streaming

5
推荐指数
1
解决办法
1896
查看次数