Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
{
  "version":2,
  "authorAccountId": "1234567abcdefg",
  "comment": "This is comment from Tempo UI",
  "attributes": {
    "ea34023-232ac-222ef": "Attribute value",
    "a2a3-40-232ac-222ee": "Nother attribute value"
  }
}

You may think that instead of adding this information to Worklog property why the app is not updating actual author information on the worklog itself. Unfortunately, Jira REST API doesn’t allow modifying worklog author. Another option may be deleting the original worklog and recreating the worklog using the new information. Since this is a destructive operation, we didn’t want to do this.

Key Points About the Script

...

  • Specify environment variables specified on the below table.

  • Replace https://your-jira-domain.atlassian.net string with your actual site.Then, run the following command in your terminal: pip install requests==2.31.0 && pip install tqdm==4.66.1

  • Then, run the following command in your terminal: pip install requests==2.31.0 && pip install tqdm==4.66.1

  • You need to create the corresponding “Attribute” and “Attribute value” definitions in WorklogPRO. Names should match exactly to Tempo attributes. For example, if you have a “Work type” attribute in Tempo, you need to create “Work type” attribute in WorklogPRO. Also, if you have “Testing” as an attribute value within “Work type” attribute, you should also create the same attribute value “Testing” in “Work type” attribute of WorklogPRO.

After these steps you can run the script by running this command python <name_of_the_script>.py

...

Code Block
languagepy
import os
import time
import random
import asyncio
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
from typing import TypedDict, Callable, List, Dict, Any, Tuple

class AttributeTypeOption:
    id: str
    name: str

class AttributeType:
    id: str
    name: str
    options: List[AttributeTypeOption]

# Types
class Issue(TypedDict):
    self: str
    id: int


class Author(TypedDict):
    self: str
    accountId: str


class AttributeValues(TypedDict):
    self: str
    values: List[dict]


class TempoWorklog(TypedDict):
    self: str
    tempoWorklogId: int
    issue: Issue
    timeSpentSeconds: int
    billableSeconds: int
    startDate: str
    startTime: str
    description: str
    createdAt: str
    updatedAt: str
    author: Author
    attributes: AttributeValues


# Configuration
TEMPO_API_TOKEN = os.getenv('TEMPO_API_TOKEN')
JIRA_API_TOKEN = os.getenv('JIRA_API_TOKEN')
JIRA_EMAIL = os.getenv('JIRA_EMAIL')
WP_API_TOKEN = os.getenv('WP_API_TOKEN')
WP_GRAPH_QL_URL = os.getenv('WP_GRAPH_QL_URL')
jira_base_url = 'https://cloud1.atlassian.net'
tempo_base_url = 'https://api.tempo.io/4'
tempo_worklog_limit_per_request = 20
initial_tempo_worklog_offset = 0

# Rate limiting configuration
max_retries = 4
retry_delay_millis = 5000
max_retry_delay_millis = 30000
jitter_multiplier_range = [0.7, 1.3]


def random_in_range(range_vals):
    return random.uniform(range_vals[0], range_vals[1])


def print_with_color(text, color_code):
    print(f"\033[{color_code}m{text}\033[0m")


def handle_rate_limiting(response):
    if response.status_code == 429 or 'X-RateLimit-Remaining' in response.headers:
        rate_limit_remaining = int(response.headers.get('X-RateLimit-Remaining', '1'))
        rate_limit_reset_time = int(response.headers.get('X-RateLimit-Reset', '0'))

        if rate_limit_remaining == 0:
            sleep_time = max(rate_limit_reset_time - time.time(), 0) + random_in_range(jitter_multiplier_range)
            print_with_color(f"Rate limit reached. Sleeping for {round(sleep_time, 2)} seconds...", "33")
            time.sleep(sleep_time)


def send_request_with_rate_limit_handling(
        request_func: Callable[..., requests.Response],
        retry_count: int = 0,
        **request_kwargs: Dict[str, Any]
) -> requests.Response:
    response = request_func(**request_kwargs)

    if response.ok:
        return response
    else:
        if response.status_code == 429:
            handle_rate_limiting(response)
            if retry_count < max_retries:
                retry_delay = retry_delay_millis + (retry_delay_millis * random_in_range(jitter_multiplier_range))
                retry_delay = min(retry_delay, max_retry_delay_millis)
                print_with_color(
                    f"Retrying in {retry_delay / 1000.0} seconds... (Attempt {retry_count + 1}/{max_retries})", "33")
                time.sleep(retry_delay / 1000.0)
                return send_request_with_rate_limit_handling(
                    request_func, retry_count=retry_count + 1, **request_kwargs
                )
            else:
                print_with_color(f"Max retries reached. Failed to complete the request.", "31")
        else:
            print_with_color(
                f"Failed to complete the request. Status code: {response.status_code}, Response: {response.text}", "31")
    return response


def get_jira_worklog_ids_from_tempo(worklogs_ids: List[str]):
    url = f'{tempo_base_url}/worklogs/tempo-to-jira?limit={tempo_worklog_limit_per_request}'
    headers = {
        'Authorization': f'Bearer {TEMPO_API_TOKEN}'
    }
    response = send_request_with_rate_limit_handling(
        request_func=requests.post,
        url=url,
        headers=headers,
        json={"tempoWorklogIds": worklogs_ids}
    )
    handle_rate_limiting(response)
    response.raise_for_status()
    #log response.json()["results"]
    return response.json()["results"]


def get_tempo_worklog_attributes(url):
    headers = {
        'Authorization': f'Bearer {TEMPO_API_TOKEN}'
    };
    response = requests.get(url, headers=headers)
    attributes = response.json()["results"]
    return attributes

def get_tempo_worklogs(tempo_worklog_offset):
    url = f'{tempo_base_url}/worklogs?limit={tempo_worklog_limit_per_request}&offset={tempo_worklog_offset}'
    headers = {
        'Authorization': f'Bearer {TEMPO_API_TOKEN}'
    }
    response = requests.get(url, headers=headers)
    handle_rate_limiting(response)
    response.raise_for_status()
    worklogs = response.json()["results"]
    for worklog in worklogs:
        tempo_attributes = worklog['attributes']['values']
        if tempo_attributes:
            worklog['attributes'] = tempo_attributes
        else:
            worklog['attributes'] = None

    return worklogs


def get_tempo_data(tempo_worklog_offset) -> Tuple[List[TempoWorklog], Dict[int, int]]:
    worklogs = get_tempo_worklogs(tempo_worklog_offset)
    worklog_ids = [worklog['tempoWorklogId'] for worklog in worklogs]
    jira_tempo_worklog_id_mappings = {item['tempoWorklogId']: item['jiraWorklogId'] for item in
                                      get_jira_worklog_ids_from_tempo(worklog_ids)}
    return worklogs, jira_tempo_worklog_id_mappings


def update_jira_worklog_request(issue_id_or_key: str, worklog_id: int, payload: dict):
    url = f'{jira_base_url}/rest/api/3/issue/{issue_id_or_key}/worklog/{worklog_id}/properties/worklogpro'
    response = send_request_with_rate_limit_handling(
        request_func=requests.put,
        url=url,
        auth=requests.auth.HTTPBasicAuth(JIRA_EMAIL, JIRA_API_TOKEN),
        json=payload
    )
    if response.status_code == 404:  # We only cover 404 case here to log and continue
        return False
    response.raise_for_status()
    return True


async def async_run_update_jira_worklog(executor, issue_id_or_key: str, worklog_id: int, payload: dict,
                                        update_pbar: Callable = None):
    loop = asyncio.get_event_loop()
    result = await loop.run_in_executor(
        executor,
        update_jira_worklog_request,
        issue_id_or_key,
        worklog_id,
        payload
    )
    if not result:
        print_with_color(f"Worklog with ID {worklog_id} not found for issue {issue_id_or_key}", "31")
    update_pbar()

def process_worklog_attributes(worklog, payload, tempo_id_to_wp_attribute_map):
    payload['attributes'] = {}
    for attr in worklog["attributes"]:
        tempo_attribute_id = attr['key']
        tempo_attribute_value = attr['value']
        wp_attribute_type = tempo_id_to_wp_attribute_map.get(tempo_attribute_id)
        if wp_attribute_type:
            wp_options = wp_attribute_type.get('options')
            if wp_options:
                wp_attribute_option = next(
                    (option for option in wp_options if option['name'] == tempo_attribute_value), None)
                if wp_attribute_option:
                    payload['attributes'][wp_attribute_type['id']] = wp_attribute_option['id']
                else:
                    print_with_color(
                        f"Attribute value {tempo_attribute_value} not found in WP for attribute {tempo_attribute_id}",
                        "31")
            else:
                payload['attributes'][wp_attribute_type['id']] = tempo_attribute_value
        else:
            print_with_color(f"Attribute with ID {tempo_attribute_id} not found in WP", "31")


async def update_jira_worklogs_in_batch(worklogs: list, requests_per_second: int, jira_tempo_worklog_id_mappings: dict,
                                        tempo_id_to_wp_attribute_map: Dict[str, AttributeType]):
    # Create an executor for running the requests concurrently
    executor = ThreadPoolExecutor(max_workers=requests_per_second)

    # Define the time interval to wait between requests to match requests_per_second rate
    interval = 1.0 / requests_per_second

    with tqdm(total=len(worklogs), desc="Processing Worklogs", unit="worklog", ncols=80,
              bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}]", leave=True) as pbar:
        tasks = []
        for worklog in worklogs:
            payload = {
                "version": 2,
                "authorAccountId": worklog['author']['accountId'],
                "comment": worklog['description'],
            }
            if worklog['description']:
                payload['comment'] = worklog['description']
            if worklog['attributes']:
                process_worklog_attributes(worklog, payload, tempo_id_to_wp_attribute_map)

            issue_id_or_key = worklog['issue']['id']
            jira_worklog_id = jira_tempo_worklog_id_mappings[worklog['tempoWorklogId']]

            # Create an async task with progress bar update logic
            task = async_run_update_jira_worklog(
                executor, issue_id_or_key, jira_worklog_id, payload, lambda: pbar.update(1)
            )
            tasks.append(task)

            # Wait to control the requests per second rate
            await asyncio.sleep(interval)

        # Wait for all tasks to complete
        await asyncio.gather(*tasks)

    print_with_color(f"\nBatch completed.", "32")


def process_tempo_worklogs_batch(tempo_worklog_offset: int, tempo_id_to_wp_attribute_map: Dict[str, AttributeType]):
    requests_per_second = 9
    worklogs, jira_tempo_worklog_id_mappings = get_tempo_data(tempo_worklog_offset)
    if worklogs:
        asyncio.run(update_jira_worklogs_in_batch(worklogs, requests_per_second,
                                                  jira_tempo_worklog_id_mappings=jira_tempo_worklog_id_mappings,
                                                  tempo_id_to_wp_attribute_map=tempo_id_to_wp_attribute_map))
        return len(worklogs)
    return 0


# Main Processing Loop
def get_wp_attributes():
    url = f'{WP_GRAPH_QL_URL}'
    headers = {
        'Authorization': f'Bearer {WP_API_TOKEN}'
    }
    json = {
        "query": """
            query {
                getAttributes {
                    ok,
                    attributes {
                        id,
                        name,
                        options {
                            id,
                            name
                        }           
                    }        
                }
            }
        """
    }
    response = requests.post(url, headers=headers, json=json)
    if response.ok:
        attributes = response.json()["data"]["getAttributes"]["attributes"]
        tempo_id_to_wp_attribute_map: Dict[str, AttributeType] = {}
        for attribute in attributes:
            wp_attribute_type_name = attribute["name"]
            tempo_attribute_type_id = convert_to_tempo_id(wp_attribute_type_name)
            tempo_id_to_wp_attribute_map[tempo_attribute_type_id] = attribute

        return tempo_id_to_wp_attribute_map
    else:
        print_with_color(
        f"Failed to complete the request. Status code: {response.status_code}, Response: {response.text}", "31"
        )
        raise Exception(
            f"Failed to complete the request. Status code: {response.status_code}, Response: {response.text}")


def convert_to_tempo_id(wp_attribute_type_name):
    tempo_attribute_type_id = wp_attribute_type_name.replace(" ", "")
    tempo_attribute_type_id = f"_{tempo_attribute_type_id}_"
    return tempo_attribute_type_id


def main():
    tempo_id_to_wp_attribute_map = get_wp_attributes()
    print("tempo_id_to_wp_attribute_map", tempo_id_to_wp_attribute_map)
    offset = initial_tempo_worklog_offset
    while True:
        print_with_color(f"\nUpdating the next batch of worklogs... Batch offset: {offset}", "33")
        start_time = time.time()
        worklog_count = process_tempo_worklogs_batch(offset, tempo_id_to_wp_attribute_map)
        elapsed_time = round(time.time() - start_time, 3)
        print_with_color(f"Batch processed in {elapsed_time} seconds", "32")

        if worklog_count < tempo_worklog_limit_per_request:
            print_with_color("\nNo more worklogs to process. Update completed.", "32")
            break

        offset += tempo_worklog_limit_per_request


if __name__ == "__main__":
    print_with_color("\n========================================", "34")
    print_with_color("Starting Jira worklog update process...", "34")
    print_with_color("========================================", "34")
    main()
Info

After each batch, you'll see Batch Offset number on your terminal. If the script fails, you can replace the initial_tempo_worklog_offset variable with the latest Batch Offset number and run the script again to continue the migration.

Running the script multiple times for the same worklogs is usually safe, as long as you don’t modify the same worklog using WorklogPRO app between. At the second run, the script will override the same WorklogPRO property.