我在消费者组中轮询来自 Kafka 的消息时遇到问题。\n我的消费者对象分配给给定的分区
\n\nself.ps = TopicPartition(topic, partition )\nRun Code Online (Sandbox Code Playgroud)\n\n之后消费者分配给该分区:
\n\nself.consumer.assign([self.ps])\nRun Code Online (Sandbox Code Playgroud)\n\n之后我就可以计算分区内的消息了
\n\nself.consumer.seek_to_beginning(self.ps)\npos = self.consumer.position(self.ps)\nRun Code Online (Sandbox Code Playgroud)\n\n和self.consumer.seek_to_end(self.ps)\n......
我的主题中有超过 30000 条消息。\n问题是我只收到一条消息。
\n\n消费者配置为:\n max_poll_records= 200\nAUTO_OFFSET_RESET是最早的
这是我的功能,我试图获取消息:
\n\n def poll_messages(self):\n\n\n data = []\n\n messages = self.consumer.poll(timeout_ms=6000)\n\n\n for partition, msgs in six.iteritems(messages):\n\n for msg in msgs:\n\n data.append(msg)\n\n return data\nRun Code Online (Sandbox Code Playgroud)\n\n即使我在开始轮询消息之前转到第一个可用偏移量\n我也只收到一条消息。
\n\nself.consumer.seek(self.ps, self.get_first_offset())\nRun Code Online (Sandbox Code Playgroud)\n\n我希望有人能解释我做错了什么。\n提前致谢。
\n\n最美好的祝愿\nJ\xc3\xb6rn
\n我想在oracle数据库版本11.2.0.4上的PL/SQL ETL进程内的时间戳列表中插入数据
如果我在Toad中运行"从seletcted行创建INSERT",我将获得以下SQL命令:
Insert into xxxx$1
(ID, ITEM, ITEMSIZE, QUALITY, MATERIAL,
COLOUR, IMAGEURL, CREATIONDATE, SAMPLEITEMNUMBER, ITEMNUMBER)
Values
(111, 339079775, '1', 'Microfaser PRIMABELLE®', 'TEXTILE',
'1 (=creme)', 'url', TO_TIMESTAMP('27.06.2016 15:49:35.000000','DD.MM.YYYY HH24:MI:SS.FF'), 'xxx', 'xxxx');
COMMIT;
Run Code Online (Sandbox Code Playgroud)
在PL/SQL中,我生成一个VARCHAR2/String,它看起来像是一样的:
v_sql :=
'INSERT INTO xxxx'
|| p_importpostfix
|| ' VALUES ('
|| seq_xxxx.NEXTVAL
|| ','
|| v_rec.item_id
|| ', '''
|| v_size
|| ''','''
|| v_quality
|| ''','''
|| v_material
|| ''','''
|| v_colour
|| ''','''
|| NULL
|| ''',to_timestamp('''|| to_char( sysdate, 'DD.MM.YYYY HH24:MI:SS')||''',''DD.MM.YYYY HH24:MI:SS.FF''),'''
|| v_rec.vid …Run Code Online (Sandbox Code Playgroud) apache-kafka ×1
consumer ×1
database ×1
kafka-python ×1
oracle ×1
oracle11g ×1
plsql ×1
python ×1
toad ×1