小编Dhi*_*esh的帖子

如何在 Confluence kafka C# 库中获取 Kafka 主题的最新偏移量?

我正在使用 Confluence kafka C# 客户端。如何获取此主题中消耗的最新偏移量?

c# apache-kafka kafka-consumer-api confluent-platform

7
推荐指数
2
解决办法
1万
查看次数

客户端未连接到任何Elasticsearch节点!在flink中以独立群集模式运行时

我能够在我的eclipse中运行以下代码,但是当我在flink集群中运行它时,我得到以下错误.谁能指导我这个?

我的代码:

public static void main(String[] args) throws Exception {

    /**
     * Getting the execution Environment
     */
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "11.71.10.13:9092");
    properties.setProperty("group.id", "Bitfinex");
    Map<String, String> config = new HashMap<>();
    config.put("cluster.name", "TRADE-ES");
    config.put("bulk.flush.max.actions", "1");
    config.put("node.name", "node-1");

    List<InetSocketAddress> transportAddresses = new ArrayList<>();
    transportAddresses.add(new InetSocketAddress(InetAddress.getByName("19.18.1.55"), 9300));
    transportAddresses.add(new InetSocketAddress(InetAddress.getByName("19.18.1.78"), 9300));

    /**
     * Adding the BitFinex-ETHBTC-Order source to the execution environment
     */
    DataStream<String> ethbtc_OrderStream = env.addSource(
            new FlinkKafkaConsumer010<String>("BitFinex-ETHBTC-Order", new SimpleStringSchema(), properties),
            "Kafka_BitFinex-ETHBTC-Order_Source").setParallelism(1);

    ethbtc_OrderStream.addSink(new BitfinexEthbtcOrderADLSink<String>()).name("BitfinexEthbtcOrderADLSink")
            .setParallelism(10);

    ethbtc_OrderStream
            .addSink(new …
Run Code Online (Sandbox Code Playgroud)

elasticsearch apache-flink flink-streaming

5
推荐指数
0
解决办法
374
查看次数