标签: apache-camel

Camel处理器中的业务逻辑与服务端点

在Camel路由中,我是否应该考虑将我的业务逻辑放在离散托管的bean端点中,比如消息驱动的bean或Web服务,而不仅仅是在Camel处理器中实现它?

将Camel用作中介和编排,使用Processor作为过滤器,而不是作为业务逻辑的容器,似乎更清晰地分离了关注点.但是我现在并不认为需要一个EJB容器,而且我似乎需要一个来容纳MDB.

如此清洁的架构与更小的占地面积,更少的技术 - 有没有人有这方面的想法,观点或强烈的感受?

architecture soa apache-camel

7
推荐指数
1
解决办法
2826
查看次数

如何OSGIfy图书馆

我正在开发一个项目,它是集成项目,我们正在使用Apache Camel和Apache Karaf.在项目中,我需要使用Jira REST Java客户端库.

所以我已经阅读了很多关于如何将非OSGI库包装到OSGI包中的各种文章和线程,但我真的不确定我是否做对了.

所以,我创建了一个POM文件,它依赖于所需的库.制作了一个包并试图将它部署到卡拉夫,当然,卡拉夫抱怨丢失包裹.

所以,我发现了相应的maven依赖,添加它,包进入<Import-Package>和依赖<Embed-Dependency>.

另一轮,部署,找到依赖,添加,再次,再次,直到Karaf与捆绑很好.

这是真的正确吗?在我看来,我觉得很疯狂,所以我想我不会因为usualy :)

最后,包裹变得稳定在我的工作计算机上,我快速检查并回家了,我继续但是,奇怪的是,在我的个人计算机上编译的相同的POM /包不起作用,再次抱怨丢失包裹,但是这次,这个软件包肯定是在POM文件中,并且肯定它是嵌入在包中的,我可以在那里看到它.

这次丢失的包是org.apache.commons.codec.

org.osgi.framework.BundleException: Unresolved constraint in bundle jiraclient.bundle [134]: Unable to     resolve 134.0: missing requirement [134.0] osgi.wiring.package; (osgi.wiring.package=org.apache.commons.codec)
    at org.apache.felix.framework.Felix.resolveBundleRevision(Felix.java:3826)[org.apache.felix.framework-4.0.3.jar:]
    at org.apache.felix.framework.Felix.startBundle(Felix.java:1868)[org.apache.felix.framework-4.0.3.jar:]
    at org.apache.felix.framework.BundleImpl.start(BundleImpl.java:944)[org.apache.felix.framework-4.0.3.jar:]
    at org.apache.felix.fileinstall.internal.DirectoryWatcher.startBundle(DirectoryWatcher.java:1247)[6:org.apache.felix.fileinstall:3.2.6]
    at org.apache.felix.fileinstall.internal.DirectoryWatcher.startBundles(DirectoryWatcher.java:1219)[6:org.apache.felix.fileinstall:3.2.6]
    at org.apache.felix.fileinstall.internal.DirectoryWatcher.startAllBundles(DirectoryWatcher.java:1208)[6:org.apache.felix.fileinstall:3.2.6]
    at org.apache.felix.fileinstall.internal.DirectoryWatcher.process(DirectoryWatcher.java:503)[6:org.apache.felix.fileinstall:3.2.6]
    at org.apache.felix.fileinstall.internal.DirectoryWatcher.run(DirectoryWatcher.java:291)[6:org.apache.felix.fileinstall:3.2.6]
Run Code Online (Sandbox Code Playgroud)

所以,现在我完全糊涂了,出了什么问题:(

很高兴,伙计们,帮助我.谢谢!

POM文件很长,所以我想链接更好:http://pastebin.com/j5cmWveG

osgi apache-camel maven bnd

7
推荐指数
1
解决办法
2945
查看次数

骆驼ftp似乎永远挂起

我有一条骆驼路线,就像永远被绞死一样,不确定它是Camel还是Ftp客户端问题.ftp组件的soTimeout设置为60000.感谢任何帮助.

组件版本:
camel-ftp:2.9.0
Commons Net:(2.2)

线程转储:

"Camel (some-ftp-route) thread #57 - ftp://user@ftphost/folder" Id=338 in RUNNABLE (running in native)
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
  - locked java.io.InputStreamReader@6a3f933a
at java.io.InputStreamReader.read(InputStreamReader.java:167)
at java.io.BufferedReader.fill(BufferedReader.java:136)
at java.io.BufferedReader.readLine(BufferedReader.java:299)
  - locked java.io.InputStreamReader@6a3f933a
at java.io.BufferedReader.readLine(BufferedReader.java:362)
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:295)
at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:365)
at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:630)
at org.apache.commons.net.SocketClient.connect(SocketClient.java:164)
at org.apache.commons.net.SocketClient.connect(SocketClient.java:184)
at org.apache.camel.component.file.remote.FtpOperations.connect(FtpOperations.java:91)
at org.apache.camel.component.file.remote.RemoteFileConsumer.connectIfNecessary(RemoteFileConsumer.java:144)
at org.apache.camel.component.file.remote.RemoteFileConsumer.recoverableConnectIfNecessary(RemoteFileConsumer.java:123)
at org.apache.camel.component.file.remote.RemoteFileConsumer.prePollCheck(RemoteFileConsumer.java:56)
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:100)
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:139)
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:91)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at …
Run Code Online (Sandbox Code Playgroud)

java ftp apache-camel apache-commons-net

7
推荐指数
1
解决办法
787
查看次数

apache camel multicast和recipent-list模式有什么区别?

因此,在阅读了一些文档并从这里获得了很多帮助后,我终于实现了一个动态选择端点的收件人列表(动态收件人列表):

在我的代码中,MainApp_A每10秒生成一次报告,我希望它能够同时将报告发送到所有服务器,而不是逐个发送.因此,我开发了以下路线.

MainApp_A

main.addRouteBuilder(new RouteBuilder(){
    @Override
        public void configure() throws Exception {
            from("direct:start").multicast().parallelProcessing()
                .beanRef("recipientListBean", "route").end()
            .log("${body}");
        }
});
Run Code Online (Sandbox Code Playgroud)

RecipientListBean

@RecipientList
public Set<String> route(String body) {
        return servers; //returns a collection of several severs
}
Run Code Online (Sandbox Code Playgroud)

我还检查了多播模式和动态路由的文档:

现在我有几个问题,我很困惑.所以我有几个问题:

  1. 收件人动态列表是多播模式与动态路由模式的交汇点吗?
  2. 单独的multicast()调用是纯粹顺序的吗?使用multicast()和recipientList()有什么区别?
  3. 要使multicast()和recipientList()成为concorrent,我必须使用parallelProcessing().在我的情况下,哪个更有效?

java apache multicast apache-camel

7
推荐指数
1
解决办法
7157
查看次数

使用Jackson JSON库与骆驼的例外

我在使用带有骆驼的Jackson JSON库时遇到了麻烦.例外是:

FailedToCreateRouteException: Failed to create route... because of Data format 'json-jackson' could not be created. Ensure the data format is valid and the associated Camel component is present on the classpath
Run Code Online (Sandbox Code Playgroud)

这是类路径通知的开始,杰克逊的libs在那里:

 <<< camel-maven-plugin:2.9.0:run (default-cli) @ portlistener <<<

--- camel-maven-plugin:2.9.0:run (default-cli) @ portlistener ---
Classpath = [file:/C:/Users/andrew.b-ext/Workspace/portlistener/target/classes/, file:/C:/Users/andrew.b-ext/.m2/repository/org/codehaus/jackson/jackson-asl/1.9.13/jackson-core-asl-1.9.13.jar, file:/C:/Users/andrew.b-ext/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar, file:/C:/Users/andrew.b-ext/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.9.13/jackson-jaxrs-1.9.13.jar, ...snip...
Run Code Online (Sandbox Code Playgroud)

我的pom有杰克逊:

 <properties>
    <camel.version>2.12.2</camel.version>
    <cxf.version>2.7.8</cxf.version>
    <activemq.version>5.6.0</activemq.version>
</properties>

<dependencies>
  <dependency>
     <groupId>org.codehaus.jackson</groupId>
     <artifactId>jackson-core-asl</artifactId>
     <version>1.9.13</version>
  </dependency>
  <dependency>
     <groupId>org.codehaus.jackson</groupId>
     <artifactId>jackson-mapper-asl</artifactId>
     <version>1.9.13</version>
  </dependency>
  <dependency>
     <groupId>org.codehaus.jackson</groupId>
     <artifactId>jackson-jaxrs</artifactId>
     <version>1.9.13</version>
  </dependency>
Run Code Online (Sandbox Code Playgroud)

这是我对Dataformat对象的定义:

<bean id="jsonDataformat" class="org.apache.camel.model.dataformat.JsonDataFormat">
    <property name="unmarshalType" …
Run Code Online (Sandbox Code Playgroud)

json apache-camel jackson

7
推荐指数
1
解决办法
1万
查看次数

使用带有OnException定义的adviceWith进行Camel路由测试

我有一个非常简单的Camel路由定义,它只包含一些OnException谓词来处理各自的异常和一些日志语句.

from("hazelcast:seda:someQueue")
    .id("someQueueID")
    .onException(CustomException.class)
        .handled(true)
        .log(LoggingLevel.WARN, "custom exception noticed")
    .end()
    .onException(IOException.class, FileNotFoundException.class)
        .asyncDelayedRedelivery()
        .redeliveryDelay(3*1000*60) // 3 Minutes
        .maximumRedeliveries(3)
        .log(LoggingLevel.WARN, "io exception noticed")
    .end()
    .onException(Exception.class)
        .log(LoggingLevel.WARN, "general exception noticed")
    .end()

    .log("Starting route")
    .bean(TestBean.class)
    .log("Finished route");
Run Code Online (Sandbox Code Playgroud)

bean本身也很简单,它只是检查一个header参数并抛出一个适当的异常

public class TestBean
{
    @Handler
    public void checkData(@Headers final Map<String, Object> headers)
            throws CustomException, IOException, Exception
    {
        Integer testVal = (Integer)headers.get("TestValue");
        if (0 == testVal)
            throw new CustomException("CustomException");
        else if (1 == testVal)
            throw new IOException("IOException");
        else
            throw new Exception("Exception");
    }
}
Run Code Online (Sandbox Code Playgroud)

由于这个测试设置只是一个较大项目的一小部分,这样做可能听起来很愚蠢,但核心意图是在测试时修改redeliveryDelay,因为"强制"IOException不需要等待3分钟因此,为了加快单位测试,可以将重新传递延迟减少到10毫秒.

为了实现这一点,我的测试方法执行以下操作:

@ContextConfiguration(classes …
Run Code Online (Sandbox Code Playgroud)

java unit-testing apache-camel

7
推荐指数
1
解决办法
7637
查看次数

有没有办法从bean设置消息头?

我有一个简单的骆驼路线我需要修改.路线看起来像这样:

from(source.uri)
    .unmarshal()
    .bean(TransformMessageBean.class, "SomeMethod")
    .to(destination.uri)
Run Code Online (Sandbox Code Playgroud)

我想在解组之后添加另一个bean方法调用,它设置一个标题值而不会中断当前数据流.有谁知道这样做的方法?我在apache的文档中读到,在出站消息体中设置了bean的返回值.有没有办法将其更改为标题?

提前致谢!

apache-camel

7
推荐指数
2
解决办法
8420
查看次数

openshift pods的CrashLoopBackOff状态是什么?

有一个以上的例子,我从一个以openshift原点运行的pod中看到了这个状态.在这种情况下,这是cdi camel示例的快速入门.我能够在本地成功构建和运行它(非openshift)但是当我尝试在我的本地openshift(使用mvn -Pf8-local-deploy)上部署时,我得到了这个特定示例的输出(为了相关性而剪断): -

[vagrant@vagrant camel]$ oc get pods NAME READY STATUS RESTARTS AGE cdi-camel-z4czs 0/1 CrashLoopBackOff 4 2m

日志尾巴如下: -

  Error occurred during initialization of VM
  Error opening zip file or JAR manifest missing : agents/jolokia.jar
  agent library failed to init: instrument
Run Code Online (Sandbox Code Playgroud)

有人可以帮我解决这个问题吗?

apache-camel cdi fabric8 openshift-origin

7
推荐指数
2
解决办法
9148
查看次数

通过Camel Blueprint中的属性配置SQL数据源(在Karaf中)

给出了一个非常简单的Camel捆绑包,用于生成camel-archetype-blueprint,我希望添加一个通过属性而不是在属性中配置的数据源blueprint.xml.

我尝试以各种方式配置PropertiesComponent并访问propertyMySQL数据源的值中的属性,但似乎没有一个工作.记录消息时,可以访问属性.

如何使用属性文件中的参数值配置数据源?

我特别需要这个,为多个bundle使用相同的数据源配置,并区分生产/测试环境.我考虑过在构建过程中使用Maven编写属性,具体取决于目标环境.有关如何解决此数据源问题的其他最佳实践吗?

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
       http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">

    <bean id="properties" class="org.apache.camel.component.properties.PropertiesComponent">
        <property name="location" value="classpath:config.properties" />
    </bean>

    <bean id="dataSourceMySQL" class="com.mysql.jdbc.jdbc2.optional.MysqlDataSource">
        <property name="url" value="jdbc:mysql://127.0.0.1/test_database" />
        <!-- This causes an error, as it tries to connect with
             `${mysqlUser}`@`localhost` without any evaluation -->
        <property name="user" value="${mysqlUser}" />
        <property name="password" value="${mysqlPassword}" />
    </bean>

    <service interface="javax.sql.DataSource" ref="dataSourceMySQL">
        <service-properties>
            <entry key="osgi.jndi.service.name" value="jdbc/mysqlDatasource" />
        </service-properties>
    </service>
    <bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
        <property name="dataSource" ref="dataSourceMySQL" />
    </bean> …
Run Code Online (Sandbox Code Playgroud)

java datasource apache-camel blueprint apache-karaf

7
推荐指数
1
解决办法
4604
查看次数

Apache Camel Kafka - 聚合kafka消息并定期发布到不同的主题

我有一个用例:

我需要定期阅读和聚合来自kafka主题的消息,并发布到不同的主题.Localstorage不是一种选择.这就是我计划解决这个问题的方法,欢迎任何改进建议

要安排kafka消息的聚合和发布,请计划使用Aggregator EIP的completionInterval选项.这是代码.

  @Autowired ObjectMapper objectMapper;
  JacksonDataFormat jacksonDataFormat;

  @PostConstruct
  public void initialize(){
    //objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
    jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
  }
Run Code Online (Sandbox Code Playgroud)

和路线:

public void configure() throws Exception {
    from("kafka:localhost:9092?topic=item-events" +
            "&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
            .routeId("kafkapoller")
            .unmarshal(jacksonDataFormat)
            .aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
            .marshal().json(JsonLibrary.Jackson)
            .to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
  }
Run Code Online (Sandbox Code Playgroud)

java apache-camel apache-kafka

7
推荐指数
1
解决办法
721
查看次数