标签: apache-flink

Flink Scala API"没有足够的参数"

我在使用Apache Flink Scala API时遇到了麻烦

例如,即使我从官方文档中获取示例,scala编译器也会给我带来大量的编译错误.

码:

object TestFlink {

  def main(args: Array[String]) {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val text = env.fromElements(
      "Who's there?",
      "I think I hear them. Stand, ho! Who's there?")

    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
      .map { (_, 1) }
      .groupBy(0)
      .sum(1)

    counts.print()

    env.execute("Scala WordCount Example")
  }
}
Run Code Online (Sandbox Code Playgroud)

Scala IDE为该行输出以下内容 val text = env.fromElements

Multiple markers at this line
  - not enough arguments for method fromElements: (implicit evidence$14: scala.reflect.ClassTag[String], implicit evidence$15: 
   org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.api.scala.DataSet[String]. …
Run Code Online (Sandbox Code Playgroud)

scala-ide apache-flink

10
推荐指数
1
解决办法
2766
查看次数

Flink流媒体事件时间窗口排序

我遇到了一些麻烦,理解事件时间窗口周围的语义.以下程序生成一些带有时间戳的元组,这些时间戳用作事件时间并执行简单的窗口聚合.我希望输出与输入的顺序相同,但输出的排序方式不同.为什么输出与事件时间无关?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}
Run Code Online (Sandbox Code Playgroud)

输入:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)
Run Code Online (Sandbox Code Playgroud)

结果:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

10
推荐指数
1
解决办法
2377
查看次数

Apache Flink中的任务分发

考虑具有一些节点的Flink集群,其中每个节点具有多核处理器.如果我们根据内核数量和相等的内存份额配置插槽数量,Apache Flink如何在节点和空闲插槽之间分配任务?他们受到公平对待吗?
当我们根据节点上可用的核心数配置任务槽时,有没有办法使Flink能够平等地处理插槽?
     例如,假设我们平均分区数据并在分区上运行相同的任务.Flink使用来自某些节点的所有插槽,同时一些节点完全免费.具有较少CPU核心数的节点输出结果的速度比具有该过程中涉及的更多CPU核心数的节点快得多.除此之外,这个加速比率与每个节点中使用的核心数量不成比例.换句话说,如果在一个节点中占用一个核心而在另一个节点中占用两个核心,相当地将每个核心视为一个时隙,每个时隙应该在几乎相同的时间内在同一任务上输出结果,而不管哪个节点他们属于.但是,这不是这种情况.
     有了这个假设,我会说节点不会被平等对待.这又产生了与可用节点数量不成比例的结果.我们不能说增加插槽数量必然会降低时间成本.

我很感激Apache Flink社区的任何评论!

apache-flink

10
推荐指数
1
解决办法
830
查看次数

Flink群集配置问题 - 没有可用的插槽

我已经部署了具有并行配置的Flink集群,如下所示:

jobmanager.heap.mb: 2048
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 5
parallelism.default: 2
Run Code Online (Sandbox Code Playgroud)

但是,如果我尝试运行任何示例或jar甚至-p标志我收到以下错误:

org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration. 
Task to schedule: < Attempt #1 (Source: Custom Source -> Sink: Unnamed (1/1)) @ (unassigned) - [SCHEDULED] > with groupID < 22f48c24254702e4d1674069e455c81a > in sharing group < SlotSharingGroup [22f48c24254702e4d1674069e455c81a] >. Resources available to scheduler: 
Number of instances=0, total number of slots=0, …
Run Code Online (Sandbox Code Playgroud)

apache-flink

10
推荐指数
1
解决办法
5111
查看次数

Apache光束计数器/度量标准在Flink WebUI中不可用

我正在使用Flink 1.4.1和Beam 2.3.0,并且想知道是否可以在Flink WebUI(或任何地方)中提供指标,如Dataflow WebUI中那样?

我用过像这样的柜台:

import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
...
Counter elementsRead = Metrics.counter(getClass(), "elements_read");
...
elementsRead.inc();
Run Code Online (Sandbox Code Playgroud)

但我无法"elements_read"在Flink WebUI中找到任何可用的计数(任务指标或累加器).我认为在BEAM-773之后这将是直截了当的.

java metrics apache-flink apache-beam

10
推荐指数
1
解决办法
757
查看次数

Apache Flink:java.lang.NoClassDefFoundError

我正在尝试遵循此示例,但是当我尝试编译它时,出现此错误:

Error: Unable to initialize main class com.amazonaws.services.kinesisanalytics.aws
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
Run Code Online (Sandbox Code Playgroud)

错误是由于此代码:

    private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
    }
Run Code Online (Sandbox Code Playgroud)

我想这是有问题的线路:

  return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Run Code Online (Sandbox Code Playgroud)

这是我的 Maven 依赖项:

在此处输入图片说明

可能有什么问题?任何依赖项,版本?

注意:如果我评论有问题的行,程序运行没有问题。

POM文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>as</groupId>
    <artifactId>a</artifactId>
    <version>1</version>
    <packaging>jar</packaging>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <geotools.version>21-SNAPSHOT</geotools.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <flink.version>1.6.2</flink.version>
        <kda.version>1.0.1</kda.version>
    </properties>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>${java.version}</source>
                    <target>${java.version}</target> …
Run Code Online (Sandbox Code Playgroud)

java intellij-idea noclassdeffounderror amazon-web-services apache-flink

10
推荐指数
2
解决办法
9234
查看次数

Apache Flink与Hadoop上的Mapreduce相比如何?

Apache Flink与Hadoop上的Mapreduce相比如何?它以什么方式更好,为什么?

hadoop mapreduce apache-flink

9
推荐指数
2
解决办法
2181
查看次数

Apache Flink中的并行度

我可以在Flink的程序中为不同的任务部分设置不同程度的并行度吗?例如,Flink如何解释以下示例代码?两个自定义从业者MyPartitioner1,MyPartitioner2,将输入数据分为两个4和2个分区.

partitionedData1 = inputData1
  .partitionCustom(new MyPartitioner1(), 1);
env.setParallelism(4);
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1
  .mapPartition(new calculateFun());

partitionedData2 = inputData2
  .partitionCustom(new MyPartitioner2(), 2);
env.setParallelism(2);
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2
  .mapPartition(new calculateFun());
Run Code Online (Sandbox Code Playgroud)

我为此代码收到以下错误:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
    at akka.dispatch.Mailbox.run(Mailbox.scala:221)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 …
Run Code Online (Sandbox Code Playgroud)

apache-flink

9
推荐指数
1
解决办法
2235
查看次数

Apache Flink中的全局排序

数据集的sortPartition方法根据某些指定的字段在本地对数据集进行排序.如何在Flink中以高效的方式全局排序我的大型数据集?

apache-flink

9
推荐指数
1
解决办法
1743
查看次数

flink作业不是跨机器分布的

我在Apache flink中有一个小用例,即批处理系统.我需要处理一组文件.每个文件的处理必须由一台机器处理.我有以下代码.始终只占用一个任务槽,并且一个接一个地处理文件.我有6个节点(所以6个任务管理器),并在每个节点配置4个任务槽.所以,我希望一次处理24个文件.

class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
  override def mapPartition(
      myfiles: java.lang.Iterable[java.io.File],
      out:org.apache.flink.util.Collector[Int])
    : Unit  =  {
    var temp = myfiles.iterator()
    while(temp.hasNext()){
      val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
      val file = new File(temp.next().toURI)
      Process(
        "/bin/bash ./run.sh  " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
        new File(fp1.getAbsoluteFile.getParent))
        .lines
        .foreach{println}
      out.collect(1)
    }
  }
}
Run Code Online (Sandbox Code Playgroud)

我启动了flink as ./bin/start-cluster.sh命令,Web用户界面显示它有6个任务管理器,24个任务槽.

这些文件夹包含大约49个文件.当我在这个集合上创建mapPartition时,我希望跨越49个并行进程.但是,在我的基础设施中,它们都是一个接一个地处理的.这意味着只有一台机器(一个任务管理器)处理所有49个文件名.我想要的是,每个插槽配置2个任务,我希望同时处理24个文件.

任何指针肯定会有所帮助.我在flink-conf.yaml文件中有这些参数

jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
Run Code Online (Sandbox Code Playgroud)

提前致谢.谁能让我知道我哪里出错了?

scala batch-processing apache-flink

9
推荐指数
1
解决办法
393
查看次数