标签: pipeline

如何在Python中并行化生成器/迭代器管道?

假设我有一些Python代码,如下所示:

input = open("input.txt")
x = (process_line(line) for line in input)
y = (process_item(item) for item in x)
z = (generate_output_line(item) + "\n" for item in y)
output = open("output.txt", "w")
output.writelines(z)
Run Code Online (Sandbox Code Playgroud)

此代码从输入文件中读取每一行,通过多个函数运行它,并将输出写入输出文件.现在知道函数process_line,process_item并且generate_output_line永远不会相互干扰,让我们假设输入和输出文件位于不同的磁盘上,这样读写就不会相互干扰.

但Python可能不知道这些.我的理解是Python将读取一行,依次应用每个函数,并将结果写入输出,然后只有将第一行发送到输出才会读取第二行,这样第二行就不会进入管道直到第一个退出.我是否正确理解该程序将如何流动?如果这是它的工作方式,是否有任何简单的方法可以使多个行同时在管道中,以便程序并行读取,写入和处理每个步骤?

python parallel-processing iterator pipeline

7
推荐指数
1
解决办法
5126
查看次数

使用Gstreamer中的Pipeline播放音频和视频(Python)

有没有办法制作一个可以播放任何视频文件的管道(也包含音频)?我试过链接元素,如:

filesrc -> decodebin
Run Code Online (Sandbox Code Playgroud)

随着

queue -> audioconvert -> autoaudiosink
Run Code Online (Sandbox Code Playgroud)

queue -> autovideoconvert -> autovideosink
Run Code Online (Sandbox Code Playgroud)

这会导致两个问题:

  1. A queue不能链接到autovideoconvert.
  2. 我不知道如何使用"pad-added"事件实现pad ,特别是当管道同时支持音频和视频时.

我想知道如何在不需要的情况下做到这一点gst.parse_launch.另外,我希望pieline能够使用我抛出的任何格式(如playbin),但不能使用playbin,因为我需要链接其他元素(levelvolume).

或者,有没有办法将元素(如level)连接到播放箱?

c python pipeline gstreamer

7
推荐指数
1
解决办法
1万
查看次数

如何使python脚本在bash和python中都可以管道化

简介:我想在命令行上编写类似于bash脚本的python脚本,但是我还想在python中轻松地将它们连接在一起.我遇到麻烦的地方是制造后者的胶水.

所以,想象一下我写了两个剧本,script1.pyscript2.py我能管他们在一起就像这样:

echo input_string | ./script1.py -a -b | ./script2.py -c -d
Run Code Online (Sandbox Code Playgroud)

如何从另一个python文件中获取此行为? 这是我所知道的方式,但我不喜欢:

arg_string_1 = convert_to_args(param_1, param_2)
arg_string_2 = convert_to_args(param_3, param_4)
output_string = subprocess.check_output("echo " + input_string + " | ./script1.py " + arg_string_1 + " | ./script2.py " + arg_string_2)
Run Code Online (Sandbox Code Playgroud)

如果我不想利用多线程,我可以做这样的事情(?):

input1  = StringIO(input_string)
output1 = StringIO()
script1.main(param_1, param_2, input1, output1)
input2  = StringIO(output1.get_value())
output2 = StringIO()
script2.main(param_3, param_4, input2, output2)
Run Code Online (Sandbox Code Playgroud)

这是我尝试的方法,但我坚持写胶水.我很感激学习如何完成下面的方法,或建议更好的设计/方法!

我的方法:我写了script1.py,script2.py看起来像:

#!/usr/bin/python3

... # import sys and …
Run Code Online (Sandbox Code Playgroud)

python pipeline python-multithreading

7
推荐指数
1
解决办法
918
查看次数

如何将不同的输入装入sklearn管道?

我正在使用sklearn中的Pipeline对文本进行分类.

在这个例子中,Pipeline我有一个TfIDF矢量化器和一些用FeatureUnion包装的自定义特征和一个分类器作为Pipeline步骤,然后我拟合训练数据并进行预测:

from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import LinearSVC

X = ['I am a sentence', 'an example']
Y = [1, 2]
X_dev = ['another sentence']

# load custom features and FeatureUnion with Vectorizer
features = []
measure_features = MeasureFeatures() # this class includes my custom features
features.append(('measure_features', measure_features))

countVecWord = TfidfVectorizer(ngram_range=(1, 3), max_features= 4000)
features.append(('ngram', countVecWord))

all_features = FeatureUnion(features)

# classifier
LinearSVC1 = LinearSVC(tol=1e-4,  C = 0.10000000000000001)

pipeline = Pipeline(
    [('all', all_features ),
    ('clf', …
Run Code Online (Sandbox Code Playgroud)

python pipeline classification machine-learning scikit-learn

7
推荐指数
1
解决办法
3413
查看次数

从sklearn.pipeline.Pipeline获取变换器结果

我正在使用一个sklearn.pipeline.Pipeline对象来进行聚类.

pipe = sklearn.pipeline.Pipeline([('transformer1': transformer1),
                                  ('transformer2': transformer2),
                                  ('clusterer': clusterer)])
Run Code Online (Sandbox Code Playgroud)

然后我通过使用轮廓分数来评估结果.

sil = preprocessing.silhouette_score(X, y)
Run Code Online (Sandbox Code Playgroud)

我想知道如何X从管道获取或转换数据,因为它只返回clusterer.fit_predict(X).

我知道我可以通过将管道拆分为:

pipe = sklearn.pipeline.Pipeline([('transformer1': transformer1),
                                  ('transformer2': transformer2)])

X = pipe.fit_transform(data)
res = clusterer.fit_predict(X)
sil = preprocessing.silhouette_score(X, res)
Run Code Online (Sandbox Code Playgroud)

但我想在一个管道中完成所有工作.

pipeline python-2.7 scikit-learn

7
推荐指数
1
解决办法
1347
查看次数

Azure DevOps 管道构建 - 注册 COM DLL

背景:我的解决方案中很少有项目依赖于 COM 库。因此,必须在构建实际解决方案之前注册这些 COM DLL。

在 Azure DevOps - Pipeline - Build - Task 中,我添加了一个“命令行”代理作业,使用以下命令,

场景一:
C:\windows\system32\regsvr32.exe /s [DLLFilePath]\[DLLName].dll

场景二:
CD [DLLFilePath]
C:\windows\system32\regsvr32.exe /s [DLLName].dll

但是这两种方案在构建时都返回相同的错误,
[error]Cmd.exe 以代码“3”退出。

注意:
在调用 regsvr32 之前,使用单独的代理作业将 DLL 复制到上述位置。
[DLLFilePath]\[DLLName].dll 是构建代理中的本地路径,比如 c:\..\someLibrary.dll

pipeline azure azure-devops azure-pipelines

7
推荐指数
1
解决办法
676
查看次数

Sklearn Pipeline:将参数传递给自定义转换器?

我的sklearn管道中有一个自定义 Transformer,我想知道如何将参数传递给我的 Transformer :

在下面的代码中,您可以看到我在 Transformer 中使用了字典“权重”。我不希望在我的 Transformer 中定义这个字典,而是从管道传递它,这样我就可以在网格搜索中包含这个字典。是否可以将字典作为参数传递给我的 Transformer ?

# My custom Transformer
  class TextExtractor(BaseEstimator, TransformerMixin):
        """Concat the 'title', 'body' and 'code' from the results of 
        Stackoverflow query
        Keys are 'title', 'body' and 'code'.
        """
        def fit(self, x, y=None):
            return self

        def transform(self, x):
            # here is the parameter  I want to pass to my transformer
            weight ={'title' : 10, 'body': 1, 'code' : 1}
            x['text'] = weight['title']*x['Title'] +  
            weight['body']*x['Body'] +  
            weight['code']*x['Code']

            return x['text']

param_grid = …
Run Code Online (Sandbox Code Playgroud)

pipeline transformer-model scikit-learn

7
推荐指数
1
解决办法
2419
查看次数

Scikit-learn 管道类型错误:zip 参数 #2 必须支持迭代

我正在尝试为 sklearn 管道创建一个自定义转换器,它将提取特定文本的平均字长,然后对其应用标准缩放器以标准化数据集。我正在将一系列文本传递给管道。

class AverageWordLengthExtractor(BaseEstimator, TransformerMixin):

    def __init__(self):
        pass
    def average_word_length(self, text):
        return np.mean([len(word) for word in text.split( )])
    def fit(self, x, y=None):
        return self
    def transform(self, x , y=None):
        return pd.DataFrame(pd.Series(x).apply(self.average_word_length))
Run Code Online (Sandbox Code Playgroud)

然后我创建了一个这样的管道。

pipeline = Pipeline(['text_length', AverageWordLengthExtractor(), 
                         'scale', StandardScaler()])
Run Code Online (Sandbox Code Playgroud)

当我在这条管道上执行 fit_transform 时,我收到错误消息,

 File "custom_transformer.py", line 48, in <module>
    main()
  File "custom_transformer.py", line 43, in main
    'scale', StandardScaler()])
  File "/opt/conda/lib/python3.6/site-packages/sklearn/pipeline.py", line 114, in __init__
    self._validate_steps()
  File "/opt/conda/lib/python3.6/site-packages/sklearn/pipeline.py", line 146, in _validate_steps
    names, estimators = zip(*self.steps)
TypeError: zip argument #2 must support …
Run Code Online (Sandbox Code Playgroud)

python pipeline python-3.x scikit-learn

7
推荐指数
1
解决办法
2876
查看次数

使用 Imblearn 管道和 GridSearchCV 进行交叉验证

我正在尝试使用Pipeline来自的类imblearnGridSearchCV获得用于对不平衡数据集进行分类的最佳参数。由于每答案提到这里,我要离开了验证集的重采样,只有重新采样训练集,其中imblearnPipeline似乎是在做。但是,在实施已接受的解决方案时出现错误。请让我知道我做错了什么。下面是我的实现:

def imb_pipeline(clf, X, y, params):

    model = Pipeline([
        ('sampling', SMOTE()),
        ('classification', clf)
    ])

    score={'AUC':'roc_auc', 
           'RECALL':'recall',
           'PRECISION':'precision',
           'F1':'f1'}

    gcv = GridSearchCV(estimator=model, param_grid=params, cv=5, scoring=score, n_jobs=12, refit='F1',
                       return_train_score=True)
    gcv.fit(X, y)

    return gcv

for param, classifier in zip(params, classifiers):
    print("Working on {}...".format(classifier[0]))
    clf = imb_pipeline(classifier[1], X_scaled, y, param) 
    print("Best parameter for {} is {}".format(classifier[0], clf.best_params_))
    print("Best `F1` for {} is {}".format(classifier[0], clf.best_score_))
    print('-'*50)
    print('\n')
Run Code Online (Sandbox Code Playgroud)

参数:

[{'penalty': ('l1', 'l2'), 'C': (0.01, …
Run Code Online (Sandbox Code Playgroud)

pipeline python-3.x scikit-learn imblearn

7
推荐指数
1
解决办法
5362
查看次数

Haskell 的类型系统可以强制执行数据管道阶段的正确排序吗?

我使用质谱数据创建了许多数据处理管道,其中来自仪器的数据被清理、转换、缩放、检查和最终分析。我倾向于为此使用递归类型定义——这是一个非常简化的例子:

data Dataset = Initial { x::(Vector Double), y::(Vector Double) name::String}
             | Cleaned { x::(Vector Double), y::(Vector Double) name::String}
             | Transformed { x::(Vector Double), y::(Vector Double) name::String}
Run Code Online (Sandbox Code Playgroud)

那么一个典型的管道将只是一个以Dataset创建者开始的函数链,然后继续消耗类型的函数Dataset,并产生类型的东西Dataset

createDataset :: Vector Double -> Vector Double -> String -> Dataset
createDataset x y name = Initial x y name

removeOutliers :: Dataset -> Dataset
removeOutliers (Initial x y n) = let
                         (new_x, new_y) = outlierRemovalFunction x y
                         in Cleaned new_x new_y (n ++"_outliersRemoved")
               (Cleaned x …
Run Code Online (Sandbox Code Playgroud)

haskell type-systems pipeline type-level-computation

7
推荐指数
2
解决办法
225
查看次数