在Camel路由中,我是否应该考虑将我的业务逻辑放在离散托管的bean端点中,比如消息驱动的bean或Web服务,而不仅仅是在Camel处理器中实现它?
将Camel用作中介和编排,使用Processor作为过滤器,而不是作为业务逻辑的容器,似乎更清晰地分离了关注点.但是我现在并不认为需要一个EJB容器,而且我似乎需要一个来容纳MDB.
如此清洁的架构与更小的占地面积,更少的技术 - 有没有人有这方面的想法,观点或强烈的感受?
我正在开发一个项目,它是集成项目,我们正在使用Apache Camel和Apache Karaf.在项目中,我需要使用Jira REST Java客户端库.
所以我已经阅读了很多关于如何将非OSGI库包装到OSGI包中的各种文章和线程,但我真的不确定我是否做对了.
所以,我创建了一个POM文件,它依赖于所需的库.制作了一个包并试图将它部署到卡拉夫,当然,卡拉夫抱怨丢失包裹.
所以,我发现了相应的maven依赖,添加它,包进入<Import-Package>和依赖<Embed-Dependency>.
另一轮,部署,找到依赖,添加,再次,再次,直到Karaf与捆绑很好.
这是真的正确吗?在我看来,我觉得很疯狂,所以我想我不会因为usualy :)
最后,包裹变得稳定在我的工作计算机上,我快速检查并回家了,我继续但是,奇怪的是,在我的个人计算机上编译的相同的POM /包不起作用,再次抱怨丢失包裹,但是这次,这个软件包肯定是在POM文件中,并且肯定它是嵌入在包中的,我可以在那里看到它.
这次丢失的包是org.apache.commons.codec.
org.osgi.framework.BundleException: Unresolved constraint in bundle jiraclient.bundle [134]: Unable to resolve 134.0: missing requirement [134.0] osgi.wiring.package; (osgi.wiring.package=org.apache.commons.codec)
at org.apache.felix.framework.Felix.resolveBundleRevision(Felix.java:3826)[org.apache.felix.framework-4.0.3.jar:]
at org.apache.felix.framework.Felix.startBundle(Felix.java:1868)[org.apache.felix.framework-4.0.3.jar:]
at org.apache.felix.framework.BundleImpl.start(BundleImpl.java:944)[org.apache.felix.framework-4.0.3.jar:]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.startBundle(DirectoryWatcher.java:1247)[6:org.apache.felix.fileinstall:3.2.6]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.startBundles(DirectoryWatcher.java:1219)[6:org.apache.felix.fileinstall:3.2.6]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.startAllBundles(DirectoryWatcher.java:1208)[6:org.apache.felix.fileinstall:3.2.6]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.process(DirectoryWatcher.java:503)[6:org.apache.felix.fileinstall:3.2.6]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.run(DirectoryWatcher.java:291)[6:org.apache.felix.fileinstall:3.2.6]
Run Code Online (Sandbox Code Playgroud)
所以,现在我完全糊涂了,出了什么问题:(
很高兴,伙计们,帮助我.谢谢!
POM文件很长,所以我想链接更好:http://pastebin.com/j5cmWveG
我有一条骆驼路线,就像永远被绞死一样,不确定它是Camel还是Ftp客户端问题.ftp组件的soTimeout设置为60000.感谢任何帮助.
组件版本:
camel-ftp:2.9.0
Commons Net:(2.2)
线程转储:
"Camel (some-ftp-route) thread #57 - ftp://user@ftphost/folder" Id=338 in RUNNABLE (running in native)
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
- locked java.io.InputStreamReader@6a3f933a
at java.io.InputStreamReader.read(InputStreamReader.java:167)
at java.io.BufferedReader.fill(BufferedReader.java:136)
at java.io.BufferedReader.readLine(BufferedReader.java:299)
- locked java.io.InputStreamReader@6a3f933a
at java.io.BufferedReader.readLine(BufferedReader.java:362)
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:295)
at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:365)
at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:630)
at org.apache.commons.net.SocketClient.connect(SocketClient.java:164)
at org.apache.commons.net.SocketClient.connect(SocketClient.java:184)
at org.apache.camel.component.file.remote.FtpOperations.connect(FtpOperations.java:91)
at org.apache.camel.component.file.remote.RemoteFileConsumer.connectIfNecessary(RemoteFileConsumer.java:144)
at org.apache.camel.component.file.remote.RemoteFileConsumer.recoverableConnectIfNecessary(RemoteFileConsumer.java:123)
at org.apache.camel.component.file.remote.RemoteFileConsumer.prePollCheck(RemoteFileConsumer.java:56)
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:100)
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:139)
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:91)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at …Run Code Online (Sandbox Code Playgroud) 因此,在阅读了一些文档并从这里获得了很多帮助后,我终于实现了一个动态选择端点的收件人列表(动态收件人列表):
在我的代码中,MainApp_A每10秒生成一次报告,我希望它能够同时将报告发送到所有服务器,而不是逐个发送.因此,我开发了以下路线.
MainApp_A
main.addRouteBuilder(new RouteBuilder(){
@Override
public void configure() throws Exception {
from("direct:start").multicast().parallelProcessing()
.beanRef("recipientListBean", "route").end()
.log("${body}");
}
});
Run Code Online (Sandbox Code Playgroud)
RecipientListBean
@RecipientList
public Set<String> route(String body) {
return servers; //returns a collection of several severs
}
Run Code Online (Sandbox Code Playgroud)
我还检查了多播模式和动态路由的文档:
现在我有几个问题,我很困惑.所以我有几个问题:
我在使用带有骆驼的Jackson JSON库时遇到了麻烦.例外是:
FailedToCreateRouteException: Failed to create route... because of Data format 'json-jackson' could not be created. Ensure the data format is valid and the associated Camel component is present on the classpath
Run Code Online (Sandbox Code Playgroud)
这是类路径通知的开始,杰克逊的libs在那里:
<<< camel-maven-plugin:2.9.0:run (default-cli) @ portlistener <<<
--- camel-maven-plugin:2.9.0:run (default-cli) @ portlistener ---
Classpath = [file:/C:/Users/andrew.b-ext/Workspace/portlistener/target/classes/, file:/C:/Users/andrew.b-ext/.m2/repository/org/codehaus/jackson/jackson-asl/1.9.13/jackson-core-asl-1.9.13.jar, file:/C:/Users/andrew.b-ext/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar, file:/C:/Users/andrew.b-ext/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.9.13/jackson-jaxrs-1.9.13.jar, ...snip...
Run Code Online (Sandbox Code Playgroud)
我的pom有杰克逊:
<properties>
<camel.version>2.12.2</camel.version>
<cxf.version>2.7.8</cxf.version>
<activemq.version>5.6.0</activemq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.13</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
这是我对Dataformat对象的定义:
<bean id="jsonDataformat" class="org.apache.camel.model.dataformat.JsonDataFormat">
<property name="unmarshalType" …Run Code Online (Sandbox Code Playgroud) 我有一个非常简单的Camel路由定义,它只包含一些OnException谓词来处理各自的异常和一些日志语句.
from("hazelcast:seda:someQueue")
.id("someQueueID")
.onException(CustomException.class)
.handled(true)
.log(LoggingLevel.WARN, "custom exception noticed")
.end()
.onException(IOException.class, FileNotFoundException.class)
.asyncDelayedRedelivery()
.redeliveryDelay(3*1000*60) // 3 Minutes
.maximumRedeliveries(3)
.log(LoggingLevel.WARN, "io exception noticed")
.end()
.onException(Exception.class)
.log(LoggingLevel.WARN, "general exception noticed")
.end()
.log("Starting route")
.bean(TestBean.class)
.log("Finished route");
Run Code Online (Sandbox Code Playgroud)
bean本身也很简单,它只是检查一个header参数并抛出一个适当的异常
public class TestBean
{
@Handler
public void checkData(@Headers final Map<String, Object> headers)
throws CustomException, IOException, Exception
{
Integer testVal = (Integer)headers.get("TestValue");
if (0 == testVal)
throw new CustomException("CustomException");
else if (1 == testVal)
throw new IOException("IOException");
else
throw new Exception("Exception");
}
}
Run Code Online (Sandbox Code Playgroud)
由于这个测试设置只是一个较大项目的一小部分,这样做可能听起来很愚蠢,但核心意图是在测试时修改redeliveryDelay,因为"强制"IOException不需要等待3分钟因此,为了加快单位测试,可以将重新传递延迟减少到10毫秒.
为了实现这一点,我的测试方法执行以下操作:
@ContextConfiguration(classes …Run Code Online (Sandbox Code Playgroud) 我有一个简单的骆驼路线我需要修改.路线看起来像这样:
from(source.uri)
.unmarshal()
.bean(TransformMessageBean.class, "SomeMethod")
.to(destination.uri)
Run Code Online (Sandbox Code Playgroud)
我想在解组之后添加另一个bean方法调用,它设置一个标题值而不会中断当前数据流.有谁知道这样做的方法?我在apache的文档中读到,在出站消息体中设置了bean的返回值.有没有办法将其更改为标题?
提前致谢!
有一个以上的例子,我从一个以openshift原点运行的pod中看到了这个状态.在这种情况下,这是cdi camel示例的快速入门.我能够在本地成功构建和运行它(非openshift)但是当我尝试在我的本地openshift(使用mvn -Pf8-local-deploy)上部署时,我得到了这个特定示例的输出(为了相关性而剪断): -
[vagrant@vagrant camel]$ oc get pods
NAME READY STATUS RESTARTS AGE
cdi-camel-z4czs 0/1 CrashLoopBackOff 4 2m
日志尾巴如下: -
Error occurred during initialization of VM
Error opening zip file or JAR manifest missing : agents/jolokia.jar
agent library failed to init: instrument
Run Code Online (Sandbox Code Playgroud)
有人可以帮我解决这个问题吗?
给出了一个非常简单的Camel捆绑包,用于生成camel-archetype-blueprint,我希望添加一个通过属性而不是在属性中配置的数据源blueprint.xml.
我尝试以各种方式配置PropertiesComponent并访问propertyMySQL数据源的值中的属性,但似乎没有一个工作.记录消息时,可以访问属性.
如何使用属性文件中的参数值配置数据源?
我特别需要这个,为多个bundle使用相同的数据源配置,并区分生产/测试环境.我考虑过在构建过程中使用Maven编写属性,具体取决于目标环境.有关如何解决此数据源问题的其他最佳实践吗?
<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">
<bean id="properties" class="org.apache.camel.component.properties.PropertiesComponent">
<property name="location" value="classpath:config.properties" />
</bean>
<bean id="dataSourceMySQL" class="com.mysql.jdbc.jdbc2.optional.MysqlDataSource">
<property name="url" value="jdbc:mysql://127.0.0.1/test_database" />
<!-- This causes an error, as it tries to connect with
`${mysqlUser}`@`localhost` without any evaluation -->
<property name="user" value="${mysqlUser}" />
<property name="password" value="${mysqlPassword}" />
</bean>
<service interface="javax.sql.DataSource" ref="dataSourceMySQL">
<service-properties>
<entry key="osgi.jndi.service.name" value="jdbc/mysqlDatasource" />
</service-properties>
</service>
<bean id="sql" class="org.apache.camel.component.sql.SqlComponent">
<property name="dataSource" ref="dataSourceMySQL" />
</bean> …Run Code Online (Sandbox Code Playgroud) 我有一个用例:
我需要定期阅读和聚合来自kafka主题的消息,并发布到不同的主题.Localstorage不是一种选择.这就是我计划解决这个问题的方法,欢迎任何改进建议
要安排kafka消息的聚合和发布,请计划使用Aggregator EIP的completionInterval选项.这是代码.
@Autowired ObjectMapper objectMapper;
JacksonDataFormat jacksonDataFormat;
@PostConstruct
public void initialize(){
//objectMapper.setPropertyNamingStrategy(PropertyNamingStrategy.SNAKE_CASE);
jacksonDataFormat = new JacksonDataFormat(objectMapper,EventMessage.class);
}
Run Code Online (Sandbox Code Playgroud)
和路线:
public void configure() throws Exception {
from("kafka:localhost:9092?topic=item-events" +
"&groupId=aggregator-group-id&autoCommitIntervalMs=25000&autoOffsetReset=earliest&consumersCount=1")
.routeId("kafkapoller")
.unmarshal(jacksonDataFormat)
.aggregate(body().method("getItemId"), new EventAggregationStrategy()).completionInterval(20000)
.marshal().json(JsonLibrary.Jackson)
.to("kafka:localhost:9092?topic=item-events-aggregated&serializerClass=org.apache.kafka.common.serialization.ByteArraySerializer");
}
Run Code Online (Sandbox Code Playgroud) apache-camel ×10
java ×5
apache ×1
apache-kafka ×1
apache-karaf ×1
architecture ×1
blueprint ×1
bnd ×1
cdi ×1
datasource ×1
fabric8 ×1
ftp ×1
jackson ×1
json ×1
maven ×1
multicast ×1
osgi ×1
soa ×1
unit-testing ×1