gab*_*ssi 40
从kafka 0.11.0.0开始,您可以使用此答案中的脚本kafka-consumer-groups.sh
示例
kafka-consumer-groups.sh --bootstrap-server kafka-host:9092 --group my-group --reset-offsets --to-earliest --all-topics --execute
Run Code Online (Sandbox Code Playgroud)
KIP-122中列出的其他选项:添加重置消费者组偏移工具
.----------------------.-----------------------------------------------.----------------------------------------------------------------------------------------------------------------------------------------------.
| Scenario | Arguments | Example |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Datetime | --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm | Reset to first offset since 01 January 2017, 00:00:00 hrs: --reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:00Z |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset by Duration | --by-duration PnDTnHnMnS | Reset to first offset since one week ago (from current timestamp): --reset-offsets --group test.group --topic foo --by-duration P7D |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Earliest | --to-earliest | Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Latest | --to-latest | Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Offset | --to-offset | Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1 |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Shift Offset by 'n' | --shift-by n | Reset to current offset plus 5 positions: --reset-offsets --group test.group –topic foo --shift-by 5 |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset from File | --from-file PATH_TO_FILE | Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv |
'----------------------'-----------------------------------------------'----------------------------------------------------------------------------------------------------------------------------------------------'
Run Code Online (Sandbox Code Playgroud)
您还可以定义要重置的分区,例如:
将主题foo分区0的偏移重置为1
--reset-offsets --group test.group --topic foo:0 --to-offset 1
将主题foo分区0,1,2的偏移重置为最早
--reset-offsets --group test.group --topic foo:0,1,2 --to-earliest
提醒:不要忘记--execute标志(请参阅KIP中的执行选项).如果没有此标志,脚本 将仅按范围打印出场景的结果,例如:
TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID
foo 0 90 10 100 - - -
Run Code Online (Sandbox Code Playgroud)
Vis*_*ohn 23
您可以在zookeeper shell的帮助下完成此操作.Kafka使用zookeeper跟踪消费者抵消.
转到kafka bin目录并调用zookeeper shell.(我的kafka版本是0.8.0)
./zookeeper-shell.sh localhost:2181
Run Code Online (Sandbox Code Playgroud)
现在使用zookeeper get命令
get /consumers/consumer_group_id/offsets/topic/0
Run Code Online (Sandbox Code Playgroud)
它显示了类似的东西
2043
cZxid = 0x4d
ctime = Wed Mar 18 03:56:32 EDT 2015
...
Run Code Online (Sandbox Code Playgroud)
这里2043是消耗的最大偏移量.使用zookeeper set命令将其设置为所需的值
set /consumers/consumer_group_id/offsets/topic/0 10000
Run Code Online (Sandbox Code Playgroud)
该路径的框架类似于/ consume/[consumer_group_id]/offsets/[topic]/[partition_id].
您必须使用适当的使用者组,主题和分区ID替换.
*另外,既然你提到它是kafka的新实例,我不确定消费者是否会创建连接和消费者组.
如果您需要更改偏移量。
kafka-consumer-groups --bootstrap-server {url} \
--topic {topic} \
--group {consumer} \
--reset-offsets --to-datetime 2020-11-11T00:00:00.000+0900 \
--execute
Run Code Online (Sandbox Code Playgroud)
通过 SimpleDateFormat 将 UTC 字符串解析为日期时出现无法解析的日期错误
从 kafka 0.9 开始,偏移量存储在主题中。要更改偏移量,请使用seek()方法:
public void seek(TopicPartition partition, long offset)
Run Code Online (Sandbox Code Playgroud)
覆盖消费者将在 next 上使用的获取偏移量
poll(timeout)。如果对同一分区多次调用此 API,则下一次 poll() 将使用最新的偏移量。请注意,如果在消费过程中任意使用此API来重置获取偏移量,则可能会丢失数据
| 归档时间: |
|
| 查看次数: |
28547 次 |
| 最近记录: |