我需要创建一个库,在其中我将有同步和异步方法.
executeSynchronous() - 等到我有结果,返回结果.executeAsynchronous() - 立即返回Future,如果需要,可在其他事情完成后处理.我的图书馆的核心逻辑
客户将使用我们的库,他们将通过传递DataKey构建器对象来调用它.然后我们将使用该DataKey对象构造一个URL,并通过执行它来对该URL进行HTTP客户端调用,然后在我们将响应作为JSON字符串返回之后,我们将通过创建DataResponse对象将该JSON字符串发送回我们的客户.有些客户会打电话executeSynchronous(),有些人可能会打电话executeAsynchronous(),这就是为什么我需要在我的库中单独提供两种方法.
接口:
public interface Client {
// for synchronous
public DataResponse executeSynchronous(DataKey key);
// for asynchronous
public Future<DataResponse> executeAsynchronous(DataKey key);
}
Run Code Online (Sandbox Code Playgroud)
然后我有我DataClient实现上面的Client接口:
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// do I need to have all threads as non-daemon or I can have daemon thread for my use case?
private ExecutorService executor = …Run Code Online (Sandbox Code Playgroud) 我有一个RestService在三个数据中心的45台不同的机器上运行(每个数据中心15个).我有一个客户端库,用于RestTemplate根据呼叫的来源调用这些机器.如果呼叫来自DC1,那么我的库将调用我在DC1中运行的休息服务,对其他人也是如此.
我的客户端库在三个数据中心的不同机器上运行(不在同一台45台机器上).
我使用RestTemplate有HttpComponentsClientHttpRequestFactory如下图所示:
public class DataProcess {
private RestTemplate restTemplate = new RestTemplate();
private ExecutorService service = Executors.newFixedThreadPool(15);
// singleton class so only one instance
public DataProcess() {
restTemplate.setRequestFactory(clientHttpRequestFactory());
}
public DataResponse getData(DataKey key) {
// do some stuff here which will internally call our RestService
// by using DataKey object and using RestTemplate which I am making below
}
private ClientHttpRequestFactory clientHttpRequestFactory() {
HttpComponentsClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory();
RequestConfig requestConfig = RequestConfig.custom().setConnectionRequestTimeout(1000).setConnectTimeout(1000)
.setSocketTimeout(1000).setStaleConnectionCheckEnabled(false).build(); …Run Code Online (Sandbox Code Playgroud) java spring connection-pooling resttemplate apache-httpclient-4.x
我在多线程环境中使用Jeromq,如下所示.下面是我的代码,其中构造函数首先SocketManager连接到所有可用的套接字,然后我将它们放在方法中的liveSocketsByDatacentermap中connectToZMQSockets.之后,我在同一个构造函数中启动一个后台线程,该构造函数每30秒运行一次,它调用updateLiveSockets方法来ping所有那些已经存在于liveSocketsByDatacentermap中的套接字并更新liveSocketsByDatacenter映射,看看这些套接字是否存活.
并且getNextSocket()多个读取器线程同时调用方法以获取下一个可用的套接字,然后我们使用该套接字在其上发送数据.所以我的问题是我们在多线程环境中正确使用Jeromq吗?因为在我们尝试将数据发送到该实时套接字时,我们刚刚在生产环境中看到了这个堆栈跟踪的异常,所以我不确定它是否是一个错误或其他什么?
java.lang.ArrayIndexOutOfBoundsException: 256
at zmq.YQueue.push(YQueue.java:97)
at zmq.YPipe.write(YPipe.java:47)
at zmq.Pipe.write(Pipe.java:232)
at zmq.LB.send(LB.java:83)
at zmq.Push.xsend(Push.java:48)
at zmq.SocketBase.send(SocketBase.java:590)
at org.zeromq.ZMQ$Socket.send(ZMQ.java:1271)
at org.zeromq.ZFrame.send(ZFrame.java:131)
at org.zeromq.ZFrame.sendAndKeep(ZFrame.java:146)
at org.zeromq.ZMsg.send(ZMsg.java:191)
at org.zeromq.ZMsg.send(ZMsg.java:163)
Run Code Online (Sandbox Code Playgroud)
以下是我的代码:
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final Map<Datacenters, List<SocketHolder>> liveSocketsByDatacenter = new ConcurrentHashMap<>();
private final ZContext ctx = new ZContext();
private static class Holder {
private …Run Code Online (Sandbox Code Playgroud) 我使用下面的类通过使用socket以同步方式或异步方式将数据发送到我们的消息队列,如下所示.
sendAsync - 它不间断地异步发送数据.发送后,(on LINE A)它会添加到retryHolder存储桶,这样如果没有收到确认,那么它将再次从构造函数中启动的后台线程重试.send- 它在内部调用sendAsync方法,然后在特定的超时时间内休眠,如果没有收到确认,则从retryHolder桶中删除,以便我们不再重试.所以上述两种方法之间的唯一区别是 - 对于异步,我需要不惜一切代价重试,但是对于同步,我不需要重试但看起来可能会重试,因为我们共享相同的重试桶缓存并重试线程运行每1秒钟.
ResponsePoller是一个类,它接收发送到我们的消息队列的数据的确认,然后调用removeFromretryHolder下面的方法删除地址,以便我们在收到确认后不重试.
public class SendToQueue {
private final ExecutorService cleanupExecutor = Executors.newFixedThreadPool(5);
private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(3);
private final Cache<Long, byte[]> retryHolder =
CacheBuilder
.newBuilder()
.maximumSize(1000000)
.concurrencyLevel(100)
.removalListener(
RemovalListeners.asynchronous(new LoggingRemovalListener(), cleanupExecutor)).build();
private static class Holder {
private static final SendToQueue INSTANCE = new SendToQueue();
}
public static SendToQueue getInstance() {
return Holder.INSTANCE;
}
private SendToQueue() {
executorService.submit(new …Run Code Online (Sandbox Code Playgroud) 我有一个实际的JSON字符串,我需要将二进制编码avro到字节数组.在完成Apache Avro规范之后,我想出了以下代码.
我不确定这是否是正确的做法.任何人都可以看看我试图avro二进制编码我的JSON字符串的方式是否正确?我使用的是Apache Avro 1.7.7版本.
public class AvroTest {
private static final String json = "{" + "\"name\":\"Frank\"," + "\"age\":47" + "}";
private static final String schema = "{ \"type\":\"record\", \"namespace\":\"foo\", \"name\":\"Person\", \"fields\":[ { \"name\":\"name\", \"type\":\"string\" }, { \"name\":\"age\", \"type\":\"int\" } ] }";
public static void main(String[] args) throws IOException {
byte[] data = jsonToAvro(json, schema);
String jsonString = avroToJson(data, schema);
System.out.println(jsonString);
}
/**
* Convert JSON to avro binary array.
*
* @param json
* @param schemaStr …Run Code Online (Sandbox Code Playgroud) 我正在使用Maven项目,我有两个项目,ProjectA并且ProjectB.我ProjectA是一个maven图书馆,其pom看起来像这样:
<?xml version="1.0" encoding="UTF-8"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.texture.partial</groupId>
<artifactId>PartialPlatform</artifactId>
<version>2.1.5-RELEASE</version>
</parent>
<groupId>com.texture.transform.golden</groupId>
<artifactId>SampleClient</artifactId>
<version>1.0.4</version>
<dependencies>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.2.4</version>
</dependency>
<dependency>
<groupId>com.texture.partial.core</groupId>
<artifactId>PartialKernel</artifactId>
</dependency>
<dependency>
<groupId>com.texture.webres</groupId>
<artifactId>WebResPartial</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet.jsp</groupId>
<artifactId>jsp-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.texture.kernel</groupId>
<artifactId>TextureServer</artifactId>
</dependency>
<dependency>
<groupId>com.texture.kernel</groupId>
<artifactId>Kernel</artifactId>
</dependency>
<dependency>
<groupId>com.texture.v3jars.Houston</groupId>
<artifactId>KernelDAL</artifactId>
</dependency>
<dependency>
<groupId>com.texture.kernel</groupId>
<artifactId>uKernel</artifactId>
</dependency>
<dependency>
<groupId>com.texture.kernel</groupId>
<artifactId>uKernelCore</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
</dependency> …Run Code Online (Sandbox Code Playgroud) 我有一个客户端库,我在其中对我的休息服务进行http远程调用,然后我返回List<DataResponse>给调用我们的库的客户,我从REST服务获取响应以及任何错误,如果有任何包装DataResponse对象周围.
public class DataResponse {
private final String response;
private final boolean isLink;
private final TypeOfId idType;
private final long ctime;
private final long lmd;
private final String maskInfo;
// below are for error stuff
private final ErrorCode error;
private final StatusCode status;
// constructors and getters here
}
Run Code Online (Sandbox Code Playgroud)
这是我的ErrorCode枚举类:
public enum ErrorCode {
// enum values
private final int code;
private final String status;
private final String description;
// constructors and getters
} …Run Code Online (Sandbox Code Playgroud) 我正在开发一个项目,我正在将一堆东西记录在一个文件中,我想确保我的日志文件在达到文件的固定限制时立即滚动.我有一个下面的logback.xml文件,但看起来文件大小不起作用.我看到我的文件大小为793M,但我的限制是100M
<configuration>
<appender name="FILE" class="ch.qos.logback.core.FileAppender">
<file>process.log</file>
<triggeringPolicy
class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
<maxFileSize>100MB</maxFileSize>
</triggeringPolicy>
<rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
<fileNamePattern>process%i.log</fileNamePattern>
<minIndex>1</minIndex>
<maxIndex>9</maxIndex>
</rollingPolicy>
<encoder>
<pattern>%date %level [%thread] %msg%n</pattern>
<!-- this improves logging throughput -->
<immediateFlush>true</immediateFlush>
</encoder>
</appender>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder
by default -->
<encoder>
<pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="FILE" />
<appender-ref ref="STDOUT" />
</root>
</configuration>
Run Code Online (Sandbox Code Playgroud)
我在这做什么错?此外,在采伐方面,我们应该遵循的最佳政策是什么?我们在一个文件中记录了一堆东西,我们不想用这个日志文件填满磁盘.
我正在开发一个项目,我需要消耗大量的记录然后将这些记录发送到其他使用ZeroMQ的系统.
这是流程:
我不确定在我的下面的场景中最小化这个的最佳方法是什么.
下面是我的Processor类,其中一个.add()方法将由多个线程调用dataHolderByPartitionReference,以线程安全的方式填充CHM.然后,在Processor类的构造函数中,我启动后台线程,每30秒运行一次,通过调用SendToZeroMQ类将记录从同一个CHM推送到一组ZeroMQ服务器,如下所示:
Processor
public class Processor {
private final ScheduledExecutorService executorService = Executors
.newSingleThreadScheduledExecutor();
private final AtomicReference<ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>> dataHolderByPartitionReference =
new AtomicReference<>(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>());
private static class Holder {
private static final Processor INSTANCE = new Processor();
}
public static Processor getInstance() {
return Holder.INSTANCE;
}
private Processor() {
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
validateAndSendAllPartitions(dataHolderByPartitionReference
.getAndSet(new ConcurrentHashMap<Integer, ConcurrentLinkedQueue<DataHolder>>()));
} …Run Code Online (Sandbox Code Playgroud) 我有一个类,我liveSocketsByDatacenter在updateLiveSockets()方法中每30秒从一个后台线程填充一个映射,然后我有一个方法getNextSocket(),将由多个读取器线程调用以获得一个可用的实时套接字,它使用相同的映射来获取此信息.
public class SocketManager {
private static final Random random = new Random();
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicReference<Map<Datacenters, List<SocketHolder>>> liveSocketsByDatacenter =
new AtomicReference<>(Collections.unmodifiableMap(new HashMap<>()));
private final ZContext ctx = new ZContext();
// Lazy Loaded Singleton Pattern
private static class Holder {
private static final SocketManager instance = new SocketManager();
}
public static SocketManager getInstance() {
return Holder.instance;
}
private SocketManager() {
connectToZMQSockets();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
updateLiveSockets(); …Run Code Online (Sandbox Code Playgroud)