Migrating Tempo Worklogs to WorklogPRO

Tempo Timesheet application stores some part of worklogs in its own database. These parts are;

  • Worklog author

  • Worklog description

  • Worklog attributes

When you migrate from Tempo to another timesheet app, all when you view the worklogs through Jira’s own UI or REST API, all these information will be missing. Since attributes are Tempo specific information, it will not be visible at all in other apps or in Jira UI. This is something expected. But losing worklog author and worklog description is not something most people expect. Worklog author will be always displayed as “Tempo Timesheet App”, and work description will be always displayed as “time-tracking”. In short, you will lose actual value of worklog author and worklog description. This misrepresentation can cause issues in reporting, accountability, and overall data management within Jira. As discussed in the Atlassian Developer Community thread titled "Unable to get user worklogs using Jira API when Tempo is installed", other developers have faced similar challenges when trying to retrieve accurate user work logs via the Jira API when Tempo is installed.

Here is how the Jira work logs look like after migrating from Tempo; as you can see, all of the work log authors are “Tempo Timesheets” and work descriptions are “time-tracking”.

tempo-timesheet.png

 

In order for our customers to be able to migrate from Tempo to WorklogPRO we have prepared a Python Script which will connect to Tempo API and migrate this information to a format WorklogPRO can display. This is done by storing tempo specific information on Jira’s worklog properties object.

An example for WorklogPRO property value on a Jira work log after running the script

{ "version":2, "authorAccountId": "1234567abcdefg", "comment": "This is comment from Tempo UI", "attributes": { "ea34023-232ac-222ef": "Attribute value", "a2a3-40-232ac-222ee": "Nother attribute value" } }

You may think that instead of adding this information to Worklog property why the app is not updating actual author information on the worklog itself. Unfortunately, Jira REST API doesn’t allow modifying worklog author. Another option may be deleting the original worklog and recreating the worklog using the new information. Since this is a destructive operation, we didn’t want to do this.

Key Points About the Script

  • Utilizes threads for faster worklog updates, crucial for handling large data volumes.

  • Manages concurrency to respect Jira's API rate limits, adjusting request timing to prevent failures.

  • Updates Jira worklog properties with accurate author data from Tempo, ensuring data integrity post-migration.

  • Provides real-time progress updates and color-coded messages for easy monitoring during migration.

How to Use the Script

Script will require you to define some environment variables before running it. These are required to connect Tempo API, Jira API and WorklogPRO API.

  • Specify environment variables specified on the below table.

  • Replace https://your-jira-domain.atlassian.net string with your actual site.

  • Then, run the following command in your terminal: pip install requests==2.31.0 && pip install tqdm==4.66.1

  • You need to create the corresponding “Attribute” and “Attribute value” definitions in WorklogPRO. Names should match exactly to Tempo attributes. For example, if you have a “Work type” attribute in Tempo, you need to create “Work type” attribute in WorklogPRO. Also, if you have “Testing” as an attribute value within “Work type” attribute, you should also create the same attribute value “Testing” in “Work type” attribute of WorklogPRO.

After these steps you can run the script by running this command python <name_of_the_script>.py

The Script

 

 

 

 

TEMPO_API_TOKEN

Required to connect to Tempo REST API. We will use this token to pull real author, description and attribute values of worklogs from Tempo.

To get an API key for Tempo, you need to follow this path in your Jira: Tempo > Settings > OAuth 2.0 Applications

JIRA_API_TOKEN

Required to connect to Jira REST API. We will use this token to update worklog properties. We will store Tempo specific information on worklog itself as properties.

Go to this link and create an API key: https://id.atlassian.com/manage-profile/security/api-tokens

JIRA_EMAIL

Required to connect to Jira REST API.

WP_API_TOKEN

Required to connect WorklogPRO GraphQL API. We need this information to fetch

 

import os import time import random import asyncio import requests from tqdm import tqdm from concurrent.futures import ThreadPoolExecutor from typing import TypedDict, Callable, List, Dict, Any, Tuple class AttributeTypeOption: id: str name: str class AttributeType: id: str name: str options: List[AttributeTypeOption] # Types class Issue(TypedDict): self: str id: int class Author(TypedDict): self: str accountId: str class AttributeValues(TypedDict): self: str values: List[dict] class TempoWorklog(TypedDict): self: str tempoWorklogId: int issue: Issue timeSpentSeconds: int billableSeconds: int startDate: str startTime: str description: str createdAt: str updatedAt: str author: Author attributes: AttributeValues # Configuration TEMPO_API_TOKEN = os.getenv('TEMPO_API_TOKEN') JIRA_API_TOKEN = os.getenv('JIRA_API_TOKEN') JIRA_EMAIL = os.getenv('JIRA_EMAIL') WP_API_TOKEN = os.getenv('WP_API_TOKEN') WP_GRAPH_QL_URL = os.getenv('WP_GRAPH_QL_URL') jira_base_url = 'https://cloud1.atlassian.net' tempo_base_url = 'https://api.tempo.io/4' tempo_worklog_limit_per_request = 20 initial_tempo_worklog_offset = 0 # Rate limiting configuration max_retries = 4 retry_delay_millis = 5000 max_retry_delay_millis = 30000 jitter_multiplier_range = [0.7, 1.3] def random_in_range(range_vals): return random.uniform(range_vals[0], range_vals[1]) def print_with_color(text, color_code): print(f"\033[{color_code}m{text}\033[0m") def handle_rate_limiting(response): if response.status_code == 429 or 'X-RateLimit-Remaining' in response.headers: rate_limit_remaining = int(response.headers.get('X-RateLimit-Remaining', '1')) rate_limit_reset_time = int(response.headers.get('X-RateLimit-Reset', '0')) if rate_limit_remaining == 0: sleep_time = max(rate_limit_reset_time - time.time(), 0) + random_in_range(jitter_multiplier_range) print_with_color(f"Rate limit reached. Sleeping for {round(sleep_time, 2)} seconds...", "33") time.sleep(sleep_time) def send_request_with_rate_limit_handling( request_func: Callable[..., requests.Response], retry_count: int = 0, **request_kwargs: Dict[str, Any] ) -> requests.Response: response = request_func(**request_kwargs) if response.ok: return response else: if response.status_code == 429: handle_rate_limiting(response) if retry_count < max_retries: retry_delay = retry_delay_millis + (retry_delay_millis * random_in_range(jitter_multiplier_range)) retry_delay = min(retry_delay, max_retry_delay_millis) print_with_color( f"Retrying in {retry_delay / 1000.0} seconds... (Attempt {retry_count + 1}/{max_retries})", "33") time.sleep(retry_delay / 1000.0) return send_request_with_rate_limit_handling( request_func, retry_count=retry_count + 1, **request_kwargs ) else: print_with_color(f"Max retries reached. Failed to complete the request.", "31") else: print_with_color( f"Failed to complete the request. Status code: {response.status_code}, Response: {response.text}", "31") return response def get_jira_worklog_ids_from_tempo(worklogs_ids: List[str]): url = f'{tempo_base_url}/worklogs/tempo-to-jira?limit={tempo_worklog_limit_per_request}' headers = { 'Authorization': f'Bearer {TEMPO_API_TOKEN}' } response = send_request_with_rate_limit_handling( request_func=requests.post, url=url, headers=headers, json={"tempoWorklogIds": worklogs_ids} ) handle_rate_limiting(response) response.raise_for_status() #log response.json()["results"] return response.json()["results"] def get_tempo_worklog_attributes(url): headers = { 'Authorization': f'Bearer {TEMPO_API_TOKEN}' }; response = requests.get(url, headers=headers) attributes = response.json()["results"] return attributes def get_tempo_worklogs(tempo_worklog_offset): url = f'{tempo_base_url}/worklogs?limit={tempo_worklog_limit_per_request}&offset={tempo_worklog_offset}' headers = { 'Authorization': f'Bearer {TEMPO_API_TOKEN}' } response = requests.get(url, headers=headers) handle_rate_limiting(response) response.raise_for_status() worklogs = response.json()["results"] for worklog in worklogs: tempo_attributes = worklog['attributes']['values'] if tempo_attributes: worklog['attributes'] = tempo_attributes else: worklog['attributes'] = None return worklogs def get_tempo_data(tempo_worklog_offset) -> Tuple[List[TempoWorklog], Dict[int, int]]: worklogs = get_tempo_worklogs(tempo_worklog_offset) worklog_ids = [worklog['tempoWorklogId'] for worklog in worklogs] jira_tempo_worklog_id_mappings = {item['tempoWorklogId']: item['jiraWorklogId'] for item in get_jira_worklog_ids_from_tempo(worklog_ids)} return worklogs, jira_tempo_worklog_id_mappings def update_jira_worklog_request(issue_id_or_key: str, worklog_id: int, payload: dict): url = f'{jira_base_url}/rest/api/3/issue/{issue_id_or_key}/worklog/{worklog_id}/properties/worklogpro' response = send_request_with_rate_limit_handling( request_func=requests.put, url=url, auth=requests.auth.HTTPBasicAuth(JIRA_EMAIL, JIRA_API_TOKEN), json=payload ) if response.status_code == 404: # We only cover 404 case here to log and continue return False response.raise_for_status() return True async def async_run_update_jira_worklog(executor, issue_id_or_key: str, worklog_id: int, payload: dict, update_pbar: Callable = None): loop = asyncio.get_event_loop() result = await loop.run_in_executor( executor, update_jira_worklog_request, issue_id_or_key, worklog_id, payload ) if not result: print_with_color(f"Worklog with ID {worklog_id} not found for issue {issue_id_or_key}", "31") update_pbar() def process_worklog_attributes(worklog, payload, tempo_id_to_wp_attribute_map): payload['attributes'] = {} for attr in worklog["attributes"]: tempo_attribute_id = attr['key'] tempo_attribute_value = attr['value'] wp_attribute_type = tempo_id_to_wp_attribute_map.get(tempo_attribute_id) if wp_attribute_type: wp_options = wp_attribute_type.get('options') if wp_options: wp_attribute_option = next( (option for option in wp_options if option['name'] == tempo_attribute_value), None) if wp_attribute_option: payload['attributes'][wp_attribute_type['id']] = wp_attribute_option['id'] else: print_with_color( f"Attribute value {tempo_attribute_value} not found in WP for attribute {tempo_attribute_id}", "31") else: payload['attributes'][wp_attribute_type['id']] = tempo_attribute_value else: print_with_color(f"Attribute with ID {tempo_attribute_id} not found in WP", "31") async def update_jira_worklogs_in_batch(worklogs: list, requests_per_second: int, jira_tempo_worklog_id_mappings: dict, tempo_id_to_wp_attribute_map: Dict[str, AttributeType]): # Create an executor for running the requests concurrently executor = ThreadPoolExecutor(max_workers=requests_per_second) # Define the time interval to wait between requests to match requests_per_second rate interval = 1.0 / requests_per_second with tqdm(total=len(worklogs), desc="Processing Worklogs", unit="worklog", ncols=80, bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]", leave=True) as pbar: tasks = [] for worklog in worklogs: payload = { "version": 2, "authorAccountId": worklog['author']['accountId'], "comment": worklog['description'], } if worklog['description']: payload['comment'] = worklog['description'] if worklog['attributes']: process_worklog_attributes(worklog, payload, tempo_id_to_wp_attribute_map) issue_id_or_key = worklog['issue']['id'] jira_worklog_id = jira_tempo_worklog_id_mappings[worklog['tempoWorklogId']] # Create an async task with progress bar update logic task = async_run_update_jira_worklog( executor, issue_id_or_key, jira_worklog_id, payload, lambda: pbar.update(1) ) tasks.append(task) # Wait to control the requests per second rate await asyncio.sleep(interval) # Wait for all tasks to complete await asyncio.gather(*tasks) print_with_color(f"\nBatch completed.", "32") def process_tempo_worklogs_batch(tempo_worklog_offset: int, tempo_id_to_wp_attribute_map: Dict[str, AttributeType]): requests_per_second = 9 worklogs, jira_tempo_worklog_id_mappings = get_tempo_data(tempo_worklog_offset) if worklogs: asyncio.run(update_jira_worklogs_in_batch(worklogs, requests_per_second, jira_tempo_worklog_id_mappings=jira_tempo_worklog_id_mappings, tempo_id_to_wp_attribute_map=tempo_id_to_wp_attribute_map)) return len(worklogs) return 0 # Main Processing Loop def get_wp_attributes(): url = f'{WP_GRAPH_QL_URL}' headers = { 'Authorization': f'Bearer {WP_API_TOKEN}' } json = { "query": """ query { getAttributes { ok, attributes { id, name, options { id, name } } } } """ } response = requests.post(url, headers=headers, json=json) if response.ok: attributes = response.json()["data"]["getAttributes"]["attributes"] tempo_id_to_wp_attribute_map: Dict[str, AttributeType] = {} for attribute in attributes: wp_attribute_type_name = attribute["name"] tempo_attribute_type_id = convert_to_tempo_id(wp_attribute_type_name) tempo_id_to_wp_attribute_map[tempo_attribute_type_id] = attribute return tempo_id_to_wp_attribute_map else: print_with_color( f"Failed to complete the request. Status code: {response.status_code}, Response: {response.text}", "31" ) raise Exception( f"Failed to complete the request. Status code: {response.status_code}, Response: {response.text}") def convert_to_tempo_id(wp_attribute_type_name): tempo_attribute_type_id = wp_attribute_type_name.replace(" ", "") tempo_attribute_type_id = f"_{tempo_attribute_type_id}_" return tempo_attribute_type_id def main(): tempo_id_to_wp_attribute_map = get_wp_attributes() print("tempo_id_to_wp_attribute_map", tempo_id_to_wp_attribute_map) offset = initial_tempo_worklog_offset while True: print_with_color(f"\nUpdating the next batch of worklogs... Batch offset: {offset}", "33") start_time = time.time() worklog_count = process_tempo_worklogs_batch(offset, tempo_id_to_wp_attribute_map) elapsed_time = round(time.time() - start_time, 3) print_with_color(f"Batch processed in {elapsed_time} seconds", "32") if worklog_count < tempo_worklog_limit_per_request: print_with_color("\nNo more worklogs to process. Update completed.", "32") break offset += tempo_worklog_limit_per_request if __name__ == "__main__": print_with_color("\n========================================", "34") print_with_color("Starting Jira worklog update process...", "34") print_with_color("========================================", "34") main()

After each batch, you'll see Batch Offset number on your terminal. If the script fails, you can replace the initial_tempo_worklog_offset variable with the latest Batch Offset number and run the script again to continue the migration.

Running the script multiple times for the same worklogs is usually safe, as long as you don’t modify the same worklog using WorklogPRO app between. At the second run, the script will override the same WorklogPRO property.

Â