我正在研究 Spark 优化方法,并遇到了各种实现优化的方法。但有两个名字引起了我的注意。
他们说:
分区修剪:
分区修剪是一种性能优化,它限制 Spark 查询时读取的文件和分区的数量。对数据进行分区后,匹配某些分区过滤条件的查询允许 Spark 仅读取目录和文件的子集,从而提高性能。
谓词下推:
Spark 将尝试将数据过滤移至尽可能靠近源的位置,以避免将不必要的数据加载到内存中。Parquet 和 ORC 文件维护不同块 aof 数据中每列的各种统计信息(例如最小值和最大值)。读取这些文件的程序可以使用这些索引来确定是否需要读取某些块甚至整个文件。这允许程序在处理过程中潜在地跳过大部分数据。
通过阅读上述概念,它们似乎做了同样的事情,即应用满足查询中给出的谓词的读取语句(查询)。分区修剪和谓词下推是不同的概念还是我以错误的方式看待它们?
我正在尝试构建一个maven-scala项目.创建项目后,我创建了"clean compile"来构建项目.当我运行构建选项时,我看到以下异常消息:
"C:\Program Files\Java\jdk1.8.0_151\bin\java.exe" -Dmaven.multiModuleProjectDirectory=C:\ABCSparkWorkSpace\DataExpo "-Dmaven.home=C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2018.1.4\plugins\maven\lib\maven3" "-Dclassworlds.conf=C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2018.1.4\plugins\maven\lib\maven3\bin\m2.conf" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2018.1.4\lib\idea_rt.jar=54199:C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2018.1.4\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2018.1.4\plugins\maven\lib\maven3\boot\plexus-classworlds-2.5.2.jar" org.codehaus.classworlds.Launcher -Didea.version=2018.1.4 clean compile
[WARNING]
[WARNING] Some problems were encountered while building the effective model for com.dbloads.programs:DataExpo:jar:1.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.version' for org.scala-tools:maven-scala-plugin is missing. @ line 51, column 15
[WARNING] 'build.plugins.plugin.version' for org.apache.maven.plugins:maven-eclipse-plugin is missing. @ line 69, column 15
[WARNING] It is highly recommended …Run Code Online (Sandbox Code Playgroud) 我是 Python 新手,尝试在我的代码中实现记录器。我创建了一个 python 文件:setup_logger.py
import logging
CLASS_NAME = ''
# Create and configure logger
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(filename='/Users/bobby/Desktop/lumberjack.log', level=logging.DEBUG, format=LOG_FORMAT)
logger = logging.getLogger()
Run Code Online (Sandbox Code Playgroud)
我在另一个 python 文件中使用此记录器配置:GetLogger.py 如下
from LoggerLevels.setup_logger import logger
if __name__ == '__main__':
logger.error('This is a basic log error message')
logger.info('This is a warning message')
Run Code Online (Sandbox Code Playgroud)
日志在文件中打印为:
2020-06-17 14:54:47,161 - root - ERROR - This is a basic log error message
2020-06-17 14:54:47,161 - root - INFO - This is a warning …Run Code Online (Sandbox Code Playgroud) 我正在尝试更改从 RDBMS 数据库读取的数据帧 I 中存在的列的数据类型。为此,我通过以下方式获取了数据框的架构:
val dataSchema = dataDF.schema
Run Code Online (Sandbox Code Playgroud)
为了查看数据框的架构,我使用了以下语句:
println(dataSchema.schema)
Output: StructType(StructField(je_header_id,LongType,true), StructField(je_line_num,LongType,true), StructField(last_update_date,TimestampType,true), StructField(last_updated_by,DecimalType(15,0),true), StructField(creation_date,TimestampType,true), StructField(created_by,DecimalType(15,0),true), StructField(created_by_name,StringType,true), StructField(entered_dr,DecimalType(38,30),true), StructField(entered_cr,DecimalType(38,30),true))
Run Code Online (Sandbox Code Playgroud)
我的要求是找到 DecimalType 并将其从上面的架构中更改为 DoubleType。我可以使用以下方式获取列名称和数据类型:dataSchema.dtype,但它为我提供了以下格式的数据类型((columnName1, column datatype),(columnName2, column datatype)....(columnNameN, column datatype))
我试图找到一种方法来解析 StructType 并徒劳地更改 dataSchema 中的架构。
谁能告诉我是否有一种方法可以解析 StructType ,以便我可以将数据类型更改为我的要求并采用以下格式
StructType(StructField(je_header_id,LongType,true), StructField(je_line_num,LongType,true), StructField(last_update_date,TimestampType,true), StructField(last_updated_by,DoubleType,true), StructField(creation_date,TimestampType,true), StructField(created_by,DoubleType,true), StructField(created_by_name,StringType,true), StructField(entered_dr,DoubleType,true), StructField(entered_cr,DoubleType,true))
Run Code Online (Sandbox Code Playgroud) 我正在尝试通过删除带有相邻字母B的字母A或删除带有相邻字母D的字母C来转换字符串。
例如,给定字符串“ CBACD”,应将其转换为
CBACD -> CCD -> C
Run Code Online (Sandbox Code Playgroud)
示例2:在给定字符串“ CABABD”的情况下,转换如下所示应不返回任何内容:
CABABD -> CABD -> CD ->
Run Code Online (Sandbox Code Playgroud)
示例3:“ ACBDACBD”,A和C没有相应的相邻字符,因此应返回整个字符串
"ACBDACBD" -> "ACBDACBD"
Run Code Online (Sandbox Code Playgroud)
我编写了以下代码来执行操作:
object RemoveCharsABCD {
val s = scala.io.StdIn
def adjacent(s: String): String = {
val charSet = ArrayBuffer("AB","BA","CD","DC")
var i = 0
var ret:String = ""
while(i < s.length-1) {
if(charSet.contains(s"${s.charAt(i)}${s.charAt(i+1)}")) {
s.slice(i+2, s.length)
i += 2
if(i == s.length-1) ret = s"$ret${s.charAt(i).toString}"
} else {
ret = s"$ret${s.charAt(i).toString}"
i += 1
if(i == s.length-1) ret = s"$ret${s.charAt(i).toString}"
} …Run Code Online (Sandbox Code Playgroud) 我正在尝试创建一个从邮递员接收参数的 API。api 的主体包含两个参数:
{
"db":"EUR",
"env":"test"
}
Run Code Online (Sandbox Code Playgroud)
我在代码中解析了这两个参数,如下所示:
parser = reqparse.RequestParser()
parser.add_argument('fab', type=str, required=True, help='Fab name must be provided.')
parser.add_argument('env', type=str, required=False, help='Env is an optional parameter.')
Run Code Online (Sandbox Code Playgroud)
最近我被要求在代码中添加一个令牌验证。令牌从Authorization-> Type(Bearer Token) -> Token value: eeb867bd2bcca05
但我不知道如何将邮递员的不记名令牌读入 Python 代码。谁能告诉我如何读取从 Postman 的不记名令牌传递到我的 Python 代码中的令牌值?任何帮助深表感谢。
I have a dataframe as below:
val data = Seq(("James", "Sales", 34, "Developer"), ("Michael", "Sales", 56, "Architect"), ("Robert", "Sales", 30, "Manager"), ("Maria", "Finance", 24, "Consultant"))
val df1 = data.toDF("name","dept","id", "role")
df1.printSchema()
root
|-- name: string (nullable = true)
|-- dept: string (nullable = true)
|-- id: integer (nullable = true)
|-- role: string (nullable = true)
Run Code Online (Sandbox Code Playgroud)
I have a hive table with same columns and exact schema:
val df2 = spark.sql("select * from db.table")
Run Code Online (Sandbox Code Playgroud)
From the incoming dataframe df1 I …
我正在尝试将数据框加载到Hive表中。
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql._
object SparkToHive {
def main(args: Array[String]) {
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val sparkSession = SparkSession.builder.master("local[2]").appName("Saving data into HiveTable using Spark")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.metastore.warehouse.dir", "/user/hive/warehouse")
.config("spark.sql.warehouse.dir", warehouseLocation)
.getOrCreate()
**import sparkSession.implicits._**
val partfile = sparkSession.read.text("partfile").as[String]
val partdata = partfile.map(part => part.split(","))
case class Partclass(id:Int, name:String, salary:Int, dept:String, location:String)
val partRDD = partdata.map(line => PartClass(line(0).toInt, line(1), line(2).toInt, line(3), line(4)))
val partDF = partRDD.toDF()
partDF.write.mode(SaveMode.Append).insertInto("parttab")
}
}
Run Code Online (Sandbox Code Playgroud)
我还没有执行它,但是在此行出现以下错误:
import sparkSession.implicits._
could not find implicit …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用spark-jdbc在postgres db上读取表。为此,我想出了以下代码:
object PartitionRetrieval {
var conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.default.parallelism", "20")
val log = LogManager.getLogger("Spark-JDBC Program")
Logger.getLogger("org").setLevel(Level.ERROR)
val conFile = "/home/myuser/ReconTest/inputdir/testconnection.properties"
val properties = new Properties()
properties.load(new FileInputStream(conFile))
val connectionUrl = properties.getProperty("gpDevUrl")
val devUserName = properties.getProperty("devUserName")
val devPassword = properties.getProperty("devPassword")
val driverClass = properties.getProperty("gpDriverClass")
val tableName = "base.ledgers"
try {
Class.forName(driverClass).newInstance()
} catch {
case cnf: ClassNotFoundException =>
log.error("Driver class: " + driverClass + " not found")
System.exit(1)
case e: Exception =>
log.error("Exception: " + e.printStackTrace())
System.exit(1)
}
def main(args: …Run Code Online (Sandbox Code Playgroud) 我正在尝试在普通的 Scala 而不是 Spark 中寻找爆炸函数或其等效函数。使用 Spark 中的爆炸功能,我能够将具有多个元素的一行平展为多行,如下所示。
scala> import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.explode
scala> val test = spark.read.json(spark.sparkContext.parallelize(Seq("""{"a":1,"b":[2,3]}""")))
scala> test.schema
res1: org.apache.spark.sql.types.StructType = StructType(StructField(a,LongType,true), StructField(b,ArrayType(LongType,true),true))
scala> test.show
+---+------+
| a| b|
+---+------+
| 1|[2, 3]|
+---+------+
scala> val flat = test.withColumn("b",explode($"b"))
flat: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint]
scala> flat.show
+---+---+
| a| b|
+---+---+
| 1| 2|
| 1| 3|
+---+---+
Run Code Online (Sandbox Code Playgroud)
在不使用 Spark 的情况下,普通 scala 中是否有爆炸等效函数?如果 Scala 中没有可用的爆炸功能,我是否可以实现它?