con*_*xyz 12 python case-class apache-spark pyspark
您将如何在PySpark中使用和/或实现等效的案例类?
hi-*_*zir 16
如前所述由亚历克斯·霍尔命名的产品类型的一个真正的等价物,是一个namedtuple.
与其他答案中Row建议的不同,它具有许多有用的属性:
具有良好的形状,可以可靠地用于结构模式匹配:
>>> from collections import namedtuple
>>>
>>> FooBar = namedtuple("FooBar", ["foo", "bar"])
>>> foobar = FooBar(42, -42)
>>> foo, bar = foobar
>>> foo
42
>>> bar
-42
相反Rows ,与关键字参数一起使用时不可靠:
>>> from pyspark.sql import Row
>>>
>>> foobar = Row(foo=42, bar=-42)
>>> foo, bar = foobar
>>> foo
-42
>>> bar
42
虽然如果用位置参数定义:
>>> FooBar = Row("foo", "bar")
>>> foobar = FooBar(42, -42)
>>> foo, bar = foobar
>>> foo
42
>>> bar
-42
订单保留.
定义合适的类型
>>> from functools import singledispatch
>>> 
>>> FooBar = namedtuple("FooBar", ["foo", "bar"])
>>> type(FooBar)
<class 'type'>
>>> isinstance(FooBar(42, -42), FooBar)
True
并且可以在需要类型处理时使用,特别是单个:
>>> Circle = namedtuple("Circle", ["x", "y", "r"])
>>> Rectangle = namedtuple("Rectangle", ["x1", "y1", "x2", "y2"])
>>>
>>> @singledispatch
... def area(x):
...     raise NotImplementedError
... 
... 
>>> @area.register(Rectangle)
... def _(x):
...     return abs(x.x1 - x.x2) * abs(x.y1 - x.y2)
... 
... 
>>> @area.register(Circle)
... def _(x):
...     return math.pi * x.r ** 2
... 
... 
>>>
>>> area(Rectangle(0, 0, 4, 4))
16
>>> >>> area(Circle(0, 0, 4))
50.26548245743669
和多次发送:
>>> from multipledispatch import dispatch
>>> from numbers import Rational
>>>
>>> @dispatch(Rectangle, Rational)
... def scale(x, y):
...     return Rectangle(x.x1, x.y1, x.x2 * y, x.y2 * y)
... 
... 
>>> @dispatch(Circle, Rational)
... def scale(x, y):
...     return Circle(x.x, x.y, x.r * y)
...
...
>>> scale(Rectangle(0, 0, 4, 4), 2)
Rectangle(x1=0, y1=0, x2=8, y2=8)
>>> scale(Circle(0, 0, 11), 2)
Circle(x=0, y=0, r=22)
并且与第一个属性相结合,可以用于各种模式匹配场景.namedtuples还支持标准继承和类型提示.
Rows 别:
>>> FooBar = Row("foo", "bar")
>>> type(FooBar)
<class 'pyspark.sql.types.Row'>
>>> isinstance(FooBar(42, -42), FooBar)  # Expected failure
Traceback (most recent call last):
...
TypeError: isinstance() arg 2 must be a type or tuple of types
>>> BarFoo = Row("bar", "foo")
>>> isinstance(FooBar(42, -42), type(BarFoo))
True
>>> isinstance(BarFoo(42, -42), type(FooBar))
True
提供高度优化的表示.与Row对象不同,元组不会__dict__对每个实例使用和携带字段名称.因此,初始化的速度可以快一些:
>>> FooBar = namedtuple("FooBar", ["foo", "bar"])
>>> %timeit FooBar(42, -42)
587 ns ± 5.28 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
与不同的Row构造者相比:
>>> %timeit Row(foo=42, bar=-42)
3.91 µs ± 7.67 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
>>> FooBar = Row("foo", "bar")
>>> %timeit FooBar(42, -42)
2 µs ± 25.4 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
并且具有更高的内存效率(使用大规模数据时非常重要的属性):
>>> import sys
>>> FooBar = namedtuple("FooBar", ["foo", "bar"])
>>> sys.getsizeof(FooBar(42, -42))
64
相比之下 Row
>>> sys.getsizeof(Row(foo=42, bar=-42))
72
最后,属性访问速度提高了一个数量级namedtuple:
>>> FooBar = namedtuple("FooBar", ["foo", "bar"])
>>> foobar = FooBar(42, -42)
>>> %timeit foobar.foo
102 ns ± 1.33 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
与Row对象上的等效操作相比:
>>> foobar = Row(foo=42, bar=-42)
>>> %timeit foobar.foo
2.58 µs ± 26.9 ns per loop (mean ± std. dev. of 7 runs, 100000 loops each)
最后但并非最不重要的namedtuples是Spark SQL中正确支持
>>> Record = namedtuple("Record", ["id", "name", "value"])
>>> spark.createDataFrame([Record(1, "foo", 42)])
DataFrame[id: bigint, name: string, value: bigint]
摘要:
应该清楚的Row是,实际产品类型的替代品非常差,除非Spark API强制执行,否则应该避免使用.
还应该清楚的pyspark.sql.Row是,当你考虑它时,它不是一个案例类的替代品,它直接等同于org.apache.spark.sql.Row- 与实际产品相当远的类型,并且表现得像Seq[Any](取决于子类,添加了名称) ).Python和Scala实现都是作为外部代码和内部Spark SQL表示之间有用的,尽管很尴尬的接口而引入的.
另见:
这将是一种耻辱,更何况真棒MacroPy通过开发黎蒿揖及其端口(MacroPy3阿尔贝托贝尔蒂):
>>> import macropy.console
0=[]=====> MacroPy Enabled <=====[]=0
>>> from macropy.case_classes import macros, case
>>> @case
... class FooBar(foo, bar): pass
... 
>>> foobar = FooBar(42, -42)
>>> foo, bar = foobar
>>> foo
42
>>> bar
-42
它具有丰富的其他功能,包括但不限于高级模式匹配和简洁的lambda表达式语法.
Python dataclasses(Python 3.7+).
如果您转到sql-programming-guide中使用反射推断架构部分,您将看到case class被定义为
案例类定义表的模式。使用反射读取案例类的参数名称并成为列的名称。案例类还可以嵌套或包含复杂类型,例如序列或数组。
举例如下
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
在同一部分中,如果您切换到python即pyspark,您将看到Row被使用并定义为
行是通过将键/值对列表作为 kwargs 传递给 Row 类来构造的。该列表的键定义了表的列名,并且通过查看第一行来推断类型。
举例如下
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
schemaPeople = sqlContext.createDataFrame(people)
所以解释的结论是可以像在pysparkRow中一样使用case class
| 归档时间: | 
 | 
| 查看次数: | 4619 次 | 
| 最近记录: |