In this article I want to share my experience with developing an AI service for a web analytics platform, called Swetrix.
My aim was to develop a machine learning model that would predict future website traffic based on the data displayed on the following screenshot
The end goal is to have a clear vision for the customer of what traffic will appear on their website in the future, thereby allowing them to get better insights and enhance business planning in general.
2. Requirements and Architecture
During the planning there was a decision made to proceed with Microservice Architecture with RabbitMQ message broker for communication between AI and API services.
First of all, we need to gather data with an hourly cron task into a separate database. We decided to choose a ClickHouse, since the original data from websites on Swetrix is stored in it. Details about the format will be covered during the next sections.
RabbitMQ was chosen as a message broker due to its simplicity and we need to establish a communication between AI and API services. Let’s breakdown everything and check the main logic
Swetrix-API Service:
- Gathers data statistics hourly via Cron Task and sends raw data to the AI service.
- Inserts and receives pre-processed data from ClickHouse.
Swetrix-AI Service:
- Processes the raw data and selected preferences (interval and subcategory) for forecasting.
- Converts the forecast data into JSON format and sends it back to the API service via RabbitMQ.
Swetrix-AI service will be using NestJs framework for the backend side and Python scripts for data pre-processing and model predictions.
3. Preprocessing
We gather the following data about projects into an analytics
table. You have already seen the rendered version of this data in the first section of the article.
I was able to achieve this (almost acceptable) result with the following query:
@Cron(CronExpression.EVERY_HOUR)
async insertHourlyProjectData(): Promise<void> {
const gatherProjectsData = `
INSERT INTO analytics.hourly_projects_data
(UniqueID, projectID, statisticsGathered, br_keys, br_vals, os_keys, os_vals, lc_keys, lc_vals, ref_keys, ref_vals, so_keys, so_vals, me_keys, me_vals, ca_keys, ca_vals, cc_keys, cc_vals, dv_keys, dv_vals, rg_keys, rg_vals, ct_keys, ct_vals)
SELECT
generateUUIDv4() as UniqueID,
pid as projectID,
toStartOfHour(now()) as statisticsGathered,
groupArray(br) as br_keys,
groupArray(br_count) as br_vals,
groupArray(os) as os_keys,
groupArray(os_count) as os_vals,
...
groupArray(ct) as ct_keys,
groupArray(ct_count) as ct_vals
FROM (
SELECT
pid,
br, count(*) as br_count,
os, count(*) as os_count,
...
ct, count(*) as ct_count
FROM analytics.analytics
GROUP BY pid, br, os, lc, ref, so, me, ca, cc, dv, rg, ct
)
GROUP BY pid;
`
try {
await clickhouse.query(gatherProjectsData).toPromise()
} catch (e) {
console.error(
`[CRON WORKER] Error whilst gathering hourly data for all projects: ${e}`,
)
The function is scheduled to run every hour using a Cron Job. It gathers and inserts analytics data into a clickhouse analytics.hourly_projects_data
.
Output
Due to ClickHouse limitations I wasn’t able to achieve the desired format of the data. Therefore I decided to usepandas
to complete the preprocessing, required for the training of the model.
Specifically I used Python to do the following:
3.1 Combine Keys & Values
Combine keys and values related to one category into one JSON field, for instance combining keys and values of devices into one object as such.
os_keys = {“Windows”, ”MacOS”, ”MacOS”, ”MacOS”, ”Linux”}
os_values = {1, 2, 2, 1, 5}
Into:
os = {“Windows”: 1, “MacOS”: 5, “Linux”: 5}
Attaching the code and output:
def format_data(keys_list, vals_list, threshold):
"""
Format data by converting string representations of lists to actual lists,
then sums up the counts for each key. Keys with counts below a specified threshold are aggregated into 'Other'.
"""
counts = defaultdict(int)
for keys_str, vals_str in zip(keys_list, vals_list):
keys = ast.literal_eval(keys_str)
vals = ast.literal_eval(vals_str)
for key, val in zip(keys, vals):
counts[key] += val
final_data = defaultdict(int)
for value, count in counts.items():
final_data[value] = count
return dict(final_data)
def process_group(group):
"""
Combine specific groups by a group clause, and make a
"""
result = {}
for col in group.columns:
if col.endswith('_keys'):
prefix = col.split('_')[0] # Extract prefix to identify the category (e.g., 'br' for browsers)
threshold = other_thresholds.get(prefix, 1) # Get the threshold for this category, default to 1
vals_col = col.replace('_keys', '_vals')
keys_list = group[col].tolist()
vals_list = group[vals_col].tolist()
result[col.replace('_keys', '')] = format_data(keys_list, vals_list, threshold)
return pd.Series(result)
This format of data won’t be used for the prediction itself, I would say, that it’s more for storing it in the database and debugging purposes to verify that there are no missing values and furthermore, to double check that the model produces an accurate result.
Output
3.2 Combine Keys & Values
To train an adequate model I decided to define other groups for various categories. Which means that if globally the number of instances of a group in a specific category is below a certain percent (%), it will be added as part of the other.
For instance, in the os
category we have:
{“MacOS”: 300, “Windows”: 400, “Linux”: 23 and “TempleOS”: 10}
Since both Linux and TempleOS in this case are extremely rare they will be combined into other group, hence the end result will be:
{“MacOS”: 300, “Windows”: 400, “other”: 33}.
And the “rarity” is determined differently depending on the category and based on the designated to this category threshold.
It can be configurable based on the preferences and desired data for the customer
other_thresholds = {
'br': 0.06,
'os': 0.04,
'cc': 0.02,
'lc': 0.02,
'ref': 0.02,
'so': 0.03,
'me': 0.03,
'ca': 0.03,
'cc': 0.02,
'dv': 0.02,
'rg': 0.01,
'ct': 0.01
}
There were 2 functions implemented in order to achieve this
def get_groups_by_treshholds(df,column_name):
"""Calculate total values for all columns"""
if column_name in EXCLUDED_COLUMNS:
return
counter = count_dict_values(df[column_name])
total = sum(counter.values())
list1 = []
for key, value in counter.items():
if not (value / total) < other_thresholds[column_name]:
list1.append(key)
return list1
def create_group_columns(df):
column_values = []
for key in other_thresholds.keys():
groups = get_groups_by_treshholds(df, key)
if not groups:
continue
for group in groups:
column_values.append(f"{key}_{group}")
column_values.append(f"{key}_other")
return column_values
column_values = create_group_columns(df)
column_values
Output
['br_Chrome', 'br_Firefox', 'os_Mac OS', 'os_other',
'cc_UA', 'cc_GB', 'cc_other',
'dv_mobile', 'dv_desktop', 'dv_other']
When working with machine learning models, it's crucial that the input data is in a format that the model can understand. Machine learning models typically require numerical values (integers, floats) rather than complex data structures like JSON.
Therefore, again, it is preferable to to a little bit more preprocessing of our data to fit this requirement.
I have created a function create_exploded_df
where each feature is represented as a separate column, and the rows contain the corresponding numerical values. (It is not ideal yet, but that was the best solution I was able to produce)
def create_exploded_df(df):
"""
Function which creates a new data set, iterates through the old one
and fill in values according to their belongings (br_other, etc..)
"""
new_df = df[['projectID', 'statisticsGathered']]
for group in column_values:
new_df[group] = 0
new_df_cols = new_df.columns
df_cols = df.columns
for column in df_cols:
if column in ['projectID', 'statisticsGathered']:
continue
for index, row in enumerate(df[column]):
if column in EXCLUDED_COLUMNS:
continue
for key, value in row.items():
total = 0
if (a:=f"{column}_{key}") in new_df_cols:
new_df[a][index] = value
else:
total += value
new_df[f"{column}_other"][index] = total
return new_df
new_df = create_exploded_df(df)
new_df.to_csv("2-weeks-exploded.csv")
new_df
Output
3.3 Fill in hours
Another problem with the format of data we had is that if there were no traffic for a project in a specific hour instead of creating a blank row, there would be no row at all, which is inconvenient considering the fact that the model is designed to predict data for the upcoming time frame (e.g., the next hour). However, it is not feasible to train the model to make predictions if there is no data available for the initial time frame.
Therefore I wrote a script that would find missing hours and insert blank rows when an hour is skipped
3.4 Add and shift target columns
Regarding model training, the primary approach was to use data from the previous hour as the target for the model. This allows the model to predict future traffic based on the current data.
def sort_df_and_assign_targets(df):
df = df.copy()
df = df.sort_values(by=['projectID', 'statisticsGathered'])
for column_name in df.columns:
if not column_name.endswith('target'):
continue
df[column_name] = df.groupby('projectID')[column_name].shift(-1)
return df
new_df = sort_df_and_assign_targets(new_df)
Output
3.5 Split statisticsGathered
into separate columns
The main reason for such an approach is that statisticsGathered
was a datetime
object, which models I have tried to use (check the subsequent sections) are not able to process it, and identify the correct pattern.
That resulted in terrible MSE/MRSE
metrics. So during development the decision was made to separate features for day
, month
, and hour
which enhanced results significantly.
def split_statistic_gathered(df):
df['Month'] = df['statisticsGathered'].dt.month.astype(int) # as int
df['Day'] = df['statisticsGathered'].dt.day.astype(int) # as int
df['Hour'] = df['statisticsGathered'].dt.hour
df = df.drop('statisticsGathered', axis = 1)
return df
new_df = split_statistic_gathered(new_df)
new_df
Output
And that’s it! Let’s jump to the training itself! 🎉🎉🎉
4. Linear Regression
Well, I guess, the actual prediction was the most challenging part during building this application.
First thing I wanted to try is to use LinearRegression
model:
I implemented the following functions:
def create_model_for_target(train_df, target_series):
X_train, x_test, Y_train, y_test = train_test_split(train_df, target_series, test_size=0.3, shuffle=False)
reg = LinearRegression()
reg.fit(X_train, Y_train)
y_pred = reg.predict(x_test)
return {"y_test": y_test, "y_pred": y_pred}
def create_models_for_targets(df):
models_data = dict()
df = df.dropna()
train_df = clear_df(df)
for target_name in df[[column_name for column_name in df.columns if column_name.endswith("target")]]:
models_data[target_name] = create_model_for_target(train_df, df[target_name])
return models_data
Explanation
For each target column, we split the data into training and test sets. We then train a LinearRegression
model on the training data and make predictions on the test data.
In order to evaluate that results are correct I added the function which gathers required metrics and produces the output
def evaluate_models(data):
evaluation = []
for target, results in data.items():
y_test, y_pred = results['y_test'], results['y_pred']
mse = mean_squared_error(y_test, y_pred)
rmse = mean_squared_error(y_test, y_pred) ** 0.5
mae = mean_absolute_error(y_test, y_pred)
mean_y = y_test.mean()
median_y = y_test.median()
evaluation.append({'target': target, 'mse': mse, 'rmse': rmse, 'mae': mae, 'mean_y': mean_y, 'median_y': median_y})
return pd.DataFrame(evaluation)
Output
I wrote a script which generated the output and saved it into excel file, accounting mse
, rmse
, mae
and mean_y
values
As you can see the metrics are not satisfactory and the predicted traffic data will be far from accurate and not suitable for my goals of traffic forecasts.
Therefore, I made a decision to predict totals of visitors per hour, so that the following functions were created
def add_target_column(df, by):
totals_series = df.apply(lambda x: sum(x[[column for column in df.columns if column.startswith(by)]]), axis=1)
df['total'] = totals_series
df[f'total_{by}_target'] = totals_series
return df
def shift_target_column(df, by):
df = df.sort_values(by=['projectID', 'statisticsGathered'], ignore_index=True)
df['total_target'] = df.groupby('projectID')[f'total_{by}_target'].shift(-1)
return df
new_df = add_target_column(new_df, 'br')
new_df = shift_target_column(new_df, 'br')
new_df[['total_br_target']]
Output
This function takes a specific category and calculates total visitors based on it. This works because the total number of Device values would be the same as the total number of OS values.
With such an approach, the model showed 10x better results than it was before.
5. Conclusion
If we are talking about this case, it is almost acceptable and ready to use feature. Customers now can plan their budget allocation and server scaling depending on the result of these predictions
Predictions deviate from the actual values by around 2.45 visitors (since RMSE = √MSE). Which cannot have any negative crucial impact for the marketing needs.
As this article has grown quite extensive and the app remains under development, we will pause here. We will continue to refine this approach moving forward and I will keep you updated!
Thanks for reading and your attention! I look forward to hearing your feedback and thoughts in the comments section. I hope this information proves to be useful for your objectives!
And good luck!