标签: telethon

在固定装置中包含代码后,使用 asyncio 进行 pytest 时出现 AttributeError

我需要测试我的电报机器人。为此,我需要创建客户端用户来询问我的机器人。我找到了可以做到这一点的电视马拉松图书馆。首先,我编写了一个代码示例以确保授权和连接正常工作并向自己发送测试消息(省略导入):

api_id = int(os.getenv("TELEGRAM_APP_ID"))
api_hash = os.getenv("TELEGRAM_APP_HASH")
session_str = os.getenv("TELETHON_SESSION")

async def main():
    client = TelegramClient(
        StringSession(session_str), api_id, api_hash,
        sequential_updates=True
    )
    await client.connect()
    async with client.conversation("@someuser") as conv:
        await conv.send_message('Hey, what is your name?')


if __name__ == "__main__":
    asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)

@someuser(我)成功收到消息。好的,现在我根据上面的代码使用固定装置创建一个测试:

api_id = int(os.getenv("TELEGRAM_APP_ID"))
api_hash = os.getenv("TELEGRAM_APP_HASH")
session_str = os.getenv("TELETHON_SESSION")

@pytest.fixture(scope="session")
async def client():
    client = TelegramClient(
        StringSession(session_str), api_id, api_hash,
        sequential_updates=True
    )
    await client.connect()
    yield client
    await client.disconnect()


@pytest.mark.asyncio
async def test_start(client: TelegramClient):
    async with client.conversation("@someuser") as conv:
        await …
Run Code Online (Sandbox Code Playgroud)

python pytest python-asyncio telethon

16
推荐指数
2
解决办法
1万
查看次数

Telethon 导致“RuntimeWarning:协程‘MessageMethods.send_message’从未等待过”

我正在尝试运行Telethon 文档提供的第一个代码片段。但是,在出现多个问题(此处此处)之后,我最终得到了这个修改后的版本:

import os
import sys
from telethon.sync import TelegramClient, events

# import nest_asyncio
# nest_asyncio.apply()

session_name = "<session_name>"
api_id = <api_id>
api_hash = "<api_hash>"


os.chdir(sys.path[0])

if f"{session_name}.session" in os.listdir():
    os.remove(f"{session_name}.session")

async with TelegramClient(session_name, api_id, api_hash) as client:
   client.send_message('me', 'Hello, myself!')
   print(client.download_profile_photo('me'))

   @client.on(events.NewMessage(pattern='(?i).*Hello'))
   async def handler(event):
      await event.reply('Hey!')

   client.run_until_disconnected()
Run Code Online (Sandbox Code Playgroud)

但是现在我收到这些警告:

usr/local/lib/python3.7/site-packages/ipykernel_launcher.py:23:RuntimeWarning:协程“MessageMethods.send_message”从未等待
RuntimeWarning:启用tracemalloc以获取对象分配回溯
/usr/local/lib/python3.7/site-packages/ipykernel_launcher.py:24:RuntimeWarning:从未等待协程“DownloadMethods.download_profile_photo”
RuntimeWarning:启用tracemalloc以获取对象分配回溯
/usr/local/lib/python3.7/site-packages/ipykernel_launcher.py:30:RuntimeWarning:从未等待协程“UpdateMethods._run_until_disconnected”
RuntimeWarning:启用tracemalloc以获取对象分配回溯

在 Jupyter 上运行代码时。现在我的问题是:

  • 这些警告消息意味着什么以及我应该如何解决它们?
  • 如果正常工作,这段代码的预期结果是什么?我应该通过 Telegram 或其他方式收到消息吗?因为除了登录码之外我没有收到任何消息。
  • @行首的符号是什么@client.on...意思?那条线应该做什么?从这一行开始我就不明白代码了。如果您能帮助我理解它,我将不胜感激。

python ipython python-asyncio telegram telethon

9
推荐指数
1
解决办法
3万
查看次数

如何获取我参加电视马拉松的频道列表?

我想制作一个脚本来显示我加入的频道,然后将所有内容保留在以下示例中:

from telethon.tl.functions.channels import LeaveChannelRequest
await client(LeaveChannelRequest(input_channel))
Run Code Online (Sandbox Code Playgroud)

python python-2.7 python-3.x telegram telethon

9
推荐指数
1
解决办法
9980
查看次数

如何在线程中使用 Telethon

我想在后台运行一个函数。所以我在我的代码中使用线程。

但是返回错误ValueError: signal only works in main thread并且不知道两件事:

  1. 什么是主线程
  2. 如何解决这个问题呢 :)

views.py

def callback(update):
    print('I received', update)

def message_poll_start():
    try:
        client = TelegramClient('phone', api_id, api_hash,
            update_workers=1, spawn_read_thread=False)
        client.connect()
        client.add_update_handler(callback)
        client.idle()
    except TypeNotFoundError:
        pass

def message_poll_start_thread(request):
    t = threading.Thread(target=message_poll_start, args=(), kwargs={})
    t.setDaemon(True)
    t.start()
    return HttpResponse("message polling started")
Run Code Online (Sandbox Code Playgroud)

urls.py

urlpatterns = [
    path('message_poll_start', messagemanager_views.message_poll_start_thread, name="message_poll_start"),
]
Run Code Online (Sandbox Code Playgroud)

trace

[12/Jan/2018 11:24:38] "GET /messages/message_poll_start HTTP/1.1" 200 23
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
    self.run() …
Run Code Online (Sandbox Code Playgroud)

python django telethon

8
推荐指数
1
解决办法
2394
查看次数

Telethon:操作错误:数据库已锁定

如果这是一个愚蠢的问题,我深表歉意。

我第一次尝试 telethon,它无法与我的 telegram API 同步。

当我输入以下代码时,我得到一个 IP 地址: 在此输入图像描述

但是当我尝试连接以启动或连接客户端时收到此消息:

在此输入图像描述

最后,OperationalError: database is locked当我尝试使用手机登录时出现错误。

在此输入图像描述

错误消息完整:

-------------------------------------------------------------------- 

OperationalError Traceback (most recent 
 call last)
<ipython-input-13-880bc0e4ea12> in <module>()
  1 from telethon import TelegramClient, sync
 ----> 2 client = TelegramClient('session_name', api_id, api_hash)
  3 
  4 client.connect()
  5 if not client.is_user_authorized():

 /anaconda3/lib/python3.7/site- 
 packages/telethon/client/telegrambaseclient.py in __init__(self, 
 session, api_id, api_hash, connection, use_ipv6, proxy, timeout, 
 request_retries, connection_retries, retry_delay, auto_reconnect, 
 sequential_updates, flood_sleep_threshold, device_model, 
 system_version, app_version, lang_code, system_lang_code, loop, 
 base_logger)
 221                 DEFAULT_DC_ID,
 222                 DEFAULT_IPV6_IP if self._use_ipv6 else 
 DEFAULT_IPV4_IP,
 --> …
Run Code Online (Sandbox Code Playgroud)

python-3.x telegram telethon

7
推荐指数
2
解决办法
2万
查看次数

如何使用 telethon 在 telegram 中搜索群组和频道?

我使用telethon使用 python 脚本向电报发送消息。
我在电视马拉松中没有找到任何东西来搜索我喜欢用来在电报应用程序上搜索的群组和频道。请看图片。我如何使用电视马拉松获得这样的列表?

在此输入图像描述

python telegram telethon

7
推荐指数
1
解决办法
1万
查看次数

如何通过 telethon 获取 telegram 私人频道 id

您好,不知道如何解决这个问题,因此我们将非常感谢任何帮助。我订阅了私人频道。该频道没有用户名,我也没有邀请链接(管理员刚刚添加了我)。由于我在工作中使用此频道,为了加快处理速度,我想使用 Telethon 处理在该频道上发布的消息。

该方案的核心是:

@events.register(events.NewMessage(chats = my_private_channel))
async def handler(event):
    
        #do things
Run Code Online (Sandbox Code Playgroud)

问题是我无法过滤发送到该特定通道 ID 的消息。我收到错误:

ValueError: Cannot find any entity corresponding to "0123456789"
Run Code Online (Sandbox Code Playgroud)

我尝试了不同的技术来获取我的频道 ID,但错误始终相同。尤其:

  1. 该频道是私人频道,因此没有用户名(“@blablabla”)
  2. 我没有邀请链接
  3. 我尝试处理所有传入消息,直到管理员在频道上发送消息,打印发件人信息并从“ID”键获取值
  4. 我尝试使用telegram web并从url中获取ID(还在前面添加-100)

但是当我将 ID 放入参数chats时,我总是收到上面报告的错误。

预先感谢,祝你有美好的一天

python channel telegram telethon

7
推荐指数
2
解决办法
1万
查看次数

RuntimeError: Task got Future &lt;Future pending&gt; 附加到不同的循环

如何在Quart中的另一个异步方法内调用在主线程中获取事件循环的异步方法?

t.py

from telethon import TelegramClient, functions, types

client2 = TelegramClient(sn, api_id, api_hash).start()

async def create_contact():
    return await client2(functions.contacts.ImportContactsRequest([
        types.InputPhoneContact(0, '8', 'first_name', 'last_name')
    ]))
Run Code Online (Sandbox Code Playgroud)

应用程序

from quart import Quart, websocket,render_template,request
import t2
app = Quart(__name__)

@app.route('/wa2tg')
def wa2tg():
    return render_template('wa2tg.html',nm=request.args.get('nm',''))

@app.websocket('/wa2tg2')
async def wa2tg2():
    while True:
        data = await websocket.receive()
        await t2.create_contact()

# Thread(target=tele.client2.run_until_disconnected).start()
app.run(debug=1)        
Run Code Online (Sandbox Code Playgroud)

错误:

Running on http://127.0.0.1:5000 (CTRL + C to quit)
[2019-06-21 16:31:42,035] 127.0.0.1:51696 GET /wa2tg 1.1 200 553 12995
[2019-06-21 16:31:42,486] 127.0.0.1:51698 GET /wa2tg2 1.1 101 …
Run Code Online (Sandbox Code Playgroud)

python multithreading python-asyncio telethon quart

6
推荐指数
1
解决办法
8764
查看次数

在以下位置找不到 &lt;telethon.tl.types.PeerUser 对象的输入实体

我收到错误ValueError:找不到 . 请阅读https://docs.telethon.dev/en/latest/concepts/entities.html以了解更多详细信息。. 此错误来自 client.send_message(bot_id, 'reset')。

def run_test(self,name,bot_id):
        self.start = time.time()
        api_id = 10*****
        api_hash = 'c1*********************************'

        with TelegramClient('anon', api_id, api_hash) as client:
            #breakpoint()
            results = {'results':[], 'start': time.asctime(time.localtime(self.start)), 'conversations': len(self.conversations['tests'])}

            Button(self, text='Cancel', width=20, font=("Lucinda Sans", 12),command=self.cancel(client)).pack()

            statusintro = 'Bot Instance ' + name +': '
            testtitle = ""

            self.show_output(statusintro + testtitle)

            if self.conversationReset is False:
                client.send_message(bot_id, self.conversations['tests'][self.test_counter]['questions'][self.question_counter])
            else:
                print (bot_id)                
                client.send_message(bot_id, 'reset')
                self.conversationReset = False
Run Code Online (Sandbox Code Playgroud)

python-3.x telethon

6
推荐指数
1
解决办法
1606
查看次数

如何使用电视马拉松仅获取聊天中的未读消息?

我有client.get_messages(dialog.entity)但它只返回没有“已读/未读标记”的消息...那么,我怎样才能只获取未读的新消息呢?有人知道吗?

python telethon

6
推荐指数
2
解决办法
3078
查看次数