Audit document timestamp issue

In the event of an issue with timestamps on audit documents, please follow the steps below.

Note before starting: Ensure to do a backup of the server if possible and if not possible to back up the whole server, do a backup for ElasticSearch volume data. It is recommended to perform during a day off, because some audit events may be lost while converting documents timestamp formats. For ~100K of documents the whole process will take ~1h (timing depends on amount of data), hence it is worth stopping the ui/proxy/ or redirect users to 'Under construction' page, so they can't login and perform actions.

Prerequisite

Make sure to install Python3 if not yet installed. If missing use the following command to install it.

apt install python3-pip
pip3 install python-dateutil

Fixing the issue

  1. Start Elasticdump. In env.sh set export ELASTICDUMP_ENABLED="true" and then in 3.2/utilities/elasticdump/docker-compose.yaml set INDEX: "auditlog". Make sure that ACTION: "export" and then run startup.sh to deploy Elasticdump. Wait till it will be completed (exit status of container must be 0) and check logs to make sure no errors occurred during export. Example of correct logs is given below.
Wed, 06 Nov 2024 11:10:55 GMT | sent 64 objects to destination file, wrote 64
Wed, 06 Nov 2024 11:10:55 GMT | got 0 objects from source elasticsearch (offset: 84364)
Wed, 06 Nov 2024 11:10:55 GMT | Total Writes: 84364
Wed, 06 Nov 2024 11:10:55 GMT | dump complete
Going to output what's in the target directory: /data/openiam/conf/elasticdump
total 172712
drwxr-xr-x 2 root root 4096 Nov 6 10:56 .
drwxr-xr-x 3 root root 4096 Nov 6 10:56 ..
-rw-r--r-- 1 root root 176843787 Nov 6 11:10 auditlog.json

Here, 84364 audit docs were exported. You can check current count in ElasticSearch by running the following command inside the container with the following output.

curl -X GET "localhost:9200/auditlog/_count?pretty"
elasticsearch@93414cede45b:~$ curl -X GET "localhost:9200/auditlog/_count?pretty"
{
"count" : 84364,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
}
}

Make sure that counts must match.

Now we have all documents of audit exported into a file.

  1. Create python script file auditLogDateConversion.py. The file content is given below. Copy file from openiam-elasticdump-storage_storage, in case the default path of volumes used then then it is in cp /var/lib/docker/volumes/openiam-elasticdump-storage_storage/_data/auditlog.json. Run Python script to convert dates in epoch to SO 8601 formats.

Wait till end and once done (no errors should appear) open converted_file.json and make sure it has the correct number of lines. In this case, there should be 84364 lines as per number of documents exported. Now, the data in this file should be imported back to ElasticSearch.

  1. Login into ElasticSearch container and delete auditLog index by running the following command.
curl -X DELETE "localhost:9200/auditlog/"

Logout and restart ESB container, it will recreate index. Once ESB is up, you can import data.

  1. In 3.2/utilities/elasticdump/docker-compose.yaml change action to import ACTION: "import" and remove Elasticdump form stack by running the comand below.
docker stack rm elasticdump

Run startup.sh to deploy Elasticdump again and check logs of Elasticdump, they should look as follows.

Wed, 06 Nov 2024 13:13:04 GMT | got 100 objects from source file (offset: 84200)
Wed, 06 Nov 2024 13:13:04 GMT | sent 100 objects to destination elasticsearch, wrote 100
Wed, 06 Nov 2024 13:13:04 GMT | got 64 objects from source file (offset: 84300)
Wed, 06 Nov 2024 13:13:04 GMT | sent 64 objects to destination elasticsearch, wrote 64
Wed, 06 Nov 2024 13:13:04 GMT | got 0 objects from source file (offset: 84364)
Wed, 06 Nov 2024 13:13:04 GMT | Total Writes: 84364
Wed, 06 Nov 2024 13:13:04 GMT | dump complete
Going to output what's in the target directory: /data/openiam/conf/elasticdump
total 179720
drwxr-xr-x 2 root root 4096 Nov 6 10:56 .
drwxr-xr-x 3 root root 4096 Nov 6 12:59 ..
-rw-r--r-- 1 root root 184021746 Nov 6 12:57 auditlog.json

In the end, you should see same number of imported documents.

You can also login into ElasticSearch container and check count again, it should match count of imported documents.

The contents of Python script

auditLogDateConversion.py

import json
from datetime import datetime, timezone
from dateutil import parser # Import the parser from dateutil to handle ISO 8601 strings
# Function to convert epoch to ISO 8601 datetime format with milliseconds and 'Z' suffix
def convert_to_datetime(value):
# If the value is already in ISO 8601 format (check for 'Z' at the end for UTC)
if isinstance(value, str) and value.endswith('Z'):
try:
# Use dateutil.parser.isoparse to handle ISO 8601 format for all Python versions
return parser.isoparse(value).strftime('%Y-%m-%dT%H:%M:%S.') + f"{parser.isoparse(value).microsecond // 1000:03d}Z"
except ValueError:
pass # If it doesn't match ISO format, proceed to try epoch conversion
# If value is numeric or a string that can be cast to a number (epoch)
try:
# If the value is a valid integer (or can be converted to int), it's epoch
value = int(value)
# Convert the epoch to a datetime object
dt = datetime.fromtimestamp(value / 1000, tz=timezone.utc)
# Format it to include milliseconds and append 'Z' for UTC
return dt.strftime('%Y-%m-%dT%H:%M:%S.') + f"{dt.microsecond // 1000:03d}Z"
except (ValueError, TypeError):
# If it's neither an epoch nor a valid ISO datetime, return None
print(f"Invalid datetime value '{value}' encountered.")
return None
# Recursive function to traverse nested structures and convert 'lastIndexDateTime' in epoch format
def convert_nested_data(data):
if isinstance(data, dict): # If it's a dictionary, check each key
for key, value in data.items():
if key == 'lastIndexDateTime' or key == 'timestamp': # Check for the specific keys
converted_value = convert_to_datetime(value)
if converted_value:
data[key] = converted_value
else:
# Recursively call the function for nested objects
convert_nested_data(value)
elif isinstance(data, list): # If it's a list, check each item
for item in data:
convert_nested_data(item)
# Define input and output file paths
input_file_path = "auditlog.json" # Replace with the path to your input file
output_file_path = "converted_file.json" # The path for the output file
# Open the input file and process each line
with open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:
for line in infile:
# Parse the JSON line
record = json.loads(line)
# Access '_source' data where the target fields are located
source_data = record.get('_source', {})
# Convert top-level fields 'timestamp' and 'lastIndexDateTime'
if 'timestamp' in source_data:
converted_timestamp = convert_to_datetime(source_data['timestamp'])
if converted_timestamp:
source_data['timestamp'] = converted_timestamp
else:
print(f"Error converting 'timestamp' for record ID: {record.get('_id')}")
if 'lastIndexDateTime' in source_data:
converted_last_index = convert_to_datetime(source_data['lastIndexDateTime'])
if converted_last_index:
source_data['lastIndexDateTime'] = converted_last_index
else:
print(f"Error converting 'lastIndexDateTime' for record ID: {record.get('_id')}")
# Convert nested fields like 'attributes' or 'targetUsers'
convert_nested_data(source_data)
# Write the modified record to the output file as a single line of JSON
outfile.write(json.dumps(record) + '\n')