我在使用Apache Flink Scala API时遇到了麻烦
例如,即使我从官方文档中获取示例,scala编译器也会给我带来大量的编译错误.
码:
object TestFlink {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(
"Who's there?",
"I think I hear them. Stand, ho! Who's there?")
val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
counts.print()
env.execute("Scala WordCount Example")
}
}
Run Code Online (Sandbox Code Playgroud)
Scala IDE为该行输出以下内容 val text = env.fromElements
Multiple markers at this line
- not enough arguments for method fromElements: (implicit evidence$14: scala.reflect.ClassTag[String], implicit evidence$15:
org.apache.flink.api.common.typeinfo.TypeInformation[String])org.apache.flink.api.scala.DataSet[String]. …Run Code Online (Sandbox Code Playgroud) 我遇到了一些麻烦,理解事件时间窗口周围的语义.以下程序生成一些带有时间戳的元组,这些时间戳用作事件时间并执行简单的窗口聚合.我希望输出与输入的顺序相同,但输出的排序方式不同.为什么输出与事件时间无关?
import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
object WindowExample extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.enableTimestamps()
env.setParallelism(1)
val start = 1449597577379L
val tuples = (1 to 10).map(t => (start + t * 1000, t))
env.fromCollection(tuples)
.assignAscendingTimestamps(_._1)
.timeWindowAll(Time.of(1, TimeUnit.SECONDS))
.sum(1)
.print()
env.execute()
}
Run Code Online (Sandbox Code Playgroud)
输入:
(1449597578379,1)
(1449597579379,2)
(1449597580379,3)
(1449597581379,4)
(1449597582379,5)
(1449597583379,6)
(1449597584379,7)
(1449597585379,8)
(1449597586379,9)
(1449597587379,10)
Run Code Online (Sandbox Code Playgroud)
结果:
[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
Run Code Online (Sandbox Code Playgroud) 考虑具有一些节点的Flink集群,其中每个节点具有多核处理器.如果我们根据内核数量和相等的内存份额配置插槽数量,Apache Flink如何在节点和空闲插槽之间分配任务?他们受到公平对待吗?
当我们根据节点上可用的核心数配置任务槽时,有没有办法使Flink能够平等地处理插槽?
例如,假设我们平均分区数据并在分区上运行相同的任务.Flink使用来自某些节点的所有插槽,同时一些节点完全免费.具有较少CPU核心数的节点输出结果的速度比具有该过程中涉及的更多CPU核心数的节点快得多.除此之外,这个加速比率与每个节点中使用的核心数量不成比例.换句话说,如果在一个节点中占用一个核心而在另一个节点中占用两个核心,相当地将每个核心视为一个时隙,每个时隙应该在几乎相同的时间内在同一任务上输出结果,而不管哪个节点他们属于.但是,这不是这种情况.
有了这个假设,我会说节点不会被平等对待.这又产生了与可用节点数量不成比例的结果.我们不能说增加插槽数量必然会降低时间成本.
我很感激Apache Flink社区的任何评论!
我已经部署了具有并行配置的Flink集群,如下所示:
jobmanager.heap.mb: 2048
taskmanager.heap.mb: 2048
taskmanager.numberOfTaskSlots: 5
parallelism.default: 2
Run Code Online (Sandbox Code Playgroud)
但是,如果我尝试运行任何示例或jar甚至-p标志我收到以下错误:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Not enough free slots available to run the job. You can decrease the operator parallelism or increase the number of slots per TaskManager in the configuration.
Task to schedule: < Attempt #1 (Source: Custom Source -> Sink: Unnamed (1/1)) @ (unassigned) - [SCHEDULED] > with groupID < 22f48c24254702e4d1674069e455c81a > in sharing group < SlotSharingGroup [22f48c24254702e4d1674069e455c81a] >. Resources available to scheduler:
Number of instances=0, total number of slots=0, …Run Code Online (Sandbox Code Playgroud) 我正在使用Flink 1.4.1和Beam 2.3.0,并且想知道是否可以在Flink WebUI(或任何地方)中提供指标,如Dataflow WebUI中那样?
我用过像这样的柜台:
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
...
Counter elementsRead = Metrics.counter(getClass(), "elements_read");
...
elementsRead.inc();
Run Code Online (Sandbox Code Playgroud)
但我无法"elements_read"在Flink WebUI中找到任何可用的计数(任务指标或累加器).我认为在BEAM-773之后这将是直截了当的.
我正在尝试遵循此示例,但是当我尝试编译它时,出现此错误:
Error: Unable to initialize main class com.amazonaws.services.kinesisanalytics.aws
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/api/functions/source/SourceFunction
Run Code Online (Sandbox Code Playgroud)
错误是由于此代码:
private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
}
Run Code Online (Sandbox Code Playgroud)
我想这是有问题的线路:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Run Code Online (Sandbox Code Playgroud)
这是我的 Maven 依赖项:
可能有什么问题?任何依赖项,版本?
注意:如果我评论有问题的行,程序运行没有问题。
POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>as</groupId>
<artifactId>a</artifactId>
<version>1</version>
<packaging>jar</packaging>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<geotools.version>21-SNAPSHOT</geotools.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<flink.version>1.6.2</flink.version>
<kda.version>1.0.1</kda.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target> …Run Code Online (Sandbox Code Playgroud) java intellij-idea noclassdeffounderror amazon-web-services apache-flink
Apache Flink与Hadoop上的Mapreduce相比如何?它以什么方式更好,为什么?
我可以在Flink的程序中为不同的任务部分设置不同程度的并行度吗?例如,Flink如何解释以下示例代码?两个自定义从业者MyPartitioner1,MyPartitioner2,将输入数据分为两个4和2个分区.
partitionedData1 = inputData1
.partitionCustom(new MyPartitioner1(), 1);
env.setParallelism(4);
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1
.mapPartition(new calculateFun());
partitionedData2 = inputData2
.partitionCustom(new MyPartitioner2(), 2);
env.setParallelism(2);
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2
.mapPartition(new calculateFun());
Run Code Online (Sandbox Code Playgroud)
我为此代码收到以下错误:
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
at akka.dispatch.Mailbox.run(Mailbox.scala:221)
at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 …Run Code Online (Sandbox Code Playgroud) 数据集的sortPartition方法根据某些指定的字段在本地对数据集进行排序.如何在Flink中以高效的方式全局排序我的大型数据集?
我在Apache flink中有一个小用例,即批处理系统.我需要处理一组文件.每个文件的处理必须由一台机器处理.我有以下代码.始终只占用一个任务槽,并且一个接一个地处理文件.我有6个节点(所以6个任务管理器),并在每个节点配置4个任务槽.所以,我希望一次处理24个文件.
class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
override def mapPartition(
myfiles: java.lang.Iterable[java.io.File],
out:org.apache.flink.util.Collector[Int])
: Unit = {
var temp = myfiles.iterator()
while(temp.hasNext()){
val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
val file = new File(temp.next().toURI)
Process(
"/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
new File(fp1.getAbsoluteFile.getParent))
.lines
.foreach{println}
out.collect(1)
}
}
}
Run Code Online (Sandbox Code Playgroud)
我启动了flink as ./bin/start-cluster.sh命令,Web用户界面显示它有6个任务管理器,24个任务槽.
这些文件夹包含大约49个文件.当我在这个集合上创建mapPartition时,我希望跨越49个并行进程.但是,在我的基础设施中,它们都是一个接一个地处理的.这意味着只有一台机器(一个任务管理器)处理所有49个文件名.我想要的是,每个插槽配置2个任务,我希望同时处理24个文件.
任何指针肯定会有所帮助.我在flink-conf.yaml文件中有这些参数
jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
Run Code Online (Sandbox Code Playgroud)
提前致谢.谁能让我知道我哪里出错了?
apache-flink ×10
java ×2
apache-beam ×1
hadoop ×1
mapreduce ×1
metrics ×1
scala ×1
scala-ide ×1