Commit 88c93839 authored by Thomas Graichen's avatar Thomas Graichen

Add PyOsmium dependency and implement OSM writing

- Currently, there is no determination of stop sign kind. So it's unclear whether it is for busses, trams, ferries, etc...
parent f8a0bd35
import os
import sys
from io import StringIO
from datetime import date
from enum import Enum
from typing import Sequence
import osmium
from lxml import etree
from osmium.osm import create_mutable_relation, create_mutable_node
from osmium.osm import Location
from osmium.osm._osm import RelationMember
from osmium.osm.mutable import Relation, Node
from zhv_parser import ZHV_Classes
from zhv_parser.RegionCode import RegionCode
from zhv_parser.ZHV_Classes import StopPlace, Area, Quay, Name
# ZHV related XML tags, elements
namespace: str = "{http://zhv.xml.wvi/}"
tag_stop_place: str = namespace + "StopPlace"
tag_area: str = namespace + "Area"
......@@ -15,67 +24,183 @@ tag_quay: str = namespace + "Quay"
event_start: str = "start"
event_end: str = "end"
def parse(xml_file):
context = etree.iterparse(xml_file, events=(event_start, event_end),
tag=(tag_stop_place, tag_area, tag_quay))
# TODO: Übergebe context (iterator) an nächste Stufe zur Extraktion der Daten
for action, elem in context:
if elem.tag == tag_stop_place and action == event_start:
stop_place_class = ZHV_Classes.StopPlace
stop_place = stop_place_class.factory()
stop_place.build(elem)
if is_stop_place_in_region(RegionCode.BRANDENBURG, stop_place):
print(stop_place)
stop_place_to_osm(stop_place, context)
else:
print("Not in Region!")
def is_stop_place_in_region(region_code: RegionCode, stop_place: StopPlace):
return region_code.isStopPlaceInRegion(stop_place)
def stop_place_to_osm(stop_place: StopPlace, context):
"""Converts stop place to an OSM object """
# extract stop place parameters
# TODO: Ask ZHV: Why are names double encapsulated?
stop_place_name: str = stop_place.get_Name().get_Name()[0].get_valueOf_()
for action, elem in context:
if elem.tag == tag_area and action == event_start:
area_class = ZHV_Classes.Area
area = area_class.factory()
area.build(elem)
area_to_osm(area, context)
elif elem.tag == tag_stop_place and action == event_end:
return
def area_to_osm(area: Area, context):
# extract area parameters
for action, elem in context:
if elem.tag == tag_quay and action == event_start:
quay_class = ZHV_Classes.Quay
quay = quay_class.factory()
quay.build(elem)
quay_to_osm(quay, context)
elif elem.tag == tag_area and action == event_end:
return
def quay_to_osm(quay: Quay, context):
# extract quay parameters
# TODO: Ask ZHV: Why are names double encapsulated?
quay_name: str = quay.get_Name().get_Name()[0].get_valueOf_()
quay_dhid: str = quay.get_DHID()
quay_lat: str = quay.get_Location().get_Latitude()
quay_lon: str = quay.get_Location().get_Longitude()
# return to previous structure hierarchy
for action, elem in context:
if elem.tag == tag_quay and action == event_end:
return
# OSM related PT tags: https://wiki.openstreetmap.org/wiki/DE:Public_transport
# General tags:
osm_tag_name = ('name', '') # example: name=TU Campus -> used in relation
# -> for search machines -> used at the stop position
osm_tag_ref = ('ref', '') # example: ref=TU Campus, Bstg. 3A -> used at the stop platform or stop sign
osm_tag_ref_ifopt = ('ref:IFOPT', '')
# OSM Tags for stop place (relation): also use OSM tag "name=*"
osm_tag_type_pt = ('type', 'public_transport')
osm_tag_stop_place = ('public_transport', 'stop_area')
osm_tag_ref_name = ('ref_name', '') # example: ref_name=TU Campus, Chemnitz
osm_relation_role_platform = 'platform'
osm_relation_type_node = 'node'
# <member type='node' ref='node_id' role='platform' />
# example:
# <relation id='7795887' timestamp='2019-08-25T21:04:46Z' uid='559480' user='SvenQ' visible='true' version='4' changeset='73726675'>
# <member type='node' ref='5274416450' role='stop' />
# <member type='way' ref='545750504' role='platform' />
# <member type='node' ref='5272770752' role='stop' />
# <member type='way' ref='545750513' role='platform' />
# <member type='node' ref='5272770751' role='stop' />
# <member type='way' ref='545750516' role='platform' />
# <member type='node' ref='5272770754' role='stop' />
# <member type='way' ref='545750519' role='platform' />
# <member type='node' ref='5272770753' role='stop' />
# <member type='way' ref='545750520' role='platform' />
# <member type='node' ref='5239435218' role='stop' />
# <member type='way' ref='545750524' role='platform' />
# <tag k='fare_zone' v='13' />
# <tag k='name' v='TU Campus' />
# <tag k='network' v='Verkehrsverbund Mittelsachsen' />
# <tag k='operator' v='Chemnitzer Verkehrs-AG' />
# <tag k='public_transport' v='stop_area' />
# <tag k='ref_name' v='TU Campus, Chemnitz' />
# <tag k='type' v='public_transport' />
# </relation>
# OSM tags for stop positions of buses and trams
osm_tag_stop_position = ('public_transport', 'stop_position')
osm_tag_bus = ('bus', 'yes')
osm_tag_tram = ('tram', 'yes')
osm_tag_tram_stop = ('railway', 'tram_stop')
# for platforms or stop signs (https://wiki.openstreetmap.org/wiki/Tag:public_transport%3Dplatform)
osm_tag_platform_pt = ('public_transport', 'platform') # for nodes and ways
osm_tag_platform_railway = ('railway', 'platform') # for nodes and ways that define tram/rail platforms
osm_tag_highway_bus_stop_sign = ('highway', 'bus_stop') # for nodes that define stop sign positions
#osm_tag_highway_bus_platform = ('highway', 'platform') # for ways that define bus platform areas or lines
# -> ZHV contains only nodes/points, therefore not necessary
osm_tag_fixme_type_unknown = ('fixme', 'Ist diese Haltestelle für Straßenbahnen, Busse oder etwas ganz anderes?')
# For trams: https://wiki.openstreetmap.org/wiki/Tag:railway%3Dtram_stop
class StopSignType(Enum):
BUS = osm_tag_highway_bus_stop_sign
TRAM = osm_tag_platform_railway
UNKNOWN = osm_tag_fixme_type_unknown
class ZhvConverter:
def __init__(self):
self.unused_osm_id = 0
def parse(self, xml_file):
today = date.today()
# dd/mm/YY
d1 = today.strftime("%Y-%m-%d-")
file_name = d1 + 'stop_places.osm'
if os.path.exists(file_name):
os.remove(file_name)
writer = osmium.SimpleWriter(file_name)
context = etree.iterparse(xml_file, events=(event_start, event_end),
tag=(tag_stop_place, tag_area, tag_quay))
for action, elem in context:
if elem.tag == tag_stop_place and action == event_start:
stop_place_class = ZHV_Classes.StopPlace
stop_place = stop_place_class.factory()
stop_place.build(elem)
if not self.is_stop_place_in_region(RegionCode.BRANDENBURG, stop_place):
continue
(stop_place_relation, nodes) = self.stop_place_to_osm(stop_place, context)
if stop_place_relation is not None:
for node in nodes:
writer.add_node(node)
writer.add_relation(stop_place_relation)
writer.close()
def is_stop_place_in_region(self, region_code: RegionCode, stop_place: StopPlace):
return region_code.isStopPlaceInRegion(stop_place)
def stop_place_to_osm(self, stop_place: StopPlace, context) -> (Relation, Sequence[Node]):
"""Converts stop place to an OSM object"""
# extract stop place parameters
# TODO: Ask ZHV: Why are names double encapsulated?
stop_place_name: str = stop_place.get_Name().get_Name()[0].get_valueOf_()
# create relation:
stop_place_relation: Relation = Relation()
stop_place_relation.members = list()
stop_place_relation.tags = list(tuple())
stop_place_relation.tags.append(tuple([osm_tag_ref_name[0], stop_place_name]))
stop_place_relation.tags.append(osm_tag_type_pt)
stop_place_relation.tags.append(osm_tag_stop_place)
stop_place_relation.id = self.get_unused_osm_id()
list_stop_signs = list()
for action, elem in context:
if elem.tag == tag_area and action == event_start:
area_class = ZHV_Classes.Area
area = area_class.factory()
area.build(elem)
list_stop_signs += self.area_to_osm(area, context)
elif elem.tag == tag_stop_place and action == event_end:
if len(list_stop_signs) == 0:
return None
else:
# todo assign IDs of stop signs as members to stop place relation
for stop_sign in list_stop_signs:
stop_place_relation.members.append(tuple([osm_relation_type_node, stop_sign.id, osm_relation_role_platform]))
return stop_place_relation, list_stop_signs
def area_to_osm(self, area: Area, context) -> Sequence[Node]:
list_nodes = list()
# extract type of stop sign (Bus, Tram, etc.?) -> unfortunately, there is no standard attribute for this
# One possible criteria for bus/tram distinguishing: (used by HannIT)
# <Description>
# <Description>Bus</Description>
# </Description>
# 2nd possible criteria:
# <Name>
# <Name>Tram</Name> or <Name>Bus</Name>
# </Name>
# name = area.get_Name().get_Name()[0].get_valueOf_()
# description = area.get_
stop_sign_type = StopSignType.UNKNOWN
for action, elem in context:
if elem.tag == tag_quay and action == event_start:
quay_class = ZHV_Classes.Quay
quay = quay_class.factory()
quay.build(elem)
list_nodes.append(self.quay_to_osm(quay, stop_sign_type, context))
elif elem.tag == tag_area and action == event_end:
return list_nodes
def quay_to_osm(self, quay: Quay, stop_sign_type: StopSignType , context) -> Node:
""" Extract an OSM node from the given ZHV stop sign structure """
# extract quay parameters
# TODO: Ask ZHV: Why are names double encapsulated?
quay_name: str = quay.get_Name().get_Name()[0].get_valueOf_()
quay_dhid: str = quay.get_DHID()
quay_lat: str = quay.get_Location().get_Latitude()
quay_lon: str = quay.get_Location().get_Longitude()
node_stop_sign: Node = Node()
node_stop_sign.id = self.get_unused_osm_id()
node_stop_sign.location = Location(quay_lon, quay_lat)
node_stop_sign.tags = list()
node_stop_sign.tags.append((osm_tag_ref_ifopt[0], quay_dhid))
node_stop_sign.tags.append(osm_tag_platform_pt)
node_stop_sign.tags.append(stop_sign_type.value)
# return to previous structure hierarchy
for action, elem in context:
if elem.tag == tag_quay and action == event_end:
return node_stop_sign
def get_unused_osm_id(self):
self.unused_osm_id -= 1
return self.unused_osm_id
USAGE_TEXT = """
......@@ -93,7 +218,8 @@ def main():
if len(args) != 1:
usage()
infile_name = args[0]
parse(infile_name)
ZhvConverter().parse(infile_name)
if __name__ == '__main__':
main()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment