内容索引(Table of Contents)
- 1. Redis-replicator
- 2. 安装
- 3. 简要用法
- 4. 高级主题
- 5. 其他主题
- 6. 贡献者
- 7. 商业咨询
- 8. 相关引用
- 9. 致谢
Redis Replicator是一款RDB解析以及AOF解析的工具. 此工具完整实现了Redis Replication协议. 支持SYNC, PSYNC, PSYNC2等三种同步命令. 还支持远程RDB文件备份以及数据同步等功能. 此文中提到的 命令 特指Redis中的写(比如 set,hmset)命令,不包括读命令(比如 get,hmget), 支持的redis版本范围从2.6到8.4.x
479688557
编译最低jdk版本 jdk9+
运行最低jdk版本 jdk8+
maven-3.3.1+
redis 2.6 - 8.4
<dependency>
<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>3.11.0</version>
</dependency> step 1: 安装 jdk-11.0.x
step 2: git clone https://github.com/leonchen83/redis-replicator.git
step 3: $cd ./redis-replicator
step 4: $mvn clean install package -DskipTests
| Redis Version | redis-replicator Version |
|---|---|
| [2.6, 8.4.x] | [3.11.0, ] |
| [2.6, 8.2.x] | [3.10.0,3.10.0] |
| [2.6, 8.0.x] | [3.9.0, 3.9.0] |
| [2.6, 7.2.x] | [3.8.0, 3.8.1] |
| [2.6, 7.0.x] | [3.6.4, 3.7.0] |
| [2.6, 7.0.x-RC2] | [3.6.2, 3.6.3] |
| [2.6, 7.0.0-RC1] | [3.6.0, 3.6.1] |
| [2.6, 6.2.x] | [3.5.2, 3.5.5] |
| [2.6, 6.2.0-RC1] | [3.5.0, 3.5.1] |
| [2.6, 6.0.x] | [3.4.0, 3.4.4] |
| [2.6, 5.0.x] | [2.6.1, 3.3.3] |
| [2.6, 4.0.x] | [2.3.0, 2.5.0] |
| [2.6, 4.0-RC3] | [2.1.0, 2.2.0] |
| [2.6, 3.2.x] | [1.0.18] (not supported) |
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if (event instanceof KeyStringValueString) {
KeyStringValueString kv = (KeyStringValueString) event;
System.out.println(new String(kv.getKey()));
System.out.println(new String(kv.getValue()));
} else {
....
}
}
});
replicator.open();我们可以用 DumpRdbVisitor 来将 rdb 转换成 redis DUMP 格式。
Replicator r = new RedisReplicator("redis:///path/to/dump.rdb");
r.setRdbVisitor(new DumpRdbVisitor(r));
r.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if (!(event instanceof DumpKeyValuePair)) return;
DumpKeyValuePair dkv = (DumpKeyValuePair) event;
byte[] serialized = dkv.getValue();
// we can use redis RESTORE command to migrate this serialized value to another redis.
}
});
r.open();我们可以用 SkipRdbVisitor 来检查 rdb 的正确性.
Replicator r = new RedisReplicator("redis:///path/to/dump.rdb");
r.setRdbVisitor(new SkipRdbVisitor(r));
r.open();默认情况下, redis-replicator 使用 PSYNC 命令伪装成slave接收命令, 如下所示
Replicator r = new RedisReplicator("redis://127.0.0.1:6379");
r.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
System.out.println(event);
}
});
r.open();然而, 在某些云服务中, PSYNC 是被禁止使用的, 因此我们使用 Scan 命令来替换PSYNC命令扫描全库, 如下所示
Replicator r = new RedisReplicator("redis://127.0.0.1:6379?enableScan=yes&scanStep=256");
r.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
System.out.println(event);
}
});
r.open();参阅 examples
@CommandSpec(command = "APPEND")
public static class YourAppendCommand extends AbstractCommand {
private final String key;
private final String value;
public YourAppendCommand(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
} public class YourAppendParser implements CommandParser<YourAppendCommand> {
@Override
public YourAppendCommand parse(Object[] command) {
return new YourAppendCommand(new String((byte[]) command[1], UTF_8), new String((byte[]) command[2], UTF_8));
}
} Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addCommandParser(CommandName.name("APPEND"),new YourAppendParser()); replicator.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if(event instanceof YourAppendCommand){
YourAppendCommand appendCommand = (YourAppendCommand)event;
// your code goes here
}
}
});参阅 CommandExtensionExample.java
$cd /path/to/redis-4.0-rc2/src/modules
$make loadmodule /path/to/redis-4.0-rc2/src/modules/hellotype.so public class HelloTypeModuleParser implements ModuleParser<HelloTypeModule> {
@Override
public HelloTypeModule parse(RedisInputStream in, int version) throws IOException {
DefaultRdbModuleParser parser = new DefaultRdbModuleParser(in);
int elements = parser.loadUnsigned(version).intValue();
long[] ary = new long[elements];
int i = 0;
while (elements-- > 0) {
ary[i++] = parser.loadSigned(version);
}
return new HelloTypeModule(ary);
}
}
public class HelloTypeModule implements Module {
private final long[] value;
public HelloTypeModule(long[] value) {
this.value = value;
}
public long[] getValue() {
return value;
}
} public class HelloTypeParser implements CommandParser<HelloTypeCommand> {
@Override
public HelloTypeCommand parse(Object[] command) {
String key = new String((byte[])command[1],Constants.UTF_8);
long value = Long.parseLong(new String((byte[])command[2],Constants.UTF_8));
return new HelloTypeCommand(key, value);
}
}
@CommandSpec(command = "hellotype.insert")
public class HelloTypeCommand extends AbstractCommand {
private final String key;
private final long value;
public long getValue() {
return value;
}
public String getKey() {
return key;
}
public HelloTypeCommand(String key, long value) {
this.key = key;
this.value = value;
}
} public static void main(String[] args) throws IOException {
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addCommandParser(CommandName.name("hellotype.insert"), new HelloTypeParser());
replicator.addModuleParser("hellotype", 0, new HelloTypeModuleParser());
replicator.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if (event instanceof KeyStringValueModule) {
System.out.println(event);
}
if (event instanceof HelloTypeCommand) {
System.out.println(event);
}
}
});
replicator.open();
}参阅 ModuleExtensionExample.java
Redis-5.0+ 增加了一个新的数据结构 STREAM. Redis-replicator 用下述代码解析 STREAM
Replicator r = new RedisReplicator("redis://127.0.0.1:6379");
r.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if (event instanceof KeyStringValueStream) {
KeyStringValueStream kv = (KeyStringValueStream)event;
// key
String key = kv.getKey();
// stream
Stream stream = kv.getValueAsStream();
// last stream id
stream.getLastId();
// entries
NavigableMap<Stream.ID, Stream.Entry> entries = stream.getEntries();
// optional : group
for (Stream.Group group : stream.getGroups()) {
// group PEL(pending entries list)
NavigableMap<Stream.ID, Stream.Nack> gpel = group.getPendingEntries();
// consumer
for (Stream.Consumer consumer : group.getConsumers()) {
// consumer PEL(pending entries list)
NavigableMap<Stream.ID, Stream.Nack> cpel = consumer.getPendingEntries();
}
}
}
}
});
r.open();- 写一个类继承
RdbVisitor抽象类 - 通过
Replicator的setRdbVisitor方法注册你自己的RdbVisitor.
在 redis-replicator-2.4.0 版之前, 我们按如下方式构造 RedisReplicator :
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/dump.rdb", FileType.RDB, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.AOF, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.MIXED, Configuration.defaultSetting());在 redis-replicator-2.4.0 版之后, 我们引入了一个新的概念(Redis URI) 来简化 RedisReplicator 的构造, 以便提供一致的API.
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");
// 配置的例子
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379?authPassword=foobared&readTimeout=10000&ssl=yes");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb?rateLimit=1000000");
Replicator replicator = new RedisReplicator("rediss://user:pass@127.0.0.1:6379?rateLimit=1000000");| 命令 | 命令 | 命令 | 命令 | 命令 | 命令 |
|---|---|---|---|---|---|
| PING | APPEND | SET | SETEX | MSET | DEL |
| SADD | HMSET | HSET | LSET | EXPIRE | EXPIREAT |
| GETSET | HSETNX | MSETNX | PSETEX | SETNX | SETRANGE |
| HDEL | UNLINK | SREM | LPOP | LPUSH | LPUSHX |
| LRem | RPOP | RPUSH | RPUSHX | ZREM | ZINTERSTORE |
| INCR | DECR | INCRBY | PERSIST | SELECT | FLUSHALL |
| FLUSHDB | HINCRBY | ZINCRBY | MOVE | SMOVE | BRPOPLPUSH |
| PFCOUNT | PFMERGE | SDIFFSTORE | RENAMENX | PEXPIREAT | SINTERSTORE |
| ZADD | BITFIELD | SUNIONSTORE | RESTORE | LINSERT | ZREMRANGEBYLEX |
| GEOADD | PEXPIRE | ZUNIONSTORE | EVAL | SCRIPT | ZREMRANGEBYRANK |
| PUBLISH | BITOP | SETBIT | SWAPDB | PFADD | ZREMRANGEBYSCORE |
| RENAME | MULTI | EXEC | LTRIM | RPOPLPUSH | SORT |
| EVALSHA | ZPOPMAX | ZPOPMIN | XACK | XADD | XCLAIM |
| XDEL | XGROUP | XTRIM | XSETID | COPY | LMOVE |
| BLMOVE | ZDIFFSTORE | GEOSEARCHSTORE | FUNCTION | SPUBLISH | HPERSIST |
| HSETEX | HPEXPIREAT | XACKDEL | XDELEX | MSETEX |
当消费事件过慢积压事件超过redis backlog限制时,redis会主动断开与slave的连接,Redis-replicator再重连时会走全量同步,如果想避免这一情况,需要设置参数client-output-buffer-limit slave 0 0 0
相关配置请参考 redis.conf
client-output-buffer-limit slave 0 0 0警告: 这个配置可能会使redis-server中的内存溢出
- 日志级别调整成 debug
- 如果你项目中使用log4j2,请加入如下Logger到配置文件:
<Logger name="com.moilioncircle" level="debug">
<AppenderRef ref="YourAppender"/>
</Logger> Configuration.defaultSetting().setVerbose(true);
// redis uri
"redis://127.0.0.1?verbose=yes" System.setProperty("javax.net.ssl.keyStore", "/path/to/keystore");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "your_type");
System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "your_type");
Configuration.defaultSetting().setSsl(true);
// 可选设置
Configuration.defaultSetting().setSslSocketFactory(sslSocketFactory);
Configuration.defaultSetting().setSslParameters(sslParameters);
Configuration.defaultSetting().setHostnameVerifier(hostnameVerifier);
// redis uri
"redis://127.0.0.1:6379?ssl=yes"
"rediss://127.0.0.1:6379" Configuration.defaultSetting().setAuthUser("default");
Configuration.defaultSetting().setAuthPassword("foobared");
// redis uri
"redis://127.0.0.1:6379?authPassword=foobared&authUser=default"
"redis://default:foobared@127.0.0.1:6379"- 调整redis-server中的如下配置
repl-backlog-size
repl-backlog-ttl
repl-ping-slave-periodrepl-ping-slave-period 必须 小于 Configuration.getReadTimeout(), 默认的 Configuration.getReadTimeout() 是60秒.
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
final long start = System.currentTimeMillis();
final AtomicInteger acc = new AtomicInteger(0);
replicator.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if(event instanceof PreRdbSyncEvent) {
System.out.println("pre rdb sync");
} else if(event instanceof PostRdbSyncEvent) {
long end = System.currentTimeMillis();
System.out.println("time elapsed:" + (end - start));
System.out.println("rdb event count:" + acc.get());
} else {
acc.incrementAndGet();
}
}
});
replicator.open();根据 4.3. 编写你自己的rdb解析器, 这个工具内嵌了一个迭代方式的rdb解析器, 以便处理巨大的KV.
详细的例子参阅:
[1] HugeKVFileExample.java
[2] HugeKVSocketExample.java
$cd /path/to/redis
$./utils/gen-test-certs.sh
$cd tests/tls
$openssl pkcs12 -export -CAfile ca.crt -in redis.crt -inkey redis.key -out redis.p12
$cd /path/to/redis
$./src/redis-server --tls-port 6379 --port 0 --tls-cert-file ./tests/tls/redis.crt \
--tls-key-file ./tests/tls/redis.key --tls-ca-cert-file ./tests/tls/ca.crt \
--tls-replication yes --bind 0.0.0.0 --protected-mode no
System.setProperty("javax.net.ssl.keyStore", "/path/to/redis/tests/tls/redis.p12");
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "pkcs12");
System.setProperty("javax.net.ssl.trustStore", "/path/to/redis/tests/tls/redis.p12");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "pkcs12");
Replicator replicator = new RedisReplicator("rediss://127.0.0.1:6379");
如果你不想设置 System.setProperty 可以使用下面的方式
RedisSslContextFactory factory = new RedisSslContextFactory();
factory.setKeyStorePath("/path/to/redis/tests/tls/redis.p12");
factory.setKeyStoreType("pkcs12");
factory.setKeyStorePassword("password");
factory.setTrustStorePath("/path/to/redis/tests/tls/redis.p12");
factory.setTrustStoreType("pkcs12");
factory.setTrustStorePassword("password");
SslConfiguration ssl = SslConfiguration.defaultSetting().setSslContextFactory(factory);
Replicator replicator = new RedisReplicator("rediss://127.0.0.1:6379", ssl); Replicator replicator = new RedisReplicator("redis://user:pass@127.0.0.1:6379");Redis 7.0 添加了 function 的支持. function 的结构存储在rdb文件中. 因此我们能用如下方式解析function.
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if (event instanceof Function) {
Function function = (Function) event;
function.getCode();
// your code goes here
}
}
});
replicator.open();也可以把 function 解析成 serialized 格式. 这样接下来我们可以用 FUNCTION RESTORE 命令把 serialized 数据迁移到目标redis
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.setRdbVisitor(new DumpRdbVisitor(replicator));
replicator.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if (event instanceof DumpFunction) {
DumpFunction function = (DumpFunction) event;
byte[] serialized = function.getSerialized();
// your code goes here
// you can use FUNCTION RESTORE to restore above serialized data to target redis
}
}
});
replicator.open();Redis 7.4 添加了 ttl hash 的支持. ttl hash 的结构存储在rdb文件中. 因此我们能用如下方式解析ttl hash.
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addEventListener(new EventListener() {
@Override
public void onEvent(Replicator replicator, Event event) {
if (event instanceof KeyStringValueTTLHash) {
KeyStringValueTTLHash skv = (KeyStringValueTTLHash) event;
// key
byte[] key = skv.getKey();
// ttl hash
Map<byte[], TTLValue> ttlHash = skv.getValue();
for (Map.Entry<byte[], TTLValue> entry : ttlHash.entrySet()) {
System.out.println("field:" + Strings.toString(entry.getKey()));
System.out.println("value:" + Strings.toString(entry.getValue().getValue()));
System.out.println("field ttl:" + entry.getValue().getExpires());
}
}
}
});
replicator.open();redis-replicator 支持如下的商业咨询服务:
- 现场咨询. 50,000元/天
- 现场培训. 50,000元/天
可以直接联系陈宝仪, 发送邮件至 chen.bao.yi@gmail.com.
2023年1月27日,在这一天我的妈妈宁文君(1953-2023)离世了。她是一个慈祥严格又乐于助人的老太太,自己的退休金虽然不多,但每年也会给贫困山区捐衣物现金。她是支撑我写下这个工具的最大动力,每当我跟她说又有新的公司在用这个工具时,她都和我一样高兴并鼓励我继续维护下去,也一直鼓励我参加各种技术分享活动。虽然我并没有取得多少成就,但她一直为我自豪。可能很多年后宁文君这个名字会被遗忘,但我希望 Github 会再有将数据备份到北极的活动,这样这个名字就会保存一千年。愿逝者安息。
YourKit is kindly supporting this open source project with its full-featured Java Profiler.
YourKit, LLC is the creator of innovative and intelligent tools for profiling
Java and .NET applications. Take a look at YourKit's leading software products:
YourKit Java Profiler and
YourKit .NET Profiler.
IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software.
It is developed by JetBrains (formerly known as IntelliJ), and is available as an Apache 2 Licensed community edition,
and in a proprietary commercial edition. Both can be used for commercial development.
Redisson is Redis based In-Memory Data Grid for Java offers distributed objects and services (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) backed by Redis server. Redisson provides more convenient and easiest way to work with Redis. Redisson objects provides a separation of concern, which allows you to keep focus on the data modeling and application logic.

