Skip to content

Utils

getSV

Retrieve the closest street view image(s) near a coordinate using the Mapillary API.

Parameters:

Name Type Description Default
centroid

The coordinates (geometry.centroid of GeoDataFrame)

required
epsg int

EPSG code for projecting the coordinates.

required
key str

Mapillary API access token.

required
multi bool

Whether to return multiple SVIs (default is False).

False
fov int

Field of view in degrees for the perspective image. Defaults to 80.

80
heading int

Camera heading in degrees. If None, it will be computed based on the house orientation.

None
pitch int

Camera pitch angle. Defaults to 10.

10
height int

Height in pixels of the returned image. Defaults to 300.

300
width int

Width in pixels of the returned image. Defaults to 400.

400

Returns:

Type Description
list[str]

list[str]: A list of images in base64 format

Source code in urbanworm/utils.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def getSV(centroid, epsg: int, key: str, multi: bool = False,
          fov: int = 80, heading: int = None, pitch: int = 10,
          height: int = 300, width: int = 400,
          year: list | tuple = None, season: str = None, time_of_day: str = None) -> list[str]:
    """
    getSV

    Retrieve the closest street view image(s) near a coordinate using the Mapillary API.

    Args:
        centroid: The coordinates (geometry.centroid of GeoDataFrame)
        epsg (int): EPSG code for projecting the coordinates.
        key (str): Mapillary API access token.
        multi (bool, optional): Whether to return multiple SVIs (default is False).
        fov (int, optional): Field of view in degrees for the perspective image. Defaults to 80.
        heading (int, optional): Camera heading in degrees. If None, it will be computed based on the house orientation.
        pitch (int, optional): Camera pitch angle. Defaults to 10.
        height (int, optional): Height in pixels of the returned image. Defaults to 300.
        width (int, optional): Width in pixels of the returned image. Defaults to 400.

    Returns:
        list[str]: A list of images in base64 format
    """
    bbox = projection(centroid, epsg)

    url = f"https://graph.mapillary.com/images?access_token={key}&fields=id,compass_angle,thumb_2048_url,captured_at,geometry&bbox={bbox}&is_pano=true"
    svis = []
    try:
        response = retry_request(url)
        if response is None:
            return []
        response = response.json()
        # find the closest image
        response = closest(centroid, response, multi, year, season, time_of_day)
        if response is None:
            return []

        for i in range(len(response)):
            # Extract Image ID, Compass Angle, image url, and coordinates
            img_heading = float(response.iloc[i, 1])
            img_url = response.iloc[i, 2]
            image_lon, image_lat = response.iloc[i, 6]
            if heading is None:
                # calculate bearing to the house
                bearing_to_house = calculate_bearing(image_lat, image_lon, centroid.y, centroid.x)
                relative_heading = (bearing_to_house - img_heading) % 360
            else:
                relative_heading = heading
            # reframe image
            svi = Equirectangular(img_url=img_url)
            sv = svi.GetPerspective(fov, relative_heading, pitch, height, width, 128)
            svis.append(sv)
        return svis
    except Exception as e:
        print(f"Error in getSV: {e}")
        return []

getOSMbuildings

Get building footprints within a bounding box from OpenStreetMap using the Overpass API.

Parameters:

Name Type Description Default
bbox tuple or list

A bounding box in the form (min_lon, min_lat, max_lon, max_lat).

required
min_area float or int

Minimum footprint area in square meters. Defaults to 0.

0
max_area float or int

Maximum footprint area in square meters. If None, no upper limit is applied.

None

Returns:

Type Description
GeoDataFrame | None

gpd.GeoDataFrame or None: A GeoDataFrame of building footprints if any are found; otherwise, None.

Source code in urbanworm/utils.py
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
def getOSMbuildings(bbox: tuple | list, min_area: float | int = 0,
                    max_area: float | int = None) -> gpd.GeoDataFrame | None:
    """
    getOSMbuildings

    Get building footprints within a bounding box from OpenStreetMap using the Overpass API.

    Args:
        bbox (tuple or list): A bounding box in the form (min_lon, min_lat, max_lon, max_lat).
        min_area (float or int): Minimum footprint area in square meters. Defaults to 0.
        max_area (float or int, optional): Maximum footprint area in square meters. If None, no upper limit is applied.

    Returns:
        gpd.GeoDataFrame or None: A GeoDataFrame of building footprints if any are found; otherwise, None.
    """
    # Extract bounding box coordinates
    min_lon, min_lat, max_lon, max_lat = bbox

    url = "https://overpass-api.de/api/interpreter"
    query = f"""
    [bbox:{max_lat},{max_lon},{min_lat},{min_lon}]
    [out:json]
    [timeout:900];
    (
        way["building"]({min_lat},{min_lon},{max_lat},{max_lon});
        relation["building"]({min_lat},{min_lon},{max_lat},{max_lon});
    );
    out geom;
    """

    payload = "data=" + requests.utils.quote(query)
    response = requests.post(url, data=payload)
    data = response.json()

    buildings = []
    for element in data.get("elements", []):
        if "geometry" in element:
            coords = [(node["lon"], node["lat"]) for node in element["geometry"]]
            if len(coords) > 2:
                polygon = Polygon(coords)
                # Approx. conversion to square meters
                area_m2 = polygon.area * (111320 ** 2)
                # Filter buildings by area
                if area_m2 >= min_area and (max_area is None or area_m2 <= max_area):
                    buildings.append(polygon)

    if len(buildings) == 0:
        return None
    # Convert to GeoDataFrame
    gdf = gpd.GeoDataFrame(geometry=buildings, crs="EPSG:4326")
    return gdf

getGlobalMLBuilding

Fetch building footprints from the Global ML Building dataset within a given bounding box.

Parameters:

Name Type Description Default
bbox tuple or list

Bounding box defined as (min_lon, min_lat, max_lon, max_lat).

required
epsg int

EPSG code for coordinate transformation. Required if min_area > 0 or max_area is specified.

None
min_area float or int

Minimum building footprint area in square meters. Defaults to 0.0.

0.0
max_area float or int

Maximum building footprint area in square meters. Defaults to None (no upper limit).

None

Returns:

Type Description
GeoDataFrame

gpd.GeoDataFrame: Filtered building footprints within the bounding box.

Source code in urbanworm/utils.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
def getGlobalMLBuilding(bbox: tuple | list, epsg: int = None, min_area: float | int = 0.0,
                        max_area: float | int = None) -> gpd.GeoDataFrame:
    """
    getGlobalMLBuilding

    Fetch building footprints from the Global ML Building dataset within a given bounding box.

    Args:
        bbox (tuple or list): Bounding box defined as (min_lon, min_lat, max_lon, max_lat).
        epsg (int, optional): EPSG code for coordinate transformation. Required if min_area > 0 or max_area is specified.
        min_area (float or int): Minimum building footprint area in square meters. Defaults to 0.0.
        max_area (float or int, optional): Maximum building footprint area in square meters. Defaults to None (no upper limit).

    Returns:
        gpd.GeoDataFrame: Filtered building footprints within the bounding box.
    """
    import mercantile
    from tqdm import tqdm
    import tempfile
    from shapely import geometry

    min_lon, min_lat, max_lon, max_lat = bbox
    aoi_geom = {
        "coordinates": [
            [
                [min_lon, min_lat],
                [min_lon, max_lat],
                [max_lon, max_lat],
                [max_lon, min_lat],
                [min_lon, min_lat]
            ]
        ],
        "type": "Polygon"
    }
    aoi_shape = geometry.shape(aoi_geom)
    # Extract bounding box coordinates
    minx, miny, maxx, maxy = aoi_shape.bounds
    # get tiles intersect bbox
    quad_keys = set()
    for tile in list(mercantile.tiles(minx, miny, maxx, maxy, zooms=9)):
        quad_keys.add(mercantile.quadkey(tile))
    quad_keys = list(quad_keys)
    # Download the building footprints for each tile and crop with bbox
    df = pd.read_csv(
        "https://minedbuildings.z5.web.core.windows.net/global-buildings/dataset-links.csv", dtype=str
    )

    idx = 0
    combined_gdf = gpd.GeoDataFrame()
    with tempfile.TemporaryDirectory() as tmpdir:
        # Download the GeoJSON files for each tile that intersects the input geometry
        tmp_fns = []
        for quad_key in tqdm(quad_keys):
            rows = df[df["QuadKey"] == quad_key]
            if rows.shape[0] == 1:
                url = rows.iloc[0]["Url"]

                df2 = pd.read_json(url, lines=True)
                df2["geometry"] = df2["geometry"].apply(geometry.shape)

                gdf = gpd.GeoDataFrame(df2, crs=4326)
                fn = os.path.join(tmpdir, f"{quad_key}.geojson")
                tmp_fns.append(fn)
                if not os.path.exists(fn):  # Skip if file already exists
                    gdf.to_file(fn, driver="GeoJSON")
            elif rows.shape[0] > 1:
                print(f"Warning: Multiple rows found for QuadKey: {quad_key}. Processing all entries.")
                for _, row in rows.iterrows():
                    url = row["Url"]
                    df2 = pd.read_json(url, lines=True)
                    df2["geometry"] = df2["geometry"].apply(geometry.shape)
                    gdf = gpd.GeoDataFrame(df2, crs=4326)
                    fn = os.path.join(tmpdir, f"{quad_key}_{_}.geojson")
                    tmp_fns.append(fn)
                    if not os.path.exists(fn):  # Skip if file already exists
                        gdf.to_file(fn, driver="GeoJSON")
            else:
                raise ValueError(f"QuadKey not found in dataset: {quad_key}")
        # Merge the GeoJSON files into a single file
        for fn in tmp_fns:
            gdf = gpd.read_file(fn)  # Read each file into a GeoDataFrame
            gdf = gdf[gdf.geometry.within(aoi_shape)]  # Filter geometries within the AOI
            gdf['id'] = range(idx, idx + len(gdf))  # Update 'id' based on idx
            idx += len(gdf)
            combined_gdf = pd.concat([combined_gdf, gdf], ignore_index=True)

    # # Reproject to a UTM CRS for accurate area measurement
    # utm_crs = combined_gdf.estimate_utm_crs()  
    # # Compute area and filter buildings by area
    # combined_gdf = combined_gdf.to_crs(utm_crs)
    # combined_gdf["area_"] = combined_gdf.geometry.area
    # combined_gdf = combined_gdf[combined_gdf["area_"] >= min_area]  # Filter min area
    # if max_area:
    #     combined_gdf = combined_gdf[combined_gdf["area_"] <= max_area]  # Filter max area

    combined_gdf = filterBF(combined_gdf, epsg, min_area, max_area)
    # Reproject back to WGS84
    combined_gdf = combined_gdf.to_crs('EPSG:4326')
    return combined_gdf