Audit document timestamp issue
In the event of an issue with timestamps on audit documents, please follow the steps below.
Note before starting: Ensure to do a backup of the server if possible and if not possible to back up the whole server, do a backup for ElasticSearch volume data. It is recommended to perform during a day off, because some audit events may be lost while converting documents timestamp formats. For ~100K of documents the whole process will take ~1h (timing depends on amount of data), hence it is worth stopping the ui/proxy/
or redirect users to 'Under construction' page, so they can't login and perform actions.
Prerequisite
Make sure to install Python3 if not yet installed. If missing use the following command to install it.
apt install python3-pippip3 install python-dateutil
Fixing the issue
- Start Elasticdump. In
env.sh
setexport ELASTICDUMP_ENABLED="true"
and then in3.2/utilities/elasticdump/docker-compose.yaml
setINDEX: "auditlog"
. Make sure thatACTION: "export"
and then runstartup.sh
to deploy Elasticdump. Wait till it will be completed (exit status of container must be 0) and check logs to make sure no errors occurred during export. Example of correct logs is given below.
Wed, 06 Nov 2024 11:10:55 GMT | sent 64 objects to destination file, wrote 64Wed, 06 Nov 2024 11:10:55 GMT | got 0 objects from source elasticsearch (offset: 84364)Wed, 06 Nov 2024 11:10:55 GMT | Total Writes: 84364Wed, 06 Nov 2024 11:10:55 GMT | dump completeGoing to output what's in the target directory: /data/openiam/conf/elasticdumptotal 172712drwxr-xr-x 2 root root 4096 Nov 6 10:56 .drwxr-xr-x 3 root root 4096 Nov 6 10:56 ..-rw-r--r-- 1 root root 176843787 Nov 6 11:10 auditlog.json
Here, 84364 audit docs were exported. You can check current count in ElasticSearch by running the following command inside the container with the following output.
curl -X GET "localhost:9200/auditlog/_count?pretty"elasticsearch@93414cede45b:~$ curl -X GET "localhost:9200/auditlog/_count?pretty"{"count" : 84364,"_shards" : {"total" : 1,"successful" : 1,"skipped" : 0,"failed" : 0}}
Make sure that counts must match.
Now we have all documents of audit exported into a file.
- Create python script file
auditLogDateConversion.py
. The file content is given below. Copy file fromopeniam-elasticdump-storage_storage
, in case the default path of volumes used then then it is incp /var/lib/docker/volumes/openiam-elasticdump-storage_storage/_data/auditlog.json
. Run Python script to convert dates in epoch to SO 8601 formats.
Wait till end and once done (no errors should appear) open converted_file.json
and make sure it has the correct number of lines. In this case, there should be 84364 lines as per number of documents exported. Now, the data in this file should be imported back to ElasticSearch.
- Login into ElasticSearch container and delete
auditLog
index by running the following command.
curl -X DELETE "localhost:9200/auditlog/"
Logout and restart ESB container, it will recreate index. Once ESB is up, you can import data.
- In
3.2/utilities/elasticdump/docker-compose.yaml
change action to importACTION: "import"
and remove Elasticdump form stack by running the comand below.
docker stack rm elasticdump
Run startup.sh
to deploy Elasticdump again and check logs of Elasticdump, they should look as follows.
Wed, 06 Nov 2024 13:13:04 GMT | got 100 objects from source file (offset: 84200)Wed, 06 Nov 2024 13:13:04 GMT | sent 100 objects to destination elasticsearch, wrote 100Wed, 06 Nov 2024 13:13:04 GMT | got 64 objects from source file (offset: 84300)Wed, 06 Nov 2024 13:13:04 GMT | sent 64 objects to destination elasticsearch, wrote 64Wed, 06 Nov 2024 13:13:04 GMT | got 0 objects from source file (offset: 84364)Wed, 06 Nov 2024 13:13:04 GMT | Total Writes: 84364Wed, 06 Nov 2024 13:13:04 GMT | dump completeGoing to output what's in the target directory: /data/openiam/conf/elasticdumptotal 179720drwxr-xr-x 2 root root 4096 Nov 6 10:56 .drwxr-xr-x 3 root root 4096 Nov 6 12:59 ..-rw-r--r-- 1 root root 184021746 Nov 6 12:57 auditlog.json
In the end, you should see same number of imported documents.
You can also login into ElasticSearch container and check count again, it should match count of imported documents.
The contents of Python script
auditLogDateConversion.py
import jsonfrom datetime import datetime, timezonefrom dateutil import parser # Import the parser from dateutil to handle ISO 8601 strings# Function to convert epoch to ISO 8601 datetime format with milliseconds and 'Z' suffixdef convert_to_datetime(value):# If the value is already in ISO 8601 format (check for 'Z' at the end for UTC)if isinstance(value, str) and value.endswith('Z'):try:# Use dateutil.parser.isoparse to handle ISO 8601 format for all Python versionsreturn parser.isoparse(value).strftime('%Y-%m-%dT%H:%M:%S.') + f"{parser.isoparse(value).microsecond // 1000:03d}Z"except ValueError:pass # If it doesn't match ISO format, proceed to try epoch conversion# If value is numeric or a string that can be cast to a number (epoch)try:# If the value is a valid integer (or can be converted to int), it's epochvalue = int(value)# Convert the epoch to a datetime objectdt = datetime.fromtimestamp(value / 1000, tz=timezone.utc)# Format it to include milliseconds and append 'Z' for UTCreturn dt.strftime('%Y-%m-%dT%H:%M:%S.') + f"{dt.microsecond // 1000:03d}Z"except (ValueError, TypeError):# If it's neither an epoch nor a valid ISO datetime, return Noneprint(f"Invalid datetime value '{value}' encountered.")return None# Recursive function to traverse nested structures and convert 'lastIndexDateTime' in epoch formatdef convert_nested_data(data):if isinstance(data, dict): # If it's a dictionary, check each keyfor key, value in data.items():if key == 'lastIndexDateTime' or key == 'timestamp': # Check for the specific keysconverted_value = convert_to_datetime(value)if converted_value:data[key] = converted_valueelse:# Recursively call the function for nested objectsconvert_nested_data(value)elif isinstance(data, list): # If it's a list, check each itemfor item in data:convert_nested_data(item)# Define input and output file pathsinput_file_path = "auditlog.json" # Replace with the path to your input fileoutput_file_path = "converted_file.json" # The path for the output file# Open the input file and process each linewith open(input_file_path, 'r') as infile, open(output_file_path, 'w') as outfile:for line in infile:# Parse the JSON linerecord = json.loads(line)# Access '_source' data where the target fields are locatedsource_data = record.get('_source', {})# Convert top-level fields 'timestamp' and 'lastIndexDateTime'if 'timestamp' in source_data:converted_timestamp = convert_to_datetime(source_data['timestamp'])if converted_timestamp:source_data['timestamp'] = converted_timestampelse:print(f"Error converting 'timestamp' for record ID: {record.get('_id')}")if 'lastIndexDateTime' in source_data:converted_last_index = convert_to_datetime(source_data['lastIndexDateTime'])if converted_last_index:source_data['lastIndexDateTime'] = converted_last_indexelse:print(f"Error converting 'lastIndexDateTime' for record ID: {record.get('_id')}")# Convert nested fields like 'attributes' or 'targetUsers'convert_nested_data(source_data)# Write the modified record to the output file as a single line of JSONoutfile.write(json.dumps(record) + '\n')