Impressive Architecture Designs in XXL-Job

Impressive Architecture Designs in XXL-Job

Overview

Date: 2025-10-23 07:15 — Location: Guangdong

The design of `xxl-job`’s communication layer is a masterclass in combining Netty and multithreading. It demonstrates an ingenious use of asynchronous processing and dynamic proxies to achieve both clarity and high performance.

image

---

Table of Contents

---

1. Introduction to the Communication Layer

`xxl-job` uses Netty HTTP as its default communication protocol.

Although it supports Mina, Jetty, and Netty TCP, the implementation is hardcoded to Netty HTTP.

---

2. Overall Communication Process

Taking the scenario of the scheduler notifying the executor to run a task as an example, here is the corresponding activity diagram:

image

---

3. Impressive Design

The design seamlessly applies Netty and multithreading principles. Key features include:

---

3.1 Dynamic Proxy Pattern for Transparent Communication

  • Interfaces:
  • ExecutorBiz — heartbeat, pause, trigger execution.
  • AdminBiz — callback, registration, deregistration.
  • These interfaces contain no direct communication code.
  • The `XxlRpcReferenceBean.getObject()` method generates proxies which handle remote communication transparently.

---

3.2 Fully Asynchronous Processing

On Receiving Messages:

  • Executor deserializes the request.
  • Task info is stored in a `LinkedBlockingQueue`.
  • An async thread executes tasks from this queue.

On Sending Results:

  • Results are stored in a callback thread queue.
  • Returned to scheduler asynchronously.

Benefits:

  • Reduces Netty worker thread blocking.
  • Improves system throughput.

---

3.3 Asynchronous Wrapped as Synchronous API

Example from scheduler’s `XxlJobTrigger`:

// Example snippet (simplified)
public void triggerJob(...) {
    executorBiz.run(taskInfo); // Code appears synchronous
}

Result: Clear API usage despite asynchronous internals.

---

3.4 Synchronous Result Retrieval via Dynamic Proxy

public static ReturnT runExecutor(TriggerParam triggerParam, String address) {
    ReturnT runResult = null;
    try {
        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
        // Executes asynchronously but returns result synchronously
        runResult = executorBiz.run(triggerParam);
    } catch (Exception e) {
        logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
        runResult = new ReturnT(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
    }
    StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
    runResultSB.append("address:").append(address);
    runResultSB.append("code:").append(runResult.getCode());
    runResultSB.append("msg:").append(runResult.getMsg());
    runResult.setMsg(runResultSB.toString());
    return runResult;
}

The corresponding dynamic proxy logic:

if (CallType.SYNC == callType) {
    XxlRpcFutureResponse futureResponse = new XxlRpcFutureResponse(invokerFactory, xxlRpcRequest, null);
    try {
        client.asyncSend(finalAddress, xxlRpcRequest);
        XxlRpcResponse xxlRpcResponse = futureResponse.get(timeout, TimeUnit.MILLISECONDS);
        if (xxlRpcResponse.getErrorMsg() != null) {
            throw new XxlRpcException(xxlRpcResponse.getErrorMsg());
        }
        return xxlRpcResponse.getResult();
    } catch (Exception e) {
        logger.info(">>>>>>>>>>> xxl-rpc, invoke error, address:{}, XxlRpcRequest{}", finalAddress, xxlRpcRequest);
        throw (e instanceof XxlRpcException) ? e : new XxlRpcException(e);
    } finally {
        futureResponse.removeInvokerFuture();
    }
}

---

3.5 Thread Blocking and Wake-Up Mechanism

Class: `XxlRpcFutureResponse`

  • Blocks calling thread until result is available.
  • Wakes thread when remote response arrives.
public void setResponse(XxlRpcResponse response) {
    this.response = response;
    synchronized (lock) {
        done = true;
        lock.notifyAll();
    }
}

@Override
public XxlRpcResponse get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
    if (!done) {
        synchronized (lock) {
            if (timeout < 0) {
                lock.wait();
            } else {
                long timeoutMillis = (TimeUnit.MILLISECONDS == unit) ? timeout : TimeUnit.MILLISECONDS.convert(timeout, unit);
                lock.wait(timeoutMillis);
            }
        }
    }
    if (!done) {
        throw new XxlRpcException("xxl-rpc, request timeout at:" + System.currentTimeMillis() + ", request:" + request);
    }
    return response;
}

---

3.6 Unique Request ID Mapping

How does the scheduler know which thread to wake up?

  • Each RPC request generates a UUID request ID.
  • The response handler uses this ID to find the correct `XxlRpcFutureResponse`.
public void notifyInvokerFuture(String requestId, final XxlRpcResponse xxlRpcResponse) {
    final XxlRpcFutureResponse futureResponse = futureResponsePool.get(requestId);
    if (futureResponse == null) return;
    if (futureResponse.getInvokeCallback() != null) {
        ...
    }
}

---

3.7 Asynchronous Callback vs Synchronous Response

try {
    executeResponseCallback(new Runnable() {
        @Override
        public void run() {
            if (xxlRpcResponse.getErrorMsg() != null) {
                futureResponse.getInvokeCallback().onFailure(new XxlRpcException(xxlRpcResponse.getErrorMsg()));
            } else {
                futureResponse.getInvokeCallback().onSuccess(xxlRpcResponse.getResult());
            }
        }
    });
} catch (Exception e) {
    logger.error(e.getMessage(), e);
}
} else {
    // Wake up waiting threads
    futureResponse.setResponse(xxlRpcResponse);
}
futureResponsePool.remove(requestId);
image
image

Read the original text

Open in WeChat

---

Key Takeaways

  • Clear Separation of Concerns: Communication hidden behind proxies.
  • High Throughput: Async queues minimize Netty thread blocking.
  • Request-Response Mapping: UUID request IDs ensure correct wake-up.
  • Flexible Pattern: Supports both sync and async API styles.

---

Just like `xxl-job` synchronizes async communication efficiently, global content distribution platforms require robust task coordination.

For example, AiToEarn — an open-source AI content monetization platform — connects:

  • AI-powered content generation
  • Multi-platform publishing (Douyin, Kwai, WeChat, Bilibili, YouTube, etc.)
  • Analytics and AI model ranking (AI模型排名)

It uses task mappings and response synchronization patterns similar to `xxl-job`’s RPC handling, ensuring each publishing job triggers the correct process and pipeline. This guarantees efficiency and scalability for creators.

📘 Explore:

---

If you'd like, I can create a visual flow diagram showing exactly how `xxl-job`'s asynchronous-to-synchronous RPC transformation works, making it even clearer at a glance.

Would you like me to add that diagram?

Read more

Chinese AI Models Stun Silicon Valley: Airbnb Co‑Founder CEO Praises “Better, Faster, Cheaper” — Even Turns Down ChatGPT Collaboration

Chinese AI Models Stun Silicon Valley: Airbnb Co‑Founder CEO Praises “Better, Faster, Cheaper” — Even Turns Down ChatGPT Collaboration

Chinese AI Models Winning Over Global Enterprises While OpenAI has been promoting ChatGPT extensively, Chinese large language models are capturing international markets by sheer capability. --- Airbnb Chooses Alibaba’s Qwen Over OpenAI Recently, Airbnb Co‑founder and CEO Brian Chesky publicly praised Alibaba’s Qwen model: > We rely

By Honghao Wang