小编A.S*_*iei的帖子

Apache Flink中的任务分发

考虑具有一些节点的Flink集群,其中每个节点具有多核处理器.如果我们根据内核数量和相等的内存份额配置插槽数量,Apache Flink如何在节点和空闲插槽之间分配任务?他们受到公平对待吗?
当我们根据节点上可用的核心数配置任务槽时,有没有办法使Flink能够平等地处理插槽?
     例如,假设我们平均分区数据并在分区上运行相同的任务.Flink使用来自某些节点的所有插槽,同时一些节点完全免费.具有较少CPU核心数的节点输出结果的速度比具有该过程中涉及的更多CPU核心数的节点快得多.除此之外,这个加速比率与每个节点中使用的核心数量不成比例.换句话说,如果在一个节点中占用一个核心而在另一个节点中占用两个核心,相当地将每个核心视为一个时隙,每个时隙应该在几乎相同的时间内在同一任务上输出结果,而不管哪个节点他们属于.但是,这不是这种情况.
     有了这个假设,我会说节点不会被平等对待.这又产生了与可用节点数量不成比例的结果.我们不能说增加插槽数量必然会降低时间成本.

我很感激Apache Flink社区的任何评论!

apache-flink

10
推荐指数
1
解决办法
830
查看次数

Apache Flink中的并行度

我可以在Flink的程序中为不同的任务部分设置不同程度的并行度吗?例如,Flink如何解释以下示例代码?两个自定义从业者MyPartitioner1,MyPartitioner2,将输入数据分为两个4和2个分区.

partitionedData1 = inputData1
  .partitionCustom(new MyPartitioner1(), 1);
env.setParallelism(4);
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1
  .mapPartition(new calculateFun());

partitionedData2 = inputData2
  .partitionCustom(new MyPartitioner2(), 2);
env.setParallelism(2);
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2
  .mapPartition(new calculateFun());
Run Code Online (Sandbox Code Playgroud)

我为此代码收到以下错误:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 …
Run Code Online (Sandbox Code Playgroud)

apache-flink

9
推荐指数
1
解决办法
2235
查看次数

Apache Flink中的全局排序

数据集的sortPartition方法根据某些指定的字段在本地对数据集进行排序.如何在Flink中以高效的方式全局排序我的大型数据集?

apache-flink

9
推荐指数
1
解决办法
1743
查看次数

在Apache flink的节点之间共享数据集的最佳方法是什么?

我正在使用Apache Flink处理数据流,并且需要在所有处理输入数据的节点之间共享索引。节点经常更新索引。

我想知道,从效率角度来看,通过广播变量共享数据集是一种好习惯吗?

是否在每次更新后在所有节点中更新广播变量?

Apache Flink是否仅针对最近的更改智能地增量更新广播变量?

apache-flink flink-streaming

5
推荐指数
1
解决办法
401
查看次数

java中的treeSet序列化

我有一个类,它扩展了一个实现serializable本身的超类.我的类包含一个treeSet字段和一个连接的比较器.我想使类可序列化,因为我将在集群上运行它.我得到一个java.io.NotSerializableException错误,而访问treeSet场.有谁知道我应该如何解决它?

public static class bounderyRecordsFilter implements FilterFunction  {
    public ArrayList<String> sortingkeyStart;
    public ArrayList<String> sortingkeyEnd;
    public  TreeSet<boundery> intervals ;
    public int pass;

       public   Comparator<boundery> Interval_order =   new Comparator<boundery>() {
            public int compare(boundery e1, boundery e2) {
                int comp_res=0;
                     comp_res= e1.getStartInterval() .compareToIgnoreCase(e2.getStartInterval());
                return comp_res;
            }
        };  

    public bounderyRecordsFilter(ArrayList<String> sortingkeyStart,ArrayList<String> sortingkeyEnd, int pass){
        super();
        intervals = new TreeSet<boundery>(Interval_order);
        for (int i=0 ; i< 4 ; i++)
        {
            boundery tempInterval = new boundery();
            ...             
            intervals.add(tempInterval) ;
        }
         this.sorkingkeyStart …
Run Code Online (Sandbox Code Playgroud)

java serialization

2
推荐指数
1
解决办法
1259
查看次数

从Apache Flink中的HDFS地址流式传输文件

在我的Flink代码中,我正在流式传输一个位于HDFS文件夹的文件,我收到错误"(没有这样的文件或目录)",但我确定文件名和地址是正确的,因为我在批处理中使用了相同的方法和每件事都顺利进行.有谁知道可能是什么问题?这是我的代码:

DataStream<FebrlObject> myStream = 
env.addSource(new MyObjectGenerator("hdfs://../Data/Dataset1.csv"));
Run Code Online (Sandbox Code Playgroud)

及其相关课程:

public class MyObjectGenerator implements SourceFunction<MyObject> {

    private String dataFilePath;
    private float servingSpeedFactor;
    private Integer rowNo ; 
    private transient BufferedReader reader;
    private transient InputStream inputStream;

    public MyObjectGenerator(String dataFilePath) {
        this(dataFilePath, 1.0f);
    }

    public MyObjectGenerator(String dataFilePath, float servingSpeedFactor) {
        this.dataFilePath = dataFilePath;
        this.servingSpeedFactor = servingSpeedFactor;
        rowNo = 0 ;
    }

    @Override
    public void run(SourceContext<MyObject> sourceContext) throws Exception {
        long servingStartTime = Calendar.getInstance().getTimeInMillis();
        inputStream = new DataInputStream(new FileInputStream(dataFilePath));
        reader = new BufferedReader(new InputStreamReader(inputStream));
        String line;
        long …
Run Code Online (Sandbox Code Playgroud)

apache-flink

2
推荐指数
1
解决办法
808
查看次数

Apache Flink:在预定义上下文中运行时无法实例化 RemoteEnvironment

有谁知道,在远程 Flink 集群上运行程序时出现以下错误的根源是什么?

我该如何解决?

 org.apache.flink.client.program.ProgramInvocationException: The main method caused an error.
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:512)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395)
    at org.apache.flink.client.program.Client.runBlocking(Client.java:252)
    at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:675)
    at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326)
    at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:977)
    at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1027)
Caused by: org.apache.flink.api.common.InvalidProgramException: The RemoteEnvironment cannot be instantiated when running in a pre-defined context (such as Command Line Client, Scala Shell, or TestEnvironment)
    at org.apache.flink.api.java.RemoteEnvironment.<init>(RemoteEnvironment.java:118)
    at org.apache.flink.api.java.RemoteEnvironment.<init>(RemoteEnvironment.java:78)
    at org.apache.flink.api.java.ExecutionEnvironment.createRemoteEnvironment(ExecutionEnvironment.java:1155)
    at org.apache.flink.test.myProj.main(myProj.java:133)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497)
    ... 6 more
Run Code Online (Sandbox Code Playgroud)

apache-flink

1
推荐指数
1
解决办法
2731
查看次数