小编Den*_*sLi的帖子

消费消息而不从 Kafka 10 消费者提交

我需要从某个主题读取消息,对它们进行批处理并将批处理推送到外部系统。如果批次因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批次,每个分区的起始和终止偏移量都存储在数据库中。为了实现这一目标,我通过将分区分配给读取器来为每个分区创建一个 Kafka 消费者,根据之前存储的偏移量,消费者寻找该位置并开始读取。我已经关闭了自动提交,并且不提交来自消费者的偏移量。对于每个批次,我为每个分区创建一个新的使用者,从存储的最后一个偏移量读取消息并将其发布到外部系统。您是否发现在不提交偏移量的情况下消费消息以及跨批次使用相同的消费者组但在任何时候每个分区不会有多个消费者的任何问题?

apache-kafka kafka-consumer-api

5
推荐指数
1
解决办法
7020
查看次数

在 Spark 上实现 Trie(或类似的数据结构)

我是一名实习生,我的任务是在 Spark 集群上实现电话号码的快速搜索算法,使用tries(前缀树),并在几个此类尝试中执行诸如内部联接之类的操作

我设法让它工作于大约 500 万个数字(2 次尝试,每次 250 万个数字),我的任务是将其扩展到 10-2000 万个。但如果我尝试超越我得到的Java.outofmemory错误

现在我的方法是这样的我的代码, - 从spark数据库创建电话号码的数据框, - 使用collect()将250万个数字加载到python列表中的内存(JVM的内存)中 - 将该列表转换为trie - 清除列表 - 在 trie 中搜索要搜索的 number_to_be_searched - 如果找到则返回 true - 否则加载接下来的 250 万个数字,然后重复步骤 3,依此类推

from collections import defaultdict


class Trie:
    # Implement a trie with insert, search.

    def __init__(self):
        self.root = defaultdict()

    def insert(self, word):
        current = self.root
        for letter in word:
            current = current.setdefault(letter, {})
        current.setdefault("_end")

    def search(self, word):
        current = self.root
        for letter in word: …
Run Code Online (Sandbox Code Playgroud)

python python-2.7 apache-spark pyspark

5
推荐指数
0
解决办法
737
查看次数

将大型数组变量(类型=对象)导出到CSV文件

我使用Python中APM的Gekko解决了优化问题。两个主要决策变量(DV)是大型数组。该问题已成功收敛,但是,我需要Excel工作表中这些表的结果才能进行进一步的工作。

示例变量名是's'。由于在其中创建的数组Gekko是GKVariable / Object变量类型,因此我不能简单地使用:

pd.DataFrame(s).to_csv(r'C:\Users\...\s.csv')

因为结果为数组的每个单元提供了模型中定义的每个变量的标签(即v1,v2等)

print 's'在内核中使用会显示优化结果中的数组编号,但格式会由于列数众多而不能保证每一行都是矩阵的新行。

是否存在另一种解决方案,仅复制DV的结果值,使其成为常规np.array变量而不是对象类型变量?对此开放任何想法。

python gekko

5
推荐指数
1
解决办法
46
查看次数

initdb后Airflow无法进入/admin页面

我想清除所有示例 dag,以便我运行命令airflow initdb. 但是,在那之后我无法进入管理页面,因为出了点问题:

  File "/data/software/miniconda3/lib/python3.7/site-packages/flask_admin/menu.py", line 126, in is_accessible
    return self._view.is_accessible()
  File "/data/software/miniconda3/lib/python3.7/site-packages/airflow/www/utils.py", line 93, in is_accessible
    (not current_user.is_anonymous and current_user.is_superuser())
  File "/data/software/miniconda3/lib/python3.7/site-packages/airflow/contrib/auth/backends/password_auth.py", line 114, in is_superuser
    return hasattr(self, 'user') and self.user.is_superuser()
AttributeError: 'NoneType' object has no attribute 'is_superuser'
Run Code Online (Sandbox Code Playgroud)

我尝试创建与以前相同的超级用户,但仍然出现相同的错误。

有什么想法如何解决吗?提前致谢。

python airflow

5
推荐指数
1
解决办法
1067
查看次数

如何使用 Anaconda 安装特定版本的 Python?

我想用 Python 3.6.5 版安装 Anaconda。如果我安装 Anaconda3-5.2.0,它会安装 Python 3.5.1。哪里可以下载带有 Python 3.6.5 的 Anaconda。大数据脚本仅适用于 Anaconda Python 3.6.5。

python anaconda

5
推荐指数
3
解决办法
1万
查看次数

当我运行react脚本“yarn start”时,我到index.html中的manifest.json的链接有效,但当我运行“python3 manage.py runserver”时则无效

当我运行“yarn start”时,我的index.html 文件中的manifest.json 链接工作正常,但是当我运行时,'python3 manage.py runserver'我在终端中得到的所有内容是:

Not Found: /manifest.json
"GET /manifest.json HTTP/1.1" 404 2234
Run Code Online (Sandbox Code Playgroud)

我的所有静态链接和导入也会发生这种情况。我对 Django 和 React 以及整个编程都很陌生,所以我认为我只是错过了一些简单的东西,但我无法弄清楚。

我一直在尝试使用,但即使我进行编辑以指向我的目录{% load static %},该链接也不起作用。我还尝试编辑and ,但我得到的只是终端中的语法错误。除此之外我一无所知。STATIC_URLsettings.pymanifest.jsonview.pyurls.py

前端/public/index.html

Not Found: /manifest.json
"GET /manifest.json HTTP/1.1" 404 2234
Run Code Online (Sandbox Code Playgroud)

前端/urls.py

<html>

    <head>

    <title>WebProject</title>


    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta charset="UTF-8">

    <link rel="manifest" href="manifest.json"/>

    </head>

    <body style="background-color: #FAF0E6; font-family: Verdana; font-size: 40px;">
    <div id="root"></div>

    </body>

</html>
Run Code Online (Sandbox Code Playgroud)

前端/views.py


from django.urls import path
from . import views
from django.conf.urls.static import static …
Run Code Online (Sandbox Code Playgroud)

html json python-3.x django-staticfiles reactjs

5
推荐指数
1
解决办法
4654
查看次数

Java8 maven raise 对过滤器的错误引用不明确

我正在运行一个 Spark 快速启动应用程序:

/* SimpleApp.java */
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "/data/software/spark-2.4.4-bin-without-hadoop/README.md"; // Should be some file on your system
    SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate();
    Dataset<String> logData = spark.read().textFile(logFile).cache();

    long numAs = logData.filter(s -> s.contains("a")).count();
    long numBs = logData.filter(s -> s.contains("b")).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);

    spark.stop();
  }
}
Run Code Online (Sandbox Code Playgroud)

正如官方文件所说,

# Package a JAR containing your application
$ mvn …
Run Code Online (Sandbox Code Playgroud)

java maven

5
推荐指数
1
解决办法
1342
查看次数

Pandas 如何在 agg 函数之后展平列?

说我有一个 df:

data=[('a', 1), ('a', 1),('b', 1),('a', 3),('b', 2),('c', 1),('a', 2),('b', 3),('a', 2)]
df=df=pd.DataFrame(data, columns=['project', 'duration'])

# Then I made an aggregation:
df_agg=df.groupby('project').agg({'duration': ['median', 'mean']}).reset_index()

Out[11]: 
  project duration     
            median mean
0       a        2  1.8
1       b        2  2.0
2       c        1  1.0


In [12]: df_agg.info()                                                                                                                                                    
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3 entries, 0 to 2
Data columns (total 3 columns):
(project, )           3 non-null object
(duration, median)    3 non-null int64
(duration, mean)      3 non-null float64
dtypes: float64(1), int64(1), object(1)
memory usage: 152.0+ …
Run Code Online (Sandbox Code Playgroud)

python pandas

5
推荐指数
1
解决办法
4014
查看次数

如何显示数据框,其中列连续显示两次

例如:

df_test=pd.DataFrame([('tokyo',123),('london',11),('sydney',22),('taiwan',33),('hongkong',23),('la', 32)], columns=['city','count'])

       city  count
0     tokyo    123
1    london     11
2    sydney     22
3    taiwan     33
4  hongkong     23
5        la     32
Run Code Online (Sandbox Code Playgroud)

我希望它看起来像这样,因为行比列多,使其易于阅读。

       city  count    city     count
0     tokyo    123    taiwan     33
1    london     11    hongkong   23
2    sydney     22    la         32

Run Code Online (Sandbox Code Playgroud)

python pandas

4
推荐指数
1
解决办法
53
查看次数

Cloudera Hue Web UI默认密码

我最近下载了Cloudera CDH 5.3,现在我需要访问HUE Web UI门户.当我提供属于Cloudera admin/admin的默认用户名和密码时,它无效.我现在无法登录HUE门户.有人可以帮忙吗?

hue cloudera-manager cloudera-cdh cloudera-quickstart-vm

3
推荐指数
1
解决办法
6275
查看次数