Let’s talk about Microservices again!

In order to manage a Microservices system efficiently, people usually enforce all the microservices to follow some common patterns. This helps you standardize the monitoring process and add a new microservice more easily. In the Microservices project that I’m currently working on (at Agency Revolution), we also have to implement a base class for the each different type of microservice. The base class contains the logic to write log in the correct format, to handle some common errors correctly or to alert the developers if something go wrong, etc.

Basically, there are 2 types of Microservices in our system: Synchronous and Asynchronous. I will focus mostly on one type of Async worker in this post: The Message Queue workers. The base class was initially built in Nodejs. After several years of development, we started to face many problems with the design. And now, I’m going to show you how I identified the drawbacks and improved it with a better version in C#.

Why C#? I may explain in another post. But right now, you can take a look at this post first.

How it all began

First, we started with this Inheritance model, the way that most people will think of when they start implementing a Worker base module. We defined a super class that all the workers in the system can derive from. It contains the logic to pull messages from the corresponding queue and activate the main handler function.

// This is Javascript code
// The base class
class WorkerBase {
  constructor(config) { this.queueName = config.queueName; }

  async start() {
    let message;
    do {
      message = await pullMessages(1); // pull 1 message at a time
      await this.processMessage(message);
    } while (message != null);
  }

  // implement this in the derived class
  async processMessage(message) { throw new Error('Not implemented'); }
}

// Worker service 1
class Worker1 extends WorkerBase {
  constructor() { super({ queueName: 'Worker1' }); }

  async processMessage(message) {
    // implement worker1 logic here
  }
}

// Worker service 1
class Worker2 extends WorkerBase {
  constructor() { super({ queueName: 'Worker2' }); }

  async processMessage(message) {
    // implement worker2 logic here
  }
}

// to activate a worker
const worker = new Worker1();
await worker.start();

This is simple and does the job well.

And then we wanted to add…

As the product growing, we wanted to add more logic to our WorkerBase class. The first thing, of course, was some logging and monitoring code to support basic troubleshooting. We simply modified the start() function of the WorkerBase class and added those common things

// logging some message information
logger.info(message.id);
logger.info(message.createdAt);

// track some metrics
await trackProcessingTime(this.queueName, message.id, elapsedTime);
await trackSuccessRate(this.queueName, message.id);

After that, we also wanted to make use of the common Message queue methods. We decided to wrap most of the main logic of the start() function in a try/catch block

try {
  await this.processMessage(message);

  // other logic mentioned before...

  await acknowledgeMessage(message);
} catch (e) {
  await markMessageAsFailed(message);
}

It started to get complicated when we introduced some custom error handling mechanisms. The above catch block was extended, which made it more error-prone and might cause the error handler to throw error, too.

try {
  // other logic...
} catch (e) {
  if (e instanceof HttpError) {
    await trackMonitoringMetric(e);
  }

  if (e instanceof DatabaseError) {
    await trackDatabaseMetric(e);
  }

  // other logic...
}

How about managing Scope? Often in a worker that processes messages from a Message Queue, you will want to separate the scope for each message so one message doesn’t overwrite the data created from another one. Also, in each scope, you may want share some context data. With the above design, each call to the processMessage function creates a child context and we have no way but passing a context object downstream to all the callees that consume the context object.

messages = await pullMessages(10); // assume now you want to process 10 messages at a time
const processInScope = async (message) => {
  var contextData = {
    processId,
    trackingDisabled,
    // other props
  };

  // pass this downstream
  await this.processMessage(message, contextData);

  // and maybe downstream to other functions
  await trackProcessingTime(this.queueName, message.id, elapsedTime, contextData);
};
await Promise.all(messages.map(processInScope));

A straightforward but not scalable solution

Let take a look at the WorkerBase class after we have added all the above requirements.

class WorkerBase {
  async start() {
    let messages;
    do {
      messages = await pullMessages(10); // pull 10 messages at a time

      try {
        const processInScope = async message => {
          // logging some message information
          logger.info(message.id);
          logger.info(message.createdAt);

          const contextData = { processId, trackingDisabled };
          await this.processMessage(message, contextData);

          // track some metrics
          await trackProcessingTime(this.queueName, message.id, elapsedTime, contextData);
          await trackSuccessRate(this.queueName, message.id, contextData);

          await acknowledgeMessage(message);
        };
        await Promise.all(messages.map(processInScope));
      } catch (e) {
        if (e instanceof HttpError) {
          await trackMonitoringMetric(e);
        }

        if (e instanceof DatabaseError) {
          await trackDatabaseMetric(e);
        }

        await markMessageAsFailed(message);
      }
    } while (messages.length !== 0);
  }

  // others...
}

As you can see, there are various issues here.

  • After nearly 10 years of development, more and more features were added to the system, we finally ended up with a WorkerBase class that is more than 2500 lines long. Everybody is scared of touching it.
  • It is hard to add a new feature to the WorkerBase class. We have to scan the main function (which is very long), find the correct place, apply your logic and pray that it doesn’t break the other ones.
  • It is hard to test since all the logic are centralized into one place. The above class contains not only the primary logic but also the other cross-cutting concerns. Every time we want to test just one line of code, we will have to run through all the above stuff.
  • It is hard to enable/disable one specific feature. All the logic are tightly coupling together. In order to disable one, we have to update our code with a new if/else clause (which also makes the start function become longer).
  • Scope management is awful. We actually don’t have anything for this job beside the function scope. A worker instance is actually just a collection of functions, not a scope container. As a result, we have to pass all the context data to all the subsequent functions.

The most trivial solution is, of course, to separate them into smaller functions to make it shorter. However, it doesn’t actually solve the problem, just moves the complexity from one place to another place.

A better way would be to build them as composable components.

To be continued…

Part 2: Refactor a legacy Worker Base - Part 2 - Scope Management