小编mba*_*ker的帖子

为什么我在MySQL中遇到死锁

我在MySQL表中遇到死锁.只涉及一个表,我可以一致地重现它.它只发生在我运行代码的多个线程时.

这是表格:

CREATE TABLE `users_roles` (
  `role_id` bigint(20) NOT NULL,
  `user_id` bigint(20) NOT NULL,
  `created` datetime NOT NULL,
  PRIMARY KEY (`user_id`,`role_id`),
  KEY `created` (`created`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Run Code Online (Sandbox Code Playgroud)

然后,我在每个线程中运行这两个查询,每个线程对user_id具有不同的值.

BEGIN;
DELETE FROM `users_roles` WHERE user_id = X;
INSERT INTO `users_roles` VALUES (7, X, NOW()); -- DEADLOCK ON THIS QUERY
COMMIT;
Run Code Online (Sandbox Code Playgroud)

应该注意,当调用DELETE语句时,user_id X 在数据库中永远不存在.运行这些查询的代码位用于创建新用户.但是,该功能允许我修改用户所在的帐户,并因此从旧用户的团队中删除现有角色.

因此,当足够的这些查询并行运行时,我开始遇到死锁.InnoDB状态的死锁部分在每次死锁后显示此信息.

------------------------
LATEST DETECTED DEADLOCK
------------------------
2014-05-09 16:02:20 7fbc99e5f700

*** (1) TRANSACTION:
TRANSACTION 6241424274, ACTIVE 0 sec inserting
mysql tables in use 1, …
Run Code Online (Sandbox Code Playgroud)

mysql sql deadlock innodb database-deadlocks

16
推荐指数
1
解决办法
5624
查看次数

Kafka&Flink在重启时重复发送消息

首先,当我重新运行Flink消费者时,这与Kafka再次消费最新消息非常类似,但它不一样.这个问题的答案似乎并没有解决我的问题.如果我错过了答案,那么请重新解释答案,因为我显然错过了一些东西.

但问题完全相同--Flink(kafka连接器)重新运行它在关闭之前看到的最后3-9条消息.

我的版本

Flink 1.1.2
Kafka 0.9.0.1
Scala 2.11.7
Java 1.8.0_91
Run Code Online (Sandbox Code Playgroud)

我的守则

import java.util.Properties
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.connectors.kafka._
import org.apache.flink.streaming.util.serialization._
import org.apache.flink.runtime.state.filesystem._

object Runner {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(500)
    env.setStateBackend(new FsStateBackend("file:///tmp/checkpoints"))
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testing");

    val kafkaConsumer = new FlinkKafkaConsumer09[String]("testing-in", new SimpleStringSchema(), properties)
    val kafkaProducer = new FlinkKafkaProducer09[String]("localhost:9092", "testing-out", new SimpleStringSchema())
    env.addSource(kafkaConsumer)
      .addSink(kafkaProducer)

    env.execute()
  }
}
Run Code Online (Sandbox Code Playgroud)

我的SBT依赖

libraryDependencies ++= …
Run Code Online (Sandbox Code Playgroud)

duplicates apache-kafka apache-flink flink-streaming

5
推荐指数
1
解决办法
1351
查看次数