import pandas as pd
import numpy as np
import preprocessors.preprocessing_utils as utils
from preprocessors.preprocessing_utils import vectorize

from preprocessors import FINAL_PROCESSED
from service import k_means, gbt, ss

def get_weekday(timestamp):

def enrich_prediction_request(lat, lng, n, timestamp):
    clusters = get_clusters(lat, lng, n)
    preprocessed_data =, inferSchema=True, header=True).toPandas()
    to_predict = []
    for cluster in clusters:
        data4cluster: pd.DataFrame = preprocessed_data[preprocessed_data["Latitude"] == cluster[0]][
            preprocessed_data["Longitude"] == cluster[1]]
        ft_1 = data4cluster["ft_1"].mean()
        ft_2 = data4cluster["ft_2"].mean()
        ft_3 = data4cluster["ft_3"].mean()
        ft_4 = data4cluster["ft_4"].mean()
        ft_5 = data4cluster["ft_5"].mean()
        freq1 = data4cluster.iloc[1]["freq1"]
        freq2 = data4cluster.iloc[1]["freq2"]
        freq3 = data4cluster.iloc[1]["freq3"]
        freq4 = data4cluster.iloc[1]["freq4"]
        freq5 = data4cluster.iloc[1]["freq5"]
        amp1 = data4cluster.iloc[1]["Amp1"]
        amp2 = data4cluster.iloc[1]["Amp2"]
        amp3 = data4cluster.iloc[1]["Amp3"]
        amp4 = data4cluster.iloc[1]["Amp4"]
        amp5 = data4cluster.iloc[1]["Amp5"]
        wma = data4cluster["WeightedAvg"].mean()
        rec = create_record(ft_5, ft_4, ft_3, ft_2, ft_1, freq1, freq2, freq3, freq4, freq5, amp1, amp2, amp3, amp4,
                            amp5, cluster[0], cluster[1], get_weekday(timestamp), wma)
    to_predict_df = pd.concat(to_predict)
    df_spark = ss.createDataFrame(to_predict_df)
    vectorized = vectorize(df_spark.columns, df_spark)

    res: pd.DataFrame = gbt.predict(vectorized).toPandas()[["Latitude", "Longitude", "prediction"]]
    res: pd.DataFrame = res.sort_values("prediction").reset_index(drop=True)
    res["priority"] = res.index + 1
    res = res.drop("prediction", axis=1)
    return res.to_dict(orient='records')

def get_clusters(lat, lng, n):
    from sklearn.metrics import pairwise_distances_argmin_min

    requestor_location = np.array([float(lat), float(lng)]).reshape((1, -1))
    df = pd.DataFrame(data=requestor_location, columns=["lat", "lng"])
    df_Spark = ss.createDataFrame(df)
    df = utils.vectorize(df_Spark.columns, df_Spark)

    # current location cluster
    centers = k_means.get_centers()
    cluster = k_means.predict(df).collect()[0]["pickup_cluster"]
    current_cluster_coords = centers.pop(cluster)

    # find nearest n clusters around current
    nearest_centroids = pairwise_distances_argmin_min(centers, requestor_location)[1].tolist()
    nearest_centroids_dict = {}

    for c in range(len(nearest_centroids)):
        nearest_centroids_dict[c] = nearest_centroids[c]

    additional_clusters = sorted(nearest_centroids_dict, key=nearest_centroids_dict.get)[:int(n)]

    result = []

    for cluster_number in additional_clusters:


    return result

def create_record(ft_5, ft_4, ft_3, ft_2, ft_1, freq1, freq2, freq3, freq4, freq5, Amp1, Amp2, Amp3, Amp4, Amp5, Latitude, Longitude, WeekDay, WeightedAvg):
    data = [{"ft_5": ft_5, "ft_4": ft_4, "ft_3": ft_3, "ft_2": ft_2, "ft_1": ft_1, "freq1": freq1,
             "freq2": freq2, "freq3": freq3, "freq4": freq4, "freq5": freq5, "Amp1": Amp1, "Amp2": Amp2,
             "Amp3": Amp3, "Amp4": Amp4, "Amp5": Amp5, "Latitude": Latitude, "Longitude": Longitude,
             "WeekDay": WeekDay, "WeightedAvg": WeightedAvg}]
    model_dto_df = pd.DataFrame(data)
    return model_dto_df

Share a link to this review

6.74% issue ratio

R1 Missing type hints

Type hints help humans and linters (like mypy) to understand what to expect "in" and "out" for a function. Not only it serves as a documentation for others (and you after some time, when the code is wiped from your "brain cache"), but also allows using automated tools to find type errors.

Other people may not know what's expected in ft_5, ft_4, ft_3, ft_2, ft_1 variables. int? float? dict?

Suggested change:
def create_record(ft_5: Union[int, float], ...):
R6 Copy-paste

Copy-paste lead to errors very, very often because developers forget to change value on one of copy-pasted lines . You should avoid it as much as possible. Usually a good solution is to extract the things that differ into separate variables.

O3 Not using "inplace" in Pandas

Many pandas dataframe methods have inplace param which modifies the dataframe itself and does not create a new one. This is helpful if you want to avoid redundant creation of new dataframe and removing an old one, which happens when assigning to original variable: df = df.apply(...) -> df.apply(..., inplace=True).

Suggested change:
res.drop("prediction", axis=1, inplace=True)
R4 Range-based iteration

Using len and range in python's for loop smells. Idiomatic python iteration looks like for element in collection. If you need element's index as well, use for i, element in enumerate(collection).

Suggested change:
for i, centroid in enumerate(nearest_centroids):
    nearest_centroids_dict[i] = centroid
O5 Not using list comprehension

List comprehensions are faster than .append() or .extend(). When you use a loop for filling in a list, it's a high chance that list comprehension will do a better job. You may also use asterisk (*) to generate list from single and multiple elements: all_items = [*items_group1, *items_group2, single_element]

Suggested change:
result = [
    centers[cluster_number] for cluster_numer in additional_clusters
R2 Too many parameters

When function has too many parameters, it is very hard to read and also it is very easy to accidentally mix them and instead of calling fn(.., o, p, q) call fn(..., o, q, p). Solution is to either refactor the function to receive smaller amount of data (maybe function is too big and can be divided into smaller functions?), or to send one parameter which is some data structure and incorporates all the parameters (this structure could be NamedTuple, dataclass, or TypedDict).

Create new review request