我们如何在 Spark 中并行化循环,以便处理不是顺序的而是并行的。举个例子 - 我的 csv 文件(称为“bill_item.csv”)中包含以下数据,该文件包含以下数据:
|-----------+------------|
| bill_id | item_id |
|-----------+------------|
| ABC | 1 |
| ABC | 2 |
| DEF | 1 |
| DEF | 2 |
| DEF | 3 |
| GHI | 1 |
|-----------+------------|
Run Code Online (Sandbox Code Playgroud)
我必须得到如下输出:
|-----------+-----------+--------------|
| item_1 | item_2 | Num_of_bills |
|-----------+-----------+--------------|
| 1 | 2 | 2 |
| 2 | 3 | 1 |
| 1 | 3 | 1 |
|-----------+-----------+--------------|
Run Code Online (Sandbox Code Playgroud)
我们看到项目 1 和 …
python bigdata apache-spark apache-spark-dataset apache-spark-2.0
我有以下 python 脚本,它从 twitter 中提取推文并将其发送到 kafka 主题。该脚本运行完美,但是当我尝试在 docker 容器中运行它时,它无法导入 kafka 库。它说“语法错误:语法无效”。
以下是python脚本(twitter_app.py)的内容:
import socket
import sys
import requests
import requests_oauthlib
import json
import kafka
from kafka import KafkaProducer
import time
from kafka import SimpleProducer
from kafka import KafkaClient
###################################################
# My own twitter access tokens
####################################################
ACCESS_TOKEN = '28778811-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
ACCESS_SECRET = 'HBGjTXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
CONSUMER_KEY = '#################################'
CONSUMER_SECRET = '############################################'
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
####################################################
# Kafka Producer
####################################################
twitter_topic="twitter_topic"
client = KafkaClient("10.142.0.2:9092")
producer = SimpleProducer(client)
#producer = kafka.KafkaProducer(bootstrap_servers='10.128.0.2:9092')
def get_tweets(): …Run Code Online (Sandbox Code Playgroud)