小编hi-*_*zir的帖子

在 PySpark 中转置 RowMatrix

您好,我想知道如何在 PySpark 中转置 RowMatrix。

data = [(MLLibVectors.dense([1.0, 2.0]), ), (MLLibVectors.dense([3.0, 4.0]), )]

df=sqlContext.createDataFrame(data, ["features"])
features=df.select("features").rdd.map(lambda row: row[0])

mat=RowMatrix(features)
print mat.rows.first()
#[1.0,2.0]

mat=mat.Transpose()

print mat.rows.first()
#[1.0,3.0]
Run Code Online (Sandbox Code Playgroud)

有人用Python实现这个吗?我看过类似的帖子,但一切都在 Scala 中。谢谢。

python apache-spark pyspark

4
推荐指数
1
解决办法
2741
查看次数

Apache Spark 文本相似度

我正在尝试以下 Java 示例

Apache Spark 中的高效字符串匹配

这是我的代码

public class App {
    public static void main(String[] args) {
        System.out.println("Hello World!");

        System.setProperty("hadoop.home.dir", "D:\\del");

        List<MyRecord> firstRow = new ArrayList<MyRecord>();
        firstRow.add(new App().new MyRecord("1", "Love is blind"));

        List<MyRecord> secondRow = new ArrayList<MyRecord>();
        secondRow.add(new App().new MyRecord("1", "Luv is blind"));

        SparkSession spark = SparkSession.builder().appName("LSHExample").config("spark.master", "local")
                .getOrCreate();

        Dataset firstDataFrame = spark.createDataFrame(firstRow, MyRecord.class);
        Dataset secondDataFrame = spark.createDataFrame(secondRow, MyRecord.class);

        firstDataFrame.show(20, false);
        secondDataFrame.show(20, false);

        RegexTokenizer regexTokenizer = new RegexTokenizer().setInputCol("text").setOutputCol("words")
                .setPattern("\\W");
        NGram ngramTransformer = new NGram().setN(3).setInputCol("words").setOutputCol("ngrams");
        HashingTF hashingTF = new HashingTF().setInputCol("ngrams").setOutputCol("vectors");
        MinHashLSH …
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-ml apache-spark-mllib

4
推荐指数
1
解决办法
1922
查看次数

什么是在pyspark中列出不同数据帧列的正确方法?

我想在spark数据帧中总结不同的列.

from pyspark.sql import functions as F
cols = ["A.p1","B.p1"]
df = spark.createDataFrame([[1,2],[4,89],[12,60]],schema=cols)

# 1. Works
df = df.withColumn('sum1', sum([df[col] for col in ["`A.p1`","`B.p1`"]]))

#2. Doesnt work
df = df.withColumn('sum1', F.sum([df[col] for col in ["`A.p1`","`B.p1`"]]))

#3. Doesnt work
df = df.withColumn('sum1', sum(df.select(["`A.p1`","`B.p1`"])))
Run Code Online (Sandbox Code Playgroud)

为什么不接近#2..不工作?我在Spark 2.2上

python apache-spark apache-spark-sql pyspark pyspark-sql

4
推荐指数
1
解决办法
8822
查看次数

如何使用 cypher-shell 不在本地主机上连接到 Bolt

我真的很想从 Spark Slave 连接到 Bolt,但我想如果我能让它与 cypher-shell 一起工作,其余的就会到位。所以我可以这样做:

cypher-shell -a localhost
Run Code Online (Sandbox Code Playgroud)

连接良好

如果我在实际 ip 的盒子上执行此操作:

cypher-shell -a 172.20.0.71:7687
Run Code Online (Sandbox Code Playgroud)

连接被拒绝,我以为我所要做的就是在conf中做:

dbms.connector.bolt.address=0.0.0.0:7687
Run Code Online (Sandbox Code Playgroud)

这似乎不起作用,我还缺少其他东西吗?(或者至少上面的内容似乎不起作用,我确实重新启动了我的 neo4j 实例,因此它应该接受对 conf 的更改)。仅使用 cypher-shell 上的默认设置进行连接确实会给出以下结果:

Connected to Neo4j 3.3.0 at bolt://localhost:7687 as user neo4j.
Type :help for a list of available commands or :exit to exit the shell.
Note that Cypher queries must end with a semicolon.
Run Code Online (Sandbox Code Playgroud)

那么也许它没有完全重新启动并接受对 的更改neo4j.conf?或者更可能的是我错过了其他东西来让它按照我认为应该的方式工作。我可以在哪里从另一个 ip 或本地通过实际 ip 地址进行连接?

neo4j cypher

4
推荐指数
1
解决办法
1999
查看次数

如何在PySpark Dataframe列中将日期转换为月的第一天?

我有以下DataFrame:

+----------+
|      date|
+----------+
|2017-01-25|
|2017-01-21|
|2017-01-12|
+----------+
Run Code Online (Sandbox Code Playgroud)

以下是DataFrame上面创建的代码:

import pyspark.sql.functions as f
rdd = sc.parallelize([("2017/11/25",), ("2017/12/21",), ("2017/09/12",)])
df = sqlContext.createDataFrame(rdd, ["date"]).withColumn("date", f.to_date(f.col("date"), "yyyy/MM/dd"))
df.show()
Run Code Online (Sandbox Code Playgroud)

我想要一个新列,每行的第一个日期,只需在所有日期将日期替换为"01"

+----------++----------+
|      date| first_date|
+----------++----------+
|2017-11-25| 2017-11-01|
|2017-12-21| 2017-12-01|
|2017-09-12| 2017-09-01|
+----------+-----------+
Run Code Online (Sandbox Code Playgroud)

PySpark.sql.function中有一个last_day函数,但是没有first_day函数.

我尝试使用date_sub执行此操作但不起作用:我得到一个列而不是Iterable错误,因为date_sub的第二个参数不能是一个列而必须是一个整数.

f.date_sub(f.col('date'), f.dayofmonth(f.col('date')) - 1 )
Run Code Online (Sandbox Code Playgroud)

python apache-spark apache-spark-sql pyspark

4
推荐指数
1
解决办法
5354
查看次数

自动 - 递增 pyspark 数据框列值

我试图在数据框中生成一个附加列,并根据全局值自动递增值。但是,所有行都是使用相同的值生成的,并且该值不会递增。

这是代码

def autoIncrement():
    global rec
    if (rec == 0) : rec = 1 
    else : rec = rec + 1
    return int(rec)

rec=14
Run Code Online (Sandbox Code Playgroud)

UDF

autoIncrementUDF = udf(autoIncrement,  IntegerType())


df1 = hiveContext.sql("select id,name,location,state,datetime,zipcode from demo.target")

df1.withColumn("id2", autoIncrementUDF()).show()
Run Code Online (Sandbox Code Playgroud)

这是结果 df

+---+------+--------+----------+-------------------+-------+---+
| id|  name|location|     state|           datetime|zipcode|id2|
+---+------+--------+----------+-------------------+-------+---+
| 20|pankaj| Chennai| TamilNadu|2018-03-26 11:00:00|   NULL| 15|
| 10|geetha| Newyork|New Jersey|2018-03-27 10:00:00|   NULL| 15|
| 25| pawan| Chennai| TamilNadu|2018-03-27 11:25:00|   NULL| 15|
| 30|Manish| Gurgoan|   Gujarat|2018-03-27 11:00:00|   NULL| 15|
+---+------+--------+----------+-------------------+-------+---+
Run Code Online (Sandbox Code Playgroud)

但我期待下面的结果

+---+------+--------+----------+-------------------+-------+---+
| id| …
Run Code Online (Sandbox Code Playgroud)

python user-defined-functions apache-spark apache-spark-sql pyspark

4
推荐指数
1
解决办法
1万
查看次数

是否有任何性能问题迫使在 spark 中使用计数进行急切评估?

通常我Dataset.count在 3 个场景中看到整个代码库:

  1. 日志记录 log.info("this ds has ${dataset.count} rows")
  2. 分枝 if (dataset.count > 0) do x else do y
  3. 强制缓存 dataset.persist.count

它是否会通过强制查询优化器在任何这些场景中过早地急切来阻止查询优化器创建最有效的 dag?

apache-spark

4
推荐指数
1
解决办法
1092
查看次数

Spark - 将整行传递给 udf,然后在 udf 中获取列名

我正在将 Spark 与 Scala 一起使用,并希望将整行传递给 udf 并选择 udf 中的每个列名和列值。我怎样才能做到这一点?

我正在尝试以下 -

inputDataDF.withColumn("errorField", mapCategory(ruleForNullValidation) (col(_*)))

def mapCategory(categories: Map[String, Boolean]) = {
  udf((input:Row) =>  //write a recursive function to check if each row is in categories if yes check for null if null then false, repeat this for all columns and then combine results)   
})
Run Code Online (Sandbox Code Playgroud)

scala apache-spark

4
推荐指数
1
解决办法
6043
查看次数

How to standardize ONE column in Spark using StandardScaler?

I am trying to standardize (mean = 0, std = 1) one column ('age') in my data frame. Below is my code in Spark (Python):

from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

# Make my 'age' column an assembler type:
age_assembler = VectorAssembler(inputCols= ['age'], outputCol = "age_feature")

# Create a scaler that takes 'age_feature' as an input column:
scaler = StandardScaler(inputCol="age_feature", outputCol="age_scaled",
                        withStd=True, withMean=True)

# Creating a mini-pipeline for those 2 steps:
age_pipeline = Pipeline(stages=[age_assembler, …
Run Code Online (Sandbox Code Playgroud)

python scale apache-spark pyspark

3
推荐指数
1
解决办法
3248
查看次数

如何在pyspark管道中添加UDF?

我有以下代码,它基本上是在做特征工程管道:

token_q1=Tokenizer(inputCol='question1',outputCol='question1_tokens') 
token_q2=Tokenizer(inputCol='question2',outputCol='question2_tokens')  

remover_q1=StopWordsRemover(inputCol='question1_tokens',outputCol='question1_tokens_filtered')
remover_q2=StopWordsRemover(inputCol='question2_tokens',outputCol='question2_tokens_filtered')

q1w2model = Word2Vec(inputCol='question1_tokens_filtered',outputCol='q1_vectors')
q1w2model.setSeed(1)

q2w2model = Word2Vec(inputCol='question2_tokens_filtered',outputCol='q2_vectors')
q2w2model.setSeed(1)

pipeline=Pipeline(stages[token_q1,token_q2,remover_q1,remover_q2,q1w2model,q2w2model])
model=pipeline.fit(train)
result=model.transform(train)
result.show()
Run Code Online (Sandbox Code Playgroud)

我想将以下 UDF 添加到上述管道中:

charcount_q1 = F.udf(lambda row : sum([len(char) for char in row]),IntegerType())
Run Code Online (Sandbox Code Playgroud)

当我这样做时,我收到 Java 错误。有人可以指出我正确的方向吗?

但是,我使用以下基本有效的代码添加了此列:

charCountq1=train.withColumn("charcountq1", charcount_q1("question1"))
Run Code Online (Sandbox Code Playgroud)

但我想将它添加到管道中而不是这样做

apache-spark apache-spark-sql pyspark

3
推荐指数
1
解决办法
2081
查看次数