服务间消息流转

发送消息

数据结构message_queue *q是次级消息队列,每个服务都有与之绑定的唯一一个次级消息队列。当我们在lua层,调用skynet.send(...), 或者调用skynet.call(...)函数发送消息,其实调用的都是c.send(...),实际上调用的是c接口:

//lua-skynet.c
lsend(lua_State *L) {
	return send_message(L, 0, 2);
}
static int send_message(lua_State *L, int source, int idx_type) {...}
int skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz){...}

//把消息push到次级消息中,如果次级消息不在全局消息中,则把次级消息push到全局消息中
int skynet_context_push(uint32_t handle, struct skynet_message *message)
{
    struct skynet_context * ctx = skynet_handle_grab(handle);
	if (ctx == NULL) {
		return -1;
	}
	skynet_mq_push(ctx->queue, message);
	skynet_context_release(ctx);

	return 0;
}
  • 发送消息所做的事情就是就是把消息push到与本地服务绑定的次级消息队列中。

消费消息

消费消息主要在threa_work线程中:

//skynet.start.c
static void * thread_worker(void *p) {
    ...
    //声明一个空的消息队列
	struct message_queue * q = NULL;
	while (!m->quit) {
	    //从全局消息取出一个次级消息队列,并进行处理,处理过程都在skynet_context_message_dispatch
		q = skynet_context_message_dispatch(sm, q, weight);
		if (q == NULL) {
		    ...
		}
	}
	return NULL;
}
  • 从代码层可以看到,消费消息就是把消息从全局消息队列中取出一个消息队列。然后进行消息处理. 工作线程就是处理这个全局消息的。
  • 从发送消息push到消息队列,再到工作线程从消息队列中取出消息进行处理,这整个工作流程就很明了。

继续深入看skynet_context_message_dispatch函数做了哪些事情:

//skynet.server.c
struct message_queue * skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
    ...
    //从次级消息队列中获取该消息队列属于哪个服务
	uint32_t handle = skynet_mq_handle(q);

    //获取服务的ctx,这样就能获取服务实例指针mod了。
	struct skynet_context * ctx = skynet_handle_grab(handle);
	...

	int i,n=1;
	struct skynet_message msg;

	for (i=0;i<n;i++) {
	    //循环处理
	    ...
		//判断是否负载
		int overload = skynet_mq_overload(q);
		if (overload) {
			skynet_error(ctx, "May overload, message queue length = %d", overload);
		}

        //触发监视器
		skynet_monitor_trigger(sm, msg.source , handle);

		if (ctx->cb == NULL) {
			skynet_free(msg.data);
		} else {
		    //消息处理
			dispatch_message(ctx, &msg);
		}
		//解除监视器
		skynet_monitor_trigger(sm, 0,0);
	}

	assert(q == ctx->queue);
	//再继续从全局消息中取消息
	struct message_queue *nq = skynet_globalmq_pop();
	if (nq) {
		// If global mq is not empty , push q back, and return next queue (nq)
		// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
		skynet_globalmq_push(q);
		q = nq;
	} 
	//释放当前服务的ctx,实际操作是引用计数-1
	skynet_context_release(ctx);

	return q;
}

//消息处理函数
static void dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
	assert(ctx->init);
	CHECKCALLING_BEGIN(ctx)
	pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
	int type = msg->sz >> MESSAGE_TYPE_SHIFT;
	size_t sz = msg->sz & MESSAGE_TYPE_MASK;
	//日志
	FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile);
	if (f) {
		skynet_log_output(f, msg->source, type, msg->session, msg->data, sz);
	}
	++ctx->message_count;
	int reserve_msg;
	if (ctx->profile) {
	    ...
	} else {
	    //回调函数
		reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
	}
	if (!reserve_msg) {
		skynet_free(msg->data);
	}
	CHECKCALLING_END(ctx)
}

消息处理过程

  • 从全局消息中取出次级消息队列
  • 根据次级消息队列找到对应的服务上下文
  • 遍历出次级消息队列的消息,并进行处理,调用dispatch_message函数
  • 然后dispatch_message调用回调函数,这个回调函数是在skynet.lua中,服务启动的时候调用c.callback设置的,于是我们就看到最后服务消息的处理逻辑是在服务启动的时候设置的skynet.dispatch_message(...)中。

总结

看到整个消息流转,其实不难看出skynet的架构,

  1. 每个服务对应一个次级消息队列,然后每个服务所发送的消息全塞给这个消息队列。
  2. thread_work线程从全局消息队列中取出一个次级消息队列,然后就可以处理对应服务和对应的消息了。处理消息的逻辑在skynet.dispatch_message(...).

下图是skynet作为服务器,收到socket消息,并进行处理的整个过程,这个与上面内部的消息流转其实原理一样。都是把消息放到消息队列中,然后work线程从全局消息队列中取消息进行处理。

--完--