我设法在Spring中使用JobStoreTX持久存储来配置和调度Quartz作业.我不使用Spring的Quartz作业,因为我需要在运行时动态调度它们,并且我发现Spring与Quartz集成的所有示例都是对Spring配置文件中的shcedules进行硬编码...无论如何,这里是如何我安排工作:
JobDetail emailJob = JobBuilder.newJob(EMailJob.class)
.withIdentity("someJobKey", "immediateEmailsGroup")
.storeDurably()
.build();
SimpleTrigger trigger = (SimpleTrigger) TriggerBuilder.newTrigger()
.withIdentity("someTriggerKey", "immediateEmailsGroup")
.startAt(fireTime)
.build();
// pass initialization parameters into the job
emailJob.getJobDataMap().put(NotificationConstants.MESSAGE_PARAMETERS_KEY, messageParameters);
emailJob.getJobDataMap().put(NotificationConstants.RECIPIENT_KEY, recipient);
if (!scheduler.checkExists(jobKey) && scheduler.getTrigger(triggerKey) != null) {
// schedule the job to run
Date scheduleTime1 = scheduler.scheduleJob(emailJob, trigger);
}
Run Code Online (Sandbox Code Playgroud)
EMailJob是一个简单的工作,它使用Spring的JavaMailSenderImpl类发送电子邮件.
public class EMailJob implements Job {
@Autowired
private JavaMailSenderImpl mailSenderImpl;
public EMailJob() {
}
public void execute(JobExecutionContext context)
throws JobExecutionException {
....
try {
mailSenderImpl.send(mimeMessage);
} catch (MessagingException e) {
.... …Run Code Online (Sandbox Code Playgroud) 我在其他地方找不到这个问题的明确答案,所以我会在这里试试:
是否有某种方式(编程或其他方式)以按照它们加载的精确顺序获取由Application Classloader加载的JAR /类列表?应用程序类加载器我的意思是在应用程序服务器(WLS,WAS,JBoss ......)中加载EAR应用程序的类加载器,但显然,它适用于任何类加载器.
因此,为了概括,我想知道的是由指定的类加载器加载的JAR的列表和顺序.不是单个类,通过调用classloader.getPackages()很容易找到,但是这个类加载器加载的JAR文件列表.
我正试图找出目前的高级消费者正在努力解决的问题.我使用Kafka 0.8.2.1,在Kafka的server.properties中没有设置"offset.storage" - 我认为这意味着偏移量存储在Kafka中.(I也证实没有偏移量存储在动物园管理员通过检查ZK壳此路径:/consumers/consumer_group_name/offsets/topic_name/partition_number)
我试着听听这个__consumer_offsets话题,看看哪个消费者保存了什么价值的抵消,但它没有用......
我尝试了以下方法:
为控制台使用者创建了一个配置文件如下:
=> more kafka_offset_consumer.config
exclude.internal.topics=false
Run Code Online (Sandbox Code Playgroud)
并尝试了两个版本的控制台消费者脚本:
#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181
#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config
Run Code Online (Sandbox Code Playgroud)
两者都没有奏效 - 它只是坐在那里但不打印任何东西,即使消费者正在积极消费/节省抵消.
我错过了一些其他配置/属性吗?
谢谢!
码头
我想澄清Quartz1.6中scheduler.getCurrentlyExecutingJobs()方法的细节.我有一份工作,在任何特定时刻都应该只运行一个实例.它可以被触发从UI"立即运行",但如果已经为此作业运行的作业实例 - 什么都不应该发生.
这就是我如何检查是否有一个让我感兴趣的工作:
List<JobExecutionContext> currentJobs = scheduler.getCurrentlyExecutingJobs();
for (JobExecutionContext jobCtx: currentJobs){
jobName = jobCtx.getJobDetail().getName();
groupName = jobCtx.getJobDetail().getGroup();
if (jobName.equalsIgnoreCase("job_I_am_looking_for_name") &&
groupName.equalsIgnoreCase("job_group_I_am_looking_for_name")) {
//found it!
logger.warn("the job is already running - do nothing");
}
}
Run Code Online (Sandbox Code Playgroud)
然后,为了测试这个,我有一个单元测试,试图一个接一个地安排这个工作的两个实例.我希望在尝试安排第二份工作时看到警告,相反,我得到了这个例外:
org.quartz.ObjectAlreadyExistsException: Unable to store Job with name:
'job_I_am_looking_for_name' and group: 'job_group_I_am_looking_for_name',
because one already exists with this identification.
Run Code Online (Sandbox Code Playgroud)
当我在调试模式下运行此单元测试时,此行中断:
列出currentJobs = scheduler.getCurrentlyExecutingJobs();
我看到列表是空的 - 因此调度程序不会将此作业视为正在运行,但它仍然无法再次安排它 - 这告诉我当时工作确实正在运行...
我用这种调度方法错过了一些更好的观点吗?
谢谢!
码头
有一个类似的问题问java-thread-dump-waiting-on-object-monitor-line-not-follow-by-waiting-on,但是没有具体的答案,所以我会问我的问题,希望得到更多信息...
在下面的线程转储中,我看到线程处于"WAITING(在对象监视器上)"状态 - 但是没有"等待"的行指示它正在等待的内容.如何解释此线程堆栈并找出此线程正在等待的原因(以及什么资源)?
"eventTaskExecutor-50" prio=10 tid=0x0000000004117000 nid=0xd8dd in Object.wait() [0x00007f8f457ad000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at com.tibco.tibjms.TibjmsxLink.sendRequest(TibjmsxLink.java:359)
- locked <0x00007f98cbebe5d8> (a com.tibco.tibjms.TibjmsxResponse)
at com.tibco.tibjms.TibjmsxSessionImp._confirmTransacted(TibjmsxSessionImp.java:2934)
at com.tibco.tibjms.TibjmsxSessionImp._confirm(TibjmsxSessionImp.java:3333)
- locked <0x00007f90101399b8> (a java.lang.Object)
at com.tibco.tibjms.TibjmsxSessionImp._commit(TibjmsxSessionImp.java:2666)
at com.tibco.tibjms.TibjmsxSessionImp.commit(TibjmsxSessionImp.java:4516)
at org.springframework.jms.support.JmsUtils.commitIfNecessary(JmsUtils.java:217)
at org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:577)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:482)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1102)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:996)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
Locked ownable synchronizers:
- <0x00007f901011ca88> (a java.util.concurrent.ThreadPoolExecutor$Worker)
Run Code Online (Sandbox Code Playgroud)
此线程是配置为接受来自Tibco总线的消息的侦听器线程之一.
谢谢!
码头
我正在尝试使用最低级别的Consumer Java API来手动管理偏移,使用最新的kafka_2.10-0.8.2.1.要验证我从Kafka提交/读取的偏移量是否正确,我使用kafka.tools.ConsumerOffsetChecker工具.
以下是我的主题/使用者组的输出示例:
./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group Topic Pid Offset logSize Lag Owner
elastic_search_group my_log_topic 0 5 29 24 none
Run Code Online (Sandbox Code Playgroud)
以下是我对结果的解释:
Offset = 5 - >这是我'elastic_search_group'消费者的当前偏移量
logSize = 29 - >这是最新偏移量 - 将来到此主题/分区的下一条消息的偏移量
滞后= 24 - > 29-5 - 我的'elastic_search_group'消费者尚未处理多少消息
Pid - 分区ID
Q1:这是对的吗?
现在,我想从我的Java消费者那里获得相同的信息.在这里,我发现我必须使用两种不同的API:
kafka.javaapi.OffsetRequest获得最早和最新的抵消,但kafka.javaapi.OffsetFetchRequest获取当前偏移量.
要获得最早(或最新)的偏移我做:
TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, …Run Code Online (Sandbox Code Playgroud) 我注意到,如果我使用StreamEx lib并使用自定义ForkJoinPool并行输出我的流 - 如下所示 - 后续操作会在该池中的并行线程中运行.但是,如果我添加map()操作并并行生成的流 - 只使用池中的一个线程.
下面是演示此问题的最小工作示例的完整代码(没有所有导入).executeAsParallelFromList()和executeAsParallelAfterMap()方法之间的唯一区别是在.parallel()之前添加.map(...)调用.
import one.util.streamex.StreamEx;
public class ParallelExample {
private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);
public static List<String> getTestList(){
int listSize = 10;
List<String> testList = new ArrayList<>();
for (int i=0; i<listSize; i++)
testList.add("item_" + i);
return testList;
}
public static void executeAsParallelFromList(){
logger.info("executeAsParallelFromList():");
List<String> testList = getTestList();
StreamEx<String> streamOfItems = StreamEx
.of(testList)
.parallel(s3ThreadPool);
logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
streamOfItems.forEach(item -> handleItem(item));
}
public static void executeAsParallelAfterMap(){
logger.info("executeAsParallelAfterMap():"); …Run Code Online (Sandbox Code Playgroud) 我正在使用Kafka 0.8.2.1 SimpleConsumer.有人可以澄清SimpleConsumer和FetchRequestBuilder的一些配置参数的含义吗?没有阅读KAfka的源代码,我当时找不到任何文档.(我尝试将此问题发布到kafka用户组 - 但没有运气):
- Q1:在SimpleConsumer构造函数的签名中,我看到了Int'soTimeout'参数 - 这个超时的含义是什么?这是连接到Kafka经纪人的超时吗?从任何[或特定??]请求到Kafka的响应超时(如FetchRequest)?别的什么?
kafka.javaapi.consumer.SimpleConsumer
(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String)
Run Code Online (Sandbox Code Playgroud)
- Q2:同样,SimpleConsumer构造函数采用Int'mufferSize'参数.它是什么意思?这是在发出fetchRequest时SimpleConsumer会读取多少字节?或者它是每次从Kafka获取时读取的最大字节数 - 如果有更多数据可用,则会发生多次提取?
- 通过FetchRequestBuilder构建FetchRequest时(见下文),我还需要指定' fetchSize ':
FetchRequest req= newFetchRequestBuilder ()
.clientId(kafkaGroupId)
.addFetch(topic, partition, offset, fetchSizeInBytes)
.build();
Run Code Online (Sandbox Code Playgroud)
查看FetchRequestBuilder的源代码,我认为(我不是Scala pro)这些调用转换为下面的方法调用 - 并且传递给FetchRequest的最终参数称为' minBytes ',暗示这不是确切的提取大小,可能吗?.它是否意味着它甚至不会取任何东西,除非至少有'minBytes'数据可用?
class FetchRequestBuilder():
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)
def build() = {
val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: …Run Code Online (Sandbox Code Playgroud) 我在Eclipse的Spring Tool Suite版本(3.2.0)中有一个Gradle项目(使用Eclipse的Gradle插件导入和生成).它大部分时间都可以工作,但有时,依赖项在项目中的"Gradle Dependencies"和"Web App Libraries"之间变得不同步.这就是我的意思:
我定义了一个编译依赖项如下:
compile(group: 'com.mygroup', name: 'myClient', version: '0.2.1')
Run Code Online (Sandbox Code Playgroud)
然后,在我执行"Gradle - > Refresh All"后,我看到依赖库lib"myClient-0.2.1.jar"位于"Gradle Dependencies"列表中,并位于"web App Libraries"下的列表中.
现在,过了一段时间,我决定使用这个lib的更新版本:
compile(group: 'com.mygroup', name: 'myClient', version: '0.2.2')
Run Code Online (Sandbox Code Playgroud)
我再次做"Gradle - > Refresh All",我也尝试刷新项目,重新构建它,打开/关闭项目和Eclipse,但我看到的是:
"Gradle Dependencies"下的依赖关系确实已更新,并列为"myClient-0.2.2.jar".但是,"Web App Libraries"下的依赖关系拒绝更新并保持为"myClient-0.2.1.jar".显然,这对我的应用程序造成了严重破坏,因为现在我的代码没有引用正确的新类,并在Eclipse中显示所有红色.
一个解决办法的作品有时是核弹攻击整个项目,并从头开始重新构建它,但这是不是想我做的更激进的:)它有时不工作.看起来我无法明确控制进入"Web App库"的内容 - 所以即使我看到项目 - >属性 - > Java构建路径 - >库 - > Web应用程序库中列出了错误的库 - 我不能改变它.
任何洞察这一点将非常感谢,谢谢!码头
我正在观察一个奇怪的行为,该行为使用包含年份值的CronTrigger在Quartz中安排作业。
这是我创建触发器并使用它计划作业的方式:
CronTrigger trigger = cronJobTriggerFactory.getObject();
trigger.setName(triggerName);
trigger.setGroup(triggerGroupName);
trigger.setCronExpression(cronSchedule);
trigger.setVolatility(false);
JobDetail job = schedulableJobFactory.getObject();
job.setName(jobName);
job.setGroup(jobGroupName);
job.setVolatility(false);
job.setDurability(true);
Date scheduleTime1 = scheduler.scheduleJob(job, trigger);
logger.info(job.getKey() + " will run at: " + scheduleTime1);
Run Code Online (Sandbox Code Playgroud)
然后在我的单元测试中,确定“现在”日期,向其添加5分钟,计算该日期/时间的cron表达式,并使用此计划将我的主班计划称为作业。这是显示通过了哪个cron表达式的单元测试的输出:
NotificationSchedulerTest - Today is: 9 May 2012 05:32 PM
NotificationSchedulerTest - 5 min later is: 9 May 2012 05:37 PM
NotificationSchedulerTest - cron schedule is: 0 37 17 * 4 ? 2012
Run Code Online (Sandbox Code Playgroud)
但是,当尝试使用此cron表达式计划作业时,出现以下错误:
org.quartz.SchedulerException: Based on configured schedule, the given trigger will never fire.
Run Code Online (Sandbox Code Playgroud)
如您所见,该日期是相对于我运行测试的日期/时间而言的将来...因此,尝试安排作业在过去的某个时间运行应该不是问题。
现在,下一个奇怪的事情是:请注意,我确实在cron表达式中指定了年份值:“ 0 …
apache-kafka ×3
java ×3
classloader ×1
cron ×1
crontrigger ×1
eclipse ×1
gradle ×1
inject ×1
jar ×1
java-8 ×1
java-stream ×1
jvm ×1
spring ×1
streamex ×1
thread-dump ×1