how redis do replication

- step 1:add callback for server cron job
1 | //location:server.c |
- step 2:call replication for each 1000 seconds
1 | //location:server.c |
step 3:core replication cron job
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326//location:replication.c
//function: response for redis replication
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) {
if (connectWithMaster() == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
};
}
int connectWithMaster(void) {
int fd;
fd = anetTcpNonBlockBestEffortBindConnect(NULL,
server.masterhost,server.masterport,NET_FIRST_BIND_ADDR);
// fd of master socket
aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL);
}
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
/* Send a PING to check the master is able to reply without errors. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PING",NULL);
/* Receive the PONG command. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* AUTH with the master if required. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"AUTH",server.masterauth,NULL);
/* Receive AUTH reply. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"listening-port",port, NULL);
/* Receive REPLCONF listening-port reply. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
/* Receive REPLCONF ip-address reply. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
sendSynchronousCommand(SYNC_CMD_WRITE,fd,"REPLCONF",
"capa","eof","capa","psync2",NULL);
/* Receive CAPA reply. */
sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
slaveTryPartialResynchronization(fd,0) {
/* If we reached this point, we are able to perform a partial resync:
/* Issue the PSYNC command */
reply = sendSynchronousCommand(SYNC_CMD_WRITE,fd,"PSYNC",psync_replid,psync_offset,NULL)
{
/* command of up,will call syncCommand of master node
/* SYNC and PSYNC command implemenation. */
// PSYNC command will call syncCommand function
void syncCommand(client *c) {
masterTryPartialResynchronization(c) {
// write master replid to slave
if (c->slave_capa & SLAVE_CAPA_PSYNC2) {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE %s\r\n", server.replid);
} else {
buflen = snprintf(buf,sizeof(buf),"+CONTINUE\r\n");
}
if (write(c->fd,buf,buflen) != buflen) {
freeClientAsync(c);
}
* 1) Set client state to make it a slave.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the slave. */
c->flags |= CLIENT_SLAVE;
c->replstate = SLAVE_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
c->repl_put_online_on_ack = 0;
listAddNodeTail(server.slaves,c);
startBgsaveForReplication(c->slave_capa) {
// proto
//int startBgsaveForReplication(int mincapa)
int retval;
//socket_target init according slave_capa_eof
int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
if (rsiptr) {
if (socket_target)
// mark rio with end flag,and send to slave
rdbSaveToSlavesSockets(rsiptr){
if(fork()==0){
CHILD_INFO_TYPE_RDB
}
}
else
// 1. bgsave current rdb.dump with background
rdbSaveBackground(server.rdb_filename,rsiptr){
if((childpid=fork())==0){
// redis server main process will wait util save all database in disk for finish
rdbSave(rdb_filename);
// write CHILD_INFO_TYPE_RDB to parent process by pipe
sendChildInfo(CHILD_INFO_TYPE_RDB)
}else{
server.rdb_child_pid = childpid;
server.rdb_child_type = RDB_CHILD_TYPE_DISK;
}
}
}
copyClientOutputBuffer(c,slave);
if(!socket_target) {
// 2.write replid and offset to slave
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset)
}
}
}
}
reply = sendSynchronousCommand(SYNC_CMD_READ,fd,NULL);
/* We are going to full resync, discard the cached master structure. */
replicationDiscardCachedMaster();
}
psync_result = slaveTryPartialResynchronization(fd,1);
/* PSYNC failed or is not supported: we want our slaves to resync with us
* as well, if we have any sub-slaves. The master may transfer us an
* entirely different data set and we have no way to incrementally feed
* our slaves after that. */
disconnectSlaves(); /* Force our slaves to resync with us as well. */
freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */
/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC
* and the server.master_replid and master_initial_offset are
* already populated. sync state,waited*/
syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1);
/* Prepare a suitable temp file for bulk transfer */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
}
/* Setup the non blocking download of the bulk file. */
aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL);
server.repl_state = REPL_STATE_TRANSFER;
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
// current node as slave,open temp-rdb.dump file and get fd for this file
server.repl_transfer_fd = dfd;
server.repl_transfer_lastio = server.unixtime;
server.repl_transfer_tmpfile = zstrdup(tmpfile);
return;
}
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
char buf[4096];
ssize_t nread, readlen, nwritten;
/* If repl_transfer_size == -1 we still have to read the bulk length
* from the master reply. */
syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {
/* Read bulk data */
if (usemark) {
readlen = sizeof(buf);
} else {
left = server.repl_transfer_size - server.repl_transfer_read;
readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);
}
// read rdb.dump from master
nread = read(fd,buf,readlen);
if (nread <= 0) {
cancelReplicationHandshake();
}
// write to current temp-rdb.dump file
nwritten = write(server.repl_transfer_fd,buf,nread))
rename(server.repl_transfer_tmpfile,server.rdb_filename) ;
/* We need to stop any AOFRW fork before flusing and parsing
* RDB, otherwise we'll create a copy-on-write disaster. */
if(aof_is_enabled) stopAppendOnly();
//mark client as dirty,that will be remove
signalFlushedDb(-1);
// clean all data for current node
emptyDb(-1,server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,replicationEmptyDbCallback)
{
// proto long long emptyDb(int dbnum, int flags, void(callback)(void*))
int async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
int startdb = 0;
int enddb = server.dbnum-1;
for (int j = startdb; j <= enddb; j++) {
removed += dictSize(server.db[j].dict);
//async instand for as backgroup job to flush data in database
if (async) {
// that will create new dict after async,old dict mark to clean
emptyDbAsync(&server.db[j]);
} else {
//blocking for flush all data in database
dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires,callback);
}
}
/* Before loading the DB into memory we need to delete the readable
* handler, otherwise it will get called recursively since
* rdbLoad() will call the event loop to process events from time to
* time for non blocking loading. */
aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);
//init slave info
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
//load data from rdb_file
rdbLoad(server.rdb_filename,&rsi);
// create master client for slave,that will got master socket and other info
// incude replid of master,server.replid
replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);
server.repl_state = REPL_STATE_CONNECTED;
// clean old replid,that included in server.replid2
clearReplicationId2();
/* Let's create the replication backlog if needed. Slaves need to
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (server.repl_backlog == NULL) createReplicationBacklog();
/* Restart the AOF subsystem now that we finished the sync. This
* will trigger an AOF rewrite, and when done will start appending
* to the new file. */
if (aof_is_enabled) restartAOF();
}
return;
}
* On success the fuction returns the number of keys removed from the
* database(s). Otherwise -1 is returned in the specific case the
* DB number is out of range, and errno is set to EINVAL. */
long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
int async = (flags & EMPTYDB_ASYNC);
long long removed = 0;
if (dbnum < -1 || dbnum >= server.dbnum) {
errno = EINVAL;
return -1;
}
int startdb, enddb;
if (dbnum == -1) {
startdb = 0;
enddb = server.dbnum-1;
} else {
startdb = enddb = dbnum;
}
for (int j = startdb; j <= enddb; j++) {
removed += dictSize(server.db[j].dict);
if (async) {
emptyDbAsync(&server.db[j]);
} else {
dictEmpty(server.db[j].dict,callback);
dictEmpty(server.db[j].expires,callback);
}
}
if (server.cluster_enabled) {
if (async) {
slotToKeyFlushAsync();
} else {
slotToKeyFlush();
}
}
if (dbnum == -1) flushSlaveKeysWithExpireList();
return removed;
}step4 wait for child process
1 | // |