我试图在pyspark中修改这个Python代码:
from statsmodels.tsa.tsatools import lagmat
def lag_func(data,lag):
lag = lag
X = lagmat(data["diff"], lag)
lagged = data.copy()
for c in range(1,lag+1):
lagged["lag%d" % c] = X[:, c-1]
return lagged
def diff_creation(data):
data["diff"] = np.nan
data.ix[1:, "diff"] = (data.iloc[1:, 1].as_matrix() - data.iloc[:len(data)-1, 1].as_matrix())
return data
Run Code Online (Sandbox Code Playgroud)
结果是具有滞后列的数据帧.
我试过这样的事情:
class SerieMaker(Transformer):
def __init__(self, inputCol='f_qty_recalc', outputCol='serie', dateCol='dt_ticket_sale', idCol= ['id_store', 'id_sku'], serieSize=30):
self.inputCol = inputCol
self.outputCol = outputCol
self.dateCol = dateCol
self.serieSize = serieSize
self.idCol = idCol
def _transform(self, df):
window = Window.partitionBy(self.idCol).orderBy(self.dateCol)
series = …Run Code Online (Sandbox Code Playgroud) 我正在尝试使用带有自定义架构的镶木地板格式导入数据,但它返回:TypeError:option() 缺少 1 个必需的位置参数:“值”
ProductCustomSchema = StructType([
StructField("id_sku", IntegerType(), True),
StructField("flag_piece", StringType(), True),
StructField("flag_weight", StringType(), True),
StructField("ds_sku", StringType(), True),
StructField("qty_pack", FloatType(), True)])
def read_parquet_(path, schema) :
return spark.read.format("parquet")\
.option(schema)\
.option("timestampFormat", "yyyy/MM/dd HH:mm:ss")\
.load(path)
product_nomenclature = 'C:/Users/alexa/Downloads/product_nomenc'
product_nom = read_parquet_(product_nomenclature, ProductCustomSchema)
Run Code Online (Sandbox Code Playgroud)