pyspark 按位与与与号运算符

wrs*_*der 1 apache-spark pyspark

我正在尝试向数据框中添加一列,指示何时在嵌套数组中找到两个不同的值

 expr1 = array_contains(df.child_list, "value1")
 expr2 = array_contains(df.child_list, "value2")
Run Code Online (Sandbox Code Playgroud)

我让它与 & 号运算符一起工作

 df.select(...).withColumn("boolTest", expr1 & expr2)
Run Code Online (Sandbox Code Playgroud)

然后我试图用这样bitwiseAND的想法来代替它,我想要一个这些表达式的列表,并且动态地结合在一起。

这失败并出现错误

 df.select(...).withColumn("boolTest", expr1.bitwiseAND(expr2))

 cannot resolve ..... due to data type mismatch: '(array_contains(c1.`child_list`, 'value1') & 
array_contains(c1.`child_list`, 'value2'))' requires integral type, 
not boolean;;
Run Code Online (Sandbox Code Playgroud)

有什么区别,我做错了什么?

Rya*_*ier 10

& 和 | 运算符在 pyspark 中的 BooleanType 列上工作,作为逻辑 AND 和 OR 操作。换句话说,他们将 True/False 作为输入并输出 True/False。

bitwiseAND 函数对两个数值进行逐位与运算。所以他们可以取两个整数并输出它们的按位与。

以下是每个示例:

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([   
  StructField("b1", BooleanType()), 
  StructField("b2", BooleanType()),
  StructField("int1", IntegerType()), 
  StructField("int2", IntegerType())
])
data = [
  (True, True, 0x01, 0x01), 
  (True, False, 0xFF, 0xA), 
  (False, False, 0x01, 0x00)
]

df = sqlContext.createDataFrame(sc.parallelize(data), schema)


df2 = df.withColumn("logical", df.b1 & df.b2) \
        .withColumn("bitwise", df.int1.bitwiseAND(df.int2))

df2.printSchema()
df2.show()

+-----+-----+----+----+-------+-------+
|   b1|   b2|int1|int2|logical|bitwise|
+-----+-----+----+----+-------+-------+
| true| true|   1|   1|   true|      1|
| true|false| 255|  10|  false|     10|
|false|false|   1|   0|  false|      0|
+-----+-----+----+----+-------+-------+


>>> df2.printSchema()
root
 |-- b1: boolean (nullable = true)
 |-- b2: boolean (nullable = true)
 |-- int1: integer (nullable = true)
 |-- int2: integer (nullable = true)
 |-- logical: boolean (nullable = true)
 |-- bitwise: integer (nullable = true)
Run Code Online (Sandbox Code Playgroud)

如果你想动态地将一个列列表组合在一起,你可以这样做:

columns = [col("b1"), col("b2")]
df.withColumn("result", reduce(lambda a, b: a & b, columns))
Run Code Online (Sandbox Code Playgroud)