对于输入表中的每一行,我需要通过基于每月分隔日期范围来生成多行.(请参阅以下示例输出).
有一种简单的迭代方法可以逐行转换,但在大型数据帧上却非常慢.
任何人都可以建议使用矢量化方法,例如使用apply(),map()等来实现目标吗?
输出表是一个新表.
输入:
ID, START_DATE, END_DATE
1, 2010-12-08, 2011-03-01
2, 2010-12-10, 2011-01-12
3, 2010-12-16, 2011-03-07
Run Code Online (Sandbox Code Playgroud)
输出:
ID, START_DATE, END_DATE, NUMBER_DAYS, ACTION_DATE
1, 2010-12-08, 2010-12-31, 23, 201012
1, 2010-12-08, 2011-01-31, 54, 201101
1, 2010-12-08, 2011-02-28, 82, 201102
1, 2010-12-08, 2011-03-01, 83, 201103
2, 2010-12-10, 2010-12-31, 21, 201012
2, 2010-12-10, 2011-01-12, 33, 201101
3, 2010-12-16, 2010-12-31, 15, 201012
4, 2010-12-16, 2011-01-31, 46, 201101
5, 2010-12-16, 2011-02-28, 74, 201102
6, 2010-12-16, 2011-03-07, 81, 201103
Run Code Online (Sandbox Code Playgroud) 当我尝试将spark数据帧写入postgres DB时出现此错误。我正在使用本地群集,代码如下:
from pyspark import SparkContext
from pyspark import SQLContext, SparkConf
import os
os.environ["SPARK_CLASSPATH"] = '/usr/share/java/postgresql-jdbc4.jar'
conf = SparkConf() \
.setMaster('local[2]') \
.setAppName("test")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
df = sc.parallelize([("a", "b", "c", "d")]).toDF()
url_connect = "jdbc:postgresql://localhost:5432"
table = "table_test"
mode = "overwrite"
properties = {"user":"postgres", "password":"12345678"}
df.write.option('driver', 'org.postgresql.Driver').jdbc(
url_connect, table, mode, properties)
Run Code Online (Sandbox Code Playgroud)
错误日志如下:
Py4JJavaError: An error occurred while calling o119.jdbc.
: java.lang.NullPointerException
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:308)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at …
Run Code Online (Sandbox Code Playgroud) 火花群集设置如下:
conf['SparkConfiguration'] = SparkConf() \
.setMaster('yarn-client') \
.setAppName("test") \
.set("spark.executor.memory", "20g") \
.set("spark.driver.maxResultSize", "20g") \
.set("spark.executor.instances", "20")\
.set("spark.executor.cores", "3") \
.set("spark.memory.fraction", "0.2") \
.set("user", "test_user") \
.set("spark.executor.extraClassPath", "/usr/share/java/postgresql-jdbc3.jar")
Run Code Online (Sandbox Code Playgroud)
当我尝试使用以下代码将数据帧写入Postgres DB时:
from pyspark.sql import DataFrameWriter
my_writer = DataFrameWriter(df)
url_connect = "jdbc:postgresql://198.123.43.24:1234"
table = "test_result"
mode = "overwrite"
properties = {"user":"postgres", "password":"password"}
my_writer.jdbc(url_connect, table, mode, properties)
Run Code Online (Sandbox Code Playgroud)
我遇到以下错误:
Py4JJavaError: An error occurred while calling o1120.jdbc.
:java.sql.SQLException: No suitable driver
at java.sql.DriverManager.getDriver(DriverManager.java:278)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$2.apply(JdbcUtils.scala:50)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.createConnectionFactory(JdbcUtils.scala:49)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native …
Run Code Online (Sandbox Code Playgroud) 我有一个熊猫数据框,其中某些字段包含汉字。我使用以下代码:
df = pd.read_csv('original.csv', encoding='utf-8')
df.to_csv('saved.csv')
Run Code Online (Sandbox Code Playgroud)
然后,我使用excel或文本编辑器打开saved.csv。所有汉字都变成垃圾字符。但是,我能够加载已保存的文件并正确显示中文,如下所示。
df = pd.read_csv('saved.csv')
df.head() # Chinese characters are properly displayed.
Run Code Online (Sandbox Code Playgroud)
有人知道如何解决问题吗?
我想知道我们如何进行以下从 sql 到 mongoDB 的转换:
假设该表具有以下结构:
table
=====
-----
##id contribution time
1 300 Jan 2, 1990
2 1000 March 3, 1991
Run Code Online (Sandbox Code Playgroud)
我想找到一个按照贡献数量降序排列的 ids 排名列表。
'$' 这是我使用 sql 所做的:
select id, count(*) c from table group by id order by c desc;
Run Code Online (Sandbox Code Playgroud)
如何使用 count()、order() 和 group() 将这个复杂的 sql 转换为 mongoDB?
非常感谢!
dataframe ×3
apache-spark ×2
pandas ×2
postgresql ×2
python ×2
jdbc ×1
mongodb ×1
pyspark ×1
sql ×1