小编use*_*398的帖子

Python落后于Pyspark系列

我试图在pyspark中修改这个Python代码:

from statsmodels.tsa.tsatools import lagmat

def lag_func(data,lag):
    lag = lag
    X = lagmat(data["diff"], lag)
    lagged = data.copy()
    for c in range(1,lag+1):
        lagged["lag%d" % c] = X[:, c-1]
    return lagged

def diff_creation(data):
    data["diff"] = np.nan
    data.ix[1:, "diff"] = (data.iloc[1:, 1].as_matrix() - data.iloc[:len(data)-1, 1].as_matrix())
    return data
Run Code Online (Sandbox Code Playgroud)

结果是具有滞后列的数据帧.

我试过这样的事情:

class SerieMaker(Transformer):
    def __init__(self, inputCol='f_qty_recalc', outputCol='serie', dateCol='dt_ticket_sale', idCol= ['id_store', 'id_sku'], serieSize=30):
        self.inputCol = inputCol
        self.outputCol = outputCol
        self.dateCol = dateCol
        self.serieSize = serieSize
        self.idCol = idCol

    def _transform(self, df):
        window = Window.partitionBy(self.idCol).orderBy(self.dateCol)
        series = …
Run Code Online (Sandbox Code Playgroud)

python apache-spark pyspark

16
推荐指数
0
解决办法
532
查看次数

带有自定义架构的 Spark 读取镶木地板

我正在尝试使用带有自定义架构的镶木地板格式导入数据,但它返回:TypeError:option() 缺少 1 个必需的位置参数:“值”

   ProductCustomSchema = StructType([
        StructField("id_sku", IntegerType(), True),
        StructField("flag_piece", StringType(), True),
        StructField("flag_weight", StringType(), True),
        StructField("ds_sku", StringType(), True),
        StructField("qty_pack", FloatType(), True)])

def read_parquet_(path, schema) : 
    return spark.read.format("parquet")\
                             .option(schema)\
                             .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")\
                             .load(path)

product_nomenclature = 'C:/Users/alexa/Downloads/product_nomenc'
product_nom = read_parquet_(product_nomenclature, ProductCustomSchema)
Run Code Online (Sandbox Code Playgroud)

apache-spark apache-spark-sql pyspark

7
推荐指数
1
解决办法
2万
查看次数

标签 统计

apache-spark ×2

pyspark ×2

apache-spark-sql ×1

python ×1