Tengo este marco de datos en PySpark:

+--------+--------------------+--------------------+------+--------+----------+-----------+---+
|indirizzo           |radius|traffico|utmeasting|utmnorthing|cum_traffico|    lat_lng         |
+--------+--------------------+--------------------+------+--------+----------+-----------+---+
|PLUTO               |  5616|      22|    461680|    5064867|          99|[45.736298, 8.507]  |
|PIPPO               |  1014|      61|    422787|    4915355|          96|[44.387363, 8.030]  |
|GATTO               |  1014|      23|    346001|    4972736|          99|[44.891384, 7.049]  |
|DISNEY              |  1014|      72|    373467|    5022016|          84|[45.34023, 7.3849]  |
|LEONE               |  1014|      28|    407852|    5079131|          94|[45.859577, 7.812]  |
|HULK                |  5616|      20|    379192|    4915722|          88|[44.38471, 7.4833]  |

Y esta función:

def distance_haversine(lat1, lon1, lat2, lon2):
    # stuffs
    return distance

Para cada fila en este marco de datos, quiero tomar los valores en la columna 'lat_lng', pasarlos como constantes a los dos primeros argumentos de la función 'distance_haversine' y asignar a los dos argumentos restantes de la función los valores en la misma columna de todas las demás filas, luego vaya a la siguiente fila y repita el procedimiento. ¿Es posible realizar una operación de este tipo sin utilizar la función 'recopilar ()'? Gracias de antemano.

0
CHIRAQA 25 ago. 2020 a las 16:36

2 respuestas

La mejor respuesta

Cuando veo su firma distance_haversine, no coincide con lo que describe. Así que les ofrezco dos formas diferentes de hacerlo. Suponiendo que df es su marco de datos.

Solo para hacerle saber, ambos métodos pueden consumir muchos recursos.


Método 1: producto cartesiano

from pyspark.sql import functions as F, Window as W, types as T

@F.udf(T.FloatType())
def distance_haversine(lat1, lon1, lat2, lon2):
    """I created a simple distance function but you can replace it with whatever you need"""
    from math import sqrt
    distance = sqrt((lat1 - lat2) ** 2 + (lon1 + lon2) ** 2)
    return distance

cross_df = df.alias("A").crossJoin(df.alias("B")).where("A.indirizzo <> B.indirizzo")

cross_df.withColumn(
    "distance",
    distance_haversine(
        F.col("A.lat_lng").getItem(0),
        F.col("A.lat_lng").getItem(1),
        F.col("B.lat_lng").getItem(0),
        F.col("B.lat_lng").getItem(1),
    ),
).where("A.indirizzo > B.indirizzo").show()

+---------+------------------+---------+------------------+---------+
|indirizzo|           lat_lng|indirizzo|           lat_lng| distance|
+---------+------------------+---------+------------------+---------+
|    PLUTO|[45.736298, 8.507]|    PIPPO| [44.387363, 8.03]|16.591925|
|    PLUTO|[45.736298, 8.507]|    GATTO|[44.891384, 7.049]|15.578929|
|    PIPPO| [44.387363, 8.03]|    GATTO|[44.891384, 7.049]|15.087421|
|    PLUTO|[45.736298, 8.507]|   DISNEY|[45.34023, 7.3849]|15.896834|
|    PLUTO|[45.736298, 8.507]|    LEONE|[45.859577, 7.812]|16.319466|
|    PLUTO|[45.736298, 8.507]|     HULK|[44.38471, 7.4833]| 16.04732|
|    PIPPO| [44.387363, 8.03]|   DISNEY|[45.34023, 7.3849]|15.444323|
|    PIPPO| [44.387363, 8.03]|    LEONE|[45.859577, 7.812]| 15.91026|
|    PIPPO| [44.387363, 8.03]|     HULK|[44.38471, 7.4833]|  15.5133|
|    GATTO|[44.891384, 7.049]|   DISNEY|[45.34023, 7.3849]|14.440877|
|    LEONE|[45.859577, 7.812]|    GATTO|[44.891384, 7.049]|14.892506|
|     HULK|[44.38471, 7.4833]|    GATTO|[44.891384, 7.049]| 14.54113|
|    LEONE|[45.859577, 7.812]|   DISNEY|[45.34023, 7.3849]|15.205771|
|    LEONE|[45.859577, 7.812]|     HULK|[44.38471, 7.4833]|15.366243|
|     HULK|[44.38471, 7.4833]|   DISNEY|[45.34023, 7.3849]|14.898872|
+---------+------------------+---------+------------------+---------+


Método 2: Collect_list

De acuerdo con lo que dijo, desea comparar 1 conjunto de coordenadas con un conjunto completo de coordenadas, por lo que su función distance_haversine debe ser un poco diferente.

from pyspark.sql import functions as F, Window as W, types as T


@F.udf(T.FloatType())
def distance_haversine(lat1, lon1, list_lat_long):
    """I created a simple distance function but you can replace it with whatever you need"""
    return min([my_distance(lat1, lon1, x[0], x[1]) for x in list_lat_long])


def my_distance(lat1, lon1, lat2, lon2):
    from math import sqrt

    distance = sqrt((lat1 - lat2) ** 2 + (lon1 + lon2) ** 2)
    return distance


df = df.withColumn(
    "all_coordinates",
    F.collect_list(F.col("lat_lng")).over(
        W.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
    ),
)

df.withColumn(
    "distance",
    distance_haversine(
        F.col("lat_lng").getItem(0),
        F.col("lat_lng").getItem(1),
        F.col("all_coordinates"),
    ),
).show()

+---------+------------------+--------------------+---------+
|indirizzo|           lat_lng|     all_coordinates| distance|
+---------+------------------+--------------------+---------+
|   DISNEY|[45.34023, 7.3849]|[[45.34023, 7.384...|14.440877|
|    LEONE|[45.859577, 7.812]|[[45.34023, 7.384...|14.892506|
|     HULK|[44.38471, 7.4833]|[[45.34023, 7.384...| 14.54113|
|    PLUTO|[45.736298, 8.507]|[[45.34023, 7.384...|15.578929|
|    PIPPO| [44.387363, 8.03]|[[45.34023, 7.384...|15.087421|
|    GATTO|[44.891384, 7.049]|[[45.34023, 7.384...|   14.098|
+---------+------------------+--------------------+---------+

0
Steven 25 ago. 2020 a las 14:30

Aquí está mi intento con el DataFrame.

from pyspark.sql.functions import *

def distance_haversine(lat1, lng1, lat2, lng2):
    distance = (lat2 - lat1) + (lng2 - lng1) # just for test
    return distance

spark.udf.register('distance_haversine', distance_haversine)

df2.alias('a').crossJoin(df2.alias('b')) \
   .select('a.*', 'b.indirizzo', 'b.lat_lng') \
   .filter('a.indirizzo != b.indirizzo') \
   .withColumn('distance', expr('distance_haversine(a.lat_lng[0], a.lat_lng[1], b.lat_lng[0], b.lat_lng[1])')) \
   .show()

+---------+------+--------+----------+-----------+------------+------------------+---------+------------------+--------------------+
|indirizzo|radius|traffico|utmeasting|utmnorthing|cum_traffico|           lat_lng|indirizzo|           lat_lng|            distance|
+---------+------+--------+----------+-----------+------------+------------------+---------+------------------+--------------------+
|    PLUTO|  5616|      22|    461680|    5064867|          99|[45.736298, 8.507]|    PIPPO| [44.387363, 8.03]| -1.8259349999999976|
|    PLUTO|  5616|      22|    461680|    5064867|          99|[45.736298, 8.507]|    GATTO|[44.891384, 7.049]|  -2.302913999999995|
|    PLUTO|  5616|      22|    461680|    5064867|          99|[45.736298, 8.507]|   DISNEY|[45.34023, 7.3849]| -1.5181679999999993|
|    PLUTO|  5616|      22|    461680|    5064867|          99|[45.736298, 8.507]|    LEONE|[45.859577, 7.812]| -0.5717209999999957|
|    PLUTO|  5616|      22|    461680|    5064867|          99|[45.736298, 8.507]|     HULK|[44.38471, 7.4833]| -2.3752879999999994|
|    PIPPO|  1014|      61|    422787|    4915355|          96| [44.387363, 8.03]|    PLUTO|[45.736298, 8.507]|  1.8259349999999976|
|    PIPPO|  1014|      61|    422787|    4915355|          96| [44.387363, 8.03]|    GATTO|[44.891384, 7.049]| -0.4769789999999974|
|    PIPPO|  1014|      61|    422787|    4915355|          96| [44.387363, 8.03]|   DISNEY|[45.34023, 7.3849]| 0.30776699999999835|
|    PIPPO|  1014|      61|    422787|    4915355|          96| [44.387363, 8.03]|    LEONE|[45.859577, 7.812]|   1.254214000000002|
|    PIPPO|  1014|      61|    422787|    4915355|          96| [44.387363, 8.03]|     HULK|[44.38471, 7.4833]| -0.5493530000000018|
|    GATTO|  1014|      23|    346001|    4972736|          99|[44.891384, 7.049]|    PLUTO|[45.736298, 8.507]|   2.302913999999995|
|    GATTO|  1014|      23|    346001|    4972736|          99|[44.891384, 7.049]|    PIPPO| [44.387363, 8.03]|  0.4769789999999974|
|    GATTO|  1014|      23|    346001|    4972736|          99|[44.891384, 7.049]|   DISNEY|[45.34023, 7.3849]|  0.7847459999999957|
|    GATTO|  1014|      23|    346001|    4972736|          99|[44.891384, 7.049]|    LEONE|[45.859577, 7.812]|  1.7311929999999993|
|    GATTO|  1014|      23|    346001|    4972736|          99|[44.891384, 7.049]|     HULK|[44.38471, 7.4833]|-0.07237400000000438|
|   DISNEY|  1014|      72|    373467|    5022016|          84|[45.34023, 7.3849]|    PLUTO|[45.736298, 8.507]|  1.5181679999999993|
|   DISNEY|  1014|      72|    373467|    5022016|          84|[45.34023, 7.3849]|    PIPPO| [44.387363, 8.03]|-0.30776699999999835|
|   DISNEY|  1014|      72|    373467|    5022016|          84|[45.34023, 7.3849]|    GATTO|[44.891384, 7.049]| -0.7847459999999957|
|   DISNEY|  1014|      72|    373467|    5022016|          84|[45.34023, 7.3849]|    LEONE|[45.859577, 7.812]|  0.9464470000000036|
|   DISNEY|  1014|      72|    373467|    5022016|          84|[45.34023, 7.3849]|     HULK|[44.38471, 7.4833]| -0.8571200000000001|
+---------+------+--------+----------+-----------+------------+------------------+---------+------------------+--------------------+
only showing top 20 rows
0
Lamanus 25 ago. 2020 a las 14:14