我想过滤RDD源列:
val source = sql("SELECT * from sample.source").rdd.map(_.mkString(","))
val destination = sql("select * from sample.destination").rdd.map(_.mkString(","))
val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))
val src = source_primary_key.subtractByKey(destination_primary_key)
Run Code Online (Sandbox Code Playgroud)
我想在过滤条件中使用IN子句从源代码中仅过滤出src中存在的值,如下所示(EDITED):
val source = spark.read.csv(inputPath + "/source").rdd.map(_.mkString(","))
val destination = spark.read.csv(inputPath + "/destination").rdd.map(_.mkString(","))
val source_primary_key = source.map(rec => (rec.split(",")(0)))
val destination_primary_key = destination.map(rec => (rec.split(",")(0)))
val extra_in_source = source_primary_key.filter(rec._1 != destination_primary_key._1)
Run Code Online (Sandbox Code Playgroud)
等效的SQL代码是
SELECT * FROM SOURCE WHERE ID IN (select ID from src)
Run Code Online (Sandbox Code Playgroud)
谢谢
我有一个包含日期时间的字符串,我试图根据日期时间的出现来分割字符串,
data="2018-03-14 06:08:18, he went on \n2018-03-15 06:08:18, lets play"
Run Code Online (Sandbox Code Playgroud)
我在做什么,
out=re.split('^(2[0-3]|[01]?[0-9]):([0-5]?[0-9]):([0-5]?[0-9])$',data)
Run Code Online (Sandbox Code Playgroud)
我得到什么
["2018-03-14 06:08:18, he went on 2018-03-15 06:08:18, lets play"]
Run Code Online (Sandbox Code Playgroud)
我想要的是:
["2018-03-14 06:08:18, he went on","2018-03-15 06:08:18, lets play"]
Run Code Online (Sandbox Code Playgroud) 我有一个方案来比较来自两个单独的远程配置单元服务器的两个不同表的源和目标,我们可以使用两个SparkSessions如下所示的东西吗:
val spark = SparkSession.builder().master("local")
.appName("spark remote")
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
.config("javax.jdo.option.ConnectionUserName", "hiveroot")
.config("javax.jdo.option.ConnectionPassword", "hivepassword")
.config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
.config("hive.metastore.uris", "thrift://192.168.175.160:9083")
.enableHiveSupport()
.getOrCreate()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
val sparkdestination = SparkSession.builder()
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.42:3306/metastore?useSSL=false")
.config("javax.jdo.option.ConnectionUserName", "hiveroot")
.config("javax.jdo.option.ConnectionPassword", "hivepassword")
.config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
.config("hive.metastore.uris", "thrift://192.168.175.42:9083")
.enableHiveSupport()
.getOrCreate()
Run Code Online (Sandbox Code Playgroud)
我试过了, SparkSession.clearActiveSession() and SparkSession.clearDefaultSession()但没有用,并抛出以下错误:
Hive: Failed to access metastore. This class should not accessed in runtime.
org.apache.hadoop.hive.ql.metadata.HiveException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
Run Code Online (Sandbox Code Playgroud)
还有其他任何方法可以使用double SparkSessions或来访问两个hive表SparkContext。
谢谢
我有 df,像这样
States Counts
AK one
AK two
AK one
LO one
LO three
LO three
Run Code Online (Sandbox Code Playgroud)
尝试获取每种状态出现次数最多的计数
我的代码:
df.groupby('States')['Counts'].value_counts().first(), gives
Run Code Online (Sandbox Code Playgroud)
TypeError: first() missing 1 required positional argument: 'offset'
预期输出:
States Counts
AK one
LO three
Run Code Online (Sandbox Code Playgroud) 我有一个端点列表,如下所示。
endpoints ["/endpoint1", "/endpoint2", "/endpoint3"]
Run Code Online (Sandbox Code Playgroud)
我想在我的应用程序中创建动态端点并为所有端点创建 swagger API 文档,我该怎么做。
@app.route(<endpoint>):
def process():
Run Code Online (Sandbox Code Playgroud) 我有一个 df,
0 1 2 A
-0.740485792 -0.299824912 0.169113705 1
1.120120949 -0.62580736 0.013757667 2
-0.685112999 0.439492717 -0.484524907 3
Run Code Online (Sandbox Code Playgroud)
我正在尝试获取所有值都大于 0 的列名,
我试过(df > 0).all()
Out[47]:
0 False
1 False
2 False
A True
dtype: bool
Run Code Online (Sandbox Code Playgroud)
如何仅获取 True 的列名,
我的预期输出是“A”,提前致谢。
关于sort_index()的问题2
df2 = pd.DataFrame({"A":[3,2,1]}, index=[2,1,0])
Out[395]:
A
2 3
1 2
0 1
df2.sort_index(axis=1)
A
2 3
1 2
0 1
Run Code Online (Sandbox Code Playgroud)
预期输出是
A
0 3
1 2
2 1
Run Code Online (Sandbox Code Playgroud) 我有一个 df,
A B
one six
two seven
three level
five one
Run Code Online (Sandbox Code Playgroud)
和一本字典
my_dict={1:"one,two",2:"three,four"}
Run Code Online (Sandbox Code Playgroud)
我想用 my_dict keys() 替换 df.A。
我想要的输出是,
A B
1 six
1 seven
2 level
five one
Run Code Online (Sandbox Code Playgroud)
我尝试过df.A.replace(my_dict,regex=True),但没有成功。
我有一个这样的df,
df = pd.DataFrame({
"Name" : ["A","B","C","D","E","F","G"],
"part number" : ["1","3","2","1","5","1","2"],
"detail1" : ["A","C","B","B","E","E","E"],
"detail2" : ["one","three","two","two","five","five","five"]
})
df
Name part number detail1 detail2
A 1 A one
B 3 C three
C 2 B two
D 1 B two
E 5 E five
F 1 E five
G 2 E five
Run Code Online (Sandbox Code Playgroud)
我想按零件号分组并用第一行值填充detail1和detail2。
我的预期输出,
Name part number detail1 detail2
A 1 A one
B 3 C three
C 2 B two
D 1 A one
E 5 E five
F …Run Code Online (Sandbox Code Playgroud) 当我尝试运行 Spark Submit comment 时收到上述错误消息:
spark-submit --class "retail.DataValidator" --master local --executor-memory 2g --total-executor-cores 2 sample-spark-180417_2.11-1.0.jar /home/hduser/Downloads/inputfiles/ /home/hduser/output/
Run Code Online (Sandbox Code Playgroud)
错误信息:
Exception in thread "main" java.lang.NoClassDefFoundError: com/typesafe/config/ConfigFactory
at retail.DataValidator$.main(DataValidator.scala:12)
at retail.DataValidator.main(DataValidator.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:212)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:126)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: com.typesafe.config.ConfigFactory
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 11 more
Run Code Online (Sandbox Code Playgroud)
构建.sbt 文件:
name := "sample-spark-180417"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.1.0"
libraryDependencies += …Run Code Online (Sandbox Code Playgroud) In my application I compare two different Datasets(i.e source table from Hive and Destination from RDBMS) for duplications and mis-matches, it works fine with smaller dataset but when I try to compare data more that 1GB (source alone) it hangs and throws TIMEOUT ERROR, I tried .config("spark.network.timeout", "600s") even after increasing the network timeout it throwing java.lang.OutOfMemoryError: GC overhead limit exceeded.
val spark = SparkSession.builder().master("local")
.appName("spark remote")
.config("javax.jdo.option.ConnectionURL", "jdbc:mysql://192.168.175.160:3306/metastore?useSSL=false")
.config("javax.jdo.option.ConnectionUserName", "hiveroot")
.config("javax.jdo.option.ConnectionPassword", "hivepassword")
.config("hive.exec.scratchdir", "/tmp/hive/${user.name}")
.config("hive.metastore.uris", "thrift://192.168.175.160:9083")
.enableHiveSupport() …Run Code Online (Sandbox Code Playgroud) 我有一个像这样的数据框,
Roll No date status Name
1 1/1/2020 on A
2 1/1/2020 on A
3 1/1/2020 on B
Run Code Online (Sandbox Code Playgroud)
我正在尝试创建一个字典,其中键是名称,值是卷号列表,我无法处理重复的名称,
我的预期输出是
{
"A" :[1,2],
"B" :[3]
}
Run Code Online (Sandbox Code Playgroud)
我可以通过迭代数据框来获取输出,我正在寻找一种pandorable方法,谢谢。
我有一个镶木地板文件,-它的列名中有减号(即student-name)。我正在尝试将案例类用作编码器,同时读取如下所示的镶木地板文件:
case class Student (student-name : String, student_age : String)
object abcd {
val student_details = spark.read.parquet('/path/to/parquet-file').as[Student]
}
Run Code Online (Sandbox Code Playgroud)
但问题是-case 类中的减号抛出错误。我试过用反引号 (`) 封闭它没有帮助。请在下面找到错误列表。
没有反引号的错误
case class Student (student-name : String, student_age : String)
:expected
Wrong top statement declaration
Run Code Online (Sandbox Code Playgroud)
反引号错误
case class Student (`student-name` : String, student_age : String)
cannot resolve '`student$minusname`' given input columns:[student-name,student_age]
Run Code Online (Sandbox Code Playgroud)
任何帮助将不胜感激!
谢谢
我有一个这样的清单..
[
[
("a", 1)
] ,
[
("b", 2)
],
[
("c", 3),
("d", 4)
],
[
("e", 5),
("f", 6),
("g", 7)
]
]
Run Code Online (Sandbox Code Playgroud)
我试图从此列表数据中获取所有可能的组合。
我的预期输出应如下所示。
[
[
("a", 1),
("b", 2),
("c", 3),
("e", 5)
],
[
("a", 1),
("b", 2),
("c", 3),
("f", 6)
],
[
("a", 1),
("b", 2),
("c", 3),
("g", 7)
],
[
("a", 1),
("b", 2),
("d", 4),
("e", 5)
],
[
("a", 1),
("b", 2),
("d", 4),
("f", 6)
],
[ …Run Code Online (Sandbox Code Playgroud) python ×8
pandas ×6
apache-spark ×5
dataframe ×3
scala ×3
group-by ×2
hive ×2
python-3.x ×2
combinations ×1
datetime ×1
dictionary ×1
fastapi ×1
list ×1
regex ×1
swagger ×1
transform ×1
tuples ×1