Skip to content

abs_user

User class is a parent class of all child classes specified in user module gps_synth/user.

User class has a combination of concrete and abstract methods:

  • concrete methods are meant to be called in child classes with super() function.
  • abstract methods are meant to show what methods should be in a child class, but their implementation is a subject of this child class.

User

Bases: ABC

Source code in gps_synth/common/abs_user.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
class User(ABC):
    def __init__(self, user_id: int, profile_user_config):
        self.user_id = user_id
        self.date_range = pd.date_range(
            profile_user_config["DATE_BEGGINING"],
            profile_user_config["DATE_END"],
            freq="d",
        )
        self.radius_buffer_h_w = profile_user_config["RADIUS_BUFFER_H_W"]
        self.radius_buffer_h_r = profile_user_config["RADIUS_BUFFER_H_R"]
        self.mean_move_speed_ms = profile_user_config["MEAN_MOVE_SPEED_MS"]
        self.proximity_to_road = profile_user_config["PROXIMITY_TO_ROAD"]

        self.home_id = None
        self.work_id = None
        self.regular_loc_array = None

        self.data_array = []

    def get_random_id_within_buffer(
        self, center_point: Point, radius_buffer: int, gdf_locations: GeoDataFrame
    ) -> Union[int, None]:
        """
        Find a random id of a location which is within a buffer, created around center point with specified distance.
        A center point should be surronded with some amount of needed locations, otherwise None

        Args:
            center_point (Point): A point around which create a buffer
            radius_buffer (int): A radius of a buffer
            gdf_locations (GeoDataFrame): Locations to filter with condition "within a buffer"

        Returns:
            int: Random id among filtered loctions or None if home anchor is too near to border of a place
        """

        buffer = shapely.buffer(center_point, distance=radius_buffer)
        index_list = gdf_locations[gdf_locations.within(buffer)].index
        # TODO: 20 is arbitary threshold, e.g. there could be just a case that a place does not have
        # some types of locations in many quantaties, think about how logically define this value
        # and make it as another positional argument
        if len(index_list) >= 20:
            random_id = random.choice(gdf_locations[gdf_locations.within(buffer)].index)
            return random_id

        return None

    def get_meaningful_locations(
        self,
        gdf_hw: GeoDataFrame,
        gdf_event: GeoDataFrame,
    ) -> None:
        """
        Create meaningful locations for a user: one home, one work, several regular events,
        the radius between home and work and home and regular locations are defined in the config
        The distribution of meaningful locations should follow some distance conditions.
        Store computed anchors in correponding instance attributes

        Args:
            gdf_hw (GeoDataFrame): Set of locations of a network to choose from for home and work anchors
            gdf_event (GeoDataFrame): Set of locations of a network to choose from for regular event anchors
        """
        # radius_buffer_h_w (int): Radius to create a buffer around home anchor to search for work anchor
        radius_buffer_h_w = self.radius_buffer_h_w
        # radius_buffer_h_w (int): Radius to create a buffer around home anchor to search for regular event anchors
        radius_buffer_h_r = self.radius_buffer_h_r

        home_id = random.randint(0, len(gdf_hw) - 1)
        home_geometry = gdf_hw.iloc[home_id]["geometry"]
        # TODO: too conditionally nested think about a better approach
        while True:

            work_id = self.get_random_id_within_buffer(
                home_geometry, radius_buffer_h_w, gdf_hw
            )
            # if there are not many possible work anchor locations around
            if work_id is None:
                # change home id
                home_id = random.randint(0, len(gdf_hw) - 1)
                home_geometry = gdf_hw.iloc[home_id]["geometry"]
            # if the same just choose another work id, but don't change home anchor
            elif home_id == work_id:
                continue

            else:
                regular_locations_ids = []
                number_of_regular_locations = random.randint(3, 5)
                i = 0
                while i <= number_of_regular_locations:
                    regular_id = self.get_random_id_within_buffer(
                        home_geometry, radius_buffer_h_r, gdf_event
                    )

                    # if there are not many possible regular event anchors around increase search radius
                    if regular_id is None:
                        radius_buffer_h_r += 100
                    # if id is already used, chooses another one
                    elif (
                        regular_id in regular_locations_ids
                        or regular_id == home_id
                        or regular_id == work_id
                    ):
                        continue
                    else:
                        regular_locations_ids.append(regular_id)
                        i += 1

                self.home_id = home_id
                self.work_id = work_id
                self.regular_loc_array = regular_locations_ids
                break

    def get_regular_or_random_loc(
        self,
        gdf_event: GeoDataFrame,
        regular_location_ids: List[int],
        number_of_events: int,
    ) -> List[int]:
        """
        Randomly create a list with specified number of event ids, which could be either from regular event locations or completely accidental

        Args:
            gdf_event (GeoDataFrame): Set of locations of a network to choose from for regular event anchors
            regular_location_ids (List[int]): List of regular event locations' ids
            number_of_events (int): A number of event ids tom create

        Returns:
            List[int]: List of event ids for a userto visit within a day
        """
        event_id_list = []
        while len(event_id_list) < number_of_events:
            choose_reg_or_random = random.choices(
                ["reg", "random"], weights=[0.6, 0.4], k=1
            )[0]
            if choose_reg_or_random == "reg":
                event_id = random.choice(regular_location_ids)
            else:
                event_id = random.randint(0, len(gdf_event) - 1)

            if event_id not in event_id_list:
                event_id_list.append(event_id)
            else:
                continue

        return event_id_list

    def get_info_about_loc(
        self, df_loc: DataFrame, list_of_ids: List[int]
    ) -> List[List[Union[int, float]]]:
        """
        Based on id of a location find some information about it and store in a list

        Args:
            df_loc (DataFrame): Set of locations of a network and their features to search in based on id
            list_of_ids (List[int]): List of locations' ids to derive some information about

        Returns:
            List[List[Union[int, float]]]: List of lists, each element has three items: nearest node id and lat and lon coordinates of location's centroid
        """

        list_of_info = []
        for loc_id in list_of_ids:
            list_of_info.append(
                [
                    df_loc.iloc[loc_id]["nearest_node_id"],
                    df_loc.iloc[loc_id]["geometry"].x,
                    df_loc.iloc[loc_id]["geometry"].y,
                ]
            )

        return list_of_info

    def create_list_of_locations(
        self,
        gdf_hw: GeoDataFrame,
        gdf_event: GeoDataFrame,
        home_id: int,
        work_id: int,
        regular_location_ids: List[int],
        day_of_week: int,
    ) -> List[List[Union[int, float]]]:
        """
        Based on day of week define type and number of locations to visit for a user within a day
        and derive information about them

        Args:
            gdf_hw (GeoDataFrame): Set of locations (and their features) of a network to use from for home and work anchors
            gdf_event (GeoDataFrame): Set of locations (and their features) of a network to use for regular and random event anchors
            home_id (int): Id of home anchor
            work_id (int): Id of work anchor
            regular_location_ids (List[int]): List of regular event locations' ids
            day_of_week (int): _description_

        Returns:
            List[List[Union[int, float]]]: List of lists, each element has three items: nearest node id and lat and lon coordinates of location's centroid

        """
        if day_of_week < 6:
            number_of_events = random.choices(
                [0, 1, 2, 3], weights=[0.6, 0.25, 0.1, 0.05], k=1
            )[0]
            list_of_ids = [home_id, work_id]
        else:
            number_of_events = random.choices(
                [0, 1, 2, 3, 4], weights=[0.1, 0.2, 0.30, 0.25, 0.15], k=1
            )[0]
            list_of_ids = [home_id]

        event_id_list = self.get_regular_or_random_loc(
            gdf_event, regular_location_ids, number_of_events
        )

        list_of_locations_not_event = self.get_info_about_loc(gdf_hw, list_of_ids)
        list_of_locations_event = self.get_info_about_loc(gdf_event, event_id_list)

        list_of_locations = list_of_locations_not_event + list_of_locations_event

        return list_of_locations

    def get_static_points(
        self,
        user_id: int,
        data_array: List[List[Union[int, float, Timestamp]]],
        transformer_to_WGS: Transformer,
        startlon: float,
        startlat: float,
        time_start: Timestamp,
        time_end: Timestamp,
    ) -> Timestamp:
        """
        Generate the nearby points around some coordinates (the centroid point of a user’s location)

        Args:
            user_id (int): Id of a user
            data_array (List[List[Union[int, float, Timestamp]]]): List to store user's GPS data (user_id, lon, lat, timestamp)
            startlon (float): Longitude of a point where to start generating nearby points (more precisely their coordinates)
            startlat (float): Latitude of a point where to start generating nearby points (more precisely their coordinates)
            time_start (Timestamp): Timestamp from which to start generating static points
            time_end (Timestamp): Upper timestamp limit of genaration

        Returns:
            Timestamp: Time from which to start generating GPS data for another activity
        """
        time_start += timedelta(minutes=1)
        startlon, startlat = transformer_to_WGS.transform(startlon, startlat)
        while time_start < time_end:
            random_minutes = random.randint(1, 5)
            possible_forward_azimuth = random.randint(0, 360)
            possible_distance = random.randint(0, 5)  # metres
            endLon, endLat, _ = Geod(ellps="WGS84").fwd(
                startlon, startlat, possible_forward_azimuth, possible_distance
            )
            time_gps = time_start
            data_array.append([user_id, time_gps, endLon, endLat])
            time_start += timedelta(minutes=random_minutes)

        return time_start.round(freq="s")

    def get_points_on_path(
        self, path: LineString, number_of_points: int
    ) -> List[Point]:
        """
        Generate mostly equally distanced points along path between its start and end point

        Args:
            path (LineString): A line along which to generate points
            number_of_points (int): Number of points to generate along the path (it includes the start and end point)

        Returns:
            List[Point]: List of points placed on the path
        """

        distances = np.linspace(0, path.length, number_of_points)
        points = [path.interpolate(distance) for distance in distances]

        return points

    def get_chaotic_point(
        self,
        point_start: Point,
        point_end: Point,
        radius_of_buffer: int,
        proximity_to_road: int,
    ) -> Point:
        """
        Produce one chaotic point between two points
        meaning that with very high likelihood it will not be located on the path but near to it.
        Applied to make a movement look more humanlike

        Args:
            point_start (Point): _description_
            point_end (Point): _description_
            radius_of_buffer (int): A radius to define a potential space for a chaotic point
            proximity_to_road (int): A distance to define how a chaotic point should be from a path

        Returns:
            Point: A chaotic point
        """

        points_intersection = point_start.buffer(radius_of_buffer).intersection(
            point_end.buffer(radius_of_buffer)
        )
        path_between_points = LineString([point_start, point_end])
        final_intersection = points_intersection.intersection(
            path_between_points.buffer(proximity_to_road)
        )
        min_x, min_y, max_x, max_y = final_intersection.bounds
        while True:
            chaotic_point = Point(
                [random.uniform(min_x, max_x), random.uniform(min_y, max_y)]
            )
            if chaotic_point.within(final_intersection):
                return chaotic_point

    def get_moving_points(
        self,
        user_id: int,
        data_array: List[List[Union[int, float, Timestamp]]],
        graph_proj: MultiDiGraph,
        nodes: GeoDataFrame,
        transformer_to_WGS: Transformer,
        start_node: int,
        end_node: int,
        start_coords: Tuple[float, float],
        end_coords: Tuple[float, float],
        mean_move_speed_ms: Union[int, float],
        proximity_to_road: int,
        time_start: Timestamp,
    ) -> Timestamp:
        """
        First create route between origin and destination locations, interpolate this path with points,
        and create GPS data while moving from point to point

        Args:
            user_id (int): Id of a user
            data_array (List[List[Union[int, float, Timestamp]]]): List to store user's GPS data (user_id, lon, lat, timestamp)
            graph_proj (MultiDiGraph): Projected graph of a network
            nodes (GeoDataFrame): Nodes of netwrok's projected graph
            start_node (int): Id of the nearest node to a start location
            end_node (int): Id of the nearest node to an end location
            start_coords Tuple[float, float]: Lon and lat of start location
            end_coords Tuple[float, float]: Lon and lat of end location
            mean_move_speed_ms Union[int, float]: _description_
            proximity_to_road (int): A distance to define how a chaotic point should be from a path
            time_start (Timestamp): Timestamp from which to start generating moving points

        Returns:
            Timestamp: Time from which to start generating GPS data for another activity
        """
        # get the shortest route from start to end node
        route = ox.distance.shortest_path(
            graph_proj, start_node, end_node, weight="length"
        )
        route_nodes = nodes.loc[route]
        route_list = list(route_nodes.geometry.values)
        # add start location's coordinates to the beggining
        # add end location's coordinates to the end
        # not all always locations are near to a network
        route_list.insert(0, Point(start_coords[0], start_coords[1]))
        route_list.append(Point(end_coords[0], end_coords[1]))
        path = LineString(route_list)

        # to make sure that the time difference between
        # two consecutive points is not higher than 10 seconds
        # To mimic GPS tracking frequency (it could be even 1 second, but then the amount of data could be enormous)
        min_dist_between_conseq_points = mean_move_speed_ms * 10

        if path.length <= min_dist_between_conseq_points:
            number_of_points = 2
        else:
            number_of_points = math.ceil(path.length / min_dist_between_conseq_points)

        points = self.get_points_on_path(path, number_of_points)

        # iterate through each point of created route
        for i in range(number_of_points):
            # even though the actual path and points are in projected CRS
            # the final coordinates should be in WGS 84
            endLon, endLat = transformer_to_WGS.transform(points[i].x, points[i].y)
            # if not a last point calculate a chaotic point
            if i != number_of_points - 1:
                chaotic_point = self.get_chaotic_point(
                    points[i],
                    points[i + 1],
                    min_dist_between_conseq_points,
                    proximity_to_road,
                )
                distance_to_chaotic_point = LineString(
                    [points[i], chaotic_point]
                ).length
                # discard situations when start point and a chaotic point are too close and thus time difference would be too small
                # we build the model and don't want to use a lot of memory
                if distance_to_chaotic_point < mean_move_speed_ms * 2:
                    time_to_chaotic_point = 2
                else:
                    time_to_chaotic_point = (
                        distance_to_chaotic_point / mean_move_speed_ms
                    )

                # Add current point's coordinates and the time it was registered in data array
                time_gps = time_start.round(freq="s")
                data_array.append([user_id, time_gps, endLon, endLat])

                # Add time taken to reach a chaotic point
                time_start += timedelta(seconds=time_to_chaotic_point)

                # Change the current coordinates to coordinate of a chaotic point and project to WGS 84
                endLon, endLat = transformer_to_WGS.transform(
                    chaotic_point.x, chaotic_point.y
                )
                # Lenght from chaotic point to the next point or maybe it will be more clear - end point
                distance_to_next_point = LineString(
                    [chaotic_point, points[i + 1]]
                ).length
                # discard to precise situations
                if distance_to_next_point < mean_move_speed_ms * 2:
                    time_to_next_point = 2
                else:
                    time_to_next_point = distance_to_next_point / mean_move_speed_ms

                # Add chaotic point's coordinates and the time it was registered in data array
                time_gps = time_start.round(freq="s")
                data_array.append([user_id, time_gps, endLon, endLat])

                # Add time taken to reach a the next point
                # It will become a start time of the next iteration of a loop
                time_start += timedelta(seconds=time_to_next_point)

            # if last point in the route - add it and its time to data array
            else:
                endLon, endLat = transformer_to_WGS.transform(points[i].x, points[i].y)
                time_gps = time_start.round(freq="s")
                data_array.append([user_id, time_gps, endLon, endLat])

        return time_start.round(freq="s")

    @abstractmethod
    def random_plot_of_day(
        self,
        time_start: Timestamp,
        beggining_of_day: Timestamp,
        day_of_week: int,
        list_of_locations: List[List[Union[int, float]]],
        network_graph_proj: MultiDiGraph,
        network_nodes: GeoDataFrame,
        transformer_to_WGS: Transformer,
    ) -> Timestamp:
        # pylint: disable=missing-function-docstring
        pass

    @abstractmethod
    def generate_gps(
        self,
        network_gdf_hw: GeoDataFrame,
        network_gdf_event: GeoDataFrame,
        network_graph_proj: MultiDiGraph,
        network_nodes: GeoDataFrame,
        transformer_to_WGS: Transformer,
    ):
        # pylint: disable=missing-function-docstring
        pass

create_list_of_locations(gdf_hw, gdf_event, home_id, work_id, regular_location_ids, day_of_week)

Based on day of week define type and number of locations to visit for a user within a day and derive information about them

Parameters:

Name Type Description Default
gdf_hw GeoDataFrame

Set of locations (and their features) of a network to use from for home and work anchors

required
gdf_event GeoDataFrame

Set of locations (and their features) of a network to use for regular and random event anchors

required
home_id int

Id of home anchor

required
work_id int

Id of work anchor

required
regular_location_ids List[int]

List of regular event locations' ids

required
day_of_week int

description

required

Returns:

Type Description
List[List[Union[int, float]]]

List[List[Union[int, float]]]: List of lists, each element has three items: nearest node id and lat and lon coordinates of location's centroid

Source code in gps_synth/common/abs_user.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
def create_list_of_locations(
    self,
    gdf_hw: GeoDataFrame,
    gdf_event: GeoDataFrame,
    home_id: int,
    work_id: int,
    regular_location_ids: List[int],
    day_of_week: int,
) -> List[List[Union[int, float]]]:
    """
    Based on day of week define type and number of locations to visit for a user within a day
    and derive information about them

    Args:
        gdf_hw (GeoDataFrame): Set of locations (and their features) of a network to use from for home and work anchors
        gdf_event (GeoDataFrame): Set of locations (and their features) of a network to use for regular and random event anchors
        home_id (int): Id of home anchor
        work_id (int): Id of work anchor
        regular_location_ids (List[int]): List of regular event locations' ids
        day_of_week (int): _description_

    Returns:
        List[List[Union[int, float]]]: List of lists, each element has three items: nearest node id and lat and lon coordinates of location's centroid

    """
    if day_of_week < 6:
        number_of_events = random.choices(
            [0, 1, 2, 3], weights=[0.6, 0.25, 0.1, 0.05], k=1
        )[0]
        list_of_ids = [home_id, work_id]
    else:
        number_of_events = random.choices(
            [0, 1, 2, 3, 4], weights=[0.1, 0.2, 0.30, 0.25, 0.15], k=1
        )[0]
        list_of_ids = [home_id]

    event_id_list = self.get_regular_or_random_loc(
        gdf_event, regular_location_ids, number_of_events
    )

    list_of_locations_not_event = self.get_info_about_loc(gdf_hw, list_of_ids)
    list_of_locations_event = self.get_info_about_loc(gdf_event, event_id_list)

    list_of_locations = list_of_locations_not_event + list_of_locations_event

    return list_of_locations

get_chaotic_point(point_start, point_end, radius_of_buffer, proximity_to_road)

Produce one chaotic point between two points meaning that with very high likelihood it will not be located on the path but near to it. Applied to make a movement look more humanlike

Parameters:

Name Type Description Default
point_start Point

description

required
point_end Point

description

required
radius_of_buffer int

A radius to define a potential space for a chaotic point

required
proximity_to_road int

A distance to define how a chaotic point should be from a path

required

Returns:

Name Type Description
Point Point

A chaotic point

Source code in gps_synth/common/abs_user.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
def get_chaotic_point(
    self,
    point_start: Point,
    point_end: Point,
    radius_of_buffer: int,
    proximity_to_road: int,
) -> Point:
    """
    Produce one chaotic point between two points
    meaning that with very high likelihood it will not be located on the path but near to it.
    Applied to make a movement look more humanlike

    Args:
        point_start (Point): _description_
        point_end (Point): _description_
        radius_of_buffer (int): A radius to define a potential space for a chaotic point
        proximity_to_road (int): A distance to define how a chaotic point should be from a path

    Returns:
        Point: A chaotic point
    """

    points_intersection = point_start.buffer(radius_of_buffer).intersection(
        point_end.buffer(radius_of_buffer)
    )
    path_between_points = LineString([point_start, point_end])
    final_intersection = points_intersection.intersection(
        path_between_points.buffer(proximity_to_road)
    )
    min_x, min_y, max_x, max_y = final_intersection.bounds
    while True:
        chaotic_point = Point(
            [random.uniform(min_x, max_x), random.uniform(min_y, max_y)]
        )
        if chaotic_point.within(final_intersection):
            return chaotic_point

get_info_about_loc(df_loc, list_of_ids)

Based on id of a location find some information about it and store in a list

Parameters:

Name Type Description Default
df_loc DataFrame

Set of locations of a network and their features to search in based on id

required
list_of_ids List[int]

List of locations' ids to derive some information about

required

Returns:

Type Description
List[List[Union[int, float]]]

List[List[Union[int, float]]]: List of lists, each element has three items: nearest node id and lat and lon coordinates of location's centroid

Source code in gps_synth/common/abs_user.py
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def get_info_about_loc(
    self, df_loc: DataFrame, list_of_ids: List[int]
) -> List[List[Union[int, float]]]:
    """
    Based on id of a location find some information about it and store in a list

    Args:
        df_loc (DataFrame): Set of locations of a network and their features to search in based on id
        list_of_ids (List[int]): List of locations' ids to derive some information about

    Returns:
        List[List[Union[int, float]]]: List of lists, each element has three items: nearest node id and lat and lon coordinates of location's centroid
    """

    list_of_info = []
    for loc_id in list_of_ids:
        list_of_info.append(
            [
                df_loc.iloc[loc_id]["nearest_node_id"],
                df_loc.iloc[loc_id]["geometry"].x,
                df_loc.iloc[loc_id]["geometry"].y,
            ]
        )

    return list_of_info

get_meaningful_locations(gdf_hw, gdf_event)

Create meaningful locations for a user: one home, one work, several regular events, the radius between home and work and home and regular locations are defined in the config The distribution of meaningful locations should follow some distance conditions. Store computed anchors in correponding instance attributes

Parameters:

Name Type Description Default
gdf_hw GeoDataFrame

Set of locations of a network to choose from for home and work anchors

required
gdf_event GeoDataFrame

Set of locations of a network to choose from for regular event anchors

required
Source code in gps_synth/common/abs_user.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
def get_meaningful_locations(
    self,
    gdf_hw: GeoDataFrame,
    gdf_event: GeoDataFrame,
) -> None:
    """
    Create meaningful locations for a user: one home, one work, several regular events,
    the radius between home and work and home and regular locations are defined in the config
    The distribution of meaningful locations should follow some distance conditions.
    Store computed anchors in correponding instance attributes

    Args:
        gdf_hw (GeoDataFrame): Set of locations of a network to choose from for home and work anchors
        gdf_event (GeoDataFrame): Set of locations of a network to choose from for regular event anchors
    """
    # radius_buffer_h_w (int): Radius to create a buffer around home anchor to search for work anchor
    radius_buffer_h_w = self.radius_buffer_h_w
    # radius_buffer_h_w (int): Radius to create a buffer around home anchor to search for regular event anchors
    radius_buffer_h_r = self.radius_buffer_h_r

    home_id = random.randint(0, len(gdf_hw) - 1)
    home_geometry = gdf_hw.iloc[home_id]["geometry"]
    # TODO: too conditionally nested think about a better approach
    while True:

        work_id = self.get_random_id_within_buffer(
            home_geometry, radius_buffer_h_w, gdf_hw
        )
        # if there are not many possible work anchor locations around
        if work_id is None:
            # change home id
            home_id = random.randint(0, len(gdf_hw) - 1)
            home_geometry = gdf_hw.iloc[home_id]["geometry"]
        # if the same just choose another work id, but don't change home anchor
        elif home_id == work_id:
            continue

        else:
            regular_locations_ids = []
            number_of_regular_locations = random.randint(3, 5)
            i = 0
            while i <= number_of_regular_locations:
                regular_id = self.get_random_id_within_buffer(
                    home_geometry, radius_buffer_h_r, gdf_event
                )

                # if there are not many possible regular event anchors around increase search radius
                if regular_id is None:
                    radius_buffer_h_r += 100
                # if id is already used, chooses another one
                elif (
                    regular_id in regular_locations_ids
                    or regular_id == home_id
                    or regular_id == work_id
                ):
                    continue
                else:
                    regular_locations_ids.append(regular_id)
                    i += 1

            self.home_id = home_id
            self.work_id = work_id
            self.regular_loc_array = regular_locations_ids
            break

get_moving_points(user_id, data_array, graph_proj, nodes, transformer_to_WGS, start_node, end_node, start_coords, end_coords, mean_move_speed_ms, proximity_to_road, time_start)

First create route between origin and destination locations, interpolate this path with points, and create GPS data while moving from point to point

Parameters:

Name Type Description Default
user_id int

Id of a user

required
data_array List[List[Union[int, float, Timestamp]]]

List to store user's GPS data (user_id, lon, lat, timestamp)

required
graph_proj MultiDiGraph

Projected graph of a network

required
nodes GeoDataFrame

Nodes of netwrok's projected graph

required
start_node int

Id of the nearest node to a start location

required
end_node int

Id of the nearest node to an end location

required
start_coords Tuple[float, float]

Lon and lat of start location

required
end_coords Tuple[float, float]

Lon and lat of end location

required
mean_move_speed_ms Union[int, float]

description

required
proximity_to_road int

A distance to define how a chaotic point should be from a path

required
time_start Timestamp

Timestamp from which to start generating moving points

required

Returns:

Name Type Description
Timestamp Timestamp

Time from which to start generating GPS data for another activity

Source code in gps_synth/common/abs_user.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def get_moving_points(
    self,
    user_id: int,
    data_array: List[List[Union[int, float, Timestamp]]],
    graph_proj: MultiDiGraph,
    nodes: GeoDataFrame,
    transformer_to_WGS: Transformer,
    start_node: int,
    end_node: int,
    start_coords: Tuple[float, float],
    end_coords: Tuple[float, float],
    mean_move_speed_ms: Union[int, float],
    proximity_to_road: int,
    time_start: Timestamp,
) -> Timestamp:
    """
    First create route between origin and destination locations, interpolate this path with points,
    and create GPS data while moving from point to point

    Args:
        user_id (int): Id of a user
        data_array (List[List[Union[int, float, Timestamp]]]): List to store user's GPS data (user_id, lon, lat, timestamp)
        graph_proj (MultiDiGraph): Projected graph of a network
        nodes (GeoDataFrame): Nodes of netwrok's projected graph
        start_node (int): Id of the nearest node to a start location
        end_node (int): Id of the nearest node to an end location
        start_coords Tuple[float, float]: Lon and lat of start location
        end_coords Tuple[float, float]: Lon and lat of end location
        mean_move_speed_ms Union[int, float]: _description_
        proximity_to_road (int): A distance to define how a chaotic point should be from a path
        time_start (Timestamp): Timestamp from which to start generating moving points

    Returns:
        Timestamp: Time from which to start generating GPS data for another activity
    """
    # get the shortest route from start to end node
    route = ox.distance.shortest_path(
        graph_proj, start_node, end_node, weight="length"
    )
    route_nodes = nodes.loc[route]
    route_list = list(route_nodes.geometry.values)
    # add start location's coordinates to the beggining
    # add end location's coordinates to the end
    # not all always locations are near to a network
    route_list.insert(0, Point(start_coords[0], start_coords[1]))
    route_list.append(Point(end_coords[0], end_coords[1]))
    path = LineString(route_list)

    # to make sure that the time difference between
    # two consecutive points is not higher than 10 seconds
    # To mimic GPS tracking frequency (it could be even 1 second, but then the amount of data could be enormous)
    min_dist_between_conseq_points = mean_move_speed_ms * 10

    if path.length <= min_dist_between_conseq_points:
        number_of_points = 2
    else:
        number_of_points = math.ceil(path.length / min_dist_between_conseq_points)

    points = self.get_points_on_path(path, number_of_points)

    # iterate through each point of created route
    for i in range(number_of_points):
        # even though the actual path and points are in projected CRS
        # the final coordinates should be in WGS 84
        endLon, endLat = transformer_to_WGS.transform(points[i].x, points[i].y)
        # if not a last point calculate a chaotic point
        if i != number_of_points - 1:
            chaotic_point = self.get_chaotic_point(
                points[i],
                points[i + 1],
                min_dist_between_conseq_points,
                proximity_to_road,
            )
            distance_to_chaotic_point = LineString(
                [points[i], chaotic_point]
            ).length
            # discard situations when start point and a chaotic point are too close and thus time difference would be too small
            # we build the model and don't want to use a lot of memory
            if distance_to_chaotic_point < mean_move_speed_ms * 2:
                time_to_chaotic_point = 2
            else:
                time_to_chaotic_point = (
                    distance_to_chaotic_point / mean_move_speed_ms
                )

            # Add current point's coordinates and the time it was registered in data array
            time_gps = time_start.round(freq="s")
            data_array.append([user_id, time_gps, endLon, endLat])

            # Add time taken to reach a chaotic point
            time_start += timedelta(seconds=time_to_chaotic_point)

            # Change the current coordinates to coordinate of a chaotic point and project to WGS 84
            endLon, endLat = transformer_to_WGS.transform(
                chaotic_point.x, chaotic_point.y
            )
            # Lenght from chaotic point to the next point or maybe it will be more clear - end point
            distance_to_next_point = LineString(
                [chaotic_point, points[i + 1]]
            ).length
            # discard to precise situations
            if distance_to_next_point < mean_move_speed_ms * 2:
                time_to_next_point = 2
            else:
                time_to_next_point = distance_to_next_point / mean_move_speed_ms

            # Add chaotic point's coordinates and the time it was registered in data array
            time_gps = time_start.round(freq="s")
            data_array.append([user_id, time_gps, endLon, endLat])

            # Add time taken to reach a the next point
            # It will become a start time of the next iteration of a loop
            time_start += timedelta(seconds=time_to_next_point)

        # if last point in the route - add it and its time to data array
        else:
            endLon, endLat = transformer_to_WGS.transform(points[i].x, points[i].y)
            time_gps = time_start.round(freq="s")
            data_array.append([user_id, time_gps, endLon, endLat])

    return time_start.round(freq="s")

get_points_on_path(path, number_of_points)

Generate mostly equally distanced points along path between its start and end point

Parameters:

Name Type Description Default
path LineString

A line along which to generate points

required
number_of_points int

Number of points to generate along the path (it includes the start and end point)

required

Returns:

Type Description
List[Point]

List[Point]: List of points placed on the path

Source code in gps_synth/common/abs_user.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
def get_points_on_path(
    self, path: LineString, number_of_points: int
) -> List[Point]:
    """
    Generate mostly equally distanced points along path between its start and end point

    Args:
        path (LineString): A line along which to generate points
        number_of_points (int): Number of points to generate along the path (it includes the start and end point)

    Returns:
        List[Point]: List of points placed on the path
    """

    distances = np.linspace(0, path.length, number_of_points)
    points = [path.interpolate(distance) for distance in distances]

    return points

get_random_id_within_buffer(center_point, radius_buffer, gdf_locations)

Find a random id of a location which is within a buffer, created around center point with specified distance. A center point should be surronded with some amount of needed locations, otherwise None

Parameters:

Name Type Description Default
center_point Point

A point around which create a buffer

required
radius_buffer int

A radius of a buffer

required
gdf_locations GeoDataFrame

Locations to filter with condition "within a buffer"

required

Returns:

Name Type Description
int Union[int, None]

Random id among filtered loctions or None if home anchor is too near to border of a place

Source code in gps_synth/common/abs_user.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def get_random_id_within_buffer(
    self, center_point: Point, radius_buffer: int, gdf_locations: GeoDataFrame
) -> Union[int, None]:
    """
    Find a random id of a location which is within a buffer, created around center point with specified distance.
    A center point should be surronded with some amount of needed locations, otherwise None

    Args:
        center_point (Point): A point around which create a buffer
        radius_buffer (int): A radius of a buffer
        gdf_locations (GeoDataFrame): Locations to filter with condition "within a buffer"

    Returns:
        int: Random id among filtered loctions or None if home anchor is too near to border of a place
    """

    buffer = shapely.buffer(center_point, distance=radius_buffer)
    index_list = gdf_locations[gdf_locations.within(buffer)].index
    # TODO: 20 is arbitary threshold, e.g. there could be just a case that a place does not have
    # some types of locations in many quantaties, think about how logically define this value
    # and make it as another positional argument
    if len(index_list) >= 20:
        random_id = random.choice(gdf_locations[gdf_locations.within(buffer)].index)
        return random_id

    return None

get_regular_or_random_loc(gdf_event, regular_location_ids, number_of_events)

Randomly create a list with specified number of event ids, which could be either from regular event locations or completely accidental

Parameters:

Name Type Description Default
gdf_event GeoDataFrame

Set of locations of a network to choose from for regular event anchors

required
regular_location_ids List[int]

List of regular event locations' ids

required
number_of_events int

A number of event ids tom create

required

Returns:

Type Description
List[int]

List[int]: List of event ids for a userto visit within a day

Source code in gps_synth/common/abs_user.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
def get_regular_or_random_loc(
    self,
    gdf_event: GeoDataFrame,
    regular_location_ids: List[int],
    number_of_events: int,
) -> List[int]:
    """
    Randomly create a list with specified number of event ids, which could be either from regular event locations or completely accidental

    Args:
        gdf_event (GeoDataFrame): Set of locations of a network to choose from for regular event anchors
        regular_location_ids (List[int]): List of regular event locations' ids
        number_of_events (int): A number of event ids tom create

    Returns:
        List[int]: List of event ids for a userto visit within a day
    """
    event_id_list = []
    while len(event_id_list) < number_of_events:
        choose_reg_or_random = random.choices(
            ["reg", "random"], weights=[0.6, 0.4], k=1
        )[0]
        if choose_reg_or_random == "reg":
            event_id = random.choice(regular_location_ids)
        else:
            event_id = random.randint(0, len(gdf_event) - 1)

        if event_id not in event_id_list:
            event_id_list.append(event_id)
        else:
            continue

    return event_id_list

get_static_points(user_id, data_array, transformer_to_WGS, startlon, startlat, time_start, time_end)

Generate the nearby points around some coordinates (the centroid point of a user’s location)

Parameters:

Name Type Description Default
user_id int

Id of a user

required
data_array List[List[Union[int, float, Timestamp]]]

List to store user's GPS data (user_id, lon, lat, timestamp)

required
startlon float

Longitude of a point where to start generating nearby points (more precisely their coordinates)

required
startlat float

Latitude of a point where to start generating nearby points (more precisely their coordinates)

required
time_start Timestamp

Timestamp from which to start generating static points

required
time_end Timestamp

Upper timestamp limit of genaration

required

Returns:

Name Type Description
Timestamp Timestamp

Time from which to start generating GPS data for another activity

Source code in gps_synth/common/abs_user.py
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def get_static_points(
    self,
    user_id: int,
    data_array: List[List[Union[int, float, Timestamp]]],
    transformer_to_WGS: Transformer,
    startlon: float,
    startlat: float,
    time_start: Timestamp,
    time_end: Timestamp,
) -> Timestamp:
    """
    Generate the nearby points around some coordinates (the centroid point of a user’s location)

    Args:
        user_id (int): Id of a user
        data_array (List[List[Union[int, float, Timestamp]]]): List to store user's GPS data (user_id, lon, lat, timestamp)
        startlon (float): Longitude of a point where to start generating nearby points (more precisely their coordinates)
        startlat (float): Latitude of a point where to start generating nearby points (more precisely their coordinates)
        time_start (Timestamp): Timestamp from which to start generating static points
        time_end (Timestamp): Upper timestamp limit of genaration

    Returns:
        Timestamp: Time from which to start generating GPS data for another activity
    """
    time_start += timedelta(minutes=1)
    startlon, startlat = transformer_to_WGS.transform(startlon, startlat)
    while time_start < time_end:
        random_minutes = random.randint(1, 5)
        possible_forward_azimuth = random.randint(0, 360)
        possible_distance = random.randint(0, 5)  # metres
        endLon, endLat, _ = Geod(ellps="WGS84").fwd(
            startlon, startlat, possible_forward_azimuth, possible_distance
        )
        time_gps = time_start
        data_array.append([user_id, time_gps, endLon, endLat])
        time_start += timedelta(minutes=random_minutes)

    return time_start.round(freq="s")