小编joh*_*ohn的帖子

如何在多线程环境中更好地使用ExecutorService?

我需要创建一个库,在其中我将有同步和异步方法.

  • executeSynchronous() - 等到我有结果,返回结果.
  • executeAsynchronous() - 立即返回Future,如果需要,可在其他事情完成后处理.

我的图书馆的核心逻辑

客户将使用我们的库,他们将通过传递DataKey构建器对象来调用它.然后我们将使用该DataKey对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,然后在我们将响应作为JSON字符串返回之后,我们将通过创建DataResponse对象将该JSON字符串发送回我们的客户.有些客户会打电话executeSynchronous(),有些人可能会打电话executeAsynchronous(),这就是为什么我需要在我的库中单独提供两种方法.

接口:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}
Run Code Online (Sandbox Code Playgroud)

然后我有我DataClient实现上面的Client接口:

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // do I need to have all threads as non-daemon or I can have daemon thread for my use case?
    private ExecutorService executor = …
Run Code Online (Sandbox Code Playgroud)

java multithreading daemon callable executorservice

10
推荐指数
2
解决办法
1280
查看次数

如何确定setMaxTotal和setDefaultMaxPerRoute的最佳设置?

我有一个RestService在三个数据中心的45台不同的机器上运行(每个数据中心15个).我有一个客户端库,用于RestTemplate根据呼叫的来源调用这些机器.如果呼叫来自DC1,那么我的库将调用我在DC1中运行的休息服务,对其他人也是如此.

我的客户端库在三个数据中心的不同机器上运行(不在同一台45台机器上).

我使用RestTemplateHttpComponentsClientHttpRequestFactory如下图所示:

public class DataProcess {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService service = Executors.newFixedThreadPool(15);

    // singleton class so only one instance
    public DataProcess() {
        restTemplate.setRequestFactory(clientHttpRequestFactory());
    }

    public DataResponse getData(DataKey key) {
        // do some stuff here which will internally call our RestService
        // by using DataKey object and using RestTemplate which I am making below
    }   

    private ClientHttpRequestFactory clientHttpRequestFactory() {
        HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
        RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(1000).setConnectTimeout(1000)
                .setSocketTimeout(1000).setStaleConnectionCheckEnabled(false).build(); …
Run Code Online (Sandbox Code Playgroud)

java spring connection-pooling resttemplate apache-httpclient-4.x

10
推荐指数
1
解决办法
3144
查看次数

java.lang.ArrayIndexOutOfBoundsException:256,jeromq 0.3.6版本

我在多线程环境中使用Jeromq,如下所示.下面是我的代码,其中构造函数首先SocketManager连接到所有可用的套接字,然后我将它们放在方法中的liveSocketsByDatacentermap中connectToZMQSockets.之后,我在同一个构造函数中启动一个后台线程,该构造函数每30秒运行一次,它调用updateLiveSockets方法来ping所有那些已经存在于liveSocketsByDatacentermap中的套接字并更新liveSocketsByDatacenter映射,看看这些套接字是否存活.

并且getNextSocket()多个读取器线程同时调用方法以获取下一个可用的套接字,然后我们使用该套接字在其上发送数据.所以我的问题是我们在多线程环境中正确使用Jeromq吗?因为在我们尝试将数据发送到该实时套接字时,我们刚刚在生产环境中看到了这个堆栈跟踪的异常,所以我不确定它是否是一个错误或其他什么?

java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
Run Code Online (Sandbox Code Playgroud)

以下是我的代码:

public class SocketManager {
    private static final Random random = new Random();
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
    private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
    private final ZContext ctx = new ZContext();

    private static class Holder {
        private …
Run Code Online (Sandbox Code Playgroud)

java multithreading thread-safety zeromq jeromq

10
推荐指数
1
解决办法
560
查看次数

发送记录并等待其确认接收

我使用下面的类通过使用socket以同步方式或异步方式将数据发送到我们的消息队列,如下所示.

  • sendAsync - 它不间断地异步发送数据.发送后,(on LINE A)它会添加到retryHolder存储桶,这样如果没有收到确认,那么它将再次从构造函数中启动的后台线程重试.
  • send- 它在内部调用sendAsync方法,然后在特定的超时时间内休眠,如果没有收到确认,则从retryHolder桶中删除,以便我们不再重试.

所以上述两种方法之间的唯一区别是 - 对于异步,我需要不惜一切代价重试,但是对于同步,我不需要重试但看起来可能会重试,因为我们共享相同的重试桶缓存并重试线程运行每1秒钟.

ResponsePoller是一个类,它接收发送到我们的消息队列的数据的确认,然后调用removeFromretryHolder下面的方法删除地址,以便我们在收到确认后不重试.

public class SendToQueue {
  private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
  private final Cache<Long, byte[]> retryHolder =
      CacheBuilder
          .newBuilder()
          .maximumSize(1000000)
          .concurrencyLevel(100)
          .removalListener(
              RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();

  private static class Holder {
    private static final SendToQueue INSTANCE = new SendToQueue();
  }

  public static SendToQueue getInstance() {
    return Holder.INSTANCE;
  }

  private SendToQueue() {
    executorService.submit(new …
Run Code Online (Sandbox Code Playgroud)

java multithreading thread-safety race-condition guava

10
推荐指数
1
解决办法
788
查看次数

如何avro二进制编码我的json字符串到一个字节数组?

我有一个实际的JSON字符串,我需要将二进制编码avro到字节数组.在完成Apache Avro规范之后,我想出了以下代码.

我不确定这是否是正确的做法.任何人都可以看看我试图avro二进制编码我的JSON字符串的方式是否正确?我使用的是Apache Avro 1.7.7版本.

public class AvroTest {

    private static final String json = "{" + "\"name\":\"Frank\"," + "\"age\":47" + "}";
    private static final String schema = "{ \"type\":\"record\", \"namespace\":\"foo\", \"name\":\"Person\", \"fields\":[ { \"name\":\"name\", \"type\":\"string\" }, { \"name\":\"age\", \"type\":\"int\" } ] }";

    public static void main(String[] args) throws IOException {
        byte[] data = jsonToAvro(json, schema);

        String jsonString = avroToJson(data, schema);
        System.out.println(jsonString);
    }

    /**
     * Convert JSON to avro binary array.
     * 
     * @param json
     * @param schemaStr …
Run Code Online (Sandbox Code Playgroud)

java json bytearray avro

9
推荐指数
1
解决办法
7816
查看次数

如何排除旧版本的maven依赖项并使用它的新版本?

我正在使用Maven项目,我有两个项目,ProjectA并且ProjectB.我ProjectA是一个maven图书馆,其pom看起来像这样:

<?xml version="1.0" encoding="UTF-8"?>
<project
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
    xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">

    <modelVersion>4.0.0</modelVersion>

    <parent>
        <groupId>com.texture.partial</groupId>
        <artifactId>PartialPlatform</artifactId>
        <version>2.1.5-RELEASE</version>
    </parent>

    <groupId>com.texture.transform.golden</groupId>
    <artifactId>SampleClient</artifactId>
    <version>1.0.4</version>

    <dependencies>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.2.4</version>
        </dependency>
        <dependency>
            <groupId>com.texture.partial.core</groupId>
            <artifactId>PartialKernel</artifactId>
        </dependency>
        <dependency>
            <groupId>com.texture.webres</groupId>
            <artifactId>WebResPartial</artifactId>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>javax.servlet.jsp</groupId>
            <artifactId>jsp-api</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.texture.kernel</groupId>
            <artifactId>TextureServer</artifactId>
        </dependency>
        <dependency>
            <groupId>com.texture.kernel</groupId>
            <artifactId>Kernel</artifactId>
        </dependency>
        <dependency>
            <groupId>com.texture.v3jars.Houston</groupId>
            <artifactId>KernelDAL</artifactId>
        </dependency>
        <dependency>
            <groupId>com.texture.kernel</groupId>
            <artifactId>uKernel</artifactId>
        </dependency>
        <dependency>
            <groupId>com.texture.kernel</groupId>
            <artifactId>uKernelCore</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
        </dependency>
        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>javax.servlet-api</artifactId>
        </dependency> …
Run Code Online (Sandbox Code Playgroud)

java spring dependency-management maven

9
推荐指数
1
解决办法
1万
查看次数

如何避免制作长构造函数

我有一个客户端库,我在其中对我的休息服务进行http远程调用,然后我返回List<DataResponse>给调用我们的库的客户,我从REST服务获取响应以及任何错误,如果有任何包装DataResponse对象周围.

public class DataResponse {

    private final String response;
    private final boolean isLink;
    private final TypeOfId idType;
    private final long ctime;
    private final long lmd;
    private final String maskInfo;

    // below are for error stuff
    private final ErrorCode error;
    private final StatusCode status;

    // constructors and getters here

}
Run Code Online (Sandbox Code Playgroud)

这是我的ErrorCode枚举类:

public enum ErrorCode {

    // enum values

    private final int code;
    private final String status;
    private final String description;

    // constructors and getters

} …
Run Code Online (Sandbox Code Playgroud)

java enums constructor

9
推荐指数
1
解决办法
2936
查看次数

如何按文件大小保持滚动日志文件?

我正在开发一个项目,我正在将一堆东西记录在一个文件中,我想确保我的日志文件在达到文件的固定限制时立即滚动.我有一个下面的logback.xml文件,但看起来文件大小不起作用.我看到我的文件大小为793M,但我的限制是100M

<configuration>
    <appender name="FILE" class="ch.qos.logback.core.FileAppender">
        <file>process.log</file>
        <triggeringPolicy
            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
            <maxFileSize>100MB</maxFileSize>
        </triggeringPolicy>
        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
            <fileNamePattern>process%i.log</fileNamePattern>
            <minIndex>1</minIndex>
            <maxIndex>9</maxIndex>
        </rollingPolicy>
        <encoder>
            <pattern>%date %level [%thread] %msg%n</pattern>
            <!-- this improves logging throughput -->
            <immediateFlush>true</immediateFlush>
        </encoder>
    </appender>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder 
            by default -->
        <encoder>
            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
            </pattern>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="FILE" />
        <appender-ref ref="STDOUT" />
    </root>
</configuration>
Run Code Online (Sandbox Code Playgroud)

我在这做什么错?此外,在采伐方面,我们应该遵循的最佳政策是什么?我们在一个文件中记录了一堆东西,我们不想用这个日志文件填满磁盘.

java log4j logback slf4j

9
推荐指数
1
解决办法
4406
查看次数

如果没有收到确认,如何设计一个发送记录并重试发送它们的系统?

我正在开发一个项目,我需要消耗大量的记录然后将这些记录发送到其他使用ZeroMQ的系统.

这是流程:

  • 将所有传入记录存储在来自多个线程的CHM中.记录将以非常高的速度发生.
  • 从每1分钟运行一次的后台线程,将这些记录从CHM发送到ZeroMQ服务器.
  • 将每条记录发送到ZeroMQ服务器后,也将它们添加到重试存储桶中,以便在特定时间过后可以重试,如果尚未收到此记录的确认.
  • 我们还有一个poller runnable线程,它接收来自ZeroMQ服务器的确认,告知已收到这些记录,所以一旦我收到确认,我就会从重试桶中删除该记录,这样它就不会被重试.
  • 即使一些记录被发送两次,也没关系,但最好尽量减少这种情况.

我不确定在我的下面的场景中最小化这个的最佳方法是什么.

下面是我的Processor类,其中一个.add()方法将由多个线程调用dataHolderByPartitionReference,以线程安全的方式填充CHM.然后,在Processor类的构造函数中,我启动后台线程,每30秒运行一次,通过调用SendToZeroMQ类将记录从同一个CHM推送到一组ZeroMQ服务器,如下所示:


Processor

public class Processor {
  private final ScheduledExecutorService executorService = Executors
      .newSingleThreadScheduledExecutor();
  private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
      new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());

  private static class Holder {
    private static final Processor INSTANCE = new Processor();
  }

  public static Processor getInstance() {
    return Holder.INSTANCE;
  }

  private Processor() {
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        validateAndSendAllPartitions(dataHolderByPartitionReference
            .getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
      } …
Run Code Online (Sandbox Code Playgroud)

java multithreading design-patterns zeromq data-structures

9
推荐指数
1
解决办法
385
查看次数

在单个后台线程定期修改它的同时读取Map

我有一个类,我liveSocketsByDatacenterupdateLiveSockets()方法中每30秒从一个后台线程填充一个映射,然后我有一个方法getNextSocket(),将由多个读取器线程调用以获得一个可用的实时套接字,它使用相同的映射来获取此信息.

public class SocketManager {
  private static final Random random = new Random();
  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
  private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
      new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
  private final ZContext ctx = new ZContext();

  // Lazy Loaded Singleton Pattern
  private static class Holder {
    private static final SocketManager instance = new SocketManager();
  }

  public static SocketManager getInstance() {
    return Holder.instance;
  }

  private SocketManager() {
    connectToZMQSockets();
    scheduler.scheduleAtFixedRate(new Runnable() {
      public void run() {
        updateLiveSockets(); …
Run Code Online (Sandbox Code Playgroud)

java multithreading hashmap thread-safety race-condition

9
推荐指数
1
解决办法
666
查看次数