我需要测试我的电报机器人。为此,我需要创建客户端用户来询问我的机器人。我找到了可以做到这一点的电视马拉松图书馆。首先,我编写了一个代码示例以确保授权和连接正常工作并向自己发送测试消息(省略导入):
api_id = int(os.getenv("TELEGRAM_APP_ID"))
api_hash = os.getenv("TELEGRAM_APP_HASH")
session_str = os.getenv("TELETHON_SESSION")
async def main():
client = TelegramClient(
StringSession(session_str), api_id, api_hash,
sequential_updates=True
)
await client.connect()
async with client.conversation("@someuser") as conv:
await conv.send_message('Hey, what is your name?')
if __name__ == "__main__":
asyncio.run(main())
Run Code Online (Sandbox Code Playgroud)
@someuser(我)成功收到消息。好的,现在我根据上面的代码使用固定装置创建一个测试:
api_id = int(os.getenv("TELEGRAM_APP_ID"))
api_hash = os.getenv("TELEGRAM_APP_HASH")
session_str = os.getenv("TELETHON_SESSION")
@pytest.fixture(scope="session")
async def client():
client = TelegramClient(
StringSession(session_str), api_id, api_hash,
sequential_updates=True
)
await client.connect()
yield client
await client.disconnect()
@pytest.mark.asyncio
async def test_start(client: TelegramClient):
async with client.conversation("@someuser") as conv:
await …Run Code Online (Sandbox Code Playgroud) 我正在尝试运行Telethon 文档提供的第一个代码片段。但是,在出现多个问题(此处和此处)之后,我最终得到了这个修改后的版本:
import os
import sys
from telethon.sync import TelegramClient, events
# import nest_asyncio
# nest_asyncio.apply()
session_name = "<session_name>"
api_id = <api_id>
api_hash = "<api_hash>"
os.chdir(sys.path[0])
if f"{session_name}.session" in os.listdir():
os.remove(f"{session_name}.session")
async with TelegramClient(session_name, api_id, api_hash) as client:
client.send_message('me', 'Hello, myself!')
print(client.download_profile_photo('me'))
@client.on(events.NewMessage(pattern='(?i).*Hello'))
async def handler(event):
await event.reply('Hey!')
client.run_until_disconnected()
Run Code Online (Sandbox Code Playgroud)
但是现在我收到这些警告:
usr/local/lib/python3.7/site-packages/ipykernel_launcher.py:23:RuntimeWarning:协程“MessageMethods.send_message”从未等待 RuntimeWarning:启用tracemalloc以获取对象分配回溯 /usr/local/lib/python3.7/site-packages/ipykernel_launcher.py:24:RuntimeWarning:从未等待协程“DownloadMethods.download_profile_photo” RuntimeWarning:启用tracemalloc以获取对象分配回溯 /usr/local/lib/python3.7/site-packages/ipykernel_launcher.py:30:RuntimeWarning:从未等待协程“UpdateMethods._run_until_disconnected” RuntimeWarning:启用tracemalloc以获取对象分配回溯
在 Jupyter 上运行代码时。现在我的问题是:
@行首的符号是什么@client.on...意思?那条线应该做什么?从这一行开始我就不明白代码了。如果您能帮助我理解它,我将不胜感激。我想制作一个脚本来显示我加入的频道,然后将所有内容保留在以下示例中:
from telethon.tl.functions.channels import LeaveChannelRequest
await client(LeaveChannelRequest(input_channel))
Run Code Online (Sandbox Code Playgroud) 我想在后台运行一个函数。所以我在我的代码中使用线程。
但是返回错误ValueError: signal only works in main thread并且不知道两件事:
views.py
def callback(update):
print('I received', update)
def message_poll_start():
try:
client = TelegramClient('phone', api_id, api_hash,
update_workers=1, spawn_read_thread=False)
client.connect()
client.add_update_handler(callback)
client.idle()
except TypeNotFoundError:
pass
def message_poll_start_thread(request):
t = threading.Thread(target=message_poll_start, args=(), kwargs={})
t.setDaemon(True)
t.start()
return HttpResponse("message polling started")
Run Code Online (Sandbox Code Playgroud)
urls.py
urlpatterns = [
path('message_poll_start', messagemanager_views.message_poll_start_thread, name="message_poll_start"),
]
Run Code Online (Sandbox Code Playgroud)
trace
[12/Jan/2018 11:24:38] "GET /messages/message_poll_start HTTP/1.1" 200 23
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib/python3.5/threading.py", line 914, in _bootstrap_inner
self.run() …Run Code Online (Sandbox Code Playgroud) 如果这是一个愚蠢的问题,我深表歉意。
我第一次尝试 telethon,它无法与我的 telegram API 同步。
但是当我尝试连接以启动或连接客户端时收到此消息:
最后,OperationalError: database is locked当我尝试使用手机登录时出现错误。
错误消息完整:
--------------------------------------------------------------------
OperationalError Traceback (most recent
call last)
<ipython-input-13-880bc0e4ea12> in <module>()
1 from telethon import TelegramClient, sync
----> 2 client = TelegramClient('session_name', api_id, api_hash)
3
4 client.connect()
5 if not client.is_user_authorized():
/anaconda3/lib/python3.7/site-
packages/telethon/client/telegrambaseclient.py in __init__(self,
session, api_id, api_hash, connection, use_ipv6, proxy, timeout,
request_retries, connection_retries, retry_delay, auto_reconnect,
sequential_updates, flood_sleep_threshold, device_model,
system_version, app_version, lang_code, system_lang_code, loop,
base_logger)
221 DEFAULT_DC_ID,
222 DEFAULT_IPV6_IP if self._use_ipv6 else
DEFAULT_IPV4_IP,
--> …Run Code Online (Sandbox Code Playgroud) 我使用telethon使用 python 脚本向电报发送消息。
我在电视马拉松中没有找到任何东西来搜索我喜欢用来在电报应用程序上搜索的群组和频道。请看图片。我如何使用电视马拉松获得这样的列表?
您好,不知道如何解决这个问题,因此我们将非常感谢任何帮助。我订阅了私人频道。该频道没有用户名,我也没有邀请链接(管理员刚刚添加了我)。由于我在工作中使用此频道,为了加快处理速度,我想使用 Telethon 处理在该频道上发布的消息。
该方案的核心是:
@events.register(events.NewMessage(chats = my_private_channel))
async def handler(event):
#do things
Run Code Online (Sandbox Code Playgroud)
问题是我无法过滤发送到该特定通道 ID 的消息。我收到错误:
ValueError: Cannot find any entity corresponding to "0123456789"
Run Code Online (Sandbox Code Playgroud)
我尝试了不同的技术来获取我的频道 ID,但错误始终相同。尤其:
但是当我将 ID 放入参数chats时,我总是收到上面报告的错误。
预先感谢,祝你有美好的一天
如何在Quart中的另一个异步方法内调用在主线程中获取事件循环的异步方法?
t.py
from telethon import TelegramClient, functions, types
client2 = TelegramClient(sn, api_id, api_hash).start()
async def create_contact():
return await client2(functions.contacts.ImportContactsRequest([
types.InputPhoneContact(0, '8', 'first_name', 'last_name')
]))
Run Code Online (Sandbox Code Playgroud)
应用程序
from quart import Quart, websocket,render_template,request
import t2
app = Quart(__name__)
@app.route('/wa2tg')
def wa2tg():
return render_template('wa2tg.html',nm=request.args.get('nm',''))
@app.websocket('/wa2tg2')
async def wa2tg2():
while True:
data = await websocket.receive()
await t2.create_contact()
# Thread(target=tele.client2.run_until_disconnected).start()
app.run(debug=1)
Run Code Online (Sandbox Code Playgroud)
错误:
Running on http://127.0.0.1:5000 (CTRL + C to quit)
[2019-06-21 16:31:42,035] 127.0.0.1:51696 GET /wa2tg 1.1 200 553 12995
[2019-06-21 16:31:42,486] 127.0.0.1:51698 GET /wa2tg2 1.1 101 …Run Code Online (Sandbox Code Playgroud) 我收到错误ValueError:找不到 . 请阅读https://docs.telethon.dev/en/latest/concepts/entities.html以了解更多详细信息。. 此错误来自 client.send_message(bot_id, 'reset')。
def run_test(self,name,bot_id):
self.start = time.time()
api_id = 10*****
api_hash = 'c1*********************************'
with TelegramClient('anon', api_id, api_hash) as client:
#breakpoint()
results = {'results':[], 'start': time.asctime(time.localtime(self.start)), 'conversations': len(self.conversations['tests'])}
Button(self, text='Cancel', width=20, font=("Lucinda Sans", 12),command=self.cancel(client)).pack()
statusintro = 'Bot Instance ' + name +': '
testtitle = ""
self.show_output(statusintro + testtitle)
if self.conversationReset is False:
client.send_message(bot_id, self.conversations['tests'][self.test_counter]['questions'][self.question_counter])
else:
print (bot_id)
client.send_message(bot_id, 'reset')
self.conversationReset = False
Run Code Online (Sandbox Code Playgroud) 我有client.get_messages(dialog.entity)但它只返回没有“已读/未读标记”的消息...那么,我怎样才能只获取未读的新消息呢?有人知道吗?
telethon ×10
python ×8
telegram ×5
python-3.x ×3
channel ×1
django ×1
ipython ×1
pytest ×1
python-2.7 ×1
quart ×1