我需要将 RDBMS 表摄取到 Hive 中,并且在使用 regex_replace 模式将其插入 Hive 表之前,我必须清理其 String 列中的数据。在无法理解如何将它应用于我的 dataFrame 之后,我终于在 Scala 中遇到了一种foldLeft有助于满足要求的方法。
我了解 foldLeft 如何处理集合,例如:
List(1,3,9).foldLeft(100)((x,y) => x+y)
Run Code Online (Sandbox Code Playgroud)
foldLeft 接受参数:initialValue 和一个函数。它将函数的结果添加到累加器中。在上面的例子中,结果是:113。
但是当涉及到数据框时,我无法理解它是如何工作的。
val stringColumns = yearDF.schema.fields.filter(_.dataType == StringType).map(_.name)
val finalDF = stringColumns.foldLeft(yearDF){ (tempdf, colName) => tempdf.withColumn(colName, regexp_replace(col(colName), "\n", "")) }
Run Code Online (Sandbox Code Playgroud)
在上面的代码中,我从 dataFrame:yearDF中获取了 String 列,它保存在foldLeft. 我对 中使用的函数有以下疑问foldLeft:
withColumns在函数中使用并将结果添加到yearDF,为什么它不创建重复列时谁能解释一下,以便我可以更好地了解foldLeft。
我试图将数据从PostgreSQL表中的表移动到HDFS上的Hive表.为此,我想出了以下代码:
val conf = new SparkConf().setAppName("Spark-JDBC").set("spark.executor.heartbeatInterval","120s").set("spark.network.timeout","12000s").set("spark.sql.inMemoryColumnarStorage.compressed", "true").set("spark.sql.orc.filterPushdown","true").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").set("spark.kryoserializer.buffer.max","512m").set("spark.serializer", classOf[org.apache.spark.serializer.KryoSerializer].getName).set("spark.streaming.stopGracefullyOnShutdown","true").set("spark.yarn.driver.memoryOverhead","7168").set("spark.yarn.executor.memoryOverhead","7168").set("spark.sql.shuffle.partitions", "61").set("spark.default.parallelism", "60").set("spark.memory.storageFraction","0.5").set("spark.memory.fraction","0.6").set("spark.memory.offHeap.enabled","true").set("spark.memory.offHeap.size","16g").set("spark.dynamicAllocation.enabled", "false").set("spark.dynamicAllocation.enabled","true").set("spark.shuffle.service.enabled","true")
val spark = SparkSession.builder().config(conf).master("yarn").enableHiveSupport().config("hive.exec.dynamic.partition", "true").config("hive.exec.dynamic.partition.mode", "nonstrict").getOrCreate()
def prepareFinalDF(splitColumns:List[String], textList: ListBuffer[String], allColumns:String, dataMapper:Map[String, String], partition_columns:Array[String], spark:SparkSession): DataFrame = {
val colList = allColumns.split(",").toList
val (partCols, npartCols) = colList.partition(p => partition_columns.contains(p.takeWhile(x => x != ' ')))
val queryCols = npartCols.mkString(",") + ", 0 as " + flagCol + "," + partCols.reverse.mkString(",")
val execQuery = s"select ${allColumns}, 0 as ${flagCol} from schema.tablename where period_year='2017' and period_num='12'"
val yearDF = spark.read.format("jdbc").option("url", connectionUrl).option("dbtable", …Run Code Online (Sandbox Code Playgroud) 我正在尝试将字典:转换
data_dict = {'t1': '1', 't2': '2', 't3': '3'}为数据框:
key | value|
----------------
t1 1
t2 2
t3 3
Run Code Online (Sandbox Code Playgroud)
为此,我尝试了:
schema = StructType([StructField("key", StringType(), True), StructField("value", StringType(), True)])
ddf = spark.createDataFrame(data_dict, schema)
Run Code Online (Sandbox Code Playgroud)
但我收到以下错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/sql/session.py", line 748, in createDataFrame
rdd, schema = self._createFromLocal(map(prepare, data), schema)
File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/sql/session.py", line 413, in _createFromLocal
data = list(data)
File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/sql/session.py", line 730, in prepare
verify_func(obj)
File "/usr/local/Cellar/apache-spark/2.4.5/libexec/python/pyspark/sql/types.py", line 1389, in verify
verify_value(obj) …Run Code Online (Sandbox Code Playgroud) 我一直在寻找任何链接、文档或文章来帮助我了解我们什么时候应该选择数据集而不是数据框,反之亦然?
我在互联网上找到的都是标题,when to use a Dataset但是当打开时,它们只是指定了 Dataframe 和 Dataset 之间的差异。有很多链接只是列出了场景名称的差异。
stackoverflow 上只有一个问题具有正确的标题,但即使在该答案中,databricks 文档链接也不起作用。
我正在寻找一些信息,可以帮助我从根本上理解我们何时选择数据集,或者在什么情况下数据集优于数据帧,反之亦然。如果没有答案,即使是可以帮助我理解的链接或文档也是值得赞赏的。
我正在尝试从Hive表中删除重复记录.
我的Hive表:带有列的'dynpart':Id,Name,Technology
Id Name Technology
1 Abcd Hadoop
2 Efgh Java
3 Ijkl MainFrames
2 Efgh Java
Run Code Online (Sandbox Code Playgroud)
我们在选择查询中使用"Distinct"等选项,但select查询只是从表中检索数据.任何人都可以告诉如何使用删除查询从Hive表中删除重复的行.
确保不建议在Hive中删除/更新记录的标准.但我想学习如何做到这一点.
我是Bigdata的新手,目前正在学习Hive.作为SerDe的一部分,我理解Hive中的InputFormat和OutputFormat的概念.我也理解'Stored as'用于以特定格式存储文件,就像InputFormat一样.但我不明白使用'InputFormat,OutputFormat'和'Stored as'之间有什么重大区别.
任何帮助表示赞赏.
我的项目中的一个要求需要检查文件的创建日期,并确定它是否超过当天的2天.在Java中,有类似下面的代码可以让我们获取文件的创建日期和其他信息.
Path file = ...;
BasicFileAttributes attr = Files.readAttributes(file, BasicFileAttributes.class);
System.out.println("creationTime: " + attr.creationTime());
System.out.println("lastAccessTime: " + attr.lastAccessTime());
System.out.println("lastModifiedTime: " + attr.lastModifiedTime());
Run Code Online (Sandbox Code Playgroud)
但我不知道如何在Scala中编写相同的代码.任何人都可以告诉我如何在Scala中实现相同的功能.
我正在尝试调用一个 API,该 API 又会触发我们的 sqlserver 数据库中的存储过程。这就是我编码的方式。
class Api_Name(Resource):
def __init__(self):
pass
@classmethod
def get(self):
try:
engine = database_engine
connection = engine.connect()
sql = "DECLARE @return_value int EXEC @return_value = [dbname].[dbo].[proc_name])
return call_proc(sql, apiname, starttime, connection)
except Exception as e:
return {'message': 'Proc execution failed with error => {error}'.format(error=e)}, 400
pass
Run Code Online (Sandbox Code Playgroud)
call_proc是我从数据库返回 JSON 的方法。
def call_proc(sql: str, connection):
try:
json_data = []
rv = connection.execute(sql)
for result in rv:
json_data.append(dict(zip(result.keys(), result)))
return Response(json.dumps(json_data), status=200)
except Exception as e:
return {'message': …Run Code Online (Sandbox Code Playgroud) 我正在学习 Python-Flask,发现有两种方法可以在应用程序中创建端点。
1. app.routing(/endpoint)
2. api.add_resource(CLASSNAME, endpoint)
Run Code Online (Sandbox Code Playgroud)
使用app.routing()我们可以在方法上添加一个端点并调用它。使用时api.add_resource()我们需要注册类名和端点。
我已经看到方法名称的给出就像get() & put()您使用api.add_resource()
For ex 一样:
app = Flask(__name__)
api = Api(app)
vehicles = []
class VehicleData(Resource):
parser = reqparse.RequestParser()
parser.add_argument('vehicle', type=str, required=True, help='name cannot be empty')
parser.add_argument('type', type=str, required=True, help='vehicle type cannot be empty')
parser.add_argument('wheels', type=int, required=True, help='number of wheels cannot be empty')
parser.add_argument('suv', type=bool, required=False, help='SUV or not can be empty')
def get(self, name):
vehicle = next(filter(lambda x: x['name'] == name, vehicles), None) …Run Code Online (Sandbox Code Playgroud) 在读取如下的配置单元表后,我尝试将数据写入 Kafka 主题。
\nwrite_kafka_data.py:\nread_df = spark.sql("select * from db.table where some_column in (\'ASIA\', \'Europe\')")\nfinal_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))\n\nfinal_df.write.format("kafka")\\\n .option("kafka.bootstrap.servers", kafka_broker)\\\n .option("kafka.batch.size", 51200)\\\n .option("retries", 3)\\\n .option("kafka.max.request.size", 500000)\\\n .option("kafka.max.block.ms", 120000)\\\n .option("kafka.metadata.max.age.ms", 120000)\\\n .option("kafka.request.timeout.ms", 120000)\\\n .option("kafka.linger.ms", 0)\\\n .option("kafka.delivery.timeout.ms", 130000)\\\n .option("acks", "1")\\\n .option("kafka.compression.type", "snappy")\\\n .option("kafka.security.protocol", "SASL_SSL")\\\n .option("kafka.sasl.jaas.config", oauth_config)\\\n .option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\\\n .option("kafka.sasl.mechanism", "OAUTHBEARER")\\\n .option("topic", \'topic_name\')\\\n .save()\nRun Code Online (Sandbox Code Playgroud)\n成功写入后(记录数为29000),我正在另一个文件中读取来自同一主题的数据,如下所示:\nread_kafka_data.py:
\n # SCHEMA\n schema = StructType([StructField("col1", StringType()),\n StructField("col2", IntegerType())\n ])\n\n # READ FROM TOPIC\n jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \\\n + " oauth.token.endpoint.uri=" + \'"\' + "uri" + \'"\' …Run Code Online (Sandbox Code Playgroud) apache-kafka apache-spark spark-streaming pyspark spark-structured-streaming
apache-spark ×4
hive ×3
pyspark ×3
python ×3
flask ×2
hadoop ×2
scala ×2
apache-kafka ×1
dataframe ×1
hive-serde ×1
hiveql ×1
jdbc ×1
json ×1
partitioning ×1
python-3.x ×1