相关疑难解决方法(0)

多处理会导致Python崩溃并在调用fork()时在另一个线程中发生错误

我是Python的新手,并试图为我的for循环实现一个多处理模块.

我有一个存储在img_urls中的Image url数组,我需要下载并应用一些Google愿景.

if __name__ == '__main__':

    img_urls = [ALL_MY_Image_URLS]
    runAll(img_urls)
    print("--- %s seconds ---" % (time.time() - start_time)) 
Run Code Online (Sandbox Code Playgroud)

这是我的runAll()方法

def runAll(img_urls):
    num_cores = multiprocessing.cpu_count()

    print("Image URLS  {}",len(img_urls))
    if len(img_urls) > 2:
        numberOfImages = 0
    else:
        numberOfImages = 1

    start_timeProcess = time.time()

    pool = multiprocessing.Pool()
    pool.map(annotate,img_urls)
    end_timeProcess = time.time()
    print('\n Time to complete ', end_timeProcess-start_timeProcess)

    print(full_matching_pages)


def annotate(img_path):
    file =  requests.get(img_path).content
    print("file is",file)
    """Returns web annotations given the path to an image."""
    print('Process Working under ',os.getpid())
    image = types.Image(content=file)
    web_detection …
Run Code Online (Sandbox Code Playgroud)

python multithreading python-3.x

37
推荐指数
4
解决办法
1万
查看次数

运行 tweepy 的气流任务退出并返回代码 -6

我有一个简单的 Airflow DAG,它只有一个任务 - stream_from_twitter_to_kafka

这是 DAG 的代码:

default_args = {
    "owner": "me",
    "depends_on_past": False,
    "start_date": datetime(2020, 1, 20),
    "email": ["makalaaneesh18@mail.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 0,
    "retry_delay": timedelta(minutes=1),
}

NO_OF_TWEETS_TO_STREAM = 100

with DAG("stream_from_twitter",
         catchup=False,
         default_args=default_args,
         schedule_interval="@hourly") as dag:
    task1 = PythonOperator(task_id="stream_from_twitter_to_kafka",
                           python_callable=read_stream_of_tweets,
                           op_args=(NO_OF_TWEETS_TO_STREAM,))




task1
Run Code Online (Sandbox Code Playgroud)

对于代码read_stream_of_tweets的用途tweepy阅读鸣叫的输入流,并发布到卡夫卡的话题:

# override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
    def __init__(self, *args, **kwargs):
        self.num_tweets = kwargs.pop('num_tweets')
        self.current_num_tweets = 0
        super(MyStreamListener, self).__init__(*args, **kwargs)
        self.kafka_producer = MyKafkaProducer()

    def on_status(self, status):
        if self.current_num_tweets …
Run Code Online (Sandbox Code Playgroud)

python tweepy airflow

5
推荐指数
1
解决办法
3436
查看次数

标签 统计

python ×2

airflow ×1

multithreading ×1

python-3.x ×1

tweepy ×1