小编Kam*_*dan的帖子

在 Apache Spark 中高效运行“for”循环,以便并行执行

我们如何在 Spark 中并行化循环,以便处理不是顺序的而是并行的。举个例子 - 我的 csv 文件(称为“bill_item.csv”)中包含以下数据,该文件包含以下数据:

    |-----------+------------|
    | bill_id   | item_id    |
    |-----------+------------|
    | ABC       | 1          |
    | ABC       | 2          |
    | DEF       | 1          |
    | DEF       | 2          |
    | DEF       | 3          |
    | GHI       | 1          |
    |-----------+------------|
Run Code Online (Sandbox Code Playgroud)

我必须得到如下输出:

    |-----------+-----------+--------------|
    | item_1    | item_2    | Num_of_bills |
    |-----------+-----------+--------------|
    | 1         | 2         | 2            |
    | 2         | 3         | 1            |
    | 1         | 3         | 1            |
    |-----------+-----------+--------------|
Run Code Online (Sandbox Code Playgroud)

我们看到项目 1 和 …

python bigdata apache-spark apache-spark-dataset apache-spark-2.0

9
推荐指数
1
解决办法
2万
查看次数

在 docker 中运行时,python 脚本无法导入 kafka 库

我有以下 python 脚本,它从 twitter 中提取推文并将其发送到 kafka 主题。该脚本运行完美,但是当我尝试在 docker 容器中运行它时,它无法导入 kafka 库。它说“语法错误:语法无效”。

以下是python脚本(twitter_app.py)的内容:

import socket
import sys
import requests
import requests_oauthlib
import json
import kafka
from kafka import KafkaProducer
import time
from kafka import SimpleProducer
from kafka import KafkaClient

###################################################
# My own twitter access tokens
####################################################
ACCESS_TOKEN = '28778811-XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
ACCESS_SECRET = 'HBGjTXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
CONSUMER_KEY = '#################################'
CONSUMER_SECRET = '############################################'

my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)

####################################################
# Kafka Producer
####################################################
twitter_topic="twitter_topic"
client = KafkaClient("10.142.0.2:9092")
producer = SimpleProducer(client)
#producer = kafka.KafkaProducer(bootstrap_servers='10.128.0.2:9092')

def get_tweets(): …
Run Code Online (Sandbox Code Playgroud)

python apache-kafka docker

4
推荐指数
1
解决办法
2228
查看次数