我正在使用 Confluence kafka C# 客户端。如何获取此主题中消耗的最新偏移量?
我能够在我的eclipse中运行以下代码,但是当我在flink集群中运行它时,我得到以下错误.谁能指导我这个?
我的代码:
public static void main(String[] args) throws Exception {
/**
* Getting the execution Environment
*/
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "11.71.10.13:9092");
properties.setProperty("group.id", "Bitfinex");
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "TRADE-ES");
config.put("bulk.flush.max.actions", "1");
config.put("node.name", "node-1");
List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("19.18.1.55"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("19.18.1.78"), 9300));
/**
* Adding the BitFinex-ETHBTC-Order source to the execution environment
*/
DataStream<String> ethbtc_OrderStream = env.addSource(
new FlinkKafkaConsumer010<String>("BitFinex-ETHBTC-Order", new SimpleStringSchema(), properties),
"Kafka_BitFinex-ETHBTC-Order_Source").setParallelism(1);
ethbtc_OrderStream.addSink(new BitfinexEthbtcOrderADLSink<String>()).name("BitfinexEthbtcOrderADLSink")
.setParallelism(10);
ethbtc_OrderStream
.addSink(new …Run Code Online (Sandbox Code Playgroud)