小编bra*_*raj的帖子

使用 Between 子句连接两个 pyspark 数据帧以从一系列 Ip 中查找 ip 详细信息

我正在尝试为以下 sql 查询编写 pyspark 代码:

Create table table1 as
Select a.ip_address,a.ip_number,b.ip_start_int,b.ip_end_int,b.post_code_id,b.city,b.region_name,b.two_letter_country
from nk_ip_address_check a 
join 
ip_additional_pulse b
on a.ip_number between b.ip_start_int and b.ip_end_int
Run Code Online (Sandbox Code Playgroud)

上面的查询在两个表之间进行连接,并使用“ Between ”子句和“ on ”子句。我写了一个UDF,它的作用相同,但看起来很慢。有什么方法可以在 pyspark 代码中编写上述查询,这会给我更好的性能。

下面是我正在使用的代码

def ip_mapping(ip_int):
    ip_qry = "select country_code,region_code,city_code,postal_code from de_pulse_ip_pqt where ip_start_int < {} and ip_end_int > {}".format(ip_int,ip_int)
    result = spark.sql(ip_qry)
    country_code = result.rdd.map(lambda x: x['country_code']).first()
    return country_code

ip_mapped = udf(ip_mapping, IntegerType())  
df_final = df.withColumn("country_code", ip_mapped("ip_int"))
Run Code Online (Sandbox Code Playgroud)

这是非常低效的。此外,如果我有region_code,我必须通过更改函数ip_mapping的返回值来调用。

df_final = df.withColumn("region_code", ip_mapped("ip_int"))
Run Code Online (Sandbox Code Playgroud)

join apache-spark apache-spark-sql pyspark

5
推荐指数
1
解决办法
7268
查看次数

EMR 5.x | Spark on Yarn | 退出代码137和Java堆空间错误

Container exited with a non-zero exit code 137在纱线上运行火花时遇到了这个错误.经过一段时间后我尝试了几种技术但没有帮助.火花配置如下所示:

spark.driver.memory 10G
spark.driver.maxResultSize  2G
spark.memory.fraction   0.8
Run Code Online (Sandbox Code Playgroud)

我在客户端模式下使用yarn. spark-submit --packages com.databricks:spark-redshift_2.10:0.5.0 --jars RedshiftJDBC4-1.2.1.1001.jar elevatedailyjob.py > log5.out 2>&1 &

示例代码:

# Load the file (its a single file of 3.2GB)

my_df = spark.read.csv('s3://bucket-name/path/file_additional.txt.gz', schema=MySchema, sep=';', header=True)

# write the de_pulse_ip data into parquet format
my_df = my_df.select("ip_start","ip_end","country_code","region_code","city_code","ip_start_int","ip_end_int","postal_code").repartition(50)
my_df.write.parquet("s3://analyst-adhoc/elevate/tempData/de_pulse_ip1.parquet", mode = "overwrite")

# read my_df data intp dataframe from parquet files
my_df1 = spark.read.parquet("s3://bucket-name/path/my_df.parquet").repartition("ip_start_int","ip_end_int")

#join with another dataset 200 MB
my_df2 = my_df.join(my_df1, …
Run Code Online (Sandbox Code Playgroud)

hadoop-yarn apache-spark apache-spark-sql pyspark

1
推荐指数
1
解决办法
5734
查看次数