我正在尝试使用spark streaming和RabbitMq编写一个简单的"Hello World"应用程序,其中Apache Spark Streaming将通过RabbitMqReceiver从RabbitMq读取消息并在控制台中打印它.但有些我怎么也无法将从Rabbit Mq读取的字符串打印到控制台中.火花流代码正在打印以下信息: -
Value Received BlockRDD[1] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
Value Received BlockRDD[2] at ReceiverInputDStream at RabbitMQInputDStream.scala:33
Run Code Online (Sandbox Code Playgroud)
消息通过以下简单代码发送到rabbitmq: -
package helloWorld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Send {
private final static String QUEUE_NAME = "hello1";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World! is a code. Hi …Run Code Online (Sandbox Code Playgroud) 假设我有一个如下结构: -
//easyjson:json
type JSONData struct {
Data []string
}
Run Code Online (Sandbox Code Playgroud)
我想要将下面的json解组为JSONDatastruct
{"Data" : ["One", "Two", "Three"]}
Run Code Online (Sandbox Code Playgroud)
有人能告诉我如何使用easyjson在Golang中解组json吗?我找不到他们的任何例子README
我正在建立一个Redis 3.2.6集群.在本博客中,提到更改"调整内核网络堆栈"的以下参数
vm.swappiness=0 # turn off swapping
net.ipv4.tcp_sack=1 # enable selective acknowledgements
net.ipv4.tcp_timestamps=1 # needed for selective acknowledgements
net.ipv4.tcp_window_scaling=1 # scale the network window
net.ipv4.tcp_congestion_control=cubic # better congestion algorythm
net.ipv4.tcp_syncookies=1 # enable syn cookied
net.ipv4.tcp_tw_recycle=1 # recycle sockets quickly
net.ipv4.tcp_max_syn_backlog=NUMBER # backlog setting
net.core.somaxconn=NUMBER # up the number of connections per port
net.core.rmem_max=NUMBER # up the receive buffer size
net.core.wmem_max=NUMBER # up the buffer size for all connections
Run Code Online (Sandbox Code Playgroud)
有人可以解释一下上面的参数如何影响redis的行为?
除了上面提到的那些之外,我还需要查看一些其他参数来准备用于redis安装的操作系统吗?
环境
产生 reassignment.json
/home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --generate --topics-to-move-json-file /home/ubuntu/deploy/kafka/topics_to_move.json --broker-list '<broker-list>' |tail -1 > /home/ubuntu/deploy/kafka/reassignment.json
Run Code Online (Sandbox Code Playgroud)
执行重新分配
/home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --execute --reassignment-json-file /home/ubuntu/deploy/kafka/reassignment.json
Run Code Online (Sandbox Code Playgroud)
我在kafka 1.1.0中修改了如下所示的主题
/home/ubuntu/deploy/kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --topic Topic3 --config min.insync.replicas=2
Run Code Online (Sandbox Code Playgroud)
但每当我试图验证下面的重新分配
/home/ubuntu/deploy/kafka/bin/kafka-reassign-partitions.sh --zookeeper 127.0.0.1:2181 --reassignment-json-file /home/ubuntu/deploy/kafka/reassignment.json --verify
Run Code Online (Sandbox Code Playgroud)
它显示以下异常
Partitions reassignment failed due to Size of replicas list Vector(3, 0, 2) is different from size of log dirs list Vector(any) for partition Topic3-7
kafka.common.AdminCommandFailedException: Size of replicas list Vector(3, 0, 2) is different from size of log dirs list Vector(any) …Run Code Online (Sandbox Code Playgroud) 我正在使用 logstash-logback-encoder 以 json 格式打印日志。
我的logback.xml样子如下:-
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeContext>false</includeContext>
<fieldNames>
<timestamp>timestamp</timestamp>
<version>[ignore]</version>
<levelValue>[ignore]</levelValue>
</fieldNames>
</encoder>
</appender>
<appender name="stash"
class="ch.qos.logback.core.rolling.RollingFileAppender">
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>/tmp/SolrUpdater.%d{yyyy-MM-dd}.log
</fileNamePattern>
<maxHistory>7</maxHistory>
</rollingPolicy>
<encoder class="net.logstash.logback.encoder.LogstashEncoder">
<includeContext>false</includeContext>
<fieldNames>
<timestamp>timestamp</timestamp>
<version>[ignore]</version>
<levelValue>[ignore]</levelValue>
</fieldNames>
</encoder>
</appender>
<root level="error">
<appender-ref ref="CONSOLE" />
<appender-ref ref="stash" />
</root>
</configuration>
Run Code Online (Sandbox Code Playgroud)
有人可以让我知道如何level以小写形式显示所有内容error,而不是默认ERROR和WARNaswarning吗?
更新- 我CustomLogLevelJsonProvider按照 Zakhar 的建议创建了一个:-
package com.jabong.discovery.importer.solrUpdater.log;
public class CustomLogLevelJsonProvider extends LogLevelJsonProvider { …Run Code Online (Sandbox Code Playgroud) 我已经添加mongodb.so了两个/etc/php5/apache2/php.ini,/etc/php5/cli/php.ini如下所示
extension=/usr/lib/php5/20100525/mongodb.so
Run Code Online (Sandbox Code Playgroud)
但我仍然收到以下错误Class 'MongoDate' not found。
PHP 版本 - PHP 5.4.45
以下是来自 php cli 的关于 mongodb 扩展的信息
php -i | grep -i mongodb
mongodb
MongoDB support => enabled
MongoDB extension version => 1.2.5
MongoDB extension stability => stable
mongodb.debug => no value => no value
Run Code Online (Sandbox Code Playgroud)
下面是phpinfo()在 apache2 下运行的for php的输出
我正在尝试使用PowerMockito和Mockito,如下所示:
@RunWith(PowerMockRunner.class)
@PrepareForTest({AmqpMessagePublisher.class})
public class OrderItemManagerImplTest {
@Spy
@InjectMocks
private OrderItemManager orderItemManager = new OrderItemManagerImpl();
@Mock
private OrderReleaseManager srReleaseManager;
@Spy
private srOrderTransformer srOrderTransformer;
private OrderItemEntry getOrderItemEntry() {
OrderItemEntry orderItemEntry = new OrderItemEntry();
orderItemEntry.setProcessingStartDate("2017-03-01");
return orderItemEntry;
}
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
PowerMockito.mockStatic(AmqpMessagePublisher.class);
PowerMockito.doNothing().when(AmqpMessagePublisher.class);
}
@Test
public void testCreateOrder() throws Exception {
OrderItemEntry expected = getOrderItemEntry();
String expectedItemId = expected.getItems().get(0).getItemId();
// the below line is throwing the exception
Mockito.when(srReleaseManager.getReleaseByStoreReleaseId(expectedItemId)).thenReturn(null);
}
}
Run Code Online (Sandbox Code Playgroud)
在尝试测试以上代码时,将引发异常
org.mockito.exceptions.misusing.UnfinishedStubbingException:
Unfinished stubbing detected here:
-> …Run Code Online (Sandbox Code Playgroud) 我正在从Kafka 0.10迁移到最新的1.0。我没有在Kafka 0.10中为这些字段设置任何值。有一个人可以让我知道什么是推荐设置一个3节点中间件群和在单个节点代理群集的下方内部主题设置server.properties-
offsets.topic.replication.factortransaction.state.log.replication.factortransaction.state.log.min.isr如果在单节点kafka代理中升级到1.0后未更改此值,它将采用默认值3。那么在这种情况下会发生什么呢?
我正在尝试查看 jvm 支持的分析事件列表。正如文档中提到的,我使用了list如下所示的命令 -
root@vrni-platform:/home/ubuntu/async-profiler-2.0-linux-x64# ./profiler.sh list 10208
Basic events:
cpu
alloc
lock
wall
itimer
Java method calls:
ClassName.methodName
Perf events:
page-faults
context-switches
cycles
instructions
cache-references
cache-misses
branches
branch-misses
bus-cycles
L1-dcache-load-misses
LLC-load-misses
dTLB-load-misses
mem:breakpoint
trace:tracepoint
Run Code Online (Sandbox Code Playgroud)
我在上面的输出中没有看到这个答案中提到的事件。但是,如果我按照该答案中提到的方式执行上述事件,它似乎就可以工作。
root@vrni-platform:/home/ubuntu/async-profiler-2.0-linux-x64# ./profiler.sh -e malloc -d 30 -f /tmp/flamegraph.html 10208
Profiling for 30 seconds
Done
Run Code Online (Sandbox Code Playgroud)
有人可以告诉我如何查看 aysnc-profiler 支持的特定 jvm 的所有事件列表吗?如果list是正确的参数,profiler.sh那么为什么 malloc 等显示在所有事件列表下?
环境
ubuntu@vrni-platform:~/async-profiler-2.0-linux-x64# ./profiler.sh --version
Async-profiler 2.0 built on Mar 14 2021
Copyright 2016-2021 …Run Code Online (Sandbox Code Playgroud) 我有一个简单的Camel应用程序捆绑包,该捆绑包将在Apache Service Mix 6.1下的Karaf 3.0.5中部署。配置文件放置在etc/目录中(假设它名为wf.cfg)。我想在我的应用程序包中具有动态配置更改功能。这样,无论何时更改了内容wf.cfg,都可以立即打包。为此,我在我的
blueprint.xml
<cm:property-placeholder persistent-id="wf"
update-strategy="reload">
<cm:default-properties>
<cm:property name="env" value="local" />
</cm:default-properties>
</cm:property-placeholder>
<!-- a bean that uses a blueprint property placeholder -->
<bean id="configBean" class="com.jabong.orchestratorservice.basecomponent.config.ConfigBean">
<property name="env" value="${env}" />
</bean>
Run Code Online (Sandbox Code Playgroud)
我现在面临的问题是如果将update-strategy设置为reload。然后,它似乎正在重新加载整个bean。
有人可以让我知道是否可以只重新加载configBean整个捆绑包吗?如果我能做到这一点,那么可能是我可以对configBean应用程序包可以使用的配置变量进行静态引用?
完整blueprint.xml的放在这里。
apache-camel apache-karaf aries blueprint-osgi apache-servicemix
java ×3
apache-kafka ×2
apache-camel ×1
apache-karaf ×1
apache-spark ×1
aries ×1
debian ×1
go ×1
json ×1
jvm ×1
linux ×1
log4j ×1
logback ×1
mocking ×1
mockito ×1
php ×1
php-mongodb ×1
powermockito ×1
rabbitmq ×1
redis ×1