A PoC of Kosmorro, before Kosmorro became cool.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
This repo is archived. You can view files and clone it, but cannot push or open issues/pull-requests.

209 lines
7.7 KiB

  1. from multiprocessing import Pool as ThreadPool
  2. from skyfield.api import Loader, Topos
  3. from skyfield import almanac
  4. class Ephemeris:
  5. MONTH = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
  6. PLANETS = ['MERCURY', 'VENUS', 'MARS', 'JUPITER BARYCENTER', 'SATURN BARYCENTER', 'URANUS BARYCENTER',
  7. 'NEPTUNE BARYCENTER', 'PLUTO BARYCENTER']
  8. position = None
  9. latitude = None
  10. longitude = None
  11. timescale = None
  12. planets = None
  13. def __init__(self, position):
  14. load = Loader('./cache')
  15. self.timescale = load.timescale()
  16. self.planets = load('de421.bsp')
  17. self.latitude = position['lat']
  18. self.longitude = position['lon']
  19. self.position = Topos(latitude_degrees=position['lat'], longitude_degrees=position['lon'])
  20. def compute_ephemeris_for_day(self, year: int, month: int, day: int) -> dict:
  21. ephemeris = {}
  22. time1 = self.timescale.utc(year, month, day, 2)
  23. time2 = self.timescale.utc(year, month, day + 1, 2)
  24. # Compute sunrise and sunset
  25. ephemeris['sun'] = self.get_sun(time1, time2)
  26. ephemeris['moon'] = self.get_moon(year, month, day)
  27. ephemeris['planets'] = self.get_planets(year, month, day)
  28. return ephemeris
  29. def get_sun(self, time1, time2) -> dict:
  30. t, y = almanac.find_discrete(time1, time2, almanac.sunrise_sunset(self.planets, self.position))
  31. sunrise = t[0] if y[0] else t[1]
  32. sunset = t[1] if not y[1] else t[0]
  33. return {'rise': sunrise.utc_iso(), 'set': sunset.utc_iso()}
  34. def get_moon(self, year, month, day) -> dict:
  35. time1 = self.timescale.utc(year, month, day - 10)
  36. time2 = self.timescale.utc(year, month, day)
  37. _, y = almanac.find_discrete(time1, time2, almanac.moon_phases(self.planets))
  38. return {'phase': y[-1]}
  39. def get_planets(self, year: int, month: int, day: int) -> dict:
  40. args = []
  41. for planet_name in self.PLANETS:
  42. args.append({'planet': planet_name,
  43. 'observer': {'latitude': self.latitude, 'longitude': self.longitude},
  44. 'year': year, 'month': month, 'day': day})
  45. with ThreadPool(4) as pool:
  46. planets = pool.map(Ephemeris.get_planet, args)
  47. obj = {}
  48. for planet in planets:
  49. obj[planet['name'].split(' ')[0]] = {'rise': planet['rise'], 'maximum': planet['maximum'],
  50. 'set': planet['set']}
  51. return obj
  52. @staticmethod
  53. def get_planet(o) -> dict:
  54. load = Loader('./cache')
  55. planets = load('de421.bsp')
  56. timescale = load.timescale()
  57. position = Topos(latitude_degrees=o['observer']['latitude'], longitude_degrees=o['observer']['longitude'])
  58. observer = planets['earth'] + position
  59. planet = planets[o['planet']]
  60. rise_time = maximum_time = set_time = None
  61. max_elevation = None
  62. is_risen = None
  63. is_elevating = None
  64. last_is_elevating = None
  65. last_position = None
  66. for hours in range(0, 24):
  67. time = timescale.utc(o['year'], o['month'], o['day'], hours)
  68. position = observer.at(time).observe(planet).apparent().altaz()[0].degrees
  69. if is_risen is None:
  70. is_risen = position > 0
  71. if last_position is not None:
  72. is_elevating = last_position < position
  73. if rise_time is None and not is_risen and is_elevating and position > 0:
  74. # Planet has risen in the last hour, let's look for a more precise time!
  75. for minutes in range(0, 60):
  76. time = timescale.utc(o['year'], o['month'], o['day'], hours - 1, minutes)
  77. position = observer.at(time).observe(planet).apparent().altaz()[0].degrees
  78. if position > 0:
  79. # Planet has just risen!
  80. rise_time = time
  81. is_risen = True
  82. break
  83. if set_time is None and is_risen and not is_elevating and position < 0:
  84. # Planet has set in the last hour, let's look for a more precise time!
  85. for minutes in range(0, 60):
  86. time = timescale.utc(o['year'], o['month'], o['day'], hours - 1, minutes)
  87. position = observer.at(time).observe(planet).apparent().altaz()[0].degrees
  88. if position < 0:
  89. # Planet has just set!
  90. set_time = time
  91. is_risen = False
  92. break
  93. print(is_elevating, last_is_elevating, max_elevation, last_position, position)
  94. if not is_elevating and last_is_elevating:
  95. # Planet has reached its azimuth in the last hour, let's look for a more precise time!
  96. for minutes in range(0, 60):
  97. time = timescale.utc(o['year'], o['month'], o['day'], hours - 1, minutes)
  98. position = observer.at(time).observe(planet).apparent().altaz()[0].degrees
  99. max_elevation = position
  100. maximum_time = time
  101. if last_position > position:
  102. # Planet has reached its azimuth!
  103. is_elevating = False
  104. break
  105. last_position = position
  106. last_position = position
  107. last_is_elevating = is_elevating
  108. if rise_time is not None and set_time is not None and maximum_time is not None:
  109. return {
  110. 'name': o['planet'],
  111. 'rise': rise_time.utc_iso(),
  112. 'maximum': maximum_time.utc_iso(),
  113. 'set': set_time.utc_iso()
  114. }
  115. return {
  116. 'name': o['planet'],
  117. 'rise': rise_time.utc_iso() if rise_time is not None else None,
  118. 'maximum': maximum_time.utc_iso() if maximum_time is not None else None,
  119. 'set': set_time.utc_iso() if set_time is not None else None
  120. }
  121. def compute_ephemeris_for_month(self, year: int, month: int) -> list:
  122. if month == 2:
  123. is_leap_year = (year % 4 == 0 and year % 100 > 0) or (year % 400 == 0)
  124. max_day = 29 if is_leap_year else 28
  125. elif month < 8:
  126. max_day = 30 if month % 2 == 0 else 31
  127. else:
  128. max_day = 31 if month % 2 == 0 else 30
  129. e = []
  130. for day in range(1, max_day + 1):
  131. e.append(self.compute_ephemeris_for_day(year, month, day))
  132. return e
  133. def compute_ephemeris_for_year(self, year: int) -> dict:
  134. e = {}
  135. for month in range(0, 12):
  136. e[self.MONTH[month]] = self.compute_ephemeris_for_month(year, month + 1)
  137. e['seasons'] = self.get_seasons(year)
  138. return e
  139. def get_seasons(self, year: int) -> dict:
  140. t1 = self.timescale.utc(year, 1, 1)
  141. t2 = self.timescale.utc(year, 12, 31)
  142. t, y = almanac.find_discrete(t1, t2, almanac.seasons(self.planets))
  143. seasons = {}
  144. for ti, yi in zip(t, y):
  145. if yi == 0:
  146. season = 'MARCH'
  147. elif yi == 1:
  148. season = 'JUNE'
  149. elif yi == 2:
  150. season = 'SEPTEMBER'
  151. elif yi == 3:
  152. season = 'DECEMBER'
  153. else:
  154. raise AssertionError
  155. seasons[season] = ti.utc_iso()
  156. return seasons
  157. def compute_ephemeris(self, year: int, month: int, day: int):
  158. if day is not None:
  159. return self.compute_ephemeris_for_day(year, month, day)
  160. elif month is not None:
  161. return self.compute_ephemeris_for_month(year, month)
  162. else:
  163. return self.compute_ephemeris_for_year(year)