小编Til*_*ann的帖子

如何使用hadoop实现自联接/跨产品?

对项目对进行一些评估是一项常见任务:示例:重复数据删除,协同过滤,类似项目等.这基本上是具有相同数据源的自连接或跨产品.

hadoop mapreduce self-join

4
推荐指数
1
解决办法
3880
查看次数

ALS的OutOfBoundsException - Flink MLlib

我正在为电影做一个推荐系统,使用这里提供的MovieLens数据集:http: //grouplens.org/datasets/movielens/

为了计算这个推荐系统,我在scala中使用了Flink的ML库,特别是ALS算法(org.apache.flink.ml.recommendation.ALS).

我首先将电影的评级映射为a DataSet[(Int, Int, Double)]然后创建a trainingSet和a testSet(参见下面的代码).

我的问题是当我使用ALS.fit整个数据集的函数(所有评级)时没有错误,但是如果我只删除一个评级,则fit函数不再起作用,我不明白为什么.

你有什么想法?:)

使用的代码:

Rating.scala

case class Rating(userId: Int, movieId: Int, rating: Double)
Run Code Online (Sandbox Code Playgroud)

PreProcessing.scala

object PreProcessing {

def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
      env.readCsvFile[(Int, Int, Double)](
      ratingsPath, ignoreFirstLine = true,
      includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}
Run Code Online (Sandbox Code Playgroud)

Processing.scala

object Processing {
  private val ratingsPath: String = "Path_to_ratings.csv"

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val ratings: …
Run Code Online (Sandbox Code Playgroud)

scala recommendation-engine indexoutofboundsexception apache-flink flinkml

4
推荐指数
1
解决办法
296
查看次数

任务不可序列化Flink

我试图在flink中进行pagerank基本示例,稍加修改(仅在读取输入文件时,其他一切都是相同的)我得到错误,因为任务不可序列化,下面是输出错误的一部分

atorg.apache.flink.api.scala.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:179)at org.apache.flink.api.scala.ClosureCleaner $ .clean(ClosureCleaner.scala:171)

以下是我的代码

object hpdb {

  def main(args: Array[String]) {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val maxIterations = 10000

    val DAMPENING_FACTOR: Double = 0.85

    val EPSILON: Double = 0.0001

    val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"

    val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
                fieldDelimiter = "\t", includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target

    val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
      fieldDelimiter = "\t", includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid

    val noOfPages = pages.count()

    val pagesWithRanks = pages.map(p => Page(p.pageId, 1.0 / noOfPages))

    val adjacencyLists = links
      // initialize …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink

4
推荐指数
1
解决办法
1499
查看次数

运行使用maven构建的jar时,FlinkMLTools NoClassDef

我正在使用Apache Flink开发推荐系统.当我在IntelliJ中测试它时,实现正在运行,但我现在想要进入集群.我还构建了一个jar文件并在本地测试它,看看是否一切正常但我遇到了问题.

java.lang.NoClassDefFoundError:org/apache/flink/ml/common/FlinkMLTools $

我们可以看到,FlinkMLTools在运行jar时没有找到我的代码中使用的类.我使用Maven 3.3.3构建了这个jar,mvn clean install我正在使用Flink的0.9.0版本.

第一道

事实是我的全球项目包含其他项目(这个推荐人是子项目之一).这样,我必须mvn clean install在正确的项目的文件夹中启动,否则Maven总是构建一个其他项目的jar(我不明白为什么).所以我想知道是否有办法明确地说maven来构建一个全球项目的特定项目.实际上,也许路径FlinkMLTools包含在pom.xml全球项目文件中的链接中.

还有其他想法吗?

jar noclassdeffounderror maven apache-flink flinkml

4
推荐指数
1
解决办法
254
查看次数

如何增加Flink taskmanager.numberOfTaskSlots以在没有Flink服务器的情况下运行它(在IDE或胖子罐中)

我有一个关于在IDE中或作为胖子运行Flink流作业而不将其部署到Flink服务器的问题。

问题是,当我的工作中有多个任务槽时,无法在IDE中运行它。

public class StreamingJob {

public static void main(String[] args) throws Exception {
    // set up the streaming execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    Properties kafkaProperties = new Properties();
    kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
    kafkaProperties.setProperty("group.id", "test");
    env.setParallelism(1);

    DataStream<String> kafkaSource = env
        .addSource(new FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), kafkaProperties))
        .name("Kafka-Source")
        .slotSharingGroup("Kafka-Source");

    kafkaSource.print().slotSharingGroup("Print");

    env.execute("Flink Streaming Java API Skeleton");
}
Run Code Online (Sandbox Code Playgroud)

}

我知道该作业需要2个插槽,并且Flink集群中可以有两个任务管理器,但是如何在IDE中本地运行它。

当前,我必须为所有本地操作员指定相同的slotSharingGroup名称,以拥有一个插槽。但这并不灵活。

您如何处理?

java apache-flink flink-streaming

4
推荐指数
1
解决办法
462
查看次数

从FlinkML多元线性回归中提取权重

我正在为Flink运行示例多元线性回归(0.10-SNAPSHOT).我无法弄清楚如何提取权重(例如斜率和截距,beta0-beta1,你想要称之为什么).我不是斯卡拉的超级经验,这可能是我的问题的一半.

感谢任何人给予的任何帮助.

object Job {
 def main(args: Array[String]) {
    // set up the execution environment
    val env = ExecutionEnvironment.getExecutionEnvironment

    val survival = env.readCsvFile[(String, String, String, String)]("/home/danger/IdeaProjects/quickstart/docs/haberman.data")

    val survivalLV = survival
      .map{tuple =>
      val list = tuple.productIterator.toList
      val numList = list.map(_.asInstanceOf[String].toDouble)
      LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
    }

    val mlr = MultipleLinearRegression()
      .setStepsize(1.0)
      .setIterations(100)
      .setConvergenceThreshold(0.001)

    mlr.fit(survivalLV) 
    println(mlr.toString())     // This doesn't do anything productive...
    println(mlr.weightsOption)  // Neither does this.

  }
}
Run Code Online (Sandbox Code Playgroud)

scala machine-learning linear-regression apache-flink flinkml

3
推荐指数
1
解决办法
244
查看次数

在flatMap函数中Apache Flink Streaming类型不匹配

尝试在scala 2.10.4中使用0.10.0 flink版本的流API.在尝试编译第一个版本时:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._

object Main {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val text = env.socketTextStream("localhost", 9999)

    val words : DataStream[String] = text.flatMap[String](
      new Function[String,TraversableOnce[String]] { 
        def apply(line:String):TraversableOnce[String] = line.split(" ")
      })

    env.execute("Window Stream wordcount")
  }
}
Run Code Online (Sandbox Code Playgroud)

我收到编译时错误:

[error]  found   : String => TraversableOnce[String]
[error]  required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error]       new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error]       ^
Run Code Online (Sandbox Code Playgroud)

在我已经包含在项目中的反编译版本的DataStream.class中,有接受这种类型的函数(最后一个):

public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
        if (flatMapper …
Run Code Online (Sandbox Code Playgroud)

scala apache-flink flink-streaming

3
推荐指数
1
解决办法
2011
查看次数

kafka - > flink - 性能问题

我正在看一些生成~30K消息/秒的kafka主题.我有一个flink拓扑设置来读取其中一个,聚合一点(5秒窗口)然后(最终)写入DB.

当我运行拓扑并删除除了读取 - >聚合步骤之外的所有内容时,我每分钟只能获得~30K消息.背压不会发生在任何地方.

我究竟做错了什么?


编辑:

  1. 我无法改变主题空间的任何内容.每个主题都有一个分区,其中有数百个分区.
  2. 每条消息都是一个压缩的thrift对象,平均为2-3Kb

看起来我只能得到~1.5 MB/s.不是接近提到的100MB/s.

当前的代码路径:

DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);  
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
Run Code Online (Sandbox Code Playgroud)
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
    private String mapId;
    public mapper2(String mapId) {
        this.mapId = mapId;
    }

    @Override
    public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
        TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
        Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
        tuple4.f0 = timeData.getId();
        tuple4.f1 = …
Run Code Online (Sandbox Code Playgroud)

java apache-kafka apache-flink flink-streaming

2
推荐指数
1
解决办法
1639
查看次数

Apache Flink:文件 STDOUT 在 TaskExecutor 上不可用

我使用官方 flink 存储库中的以下 docker-compose.yml 启动了 flink。我只添加了到外部hadoop网络的连接。

version: "2.1"

networks:
  hadoop:
    external:
      name: flink_hadoop

services:
  jobmanager:
    image: flink:1.7.1-hadoop27-scala_2.11
    container_name: flink-jobmanager
    domainname: hadoop
    networks:
      - hadoop
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:1.7.1-hadoop27-scala_2.11
    container_name: flink-taskmanager
    domainname: hadoop
    networks:
      - hadoop
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
Run Code Online (Sandbox Code Playgroud)

此后一切运行,我可以访问 WebUI。

然后我打包了以下工作。

import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory
import stoff.schnaps.pojo.ActorMovie

object HdfsJob {
  private lazy val logger = LoggerFactory.getLogger(getClass)

  def …
Run Code Online (Sandbox Code Playgroud)

logging scala docker apache-flink

2
推荐指数
1
解决办法
1万
查看次数

Flink自定义触发器提供了意外的输出

我想创建一个Trigger第一次在20秒内触发,此后每5秒触发一次的触发。我已经习惯GlobalWindowsTrigger

val windowedStream = valueStream
                          .keyBy(0)
                          .window(GlobalWindows.create())
                          .trigger(TradeTrigger.of())
Run Code Online (Sandbox Code Playgroud)

这是中的代码TradeTrigger

@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {

    private static final long serialVersionUID = 1L;

    static boolean flag=false;
    static long ctime = System.currentTimeMillis();

    private TradeTrigger() {
    }

    @Override
    public TriggerResult onElement(
            Object arg0,
            long arg1,
            W arg2,
            org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
            throws Exception {
        // TODO Auto-generated method stub

        if(flag == false){
            if((System.currentTimeMillis()-ctime) >= 20000){
               flag = true;
               ctime = System.currentTimeMillis();
               return TriggerResult.FIRE;
            } …
Run Code Online (Sandbox Code Playgroud)

apache-flink flink-streaming

0
推荐指数
1
解决办法
970
查看次数