我在MySQL表中遇到死锁.只涉及一个表,我可以一致地重现它.它只发生在我运行代码的多个线程时.
这是表格:
CREATE TABLE `users_roles` (
`role_id` bigint(20) NOT NULL,
`user_id` bigint(20) NOT NULL,
`created` datetime NOT NULL,
PRIMARY KEY (`user_id`,`role_id`),
KEY `created` (`created`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Run Code Online (Sandbox Code Playgroud)
然后,我在每个线程中运行这两个查询,每个线程对user_id具有不同的值.
BEGIN;
DELETE FROM `users_roles` WHERE user_id = X;
INSERT INTO `users_roles` VALUES (7, X, NOW()); -- DEADLOCK ON THIS QUERY
COMMIT;
Run Code Online (Sandbox Code Playgroud)
应该注意,当调用DELETE语句时,user_id X 在数据库中永远不存在.运行这些查询的代码位用于创建新用户.但是,该功能允许我修改用户所在的帐户,并因此从旧用户的团队中删除现有角色.
因此,当足够的这些查询并行运行时,我开始遇到死锁.InnoDB状态的死锁部分在每次死锁后显示此信息.
------------------------
LATEST DETECTED DEADLOCK
------------------------
2014-05-09 16:02:20 7fbc99e5f700
*** (1) TRANSACTION:
TRANSACTION 6241424274, ACTIVE 0 sec inserting
mysql tables in use 1, …
Run Code Online (Sandbox Code Playgroud) 首先,当我重新运行Flink消费者时,这与Kafka再次消费最新消息非常类似,但它不一样.这个问题的答案似乎并没有解决我的问题.如果我错过了答案,那么请重新解释答案,因为我显然错过了一些东西.
但问题完全相同--Flink(kafka连接器)重新运行它在关闭之前看到的最后3-9条消息.
Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
Run Code Online (Sandbox Code Playgroud)
import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._
object Runner {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(500)
env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "testing");
val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
env.addSource(kafkaConsumer)
.addSink(kafkaProducer)
env.execute()
}
}
Run Code Online (Sandbox Code Playgroud)
libraryDependencies ++= …
Run Code Online (Sandbox Code Playgroud)