Skip to content

Commit

Permalink
♻️move some functions
Browse files Browse the repository at this point in the history
  • Loading branch information
PitButtchereit committed Jan 16, 2024
1 parent 45823d5 commit d656143
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 19 deletions.
55 changes: 55 additions & 0 deletions tracex/extraction/logic/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,22 @@
from openai import OpenAI


import functools
import warnings


def deprecated(func):
"""This is a decorator which can be used to mark functions as deprecated.
It will result in a warning being emitted when the function is used."""
@functools.wraps(func)
def new_func(*args, **kwargs):
warnings.warn(f"Call to deprecated function {func.__name__}.",
category=DeprecationWarning,
stacklevel=2)
return func(*args, **kwargs)
return new_func


def pause_between_queries():
"""Pauses between queries."""
time.sleep(5)
Expand Down Expand Up @@ -63,6 +79,45 @@ def create_xes(csv_file, name, key):
return str(output_path / output_name)


def get_all_xes_output_path(
is_test=False,
is_extracted=False,
xes_name="all_traces",
activity_key="event_type",
):
"""Create the xes file for all journeys."""
if not (is_test or is_extracted):
append_csv()
all_traces_xes_path = create_xes(
CSV_ALL_TRACES, name=xes_name, key=activity_key
)
else:
all_traces_xes_path = (
str(output_path / xes_name) + "_" + activity_key + ".xes"
)
return all_traces_xes_path


def append_csv():
"""Appends the current trace to the CSV containing all traces."""
trace_count = 0
with open(CSV_ALL_TRACES, "r") as f:
rows = f.readlines()[1:]
if len(rows) >= 2:
trace_count = max(int(row.split(",")[0]) for row in rows if row)
with open(CSV_OUTPUT, "r") as f:
previous_content = f.readlines()
content = []
for row in previous_content:
if row != "\n":
content.append(row)
content = content[1:]
with open(CSV_ALL_TRACES, "a") as f:
for row in content:
row = row.replace(row[0], str(int(row[0]) + trace_count), 1)
f.writelines(row)


@dataclass
class ExtractionConfiguration:
patient_journey: str
Expand Down
4 changes: 2 additions & 2 deletions tracex/extraction/prototype/output_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def get_output():
if decision:
append_csv()


# This will not be called anymore
def get_output_without_user_io():
"""Prints the output to the console or shows the filename."""
dataframe = pd.read_csv(u.CSV_OUTPUT, sep=",")
Expand All @@ -53,7 +53,7 @@ def get_output_without_user_io():
print(dataframe)
append_csv()


# Not used anymore, moved to utils
def append_csv():
"""Appends the current trace to the CSV containing all traces."""
trace_count = 0
Expand Down
1 change: 1 addition & 0 deletions tracex/extraction/prototype/pipeline_without_user_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import utils as u


# This will not be called anymore
def run_pipeline():
"""Runs the pipeline without user interaction."""
input_text = ii.create_patient_journey()
Expand Down
25 changes: 8 additions & 17 deletions tracex/extraction/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,23 @@
from io import StringIO, BytesIO

import pm4py
import pandas
import pandas as pd

from django.urls import reverse_lazy
from django.views import generic
from django.core.cache import cache
from django.shortcuts import redirect

from .forms import JourneyForm, GenerationForm, ResultForm
from .prototype import (
input_handling,
input_inquiry,
create_xes,
utils,
output_handling,
)
from .prototype import (input_handling,input_inquiry,create_xes,utils,output_handling,)
from .logic.orchestrator import Orchestrator
from .logic import utils as u

# necessary due to Windows error. see information for your os here:
# https://stackoverflow.com/questions/35064304/runtimeerror-make-sure-the-graphviz-executables-are-on-your-systems-path-aft
os.environ["PATH"] += os.pathsep + "C:/Program Files/Graphviz/bin/"

# set IS_TEST = False if you want to run the pipeline
IS_TEST = False
IS_TEST = False # set to False if you want to run the pipeline


def redirect_to_selection(request):
Expand Down Expand Up @@ -139,7 +132,6 @@ def get_context_data(self, **kwargs):
# read single journey into dataframe
single_trace_df = pm4py.read_xes(
self.get_xes_output_path(
journey,
is_test=IS_TEST,
is_extracted=is_extracted,
activity_key=orchestrator.configuration.activity_key,
Expand All @@ -152,7 +144,7 @@ def get_context_data(self, **kwargs):

# prepare all journey xes
all_traces_df = pm4py.read_xes(
self.get_all_xes_output_path(is_test=IS_TEST, is_extracted=is_extracted)
u.get_all_xes_output_path(is_test=IS_TEST, is_extracted=is_extracted)
)
all_traces_df = all_traces_df.groupby(
"case:concept:name", group_keys=False, sort=False
Expand All @@ -175,7 +167,6 @@ def get_context_data(self, **kwargs):

is_extracted = True
cache.set("is_extracted", is_extracted)
print(orchestrator.configuration)
return context

def form_valid(self, form):
Expand All @@ -190,7 +181,6 @@ def form_valid(self, form):

def get_xes_output_path(
self,
journey,
is_test=False,
is_extracted=False,
xes_name="single_trace",
Expand All @@ -209,6 +199,7 @@ def get_xes_output_path(
)
return output_path_xes

# Not used anymore, moved to utils
def get_all_xes_output_path(
self,
is_test=False,
Expand All @@ -218,7 +209,7 @@ def get_all_xes_output_path(
):
"""Create the xes file for all journeys."""
if not (is_test or is_extracted):
output_handling.append_csv()
u.append_csv()
all_traces_xes_path = create_xes.create_xes(
utils.CSV_ALL_TRACES, name=xes_name, key=activity_key
)
Expand All @@ -231,7 +222,7 @@ def get_all_xes_output_path(
def create_html_from_xes(self, df):
"""Create html table from xes file."""
xes_html_buffer = StringIO()
pandas.DataFrame.to_html(df, buf=xes_html_buffer)
pd.DataFrame.to_html(df, buf=xes_html_buffer)
return xes_html_buffer

def create_dfg_png_from_df(self, df):
Expand Down Expand Up @@ -276,7 +267,7 @@ def filter_dataframe(self, df, filter_dict):
filter_conditions = [
df[column].isin(values) for column, values in filter_dict.items()
]
combined_condition = pandas.Series(True, index=df.index)
combined_condition = pd.Series(True, index=df.index)

for condition in filter_conditions:
combined_condition &= condition
Expand Down

0 comments on commit d656143

Please sign in to comment.