我正在使用ActiveMQ中的系统,我真的不想丢失消息.我的问题是重试消息导致我的消费者阻止(而不是处理他们可以处理的消息).我想在几天内给失败的消息重试(例如,我的一个潜在目的地是我将通过SFTP访问的另一台服务器,可能会关闭),但我不希望消费者阻塞几天 - - 我希望它继续处理其他消息.
有没有办法让经纪人稍后重新发送消息?现在我正在考虑将消息从队列中取出并延迟启动,但我想知道是否有更简单的方法.我正在使用Apache Camel,所以使用它的解决方案也会很好.
在Camel路由中,我是否应该考虑将我的业务逻辑放在离散托管的bean端点中,比如消息驱动的bean或Web服务,而不仅仅是在Camel处理器中实现它?
将Camel用作中介和编排,使用Processor作为过滤器,而不是作为业务逻辑的容器,似乎更清晰地分离了关注点.但是我现在并不认为需要一个EJB容器,而且我似乎需要一个来容纳MDB.
如此清洁的架构与更小的占地面积,更少的技术 - 有没有人有这方面的想法,观点或强烈的感受?
我正在开发一个项目,它是集成项目,我们正在使用Apache Camel和Apache Karaf.在项目中,我需要使用Jira REST Java客户端库.
所以我已经阅读了很多关于如何将非OSGI库包装到OSGI包中的各种文章和线程,但我真的不确定我是否做对了.
所以,我创建了一个POM文件,它依赖于所需的库.制作了一个包并试图将它部署到卡拉夫,当然,卡拉夫抱怨丢失包裹.
所以,我发现了相应的maven依赖,添加它,包进入<Import-Package>和依赖<Embed-Dependency>.
另一轮,部署,找到依赖,添加,再次,再次,直到Karaf与捆绑很好.
这是真的正确吗?在我看来,我觉得很疯狂,所以我想我不会因为usualy :)
最后,包裹变得稳定在我的工作计算机上,我快速检查并回家了,我继续但是,奇怪的是,在我的个人计算机上编译的相同的POM /包不起作用,再次抱怨丢失包裹,但是这次,这个软件包肯定是在POM文件中,并且肯定它是嵌入在包中的,我可以在那里看到它.
这次丢失的包是org.apache.commons.codec.
org.osgi.framework.BundleException: Unresolved constraint in bundle jiraclient.bundle [134]: Unable to resolve 134.0: missing requirement [134.0] osgi.wiring.package; (osgi.wiring.package=org.apache.commons.codec)
at org.apache.felix.framework.Felix.resolveBundleRevision(Felix.java:3826)[org.apache.felix.framework-4.0.3.jar:]
at org.apache.felix.framework.Felix.startBundle(Felix.java:1868)[org.apache.felix.framework-4.0.3.jar:]
at org.apache.felix.framework.BundleImpl.start(BundleImpl.java:944)[org.apache.felix.framework-4.0.3.jar:]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.startBundle(DirectoryWatcher.java:1247)[6:org.apache.felix.fileinstall:3.2.6]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.startBundles(DirectoryWatcher.java:1219)[6:org.apache.felix.fileinstall:3.2.6]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.startAllBundles(DirectoryWatcher.java:1208)[6:org.apache.felix.fileinstall:3.2.6]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.process(DirectoryWatcher.java:503)[6:org.apache.felix.fileinstall:3.2.6]
at org.apache.felix.fileinstall.internal.DirectoryWatcher.run(DirectoryWatcher.java:291)[6:org.apache.felix.fileinstall:3.2.6]
Run Code Online (Sandbox Code Playgroud)
所以,现在我完全糊涂了,出了什么问题:(
很高兴,伙计们,帮助我.谢谢!
POM文件很长,所以我想链接更好:http://pastebin.com/j5cmWveG
我有一条骆驼路线,就像永远被绞死一样,不确定它是Camel还是Ftp客户端问题.ftp组件的soTimeout设置为60000.感谢任何帮助.
组件版本:
camel-ftp:2.9.0
Commons Net:(2.2)
线程转储:
"Camel (some-ftp-route) thread #57 - ftp://user@ftphost/folder" Id=338 in RUNNABLE (running in native)
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.read(SocketInputStream.java:129)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:264)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:306)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:158)
- locked java.io.InputStreamReader@6a3f933a
at java.io.InputStreamReader.read(InputStreamReader.java:167)
at java.io.BufferedReader.fill(BufferedReader.java:136)
at java.io.BufferedReader.readLine(BufferedReader.java:299)
- locked java.io.InputStreamReader@6a3f933a
at java.io.BufferedReader.readLine(BufferedReader.java:362)
at org.apache.commons.net.ftp.FTP.__getReply(FTP.java:295)
at org.apache.commons.net.ftp.FTP._connectAction_(FTP.java:365)
at org.apache.commons.net.ftp.FTPClient._connectAction_(FTPClient.java:630)
at org.apache.commons.net.SocketClient.connect(SocketClient.java:164)
at org.apache.commons.net.SocketClient.connect(SocketClient.java:184)
at org.apache.camel.component.file.remote.FtpOperations.connect(FtpOperations.java:91)
at org.apache.camel.component.file.remote.RemoteFileConsumer.connectIfNecessary(RemoteFileConsumer.java:144)
at org.apache.camel.component.file.remote.RemoteFileConsumer.recoverableConnectIfNecessary(RemoteFileConsumer.java:123)
at org.apache.camel.component.file.remote.RemoteFileConsumer.prePollCheck(RemoteFileConsumer.java:56)
at org.apache.camel.component.file.GenericFileConsumer.poll(GenericFileConsumer.java:100)
at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:139)
at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:91)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)
at …Run Code Online (Sandbox Code Playgroud) 因此,在阅读了一些文档并从这里获得了很多帮助后,我终于实现了一个动态选择端点的收件人列表(动态收件人列表):
在我的代码中,MainApp_A每10秒生成一次报告,我希望它能够同时将报告发送到所有服务器,而不是逐个发送.因此,我开发了以下路线.
MainApp_A
main.addRouteBuilder(new RouteBuilder(){
@Override
public void configure() throws Exception {
from("direct:start").multicast().parallelProcessing()
.beanRef("recipientListBean", "route").end()
.log("${body}");
}
});
Run Code Online (Sandbox Code Playgroud)
RecipientListBean
@RecipientList
public Set<String> route(String body) {
return servers; //returns a collection of several severs
}
Run Code Online (Sandbox Code Playgroud)
我还检查了多播模式和动态路由的文档:
现在我有几个问题,我很困惑.所以我有几个问题:
我在使用带有骆驼的Jackson JSON库时遇到了麻烦.例外是:
FailedToCreateRouteException: Failed to create route... because of Data format 'json-jackson' could not be created. Ensure the data format is valid and the associated Camel component is present on the classpath
Run Code Online (Sandbox Code Playgroud)
这是类路径通知的开始,杰克逊的libs在那里:
<<< camel-maven-plugin:2.9.0:run (default-cli) @ portlistener <<<
--- camel-maven-plugin:2.9.0:run (default-cli) @ portlistener ---
Classpath = [file:/C:/Users/andrew.b-ext/Workspace/portlistener/target/classes/, file:/C:/Users/andrew.b-ext/.m2/repository/org/codehaus/jackson/jackson-asl/1.9.13/jackson-core-asl-1.9.13.jar, file:/C:/Users/andrew.b-ext/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.9.13/jackson-mapper-asl-1.9.13.jar, file:/C:/Users/andrew.b-ext/.m2/repository/org/codehaus/jackson/jackson-jaxrs/1.9.13/jackson-jaxrs-1.9.13.jar, ...snip...
Run Code Online (Sandbox Code Playgroud)
我的pom有杰克逊:
<properties>
<camel.version>2.12.2</camel.version>
<cxf.version>2.7.8</cxf.version>
<activemq.version>5.6.0</activemq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.9.13</version>
</dependency>
Run Code Online (Sandbox Code Playgroud)
这是我对Dataformat对象的定义:
<bean id="jsonDataformat" class="org.apache.camel.model.dataformat.JsonDataFormat">
<property name="unmarshalType" …Run Code Online (Sandbox Code Playgroud) 我有一个非常简单的Camel路由定义,它只包含一些OnException谓词来处理各自的异常和一些日志语句.
from("hazelcast:seda:someQueue")
.id("someQueueID")
.onException(CustomException.class)
.handled(true)
.log(LoggingLevel.WARN, "custom exception noticed")
.end()
.onException(IOException.class, FileNotFoundException.class)
.asyncDelayedRedelivery()
.redeliveryDelay(3*1000*60) // 3 Minutes
.maximumRedeliveries(3)
.log(LoggingLevel.WARN, "io exception noticed")
.end()
.onException(Exception.class)
.log(LoggingLevel.WARN, "general exception noticed")
.end()
.log("Starting route")
.bean(TestBean.class)
.log("Finished route");
Run Code Online (Sandbox Code Playgroud)
bean本身也很简单,它只是检查一个header参数并抛出一个适当的异常
public class TestBean
{
@Handler
public void checkData(@Headers final Map<String, Object> headers)
throws CustomException, IOException, Exception
{
Integer testVal = (Integer)headers.get("TestValue");
if (0 == testVal)
throw new CustomException("CustomException");
else if (1 == testVal)
throw new IOException("IOException");
else
throw new Exception("Exception");
}
}
Run Code Online (Sandbox Code Playgroud)
由于这个测试设置只是一个较大项目的一小部分,这样做可能听起来很愚蠢,但核心意图是在测试时修改redeliveryDelay,因为"强制"IOException不需要等待3分钟因此,为了加快单位测试,可以将重新传递延迟减少到10毫秒.
为了实现这一点,我的测试方法执行以下操作:
@ContextConfiguration(classes …Run Code Online (Sandbox Code Playgroud) 我有一个简单的骆驼路线我需要修改.路线看起来像这样:
from(source.uri)
.unmarshal()
.bean(TransformMessageBean.class, "SomeMethod")
.to(destination.uri)
Run Code Online (Sandbox Code Playgroud)
我想在解组之后添加另一个bean方法调用,它设置一个标题值而不会中断当前数据流.有谁知道这样做的方法?我在apache的文档中读到,在出站消息体中设置了bean的返回值.有没有办法将其更改为标题?
提前致谢!
我可以从Chrome REST客户端成功访问Sharepoint 2013 AtomPub界面,以下URL为我提供了我想要的文件:
http://ourintranet:100/personal/myname/_vti_bin/cmis/rest/5612e38e-a324-4030-9fee-7d05cd9053a4?getContentStream&objectId=4-512
Run Code Online (Sandbox Code Playgroud)
但是,在Camel CMIS路由中使用相同的URL会得到HTTP 302(找不到文件)并将我转移到错误页面.
我试过的路线是:
from("cmis:http://ourintranet:100/personal/myname/_vti_bin/cmis/rest/5612e38e-a324-4030-9fee-7d05cd9053a4?getContentStream&objectId=4-512")
.to("file:c:/myFolder")
Run Code Online (Sandbox Code Playgroud)
运行Wireshark以查看发生了什么,似乎Camel CMIS未将查询字符串部分传递给服务器,并且可能将其视为CMIS组件的选项(根据组件的使用指南).
那么,使用Sharel的Camel CMIS组件的正确方法是什么?
我有两个插件,我想在事务控制下.如果其中一个失败,另一个也不会被执行/插入.这样在MySQL中正确执行时工作正常
START TRANSACTION;
INSERT INTO table (field) VALUES (value);
INSERT INTO table2 (field) VALUES (value);
COMMIT;
Run Code Online (Sandbox Code Playgroud)
现在使用骆驼我已经尝试过了
<to uri="sql:START TRANSACTION; INSERT INTO table (field) VALUES (value);INSERT INTO table2 (field) VALUES (value);COMMIT;"/>
Run Code Online (Sandbox Code Playgroud)
这会产生sql语法错误.我尝试的第二件事是
<to uri="sql:START TRANSACTION"/>
<to uri="sql:INSERT INTO table (field) VALUES (value)"/>
<to uri="sql:INSERT INTO table2 (field) VALUES (value)"/>
<to uri="sql:COMMIT"/>
Run Code Online (Sandbox Code Playgroud)
哪个工作但如果一个插入失败,另一个插入仍在执行和插入.
我也找到了这个 http://camel.apache.org/transactional-client.html,但我使用的是蓝图,这些例子似乎只适用于春天.因此,如果有人用骆驼蓝图做了一个很好的例子,这将是很好的.
任何人都可以帮我在骆驼做这个吗?
apache-camel ×10
java ×4
apache ×1
architecture ×1
bnd ×1
cmis ×1
ftp ×1
jackson ×1
json ×1
maven ×1
multicast ×1
mysql ×1
osgi ×1
sharepoint ×1
soa ×1
transactions ×1
unit-testing ×1