Drools在Spark中用于流媒体文件

beg*_*ner 5 java hadoop drools apache-spark spark-streaming

我们能够成功地将drools与spark集成在一起当我们尝试应用来自Drools的规则时,我们能够为批处理文件做到这一点,它存在于HDFS中,但我们尝试使用drools进行流式文件,以便我们可以立即做出决定,但我们无法弄清楚如何去做.Below是我们正在努力实现的代码片段.
案例1 : .

    SparkConf conf = new SparkConf().setAppName("sample");
    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> javaRDD = sc.textFile("/user/root/spark/sample.dat");
    List<String> store = new ArrayList<String>();
    store = javaRDD.collect();
Run Code Online (Sandbox Code Playgroud)

案例2: 当我们使用流式上下文时

SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming");
    JavaStreamingContext ssc = 
              new JavaStreamingContext(sparkconf, new Duration(1));

    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);
Run Code Online (Sandbox Code Playgroud)

在第一种情况下,我们可以在变量存储上应用我们的规则,但在第二种情况下,我们无法在线上应用规则dstream.

如果有人有一些想法,怎么做,将是一个很大的帮助.

小智 1

这是完成它的一种方法。

  1. 首先根据业务规则创建您的知识会话。

    //Create knowledge and session here
    KnowledgeBase kbase = KnowledgeBaseFactory.newKnowledgeBase();
    KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
    kbuilder.add( ResourceFactory.newFileResource( "rulefile.drl"),
            ResourceType.DRL );
    Collection<KnowledgePackage> pkgs = kbuilder.getKnowledgePackages();
    kbase.addKnowledgePackages( pkgs );
    final StatelessKnowledgeSession ksession = kbase.newStatelessKnowledgeSession();
    
    Run Code Online (Sandbox Code Playgroud)
  2. 使用 StreamingContext 创建 JavaDStream。

    SparkConf sparkconf = new SparkConf().setAppName("sparkstreaming");
    JavaStreamingContext ssc = 
              new JavaStreamingContext(sparkconf, new Duration(1));
    JavaDStream<String> lines = ssc.socketTextStream("xx.xx.xx.xx", xxxx);  
    
    Run Code Online (Sandbox Code Playgroud)
  3. 调用 DStream 的 foreachRDD 来创建事实并触发您的规则。

    lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
      @Override
      public Void call(JavaRDD<String> rdd) throws Exception {
         List<String> facts = rdd.collect();
         //Apply rules on facts here
         ksession.execute(facts);
         return null;
      }
    });
    
    Run Code Online (Sandbox Code Playgroud)