How I'm Building an AI for Analytics Service

cover
23 May 2024

In this article I want to share my experience with developing an AI service for a web analytics platform, called Swetrix.

My aim was to develop a machine learning model that would predict future website traffic based on the data displayed on the following screenshot

Figure 1 - The project

The end goal is to have a clear vision for the customer of what traffic will appear on their website in the future, thereby allowing them to get better insights and enhance business planning in general.

2. Requirements and Architecture

During the planning there was a decision made to proceed with Microservice Architecture with RabbitMQ message broker for communication between AI and API services.

Figure 2 - Architecture

First of all, we need to gather data with an hourly cron task into a separate database. We decided to choose a ClickHouse, since the original data from websites on Swetrix is stored in it. Details about the format will be covered during the next sections.

RabbitMQ was chosen as a message broker due to its simplicity and we need to establish a communication between AI and API services. Let’s breakdown everything and check the main logic

Swetrix-API Service:

  • Gathers data statistics hourly via Cron Task and sends raw data to the AI service.
  • Inserts and receives pre-processed data from ClickHouse.

Swetrix-AI Service:

  • Processes the raw data and selected preferences (interval and subcategory) for forecasting.
  • Converts the forecast data into JSON format and sends it back to the API service via RabbitMQ.

Swetrix-AI service will be using NestJs framework for the backend side and Python scripts for data pre-processing and model predictions.

3. Preprocessing

We gather the following data about projects into an analytics table. Figure 3 - Raw Data in the DBYou have already seen the rendered version of this data in the first section of the article.

I was able to achieve this (almost acceptable) result with the following query:

  @Cron(CronExpression.EVERY_HOUR)
  async insertHourlyProjectData(): Promise<void> {
    const gatherProjectsData = `
      INSERT INTO analytics.hourly_projects_data
      (UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals)
      SELECT
        generateUUIDv4() as UniqueID,
        pid as projectID,
        toStartOfHour(now()) as statisticsGathered,
        groupArray(br) as br_keys,
        groupArray(br_count) as br_vals,
        groupArray(os) as os_keys,
        groupArray(os_count) as os_vals,
        ...
        groupArray(ct) as ct_keys,
        groupArray(ct_count) as ct_vals
      FROM (
        SELECT
          pid,
          br, count(*) as br_count,
          os, count(*) as os_count,
          ...
          ct, count(*) as ct_count
        FROM analytics.analytics
        GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct
      )
      GROUP BY pid;
    `
    try {
      await clickhouse.query(gatherProjectsData).toPromise()
    } catch (e) {
      console.error(
        `[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`,
      )

The function is scheduled to run every hour using a Cron Job. It gathers and inserts analytics data into a clickhouse analytics.hourly_projects_data.

Output

Figure 4 - Processed data
Due to ClickHouse limitations I wasn’t able to achieve the desired format of the data. Therefore I decided to usepandas to complete the preprocessing, required for the training of the model.

Specifically I used Python to do the following:

3.1 Combine Keys & Values

Combine keys and values related to one category into one JSON field, for instance combining keys and values of devices into one object as such.

os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”}
os_values = {1, 2, 2, 1, 5}

Into:

os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}

Attaching the code and output:

def format_data(keys_list, vals_list, threshold):
    """
    Format data by converting string representations of lists to actual lists,
    then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'.
    """
    counts = defaultdict(int)
    for keys_str, vals_str in zip(keys_list, vals_list):
        keys = ast.literal_eval(keys_str)
        vals = ast.literal_eval(vals_str)
        for key, val in zip(keys, vals):
            counts[key] += val

    final_data = defaultdict(int)

    for value, count in counts.items():
      final_data[value] = count

    return dict(final_data)


def process_group(group):
  """
  Combine specific groups by a group clause, and make a
  """
  result = {}
  for col in group.columns:
      if col.endswith('_keys'):
          prefix = col.split('_')[0]  # Extract prefix to identify the category (e.g., 'br' for browsers)
          threshold = other_thresholds.get(prefix, 1)  # Get the threshold for this category, default to 1
          vals_col = col.replace('_keys', '_vals')
          keys_list = group[col].tolist()
          vals_list = group[vals_col].tolist()
          result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold)
  return pd.Series(result)

This format of data won’t be used for the prediction itself, I would say, that it’s more for storing it in the database and debugging purposes to verify that there are no missing values and furthermore, to double check that the model produces an accurate result.

Output
Figure 5 - Stored Data Pandas Representation3.2 Combine Keys & Values

To train an adequate model I decided to define other groups for various categories. Which means that if globally the number of instances of a group in a specific category is below a certain percent (%), it will be added as part of the other.

For instance, in the os category we have:

{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10} 

Since both Linux and TempleOS in this case are extremely rare they will be combined into other group, hence the end result will be:

{“MacOS”: 300, “Windows”: 400, “other”: 33}.

And the “rarity” is determined differently depending on the category and based on the designated to this category threshold.

It can be configurable based on the preferences and desired data for the customer

other_thresholds = {
    'br': 0.06,
    'os': 0.04,
    'cc': 0.02,
    'lc': 0.02,
    'ref': 0.02,
    'so': 0.03,
    'me': 0.03,
    'ca': 0.03,
    'cc': 0.02,
    'dv': 0.02,
    'rg': 0.01,
    'ct': 0.01
}

There were 2 functions implemented in order to achieve this

def get_groups_by_treshholds(df,column_name):
    """Calculate total values for all columns"""
    if column_name in EXCLUDED_COLUMNS:
      return
    counter = count_dict_values(df[column_name])
    total = sum(counter.values())
    list1 = []
    for key, value in counter.items():
      if not (value / total) < other_thresholds[column_name]:
         list1.append(key)
    return list1


def create_group_columns(df):
    column_values = []
    for key in other_thresholds.keys():
      groups = get_groups_by_treshholds(df, key)
      if not groups:
        continue
      for group in groups:
        column_values.append(f"{key}_{group}")
           column_values.append(f"{key}_other")
    return column_values

column_values = create_group_columns(df)
column_values

Output

['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other',
'cc_UA', 'cc_GB', 'cc_other',
'dv_mobile', 'dv_desktop', 'dv_other']

When working with machine learning models, it's crucial that the input data is in a format that the model can understand. Machine learning models typically require numerical values (integers, floats) rather than complex data structures like JSON.

Therefore, again, it is preferable to to a little bit more preprocessing of our data to fit this requirement.

I have created a function create_exploded_df where each feature is represented as a separate column, and the rows contain the corresponding numerical values. (It is not ideal yet, but that was the best solution I was able to produce)

def create_exploded_df(df):
  """
  Function which creates a new data set, iterates through the old one
  and fill in values according to their belongings (br_other, etc..)
  """
  new_df = df[['projectID', 'statisticsGathered']]

  for group in column_values:
    new_df[group] = 0

  new_df_cols = new_df.columns
  df_cols = df.columns

  for column in df_cols:
    if column in ['projectID', 'statisticsGathered']:
      continue
    for index, row in enumerate(df[column]):
      if column in EXCLUDED_COLUMNS:
        continue
      for key, value in row.items():
        total = 0
        if (a:=f"{column}_{key}") in new_df_cols:
          new_df[a][index] = value
        else:
          total += value
        new_df[f"{column}_other"][index] = total

  return new_df

new_df = create_exploded_df(df)
new_df.to_csv("2-weeks-exploded.csv")
new_df

Output

Figure 6 - Model Features  3.3 Fill in hours

Another problem with the format of data we had is that if there were no traffic for a project in a specific hour instead of creating a blank row, there would be no row at all, which is inconvenient considering the fact that the model is designed to predict data for the upcoming time frame (e.g., the next hour). However, it is not feasible to train the model to make predictions if there is no data available for the initial time frame.

Therefore I wrote a script that would find missing hours and insert blank rows when an hour is skipped

Figure 7 - Filled In Hours

3.4 Add and shift target columns

Regarding model training, the primary approach was to use data from the previous hour as the target for the model. This allows the model to predict future traffic based on the current data.

def sort_df_and_assign_targets(df):
  df = df.copy()
  df = df.sort_values(by=['projectID', 'statisticsGathered'])

  for column_name in df.columns:
    if not column_name.endswith('target'):
      continue

    df[column_name] = df.groupby('projectID')[column_name].shift(-1)
  return df

new_df = sort_df_and_assign_targets(new_df)

Output

Figure 8 - Model Predictions

3.5 Split statisticsGathered into separate columns

The main reason for such an approach is that statisticsGathered was a datetime object, which models I have tried to use (check the subsequent sections) are not able to process it, and identify the correct pattern.

That resulted in terrible MSE/MRSE metrics. So during development the decision was made to separate features for day, month, and hour which enhanced results significantly.

def split_statistic_gathered(df):
  df['Month'] = df['statisticsGathered'].dt.month.astype(int)  # as int
  df['Day'] = df['statisticsGathered'].dt.day.astype(int)  # as int
  df['Hour'] = df['statisticsGathered'].dt.hour
  df = df.drop('statisticsGathered', axis = 1)
  return df

new_df = split_statistic_gathered(new_df)
new_df

Output
Figure 9 - Converted statisticsGathered

And that’s it! Let’s jump to the training itself! 🎉🎉🎉

4. Linear Regression

Well, I guess, the actual prediction was the most challenging part during building this application.

First thing I wanted to try is to use LinearRegression model:

I implemented the following functions:

def create_model_for_target(train_df, target_series):
    X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False)
    reg = LinearRegression()
    reg.fit(X_train, Y_train)
    y_pred = reg.predict(x_test)
    return {"y_test": y_test, "y_pred": y_pred}

def create_models_for_targets(df):
    models_data = dict()
    df = df.dropna()
    train_df = clear_df(df)

    for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]:
        models_data[target_name] = create_model_for_target(train_df, df[target_name])

    return models_data

Explanation

For each target column, we split the data into training and test sets. We then train a LinearRegression model on the training data and make predictions on the test data.

In order to evaluate that results are correct I added the function which gathers required metrics and produces the output

def evaluate_models(data):
    evaluation = []

    for target, results in data.items():
        y_test, y_pred = results['y_test'], results['y_pred']
        mse = mean_squared_error(y_test, y_pred)
        rmse = mean_squared_error(y_test, y_pred) ** 0.5
        mae = mean_absolute_error(y_test, y_pred)
        mean_y = y_test.mean()
        median_y = y_test.median()
        evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y})

    return pd.DataFrame(evaluation)

Output

I wrote a script which generated the output and saved it into excel file, accounting mse, rmse, mae and mean_y values

Figure 10 - Initial Results (Without Total)

As you can see the metrics are not satisfactory and the predicted traffic data will be far from accurate and not suitable for my goals of traffic forecasts.

Therefore, I made a decision to predict totals of visitors per hour, so that the following functions were created

def add_target_column(df, by):
  totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1)
  df['total'] = totals_series
  df[f'total_{by}_target'] = totals_series
  return df

def shift_target_column(df, by):
  df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True)
  df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1)
  return df

new_df = add_target_column(new_df, 'br')
new_df = shift_target_column(new_df, 'br')

new_df[['total_br_target']]

Output

Figure 11 - Total Target This function takes a specific category and calculates total visitors based on it. This works because the total number of Device values would be the same as the total number of OS values.

With such an approach, the model showed 10x better results than it was before.

5. Conclusion

If we are talking about this case, it is almost acceptable and ready to use feature. Customers now can plan their budget allocation and server scaling depending on the result of these predictions

Figure 12 -Total ResultsPredictions deviate from the actual values by around 2.45 visitors (since RMSE = √MSE). Which cannot have any negative crucial impact for the marketing needs.

As this article has grown quite extensive and the app remains under development, we will pause here. We will continue to refine this approach moving forward and I will keep you updated!

Thanks for reading and your attention! I look forward to hearing your feedback and thoughts in the comments section. I hope this information proves to be useful for your objectives!

And good luck!