不同系统之间通过网络对接

不同系统之间通过网络对接

September 13, 2023
Go, Vscode
Mark

不同系统之间通过网络对接

数据库事件 #

  1. 可以用个事件表来做,在事务执行过程中添加事件(确保事务完成时事件也存在)。

  2. 在事务提交之后,先尝试做一次事件,如果成功了就把事件状态置为成功;如果失败了也没关系,另外开定时器来扫表进行重试执行。 – 此时不影响正常业务执行

  3. 在事件处理事务里的网络请求里加入超时控制,确保事件不会执行太久,导致接口过慢。

  4. 网络请求支持幂等,防止事件处理事务请求成功了,但是事务挂了导致状态未变更,这种情况下会重复请求多次。

skip locked实现 #

-- 条件字段必须有索引(status, [name, status]),排序字段必须是主键(id),此时刚好是所要锁定的行

start transaction;
-- select * from w_event we where status in (1) order by create_time asc limit 1 for update skip locked; -- 引入create_time作为排序字段时,会将符合条件的行都锁住,`limit 1`不起作用
select * from w_event we where status in (1) order by id asc limit 1 for update skip locked; -- 使用主键字段作排序时,`limit 1`则起作用
select * from w_event we where name = '测试' and status in (1) order by id asc limit 1 for update skip locked; -- 如果有多个字段作为条件,需要建立组合索引
SELECT object_name, index_name, lock_type, lock_mode, lock_data FROM performance_schema.data_locks; -- 查看上锁情况
select * from w_event we where status in (2) order by id asc limit 1 for update skip locked;
select * from w_event we where status in (3) order by id asc limit 1 for update skip locked;
commit;

可打开两个线程来验证上述事务执行,可以看到,当满足条件的记录有两条或以上时,当事务1查到记录1后,事务1未提交时,事务2不会拿到记录1,而是会拿到记录2. 也就实现了有锁则获取下一批数据的效果。

消息队列 #

选择Kafka等消息队列,在事务执行完成后,发送消息到消息队列,然后消费端处理该消息。

如果消息发送时失败了呢?

监听binlog #

通过监听binlog–此时事务必定是已完成了,将消息推送到消息队列。如果推送失败,binlog offset不变,下次依然会继续推送该消息,从而确保推送消息到队列会做到。

怎么知道这次该从哪个binlog offset开始读起呢?

relay log #

The source’s binary log is written to a local relay log on the replica before it is processed. The replica also records information about the current position with the source’s binary log and the local relay log. See Section 17.2.4, “Relay Log and Replication Metadata Repositories”.

replication-implementation

The replica’s connection metadata repository contains information that the replication I/O (receiver) thread needs to connect to the replication source server and retrieve transactions from the source’s binary log. The metadata in this repository includes the connection configuration, the replication user account details, the SSL settings for the connection, and the file name and position where the replication receiver thread is currently reading from the source’s binary log.

The replica’s applier metadata repository contains information that the replication SQL (applier) thread needs to read and apply transactions from the replica’s relay log. The metadata in this repository includes the file name and position up to which the replication applier thread has executed the transactions in the relay log, and the equivalent position in the source’s binary log. It also includes metadata for the process of applying transactions, such as the number of worker threads and the PRIVILEGE_CHECKS_USER account for the channel.

The connection metadata repository is written to the slave_master_info table in the mysql system schema, and the applier metadata repository is written to the slave_relay_log_info table in the mysql system schema. A warning message is issued if mysqld is unable to initialize the tables for the replication metadata repositories, but the replica is allowed to continue starting. This situation is most likely to occur when upgrading from a version of MySQL that does not support the use of tables for the repositories to one in which they are supported.

Replication Metadata Repositories

-- connection metadata repository, 记录已读到的binlog文件和位置信息
select Number_of_lines , Master_log_name , Master_log_pos  from mysql.slave_master_info;
-- applier metadata repository, 记录已处理的relaylog文件和位置信息
select Number_of_lines , Relay_log_name , Relay_log_pos  from mysql.slave_relay_log_info;

先写队列,事务发生在消费消息时 #

接口请求来到时,均是先把数据写入消息队列,然后在消费端进行事务处理,如果写入消息队列失败则直接返回错误,让用户稍后重试。

基于消息队列的持久特性,确保消息被消费一次。

#

binlog读取 #

-- 选项解析:
-- IN 'log_name'   指定要查询的binlog文件名(不指定就是第一个binlog文件)
-- FROM pos        指定从哪个pos起始点开始查起(不指定就是从整个文件首个pos点开始算)
-- LIMIT [offset,] 偏移量(不指定就是0)
-- row_count       查询总条数(不指定就是所有行)
show binlog events [IN 'log_name'] [FROM pos] [LIMIT [offset,] row_count];