Apache Doris 源码分析1 - 从进程启动到接收处理SQL

1
2
3
4
Apache Doris 源码分析1 - 从进程启动到接收处理SQL
-- yakun
-- 2022-02-08 19:56:56
-- Doris 源码基于 branch-0.15, commit: a1d1bd8965c3b97f2c4eaf4fd8405efd3b35368a

doris 分为 fe 和 be, 其中 fe 主要负责用户交互的部分, 它的启动的入口是 start_fe.sh 脚本, 这个脚本会初始化环境变量, 配置 java home, log dir, 保存 pid 等一系列的操作, 咱们直接看这个脚本文件最后的几行:

1
2
3
4
5
6
7
8
9
//位置: bin/start_fe.sh

if [ ${RUN_DAEMON} -eq 1 ]; then
nohup $LIMIT $JAVA $final_java_opt org.apache.doris.PaloFe ${HELPER} "$@" >> $LOG_DIR/fe.out 2>&1 </dev/null &
else
$LIMIT $JAVA $final_java_opt org.apache.doris.PaloFe ${HELPER} "$@" >> $LOG_DIR/fe.out 2>&1 </dev/null
fi

echo $! > $pidfile

上面这条判断条件会根据是否要以 daemon 方式启动来选择加或不加 nohup, 重点是这个主类:org.apache.doris.PaloFe, 它是 fe 进程的开始, 下面咱们进入这个主类的 main 函数():

1
2
3
4
5
//位置: fe/fe-core/src/main/java/org/apache/doris/PaloFe.java

public static void main(String[] args) {
start(DORIS_HOME_DIR, PID_DIR, args);
}

main() 函数很简单, 把自己的 home-dir 和 pid-dir 以及其它参数传递调用了自己的 start() 函数, 这个 start() 函数会做一系列的初始化工作包括:

  • 刚刚传递的参数是否合法
  • 尝试锁住自己的 pid 文件
  • 从 home-dir/conf/fe.conf 初始化配置
  • 从 home-dir/conf/ldap.conf 初始化ldap配置
  • 检查 java 版本是否兼容
  • 初始化 log4j 配置
  • 检查所有端口是否已经被占用

之后, 会进入4项重要的工作:

  1. 初始化并启动 Catalog 服务
  2. 初始化并启动 rpc 服务 feServer
  3. 初始化并启动 http 服务 httpServer2
  4. 初始化并启动 MySQL Server 服务 qeService

今天分析主线是如何接受到第一条 SQL 语句, 所以重点放在 qeService 上面, 它的启动代码逻辑如下:

1
2
3
4
5
//位置: fe/fe-core/src/main/java/org/apache/doris/PaloFe.java

QeService qeService = new QeService(Config.query_port, Config.mysql_service_nio_enabled, ExecuteEnv.getInstance().getScheduler());

qeService.start();

下面先分析一下 QeService() 的构造函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//位置: fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java

public QeService(int port, boolean nioEnabled, ConnectScheduler scheduler) {
// Set up help module
try {
HelpModule.getInstance().setUpModule();
} catch (Exception e) {
LOG.error("Help module failed, because:", e);
}
this.port = port;
if (nioEnabled) {
mysqlServer = new NMysqlServer(port, scheduler); // 异步 NIO 模式
} else {
mysqlServer = new MysqlServer(port, scheduler);
}
}

上面的代码片段中, 首先是初始化了帮助模块, 然后判断是否是异步nio模式(默认是)来启动不同的 mysqlServer 实例, 这里继续沿着 nio 的路径往下走, 看一下 NMysqlServer() 的构造函数:

1
2
3
4
5
6
7
8
9
10
11
//位置: fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java

public NMysqlServer(int port, ConnectScheduler connectScheduler) {
this.port = port;
this.xnioWorker = Xnio.getInstance().createWorkerBuilder()
.setWorkerName("doris-mysql-nio")
.setWorkerIoThreads(Config.mysql_service_io_threads_num)
.setExternalExecutorService(taskService).build();
// connectScheduler only used for idle check.
this.acceptListener = new AcceptListener(connectScheduler); // 创建网络连接回调函数
}

上面的代码中又添加了网络连接回调函数acceptListener, 它的构造函数比较简单就不分析了, 关于它的回调handleEvent() 一会儿再分析. 到这里就完成了QeService() 的构造函数逻辑.

下面开始分析 qeService.start() 的逻辑.

1
2
3
4
5
6
7
8
9
//位置: fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java

public void start() throws IOException {
if (!mysqlServer.start()) { // 逻辑比较简单, 就是调用 mysqlServer.start()
LOG.error("mysql server start failed");
System.exit(-1);
}
LOG.info("QE service start.");
}

下面再看一下 mysqlServer.start() 的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//位置: fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlServer.java

// start MySQL protocol service
// return true if success, otherwise false
@Override
public boolean start() {
try {
server = xnioWorker.createStreamConnectionServer(new InetSocketAddress(port),
acceptListener, OptionMap.create(Options.TCP_NODELAY, true, Options.BACKLOG, Config.mysql_nio_backlog_num));
server.resumeAccepts();
running = true;
LOG.info("Open mysql server success on {}", port);
return true;
} catch (IOException e) {
LOG.warn("Open MySQL network service failed.", e);
return false;
}
}

这里启动了一个异步网络连接服务, 监听指定的端口, 并设置了回调 acceptListener, 然后 server.resumeAccepts()来开始接受网络请求.

以上就是启动时的所有同步逻辑了, 通过这些步骤启动了一个监控 MySQL 端口的 Service. 接下来, 就是用户通过 MySQL Client 来连接这个 Service 时的表现了.

假设有个用户连接到这个 Service 上面了, 在接收到来自用户的网络请求时, 回调函数 acceptListener.handleEvent() 会被调用, 来看一下它的处理逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
//位置: fe/fe-core/src/main/java/org/apache/doris/mysql/nio/AcceptListener.java

@Override
public void handleEvent(AcceptingChannel<StreamConnection> channel) {
try {
StreamConnection connection = channel.accept();
if (connection == null) {
return;
}
LOG.info("Connection established. remote={}", connection.getPeerAddress());
// connection has been established, so need to call context.cleanup()
// if exception happens.
NConnectContext context = new NConnectContext(connection); // 创建了一个 NConnectContext 来标识这个连接
context.setCatalog(Catalog.getCurrentCatalog());
connectScheduler.submit(context); // nio 模式下 submit() 里不会做处理, 直接跳过

channel.getWorker().execute(() -> {
try {
// Set thread local info
context.setThreadLocalInfo();
context.setConnectScheduler(connectScheduler);
// authenticate check failed.
if (!MysqlProto.negotiate(context)) {
throw new AfterConnectedException("mysql negotiate failed");
}
if (connectScheduler.registerConnection(context)) { // 这里主要是判断全局和个人的连接数是否超限
MysqlProto.sendResponsePacket(context);
connection.setCloseListener(streamConnection -> connectScheduler.unregisterConnection(context));
} else {
context.getState().setError("Reach limit of connections");
MysqlProto.sendResponsePacket(context);
throw new AfterConnectedException("Reach limit of connections");
}
context.setStartTime();
ConnectProcessor processor = new ConnectProcessor(context); // 创建了一个网络连接处理器 processor
context.startAcceptQuery(processor); // 重点: 这里开始接受查询 Query 了
} catch (AfterConnectedException e) {
// do not need to print log for this kind of exception.
// just clean up the context;
context.cleanup();
} catch (Throwable e) {
// should be unexpected exception, so print warn log
if (context.getCurrentUserIdentity() != null) {
LOG.warn("connect processor exception because ", e);
} else if (e instanceof Error) {
LOG.error("connect processor exception because ", e);
} else {
// for unauthrorized access such lvs probe request, may cause exception, just log it in debug level
LOG.debug("connect processor exception because ", e);
}
context.cleanup();
} finally {
ConnectContext.remove();
}
});
} catch (IOException e) {
LOG.warn("Connection accept failed.", e);
}
}

参考上面标记出来的重点, 接下来分析一下 context.startAcceptQuery(processor);的处理逻辑.

1
2
3
4
5
//位置: fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NConnectContext.java

public void startAcceptQuery(ConnectProcessor connectProcessor) {
mysqlChannel.startAcceptQuery(this, connectProcessor);
}

下面继续跟进 mysqlChannel.startAcceptQuery() 内部逻辑.

1
2
3
4
5
6
//位置: fe/fe-core/src/main/java/org/apache/doris/mysql/nio/NMysqlChannel.java

public void startAcceptQuery(NConnectContext nConnectContext, ConnectProcessor connectProcessor) {
conn.getSourceChannel().setReadListener(new ReadListener(nConnectContext, connectProcessor));
conn.getSourceChannel().resumeReads();
}

上面代码中有个很重要的处理逻辑是放了一个 ReadListener 的回调实例, 它会在这个连接接到一个 mysql 命令时被回调, 下面分析一下它被回调的函数 handleEvent().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//位置: fe/fe-core/src/main/java/org/apache/doris/mysql/nio/ReadListener.java

@Override
public void handleEvent(ConduitStreamSourceChannel channel) {
// suspend must be call sync in current thread (the IO-Thread notify the read event),
// otherwise multi handler(task thread) would be waked up by once query.
XnioIoThread.requireCurrentThread();
ctx.suspendAcceptQuery();
// start async query handle in task thread.
channel.getWorker().execute(() -> {
ctx.setThreadLocalInfo();
try {
connectProcessor.processOnce(); // 重点: 调用一次 processOnce
if (!ctx.isKilled()) {
ctx.resumeAcceptQuery();
} else {
ctx.stopAcceptQuery();
ctx.cleanup();
}
} catch (Exception e) {
LOG.warn("Exception happened in one session(" + ctx + ").", e);
ctx.setKilled();
ctx.cleanup();
} finally {
ConnectContext.remove();
}
});
}

下面继续跟进上面代码标记出来的重点: connectProcessor.processOnce();

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//位置: fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java

// 处理一个MySQL请求,接收,处理,返回
public void processOnce() throws IOException {
// set status of query to OK.
ctx.getState().reset();
executor = null;

// reset sequence id of MySQL protocol
final MysqlChannel channel = ctx.getMysqlChannel();
channel.setSequenceId(0);
// read packet from channel
try {
packetBuf = channel.fetchOnePacket();
if (packetBuf == null) {
LOG.warn("Null packet received from network. remote: {}", channel.getRemoteHostPortString());
throw new IOException("Error happened when receiving packet.");
}
} catch (AsynchronousCloseException e) {
// when this happened, timeout checker close this channel
// killed flag in ctx has been already set, just return
return;
}

// dispatch
dispatch(); // 重点: 分发处理
// finalize
finalizeCommand();

ctx.setCommand(MysqlCommand.COM_SLEEP);
}

上面的代码片段, 先会创建一个 MysqlChannel 实例, 然后开始接受数据包, 在接收完成后, 会调用 dispatch() 进行逻辑处理. 下面继续跟进 dispatch().

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//位置: fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java

private void dispatch() throws IOException {
int code = packetBuf.get();
MysqlCommand command = MysqlCommand.fromCode(code);
if (command == null) {
ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
ctx.getState().setError("Unknown command(" + command + ")");
LOG.warn("Unknown command(" + command + ")");
return;
}
ctx.setCommand(command);
ctx.setStartTime();

switch (command) {
case COM_INIT_DB:
handleInitDb();
break;
case COM_QUIT:
handleQuit();
break;
case COM_QUERY:
handleQuery();
break;
case COM_FIELD_LIST:
handleFieldList();
break;
case COM_PING:
handlePing();
break;
default:
ctx.getState().setError("Unsupported command(" + command + ")");
LOG.warn("Unsupported command(" + command + ")");
break;
}
}

在 dispatch() 的处理逻辑, 先是拿一个命令数字, 然后根据它是哪种命令, 进行分别的处理, 命令的类型包含:

  • INIT_DB
  • QUIT
  • QUERY
  • FIELD_LIST
  • PING

今天要分析的重点是来自用户的第一条查询语句, 所以继续跟进到 QUERY 这个命令的处理逻辑 handleQuery() 里面.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
//位置: fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java

// process COM_QUERY statement,
// 只有在与请求客户端交互出现问题时候才抛出异常
private void handleQuery() {
MetricRepo.COUNTER_REQUEST_ALL.increase(1L);
// convert statement to Java string
String originStmt = null;
try {
byte[] bytes = packetBuf.array();
int ending = packetBuf.limit() - 1;
while (ending >= 1 && bytes[ending] == '\0') {
ending--;
}
originStmt = new String(bytes, 1, ending, "UTF-8");
// 执行到这里, 已经获取到了用户的查询语句最原始的字符串: originStmt
} catch (UnsupportedEncodingException e) {
// impossible
LOG.error("UTF8 is not supported in this environment.");
ctx.getState().setError("Unsupported character set(UTF-8)");
return;
}

String sqlHash = DigestUtils.md5Hex(originStmt);
ctx.setSqlHash(sqlHash);
try {
Catalog.getCurrentCatalog().getSqlBlockRuleMgr().matchSql(originStmt, sqlHash, ctx.getQualifiedUser());
} catch (AnalysisException e) {
LOG.warn(e.getMessage());
ctx.getState().setError(e.getMessage());
return;
}
ctx.getAuditEventBuilder().reset();
ctx.getAuditEventBuilder()
.setTimestamp(System.currentTimeMillis())
.setClientIp(ctx.getMysqlChannel().getRemoteHostPortString())
.setUser(ctx.getQualifiedUser())
.setDb(ctx.getDatabase())
.setSqlHash(ctx.getSqlHash());
// 上面的逻辑主要是做一些审计的工作, 记录一下谁在什么时候发来了一个什么查询语句

// execute this query.
StatementBase parsedStmt = null;
List<Pair<StatementBase, Data.PQueryStatistics>> auditInfoList = Lists.newArrayList();
boolean alreadyAddedToAuditInfoList = false;
try {
List<StatementBase> stmts = analyze(originStmt); // 重点: 这里进行词法分析和语法分析, 返回是解析过的多个语句
for (int i = 0; i < stmts.size(); ++i) { // 对每个语句进行执行操作
alreadyAddedToAuditInfoList = false;
ctx.getState().reset();
if (i > 0) {
ctx.resetReturnRows();
}
parsedStmt = stmts.get(i);
parsedStmt.setOrigStmt(new OriginStatement(originStmt, i));
parsedStmt.setUserInfo(ctx.getCurrentUserIdentity());
executor = new StmtExecutor(ctx, parsedStmt);
ctx.setExecutor(executor);
executor.execute(); // 执行这一条语句

if (i != stmts.size() - 1) {
ctx.getState().serverStatus |= MysqlServerStatusFlag.SERVER_MORE_RESULTS_EXISTS;
finalizeCommand();
}
auditInfoList.add(new Pair<>(executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog()));
alreadyAddedToAuditInfoList = true;
}
} catch (IOException e) {
// Client failed.
LOG.warn("Process one query failed because IOException: ", e);
ctx.getState().setError("Doris process failed");
} catch (UserException e) {
LOG.warn("Process one query failed because.", e);
ctx.getState().setError(e.getMessage());
// set is as ANALYSIS_ERR so that it won't be treated as a query failure.
ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
} catch (Throwable e) {
// Catch all throwable.
// If reach here, maybe palo bug.
LOG.warn("Process one query failed because unknown reason: ", e);
ctx.getState().setError(e.getClass().getSimpleName() + ", msg: " + e.getMessage());
if (parsedStmt instanceof KillStmt) {
// ignore kill stmt execute err(not monitor it)
ctx.getState().setErrType(QueryState.ErrType.ANALYSIS_ERR);
}
}

// that means execute some statement failed
if (!alreadyAddedToAuditInfoList && executor != null) {
auditInfoList.add(new Pair<>(executor.getParsedStmt(), executor.getQueryStatisticsForAuditLog()));
}

// audit after exec
if (!auditInfoList.isEmpty()) {
for (Pair<StatementBase, Data.PQueryStatistics> audit : auditInfoList) {
auditAfterExec(originStmt.replace("\n", " "), audit.first, audit.second);
}
} else {
// auditInfoList can be empty if we encounter analysis error.
auditAfterExec(originStmt.replace("\n", " "), null, null);
}
}

至此, 就完成了 Doris 从启动到接收并处理 SQL 的全流程分析了.

后面的文章, 咱们再深入研究一下词法分析, 语法分析, 语义分析等等一系列的处理步骤的核心源码.