对项目对进行一些评估是一项常见任务:示例:重复数据删除,协同过滤,类似项目等.这基本上是具有相同数据源的自连接或跨产品.
我正在为电影做一个推荐系统,使用这里提供的MovieLens数据集:http: //grouplens.org/datasets/movielens/
为了计算这个推荐系统,我在scala中使用了Flink的ML库,特别是ALS算法(org.apache.flink.ml.recommendation.ALS).
我首先将电影的评级映射为a DataSet[(Int, Int, Double)]然后创建a trainingSet和a testSet(参见下面的代码).
我的问题是当我使用ALS.fit整个数据集的函数(所有评级)时没有错误,但是如果我只删除一个评级,则fit函数不再起作用,我不明白为什么.
你有什么想法?:)
使用的代码:
Rating.scala
case class Rating(userId: Int, movieId: Int, rating: Double)
Run Code Online (Sandbox Code Playgroud)
PreProcessing.scala
object PreProcessing {
def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
env.readCsvFile[(Int, Int, Double)](
ratingsPath, ignoreFirstLine = true,
includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}
Run Code Online (Sandbox Code Playgroud)
Processing.scala
object Processing {
private val ratingsPath: String = "Path_to_ratings.csv"
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ratings: …Run Code Online (Sandbox Code Playgroud) scala recommendation-engine indexoutofboundsexception apache-flink flinkml
我试图在flink中进行pagerank基本示例,稍加修改(仅在读取输入文件时,其他一切都是相同的)我得到错误,因为任务不可序列化,下面是输出错误的一部分
atorg.apache.flink.api.scala.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:179)at org.apache.flink.api.scala.ClosureCleaner $ .clean(ClosureCleaner.scala:171)
以下是我的代码
object hpdb {
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val maxIterations = 10000
val DAMPENING_FACTOR: Double = 0.85
val EPSILON: Double = 0.0001
val outpath = "/home/vinoth/bigdata/assign10/pagerank.csv"
val links = env.readCsvFile[Tuple2[Long,Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "\t", includedFields = Array(1,4)).as('sourceId,'targetId).toDataSet[Link]//source and target
val pages = env.readCsvFile[Tuple1[Long]]("/home/vinoth/bigdata/assign10/ppi.csv",
fieldDelimiter = "\t", includedFields = Array(1)).as('pageId).toDataSet[Id]//Pageid
val noOfPages = pages.count()
val pagesWithRanks = pages.map(p => Page(p.pageId, 1.0 / noOfPages))
val adjacencyLists = links
// initialize …Run Code Online (Sandbox Code Playgroud) 我正在使用Apache Flink开发推荐系统.当我在IntelliJ中测试它时,实现正在运行,但我现在想要进入集群.我还构建了一个jar文件并在本地测试它,看看是否一切正常但我遇到了问题.
java.lang.NoClassDefFoundError:org/apache/flink/ml/common/FlinkMLTools $
我们可以看到,FlinkMLTools在运行jar时没有找到我的代码中使用的类.我使用Maven 3.3.3构建了这个jar,mvn clean install我正在使用Flink的0.9.0版本.
第一道
事实是我的全球项目包含其他项目(这个推荐人是子项目之一).这样,我必须mvn clean install在正确的项目的文件夹中启动,否则Maven总是构建一个其他项目的jar(我不明白为什么).所以我想知道是否有办法明确地说maven来构建一个全球项目的特定项目.实际上,也许路径FlinkMLTools包含在pom.xml全球项目文件中的链接中.
还有其他想法吗?
我有一个关于在IDE中或作为胖子运行Flink流作业而不将其部署到Flink服务器的问题。
问题是,当我的工作中有多个任务槽时,无法在IDE中运行它。
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties kafkaProperties = new Properties();
kafkaProperties.setProperty("bootstrap.servers", "localhost:9092");
kafkaProperties.setProperty("group.id", "test");
env.setParallelism(1);
DataStream<String> kafkaSource = env
.addSource(new FlinkKafkaConsumer010<>("flink-source", new SimpleStringSchema(), kafkaProperties))
.name("Kafka-Source")
.slotSharingGroup("Kafka-Source");
kafkaSource.print().slotSharingGroup("Print");
env.execute("Flink Streaming Java API Skeleton");
}
Run Code Online (Sandbox Code Playgroud)
}
我知道该作业需要2个插槽,并且Flink集群中可以有两个任务管理器,但是如何在IDE中本地运行它。
当前,我必须为所有本地操作员指定相同的slotSharingGroup名称,以拥有一个插槽。但这并不灵活。
您如何处理?
我正在为Flink运行示例多元线性回归(0.10-SNAPSHOT).我无法弄清楚如何提取权重(例如斜率和截距,beta0-beta1,你想要称之为什么).我不是斯卡拉的超级经验,这可能是我的问题的一半.
感谢任何人给予的任何帮助.
object Job {
def main(args: Array[String]) {
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment
val survival = env.readCsvFile[(String, String, String, String)]("/home/danger/IdeaProjects/quickstart/docs/haberman.data")
val survivalLV = survival
.map{tuple =>
val list = tuple.productIterator.toList
val numList = list.map(_.asInstanceOf[String].toDouble)
LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
}
val mlr = MultipleLinearRegression()
.setStepsize(1.0)
.setIterations(100)
.setConvergenceThreshold(0.001)
mlr.fit(survivalLV)
println(mlr.toString()) // This doesn't do anything productive...
println(mlr.weightsOption) // Neither does this.
}
}
Run Code Online (Sandbox Code Playgroud) scala machine-learning linear-regression apache-flink flinkml
尝试在scala 2.10.4中使用0.10.0 flink版本的流API.在尝试编译第一个版本时:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time._
object Main {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.socketTextStream("localhost", 9999)
val words : DataStream[String] = text.flatMap[String](
new Function[String,TraversableOnce[String]] {
def apply(line:String):TraversableOnce[String] = line.split(" ")
})
env.execute("Window Stream wordcount")
}
}
Run Code Online (Sandbox Code Playgroud)
我收到编译时错误:
[error] found : String => TraversableOnce[String]
[error] required: org.apache.flink.api.common.functions.FlatMapFunction[String,String]
[error] new Function[String,TraversableOnce[String]] { def apply(line:String):TraversableOnce[String] = line.split(" ")})
[error] ^
Run Code Online (Sandbox Code Playgroud)
在我已经包含在项目中的反编译版本的DataStream.class中,有接受这种类型的函数(最后一个):
public <R> DataStream<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> evidence$12, ClassTag<R> evidence$13) {
if (flatMapper …Run Code Online (Sandbox Code Playgroud) 我正在看一些生成~30K消息/秒的kafka主题.我有一个flink拓扑设置来读取其中一个,聚合一点(5秒窗口)然后(最终)写入DB.
当我运行拓扑并删除除了读取 - >聚合步骤之外的所有内容时,我每分钟只能获得~30K消息.背压不会发生在任何地方.
我究竟做错了什么?
看起来我只能得到~1.5 MB/s.不是接近提到的100MB/s.
当前的代码路径:
DataStream<byte[]> dataStream4 = env.addSource(new FlinkKafkaConsumer081<>("data_4", new RawSchema(), parameterTool.getProperties())).setParallelism(1);
DataStream<Tuple4<Long, Long, Integer, String>> ds4 = dataStream4.rebalance().flatMap(new mapper2("data_4")).setParallelism(4);
Run Code Online (Sandbox Code Playgroud)
public class mapper2 implements FlatMapFunction<byte[], Tuple4<Long, Long, Integer, String>> {
private String mapId;
public mapper2(String mapId) {
this.mapId = mapId;
}
@Override
public void flatMap(byte[] bytes, Collector<Tuple4<Long, Long, Integer, String>> collector) throws Exception {
TimeData timeData = (TimeData)ts_thriftDecoder.fromBytes(bytes);
Tuple4 tuple4 = new Tuple4<Long, Long, Integer, String>();
tuple4.f0 = timeData.getId();
tuple4.f1 = …Run Code Online (Sandbox Code Playgroud) 我使用官方 flink 存储库中的以下 docker-compose.yml 启动了 flink。我只添加了到外部hadoop网络的连接。
version: "2.1"
networks:
hadoop:
external:
name: flink_hadoop
services:
jobmanager:
image: flink:1.7.1-hadoop27-scala_2.11
container_name: flink-jobmanager
domainname: hadoop
networks:
- hadoop
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:1.7.1-hadoop27-scala_2.11
container_name: flink-taskmanager
domainname: hadoop
networks:
- hadoop
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
Run Code Online (Sandbox Code Playgroud)
此后一切运行,我可以访问 WebUI。
然后我打包了以下工作。
import org.apache.flink.api.scala._
import org.slf4j.LoggerFactory
import stoff.schnaps.pojo.ActorMovie
object HdfsJob {
private lazy val logger = LoggerFactory.getLogger(getClass)
def …Run Code Online (Sandbox Code Playgroud) 我想创建一个Trigger第一次在20秒内触发,此后每5秒触发一次的触发。我已经习惯GlobalWindows了Trigger
val windowedStream = valueStream
.keyBy(0)
.window(GlobalWindows.create())
.trigger(TradeTrigger.of())
Run Code Online (Sandbox Code Playgroud)
这是中的代码TradeTrigger:
@PublicEvolving
public class TradeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
static boolean flag=false;
static long ctime = System.currentTimeMillis();
private TradeTrigger() {
}
@Override
public TriggerResult onElement(
Object arg0,
long arg1,
W arg2,
org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext arg3)
throws Exception {
// TODO Auto-generated method stub
if(flag == false){
if((System.currentTimeMillis()-ctime) >= 20000){
flag = true;
ctime = System.currentTimeMillis();
return TriggerResult.FIRE;
} …Run Code Online (Sandbox Code Playgroud)