我是Python的新手,并试图为我的for循环实现一个多处理模块.
我有一个存储在img_urls中的Image url数组,我需要下载并应用一些Google愿景.
if __name__ == '__main__':
img_urls = [ALL_MY_Image_URLS]
runAll(img_urls)
print("--- %s seconds ---" % (time.time() - start_time))
Run Code Online (Sandbox Code Playgroud)
这是我的runAll()方法
def runAll(img_urls):
num_cores = multiprocessing.cpu_count()
print("Image URLS {}",len(img_urls))
if len(img_urls) > 2:
numberOfImages = 0
else:
numberOfImages = 1
start_timeProcess = time.time()
pool = multiprocessing.Pool()
pool.map(annotate,img_urls)
end_timeProcess = time.time()
print('\n Time to complete ', end_timeProcess-start_timeProcess)
print(full_matching_pages)
def annotate(img_path):
file = requests.get(img_path).content
print("file is",file)
"""Returns web annotations given the path to an image."""
print('Process Working under ',os.getpid())
image = types.Image(content=file)
web_detection …Run Code Online (Sandbox Code Playgroud) 我有一个简单的 Airflow DAG,它只有一个任务 - stream_from_twitter_to_kafka
这是 DAG 的代码:
default_args = {
"owner": "me",
"depends_on_past": False,
"start_date": datetime(2020, 1, 20),
"email": ["makalaaneesh18@mail.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=1),
}
NO_OF_TWEETS_TO_STREAM = 100
with DAG("stream_from_twitter",
catchup=False,
default_args=default_args,
schedule_interval="@hourly") as dag:
task1 = PythonOperator(task_id="stream_from_twitter_to_kafka",
python_callable=read_stream_of_tweets,
op_args=(NO_OF_TWEETS_TO_STREAM,))
task1
Run Code Online (Sandbox Code Playgroud)
对于代码read_stream_of_tweets的用途tweepy阅读鸣叫的输入流,并发布到卡夫卡的话题:
# override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):
def __init__(self, *args, **kwargs):
self.num_tweets = kwargs.pop('num_tweets')
self.current_num_tweets = 0
super(MyStreamListener, self).__init__(*args, **kwargs)
self.kafka_producer = MyKafkaProducer()
def on_status(self, status):
if self.current_num_tweets …Run Code Online (Sandbox Code Playgroud)