小编Mar*_*ina的帖子

在Spring中将bean引用注入Quartz作业?

我设法在Spring中使用JobStoreTX持久存储来配置和调度Quartz作业.我不使用Spring的Quartz作业,因为我需要在运行时动态调度它们,并且我发现Spring与Quartz集成的所有示例都是对Spring配置文件中的shcedules进行硬编码...无论如何,这里是如何我安排工作:

JobDetail emailJob = JobBuilder.newJob(EMailJob.class)
.withIdentity("someJobKey", "immediateEmailsGroup")
.storeDurably()
.build();

SimpleTrigger trigger = (SimpleTrigger) TriggerBuilder.newTrigger() 
.withIdentity("someTriggerKey", "immediateEmailsGroup")
.startAt(fireTime)
.build();

// pass initialization parameters into the job
emailJob.getJobDataMap().put(NotificationConstants.MESSAGE_PARAMETERS_KEY,       messageParameters);
emailJob.getJobDataMap().put(NotificationConstants.RECIPIENT_KEY, recipient);

if (!scheduler.checkExists(jobKey) && scheduler.getTrigger(triggerKey) != null)     {                                       
// schedule the job to run
Date scheduleTime1 = scheduler.scheduleJob(emailJob, trigger);
}
Run Code Online (Sandbox Code Playgroud)

EMailJob是一个简单的工作,它使用Spring的JavaMailSenderImpl类发送电子邮件.

public class EMailJob implements Job {
@Autowired
private JavaMailSenderImpl mailSenderImpl;

    public EMailJob() {
    }
    public void execute(JobExecutionContext context)
       throws JobExecutionException {
   ....
    try {
        mailSenderImpl.send(mimeMessage);
    } catch (MessagingException e) {
        .... …
Run Code Online (Sandbox Code Playgroud)

spring inject quartz-scheduler

91
推荐指数
6
解决办法
9万
查看次数

如何找到类加载器加载的罐子和顺序?

我在其他地方找不到这个问题的明确答案,所以我会在这里试试:

是否有某种方式(编程或其他方式)以按照它们加载的精确顺序获取由Application Classloader加载的JAR /类列表?应用程序类加载器我的意思是在应用程序服务器(WLS,WAS,JBoss ......)中加载EAR应用程序的类加载器,但显然,它适用于任何类加载器.

因此,为了概括,我想知道的是由指定的类加载器加载的JAR的列表和顺序.不是单个类,通过调用classloader.getPackages()很容易找到,但是这个类加载器加载的JAR文件列表.

java jar classloader

43
推荐指数
2
解决办法
5万
查看次数

Kafka如何阅读__consumer_offsets主题

我正试图找出目前的高级消费者正在努力解决的问题.我使用Kafka 0.8.2.1,在Kafka的server.properties中没有设置"offset.storage" - 我认为这意味着偏移量存储在Kafka中.(I也证实没有偏移量存储在动物园管理员通过检查ZK壳此路径:/consumers/consumer_group_name/offsets/topic_name/partition_number)

我试着听听这个__consumer_offsets话题,看看哪个消费者保存了什么价值的抵消,但它没有用......

我尝试了以下方法:

为控制台使用者创建了一个配置文件如下:

=> more kafka_offset_consumer.config 

 exclude.internal.topics=false
Run Code Online (Sandbox Code Playgroud)

并尝试了两个版本的控制台消费者脚本:

#1:
bin/kafka-console-consumer.sh --consumer.config kafka_offset_consumer.config --topic __consumer_offsets --zookeeper localhost:2181

#2
./bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 0 --broker-list localhost:9092 --formatter "kafka.server.OffsetManager\$OffsetsMessageFormatter" --consumer.config kafka_offset_consumer.config
Run Code Online (Sandbox Code Playgroud)

两者都没有奏效 - 它只是坐在那里但不打印任何东西,即使消费者正在积极消费/节省抵消.

我错过了一些其他配置/属性吗?

谢谢!

码头

apache-kafka kafka-consumer-api

23
推荐指数
2
解决办法
1万
查看次数

查找作业是否在Quartz1.6中运行

我想澄清Quartz1.6中scheduler.getCurrentlyExecutingJobs()方法的细节.我有一份工作,在任何特定时刻都应该只运行一个实例.它可以被触发从UI"立即运行",但如果已经为此作业运行的作业实例 - 什么都不应该发生.

这就是我如何检查是否有一个让我感兴趣的工作:

    List<JobExecutionContext> currentJobs = scheduler.getCurrentlyExecutingJobs();
    for (JobExecutionContext jobCtx: currentJobs){
    jobName = jobCtx.getJobDetail().getName();
    groupName = jobCtx.getJobDetail().getGroup();
    if (jobName.equalsIgnoreCase("job_I_am_looking_for_name") &&
        groupName.equalsIgnoreCase("job_group_I_am_looking_for_name")) {
        //found it!
        logger.warn("the job is already running - do nothing");
    }               
}
Run Code Online (Sandbox Code Playgroud)

然后,为了测试这个,我有一个单元测试,试图一个接一个地安排这个工作的两个实例.我希望在尝试安排第二份工作时看到警告,相反,我得到了这个例外:

org.quartz.ObjectAlreadyExistsException: Unable to store Job with name:
 'job_I_am_looking_for_name' and group: 'job_group_I_am_looking_for_name', 
 because one already exists with this identification.
Run Code Online (Sandbox Code Playgroud)

当我在调试模式下运行此单元测试时,此行中断:

列出currentJobs = scheduler.getCurrentlyExecutingJobs();

我看到列表是空的 - 因此调度程序不会将此作业视为正在运行,但它仍然无法再次安排它 - 这告诉我当时工作确实正在运行...

我用这种调度方法错过了一些更好的观点吗?

谢谢!

码头

quartz-scheduler

12
推荐指数
1
解决办法
2万
查看次数

Java线程转储:WAITING(在对象监视器上) - 它在等待什么?

有一个类似的问题问java-thread-dump-waiting-on-object-monitor-line-not-follow-by-waiting-on,但是没有具体的答案,所以我会问我的问题,希望得到更多信息...

在下面的线程转储中,我看到线程处于"WAITING(在对象监视器上)"状态 - 但是没有"等待"的行指示它正在等待的内容.如何解释此线程堆栈并找出此线程正在等待的原因(以及什么资源)?

"eventTaskExecutor-50" prio=10 tid=0x0000000004117000 nid=0xd8dd in Object.wait() [0x00007f8f457ad000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at com.tibco.tibjms.TibjmsxLink.sendRequest(TibjmsxLink.java:359)
- locked <0x00007f98cbebe5d8> (a com.tibco.tibjms.TibjmsxResponse)
at com.tibco.tibjms.TibjmsxSessionImp._confirmTransacted(TibjmsxSessionImp.java:2934)
at com.tibco.tibjms.TibjmsxSessionImp._confirm(TibjmsxSessionImp.java:3333)
- locked <0x00007f90101399b8> (a java.lang.Object)
at com.tibco.tibjms.TibjmsxSessionImp._commit(TibjmsxSessionImp.java:2666)
at com.tibco.tibjms.TibjmsxSessionImp.commit(TibjmsxSessionImp.java:4516)
at org.springframework.jms.support.JmsUtils.commitIfNecessary(JmsUtils.java:217)
at org.springframework.jms.listener.AbstractMessageListenerContainer.commitIfNecessary(AbstractMessageListenerContainer.java:577)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:482)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1102)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:996)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)

Locked ownable synchronizers:
- <0x00007f901011ca88> (a java.util.concurrent.ThreadPoolExecutor$Worker)
Run Code Online (Sandbox Code Playgroud)

此线程是配置为接受来自Tibco总线的消息的侦听器线程之一.

谢谢!

码头

multithreading jvm thread-dump

10
推荐指数
1
解决办法
2万
查看次数

Kafka Java API偏移操作澄清

我正在尝试使用最低级别的Consumer Java API来手动管理偏移,使用最新的kafka_2.10-0.8.2.1.要验证我从Kafka提交/读取的偏移量是否正确,我使用kafka.tools.ConsumerOffsetChecker工具.

以下是我的主题/使用者组的输出示例:

./bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --group   elastic_search_group --zookeeper localhost:2181 --topic my_log_topic
Group           Topic                          Pid Offset          logSize         Lag             Owner
elastic_search_group my_log_topic              0   5               29              24              none
Run Code Online (Sandbox Code Playgroud)

  以下是我对结果的解释:

Offset = 5 - >这是我'elastic_search_group'消费者的当前偏移量

logSize = 29 - >这是最新偏移量 - 将来到此主题/分区的下一条消息的偏移量

滞后= 24 - > 29-5 - 我的'elastic_search_group'消费者尚未处理多少消息

Pid - 分区ID

Q1:这是对的吗?

现在,我想从我的Java消费者那里获得相同的信息.在这里,我发现我必须使用两种不同的API:

kafka.javaapi.OffsetRequest获得最早和最新的抵消,但kafka.javaapi.OffsetFetchRequest获取当前偏移量.

要获得最早(或最新)的偏移我做:

TopicAndPartition topicAndPartition = new TopicAndPartition(myTopic, myPartition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest.EarliestTime(), 1));
// OR for Latest: requestInfo.put(topicAndPartition, …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

10
推荐指数
1
解决办法
5918
查看次数

在.map()之后,StreamEx.parallel().forEach()不会并行运行

我注意到,如果我使用StreamEx lib并使用自定义ForkJoinPool并行输出我的流 - 如下所示 - 后续操作会在该池中的并行线程中运行.但是,如果我添加map()操作并并行生成的流 - 只使用池中的一个线程.

下面是演示此问题的最小工作示例的完整代码(没有所有导入).executeAsParallelFromList()和executeAsParallelAfterMap()方法之间的唯一区别是在.parallel()之前添加.map(...)调用.

import one.util.streamex.StreamEx;

public class ParallelExample {

private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);

public static List<String> getTestList(){
    int listSize = 10;
    List<String> testList = new ArrayList<>();
    for (int i=0; i<listSize; i++)
        testList.add("item_" + i);
    return testList;
}

public static void executeAsParallelFromList(){
    logger.info("executeAsParallelFromList():");
    List<String> testList = getTestList();
    StreamEx<String> streamOfItems = StreamEx
            .of(testList)
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

public static void executeAsParallelAfterMap(){
    logger.info("executeAsParallelAfterMap():"); …
Run Code Online (Sandbox Code Playgroud)

java java-8 java-stream streamex

7
推荐指数
2
解决办法
418
查看次数

kafka:'soTimeout','bufferSize'和'minBytes'对SimpleConsumer意味着什么?

我正在使用Kafka 0.8.2.1 SimpleConsumer.有人可以澄清SimpleConsumer和FetchRequestBuilder的一些配置参数的含义吗?没有阅读KAfka的源代码,我当时找不到任何文档.(我尝试将此问题发布到kafka用户组 - 但没有运气):

- Q1:在SimpleConsumer构造函数的签名中,我看到了Int'soTimeout'参数 - 这个超时的含义是什么?这是连接到Kafka经纪人的超时吗?从任何[或特定??]请求到Kafka的响应超时(如FetchRequest)?别的什么?

kafka.javaapi.consumer.SimpleConsumer
    (val host: String,
     val port: Int,
     val soTimeout: Int,
     val bufferSize: Int,
     val clientId: String)
Run Code Online (Sandbox Code Playgroud)

- Q2:同样,SimpleConsumer构造函数采用Int'mufferSize'参数.它是什么意思?这是在发出fetchRequest时SimpleConsumer会读取多少字节?或者它是每次从Kafka获取时读取的最大字节数 - 如果有更多数据可用,则会发生多次提取?

- 通过FetchRequestBuilder构建FetchRequest时(见下文),我还需要指定' fetchSize ':

FetchRequest req= newFetchRequestBuilder ()
  .clientId(kafkaGroupId)
  .addFetch(topic, partition, offset, fetchSizeInBytes)
  .build();
Run Code Online (Sandbox Code Playgroud)

查看FetchRequestBuilder的源代码,我认为(我不是Scala pro)这些调用转换为下面的方法调用 - 并且传递给FetchRequest的最终参数称为' minBytes ',暗示这不是确切的提取大小,可能吗?.它是否意味着它甚至不会取任何东西,除非至少有'minBytes'数据可用?

class FetchRequestBuilder():
    def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)

    def build() = {
      val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)

FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
    correlationId: …
Run Code Online (Sandbox Code Playgroud)

apache-kafka kafka-consumer-api

5
推荐指数
1
解决办法
1333
查看次数

Web Gradle依赖项未在Web App Libraries中更新

我在Eclipse的Spring Tool Suite版本(3.2.0)中有一个Gradle项目(使用Eclipse的Gradle插件导入和生成).它大部分时间都可以工作,但有时,依赖项在项目中的"Gradle Dependencies"和"Web App Libraries"之间变得不同步.这就是我的意思:

我定义了一个编译依赖项如下:

compile(group: 'com.mygroup', name: 'myClient', version: '0.2.1')
Run Code Online (Sandbox Code Playgroud)

然后,在我执行"Gradle - > Refresh All"后,我看到依赖库lib"myClient-0.2.1.jar"位于"Gradle Dependencies"列表中,并位于"web App Libraries"下的列表中.

现在,过了一段时间,我决定使用这个lib的更新版本:

compile(group: 'com.mygroup', name: 'myClient', version: '0.2.2')
Run Code Online (Sandbox Code Playgroud)

我再次做"Gradle - > Refresh All",我也尝试刷新项目,重新构建它,打开/关闭项目和Eclipse,但我看到的是:

"Gradle Dependencies"下的依赖关系确实已更新,并列为"myClient-0.2.2.jar".但是,"Web App Libraries"下的依赖关系拒绝更新并保持为"myClient-0.2.1.jar".显然,这对我的应用程序造成了严重破坏,因为现在我的代码没有引用正确的新类,并在Eclipse中显示所有红色.

一个解决办法的作品有时是核弹攻击整个项目,并从头开始重新构建它,但这是不是想我做的更激进的:)它有时不工作.看起来我无法明确控制进入"Web App库"的内容 - 所以即使我看到项目 - >属性 - > Java构建路径 - >库 - > Web应用程序库中列出了错误的库 - 我不能改变它.

任何洞察这一点将非常感谢,谢谢!码头

eclipse gradle spring-tool-suite

4
推荐指数
1
解决办法
6129
查看次数

将cronTrigger与包含年份值的表达式一起使用时出错

我正在观察一个奇怪的行为,该行为使用包含年份值的CronTrigger在Quartz中安排作业。

这是我创建触发器并使用它计划作业的方式:

CronTrigger trigger =  cronJobTriggerFactory.getObject();
trigger.setName(triggerName);
trigger.setGroup(triggerGroupName);
trigger.setCronExpression(cronSchedule);
trigger.setVolatility(false);

JobDetail job = schedulableJobFactory.getObject();
job.setName(jobName);
job.setGroup(jobGroupName);
job.setVolatility(false);
job.setDurability(true);
Date scheduleTime1 = scheduler.scheduleJob(job, trigger);
logger.info(job.getKey() + " will run at: " + scheduleTime1);
Run Code Online (Sandbox Code Playgroud)

然后在我的单元测试中,确定“现在”日期,向其添加5分钟,计算该日期/时间的cron表达式,并使用此计划将我的主班计划称为作业。这是显示通过了哪个cron表达式的单元测试的输出:

NotificationSchedulerTest - Today is: 9 May 2012 05:32 PM
NotificationSchedulerTest - 5 min later is: 9 May 2012 05:37 PM
NotificationSchedulerTest - cron schedule is: 0 37 17 * 4 ? 2012
Run Code Online (Sandbox Code Playgroud)

但是,当尝试使用此cron表达式计划作业时,出现以下错误:

org.quartz.SchedulerException: Based on configured schedule, the given trigger will never fire.
Run Code Online (Sandbox Code Playgroud)

如您所见,该日期是相对于我运行测试的日期/时间而言的将来...因此,尝试安排作业在过去的某个时间运行应该不是问题。

现在,下一个奇怪的事情是:请注意,我确实在cron表达式中指定了年份值:“ 0 …

java cron quartz-scheduler crontrigger

1
推荐指数
1
解决办法
4944
查看次数