小编Sou*_*dra的帖子

在 Spark 中以结构化流模式获取 Offset 的消息正在重置

Spark (v2.4) 程序功能:

  • Kafka在spark中以结构化流模式从队列中读取JSON数据
  • 在控制台上按原样打印读取的数据

问题得到:
- 得到Resetting offset for partition nifi-log-batch-0 to offset 2826180.

源代码:

package io.xyz.streaming

import org.apache.spark.sql.avro._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.functions._

object readKafkaJson {
    private val topic = "nifi-log-batch"
    private val kafkaUrl = "http://<hostname>:9092"
    private val chk = "/home/xyz/tmp/checkpoint"
    private val outputFileLocation = "/home/xyz/abc/data"
    private val sparkSchema = StructType(Array(
                StructField("timestamp", StringType),
                StructField("level", StringType),
                StructField("thread", StringType),
                StructField("class", StringType),
                StructField("message", StringType),
                StructField("updatedOn", StringType),
                StructField("stackTrace", StringType)))


    def main(args: Array[String]): Unit = { …
Run Code Online (Sandbox Code Playgroud)

scala apache-kafka apache-spark spark-streaming

7
推荐指数
1
解决办法
2523
查看次数

在LangChain中,如何将详细输出保存到变量中?

我尝试执行 langchain 代理。我想将 verbose 的输出保存到变量中,但我可以从 agent.run 访问的只是最终答案。

如何将详细输出保存到变量以便以后使用?

我的代码:

import json
from langchain.agents import load_tools
from langchain.agents import initialize_agent
from langchain.agents import AgentType
from langchain.llms import OpenAI
from langchain.agents import Tool
from langchain.utilities import PythonREPL

llm = OpenAI(temperature=0.1)

## Define Tools
python_repl = PythonREPL()

tools = load_tools(["python_repl", "llm-math"], llm=llm)

agent = initialize_agent(tools, llm, agent=AgentType.ZERO_SHOT_REACT_DESCRIPTION, verbose=True)

response = agent.run("What is 3^2. Use calculator to solve.")
Run Code Online (Sandbox Code Playgroud)

我尝试访问代理的响应,但这只是最终答案,而不是详细输出。

打印响应仅给出 9。但我想要详细的过程,例如:

> Entering new AgentExecutor chain...
 I need to use the calculator to solve …
Run Code Online (Sandbox Code Playgroud)

agent openai-api langchain large-language-model

6
推荐指数
1
解决办法
8199
查看次数

删除Python中的HTML块

我想知道 Python 中是否有库或某种方法可以从 HTML 文档中提取元素。例如:

我有这个文件:

<html>
      <head>
          ...
      </head>
      <body>
          <div>
           ...
          </div>
      </body>
</html>
Run Code Online (Sandbox Code Playgroud)

我想<div></div>从文档中删除标签块以及块内容,然后它会像这样:

<html>
    <head>
     ...
    </head>
    <body>
    </body>
</html>
Run Code Online (Sandbox Code Playgroud)

html python parsing

3
推荐指数
1
解决办法
4154
查看次数

查找字符串中的元音数量

我有一串字母作为输入.
输入:

my_str = 'soumendra_in_stackoverflow'
Run Code Online (Sandbox Code Playgroud)

我想要输出如下.所有元音都应与字典中的相应计数一起打印.
需要输出:

{'a': 2, 'e': 2, 'i': 1, 'o': 3, 'u': 1}
Run Code Online (Sandbox Code Playgroud)

为此我写了以下程序:

ans_dict = {}
for letter in my_str:
    if letter in ['a', 'e', 'i', 'o', 'u']:
        ans_dict[letter] = ans_dict.get(letter, 0) + 1
print(ans_dict)
Run Code Online (Sandbox Code Playgroud)

有用.但是,如何在一行中编写相同的逻辑(可能使用字典理解)而不使用collections.Counter
我试过这个,但它失败了.

{x: + 1 for x in a if x in ['a', 'e', 'i', 'o', 'u'] }
Run Code Online (Sandbox Code Playgroud)

python dictionary-comprehension

3
推荐指数
1
解决办法
64
查看次数

用 Python 异步写入 CSV 文件

我正在编写一个具有以下功能的 CSV 文件:

import csv
import os
import aiofiles


async def write_extract_file(output_filename: str, csv_list: list):
    """
    Write the extracted content into the file
    """
    try:
        async with aiofiles.open(output_filename, "w+") as csv_file:
            writer = csv.DictWriter(csv_file, fieldnames=columns.keys())
            writer.writeheader()
            writer.writerows(csv_list)
    except FileNotFoundError:
        print("Output file not present", output_filename)
        print("Current dir: ", os.getcwd())
        raise FileNotFoundError
Run Code Online (Sandbox Code Playgroud)

但是,由于没有 await 允许 overwriterows方法,因此没有将行写入 CSV 文件。
如何解决这个问题?有没有可用的解决方法?
谢谢你。
整个代码可以在这里找到。

python csv async-await python-asyncio fastapi

3
推荐指数
2
解决办法
2047
查看次数

为什么next()总是显示相同的值?

我在练习这个yield陈述.我在Python 2.7中编写了以下函数:

>>> def mygen():
...     i = 0
...     j = 3
...     for k in range(i, j):
...         yield k
...
>>> mygen().next()
0
>>> mygen().next()
0
>>> mygen().next()
0
Run Code Online (Sandbox Code Playgroud)

每当我打电话mygen().next()总是显示输出0,而不是0,1,2StopIteration.有人可以解释一下吗?

python yield generator python-2.7

2
推荐指数
1
解决办法
92
查看次数