You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

207 lines
7.6 KiB

  1. from multiprocessing import Pool as ThreadPool
  2. from skyfield.api import Loader, Topos
  3. from skyfield import almanac
  4. class Ephemeris:
  5. MONTH = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC']
  6. PLANETS = ['MERCURY', 'VENUS', 'MARS', 'JUPITER BARYCENTER', 'SATURN BARYCENTER', 'URANUS BARYCENTER',
  7. 'NEPTUNE BARYCENTER', 'PLUTO BARYCENTER']
  8. position = None
  9. latitude = None
  10. longitude = None
  11. timescale = None
  12. planets = None
  13. def __init__(self, position):
  14. load = Loader('./cache')
  15. self.timescale = load.timescale()
  16. self.planets = load('de421.bsp')
  17. self.latitude = position['lat']
  18. self.longitude = position['lon']
  19. self.position = Topos(latitude_degrees=position['lat'], longitude_degrees=position['lon'])
  20. def compute_ephemeris_for_day(self, year: int, month: int, day: int) -> dict:
  21. ephemeris = {}
  22. time1 = self.timescale.utc(year, month, day, 2)
  23. time2 = self.timescale.utc(year, month, day + 1, 2)
  24. # Compute sunrise and sunset
  25. ephemeris['sun'] = self.get_sun(time1, time2)
  26. ephemeris['moon'] = self.get_moon(year, month, day)
  27. ephemeris['planets'] = self.get_planets(year, month, day)
  28. return ephemeris
  29. def get_sun(self, time1, time2) -> dict:
  30. t, y = almanac.find_discrete(time1, time2, almanac.sunrise_sunset(self.planets, self.position))
  31. sunrise = t[0] if y[0] else t[1]
  32. sunset = t[1] if not y[1] else t[0]
  33. return {'rise': sunrise, 'set': sunset}
  34. def get_moon(self, year, month, day) -> dict:
  35. time1 = self.timescale.utc(year, month, day - 10)
  36. time2 = self.timescale.utc(year, month, day)
  37. _, y = almanac.find_discrete(time1, time2, almanac.moon_phases(self.planets))
  38. return {'phase': y[-1]}
  39. def get_planets(self, year: int, month: int, day: int) -> dict:
  40. args = []
  41. for planet_name in self.PLANETS:
  42. args.append({'planet': planet_name,
  43. 'observer': {'latitude': self.latitude, 'longitude': self.longitude},
  44. 'year': year, 'month': month, 'day': day})
  45. with ThreadPool(4) as pool:
  46. planets = pool.map(Ephemeris.get_planet, args)
  47. obj = {}
  48. for planet in planets:
  49. obj[planet['name'].split(' ')[0]] = {'rise': planet['rise'], 'maximum': planet['maximum'],
  50. 'set': planet['set']}
  51. return obj
  52. @staticmethod
  53. def get_planet(o) -> dict:
  54. load = Loader('./cache')
  55. planets = load('de421.bsp')
  56. timescale = load.timescale()
  57. position = Topos(latitude_degrees=o['observer']['latitude'], longitude_degrees=o['observer']['longitude'])
  58. observer = planets['earth'] + position
  59. planet = planets[o['planet']]
  60. rise_time = maximum_time = set_time = None
  61. max_elevation = None
  62. is_risen = None
  63. is_elevating = None
  64. last_is_elevating = None
  65. last_position = None
  66. for hours in range(0, 24):
  67. time = timescale.utc(o['year'], o['month'], o['day'], hours)
  68. position = observer.at(time).observe(planet).apparent().altaz()[0].degrees
  69. if is_risen is None:
  70. is_risen = position > 0
  71. if last_position is not None:
  72. is_elevating = last_position < position
  73. if rise_time is None and not is_risen and is_elevating and position > 0:
  74. # Planet has risen in the last hour, let's look for a more precise time!
  75. for minutes in range(0, 60):
  76. time = timescale.utc(o['year'], o['month'], o['day'], hours - 1, minutes)
  77. position = observer.at(time).observe(planet).apparent().altaz()[0].degrees
  78. if position > 0:
  79. # Planet has just risen!
  80. rise_time = time
  81. is_risen = True
  82. break
  83. if set_time is None and is_risen and not is_elevating and position < 0:
  84. # Planet has set in the last hour, let's look for a more precise time!
  85. for minutes in range(0, 60):
  86. time = timescale.utc(o['year'], o['month'], o['day'], hours - 1, minutes)
  87. position = observer.at(time).observe(planet).apparent().altaz()[0].degrees
  88. if position < 0:
  89. # Planet has just set!
  90. set_time = time
  91. is_risen = False
  92. break
  93. if not is_elevating and last_is_elevating:
  94. # Planet has reached its azimuth in the last hour, let's look for a more precise time!
  95. for minutes in range(0, 60):
  96. time = timescale.utc(o['year'], o['month'], o['day'], hours - 1, minutes)
  97. position = observer.at(time).observe(planet).apparent().altaz()[0].degrees
  98. max_elevation = position
  99. maximum_time = time
  100. if last_position > position:
  101. # Planet has reached its azimuth!
  102. is_elevating = False
  103. break
  104. last_position = position
  105. last_position = position
  106. last_is_elevating = is_elevating
  107. if rise_time is not None and set_time is not None and maximum_time is not None:
  108. return {
  109. 'name': o['planet'],
  110. 'rise': rise_time,
  111. 'maximum': maximum_time,
  112. 'set': set_time
  113. }
  114. return {
  115. 'name': o['planet'],
  116. 'rise': rise_time if rise_time is not None else None,
  117. 'maximum': maximum_time if maximum_time is not None else None,
  118. 'set': set_time if set_time is not None else None
  119. }
  120. def compute_ephemeris_for_month(self, year: int, month: int) -> list:
  121. if month == 2:
  122. is_leap_year = (year % 4 == 0 and year % 100 > 0) or (year % 400 == 0)
  123. max_day = 29 if is_leap_year else 28
  124. elif month < 8:
  125. max_day = 30 if month % 2 == 0 else 31
  126. else:
  127. max_day = 31 if month % 2 == 0 else 30
  128. e = []
  129. for day in range(1, max_day + 1):
  130. e.append(self.compute_ephemeris_for_day(year, month, day))
  131. return e
  132. def compute_ephemeris_for_year(self, year: int) -> dict:
  133. e = {}
  134. for month in range(0, 12):
  135. e[self.MONTH[month]] = self.compute_ephemeris_for_month(year, month + 1)
  136. e['seasons'] = self.get_seasons(year)
  137. return e
  138. def get_seasons(self, year: int) -> dict:
  139. t1 = self.timescale.utc(year, 1, 1)
  140. t2 = self.timescale.utc(year, 12, 31)
  141. t, y = almanac.find_discrete(t1, t2, almanac.seasons(self.planets))
  142. seasons = {}
  143. for ti, yi in zip(t, y):
  144. if yi == 0:
  145. season = 'MARCH'
  146. elif yi == 1:
  147. season = 'JUNE'
  148. elif yi == 2:
  149. season = 'SEPTEMBER'
  150. elif yi == 3:
  151. season = 'DECEMBER'
  152. else:
  153. raise AssertionError
  154. seasons[season] = ti.utc_iso()
  155. return seasons
  156. def compute_ephemeris(self, year: int, month: int, day: int):
  157. if day is not None:
  158. return self.compute_ephemeris_for_day(year, month, day)
  159. elif month is not None:
  160. return self.compute_ephemeris_for_month(year, month)
  161. else:
  162. return self.compute_ephemeris_for_year(year)