| @@ -95,3 +95,16 @@ def skyfield_to_moon_phase(times: [Time], vals: [int], now: Time) -> Union[MoonP | |||
| break | |||
| return MoonPhase(current_phase, current_phase_time, next_phase_time) | |||
| def flatten_list(the_list: list): | |||
| new_list = [] | |||
| for item in the_list: | |||
| if isinstance(item, list): | |||
| for item2 in flatten_list(item): | |||
| new_list.append(item2) | |||
| continue | |||
| new_list.append(item) | |||
| return new_list | |||
| @@ -34,7 +34,8 @@ MOON_PHASES = { | |||
| } | |||
| EVENTS = { | |||
| 'OPPOSITION': {'message': '%s is in opposition'} | |||
| 'OPPOSITION': {'message': '%s is in opposition'}, | |||
| 'CONJUNCTION': {'message': '%s and %s are in conjunction'} | |||
| } | |||
| @@ -138,7 +139,7 @@ class Satellite(Object): | |||
| class Event: | |||
| def __init__(self, event_type: str, aster: [Object], start_time: Time, end_time: Union[Time, None] = None): | |||
| def __init__(self, event_type: str, objects: [Object], start_time: Time, end_time: Union[Time, None] = None): | |||
| if event_type not in EVENTS.keys(): | |||
| raise ValueError('event_type parameter must be one of the following: %s (got %s)' % ( | |||
| ', '.join(EVENTS.keys()), | |||
| @@ -146,9 +147,15 @@ class Event: | |||
| ) | |||
| self.event_type = event_type | |||
| self.object = aster | |||
| self.objects = objects | |||
| self.start_time = start_time | |||
| self.end_time = end_time | |||
| def get_description(self) -> str: | |||
| return EVENTS[self.event_type]['message'] % self.object.name | |||
| return EVENTS[self.event_type]['message'] % self._get_objects_name() | |||
| def _get_objects_name(self): | |||
| if len(self.objects) == 1: | |||
| return self.objects[0].name | |||
| return tuple(object.name for object in self.objects) | |||
| @@ -67,7 +67,7 @@ class JsonDumper(Dumper): | |||
| return moon_phase | |||
| if isinstance(obj, Event): | |||
| event = obj.__dict__ | |||
| event['object'] = event['object'].name | |||
| event['objects'] = [object.name for object in event['objects']] | |||
| return event | |||
| raise TypeError('Object of type "%s" could not be integrated in the JSON' % str(type(obj))) | |||
| @@ -22,7 +22,46 @@ from skyfield.timelib import Time | |||
| from skyfield.almanac import find_discrete | |||
| from .data import Event, Planet | |||
| from .core import get_timescale, get_skf_objects, ASTERS | |||
| from .core import get_timescale, get_skf_objects, ASTERS, flatten_list | |||
| def _search_conjunction(start_time: Time, end_time: Time) -> [Event]: | |||
| earth = get_skf_objects()['earth'] | |||
| aster1 = None | |||
| aster2 = None | |||
| def is_in_conjunction(time: Time): | |||
| earth_pos = earth.at(time) | |||
| aster1_pos = earth_pos.observe(get_skf_objects()[aster1.skyfield_name]).apparent() | |||
| aster2_pos = earth_pos.observe(get_skf_objects()[aster2.skyfield_name]).apparent() | |||
| aster_1_right_ascension, _, _ = aster1_pos.radec() | |||
| aster_2_right_ascension, _, _ = aster2_pos.radec() | |||
| return aster_1_right_ascension.hours - aster_2_right_ascension.hours < 0 | |||
| is_in_conjunction.rough_period = 1.0 | |||
| computed = [] | |||
| conjunctions = [] | |||
| for aster1 in ASTERS: | |||
| # Ignore the Sun | |||
| if not isinstance(aster1, Planet): | |||
| continue | |||
| for aster2 in ASTERS: | |||
| if not isinstance(aster2, Planet) or aster2 == aster1 or aster2 in computed: | |||
| continue | |||
| times, _ = find_discrete(start_time, end_time, is_in_conjunction) | |||
| for time in times: | |||
| conjunctions.append(Event('CONJUNCTION', [aster1, aster2], time)) | |||
| computed.append(aster1) | |||
| return conjunctions | |||
| def _search_oppositions(start_time: Time, end_time: Time) -> [Event]: | |||
| @@ -47,7 +86,7 @@ def _search_oppositions(start_time: Time, end_time: Time) -> [Event]: | |||
| times, _ = find_discrete(start_time, end_time, is_oppositing) | |||
| for time in times: | |||
| events.append(Event('OPPOSITION', aster, time)) | |||
| events.append(Event('OPPOSITION', [aster], time)) | |||
| return events | |||
| @@ -56,6 +95,7 @@ def search_events(date: date_type) -> [Event]: | |||
| start_time = get_timescale().utc(date.year, date.month, date.day) | |||
| end_time = get_timescale().utc(date.year, date.month, date.day + 1) | |||
| return [ | |||
| opposition for opposition in _search_oppositions(start_time, end_time) | |||
| ] | |||
| return sorted(flatten_list([ | |||
| _search_oppositions(start_time, end_time), | |||
| _search_conjunction(start_time, end_time) | |||
| ]), key=lambda event: event.start_time.utc_datetime()) | |||
| @@ -1,3 +1,5 @@ | |||
| from .dumper import * | |||
| from .ephemerides import * | |||
| from .events import * | |||
| from .core import * | |||
| @@ -0,0 +1,12 @@ | |||
| import unittest | |||
| import kosmorrolib.core as core | |||
| class CoreTestCase(unittest.TestCase): | |||
| def test_flatten_list(self): | |||
| self.assertEqual([0, 1, 2, 3, 4, 5, 6, 7, 8, 9], core.flatten_list([0, 1, 2, [3, 4, [5, 6], 7], 8, [9]])) | |||
| if __name__ == '__main__': | |||
| unittest.main() | |||
| @@ -21,7 +21,9 @@ class DumperTestCase(unittest.TestCase): | |||
| ' "events": [\n' | |||
| ' {\n' | |||
| ' "event_type": "OPPOSITION",\n' | |||
| ' "object": "Mars",\n' | |||
| ' "objects": [\n' | |||
| ' "Mars"\n' | |||
| ' ],\n' | |||
| ' "start_time": "2018-07-27T05:12:00Z",\n' | |||
| ' "end_time": null\n' | |||
| ' }\n' | |||
| @@ -37,7 +39,7 @@ class DumperTestCase(unittest.TestCase): | |||
| ' }\n' | |||
| ' ]\n' | |||
| '}', JsonDumper(data, | |||
| [Event('OPPOSITION', Planet('Mars', 'MARS'), | |||
| [Event('OPPOSITION', [Planet('Mars', 'MARS')], | |||
| get_timescale().utc(2018, 7, 27, 5, 12))] | |||
| ).to_string()) | |||
| @@ -64,7 +66,7 @@ class DumperTestCase(unittest.TestCase): | |||
| '05:12 Mars is in opposition\n\n' | |||
| 'Note: All the hours are given in UTC.', | |||
| TextDumper(ephemerides, [Event('OPPOSITION', | |||
| Planet('Mars', 'MARS'), | |||
| [Planet('Mars', 'MARS')], | |||
| get_timescale().utc(2018, 7, 27, 5, 12)) | |||
| ], date=date(2019, 10, 14)).to_string()) | |||
| @@ -77,7 +79,7 @@ class DumperTestCase(unittest.TestCase): | |||
| '05:12 Mars is in opposition\n\n' | |||
| 'Note: All the hours are given in UTC.', | |||
| TextDumper(ephemerides, [Event('OPPOSITION', | |||
| Planet('Mars', 'MARS'), | |||
| [Planet('Mars', 'MARS')], | |||
| get_timescale().utc(2018, 7, 27, 5, 12)) | |||
| ], date=date(2019, 10, 14)).to_string()) | |||
| @@ -23,11 +23,38 @@ class MyTestCase(unittest.TestCase): | |||
| for (o, expected_date) in [o1, o2, o3, o4]: | |||
| self.assertEqual(1, len(o), 'Expected 1 event for %s, got %d' % (expected_date, len(o))) | |||
| self.assertEqual('OPPOSITION', o[0].event_type) | |||
| self.assertEqual('MARS', o[0].object.skyfield_name) | |||
| self.assertEqual('MARS', o[0].objects[0].skyfield_name) | |||
| self.assertRegex(o[0].start_time.utc_iso(), expected_date) | |||
| self.assertIsNone(o[0].end_time) | |||
| self.assertEqual('Mars is in opposition', o[0].get_description()) | |||
| def test_find_conjunctions(self): | |||
| # Test case: Mars opposition | |||
| # Source of the information: https://promenade.imcce.fr/en/pages6/887.html#mar | |||
| c1 = (events.search_events(date(2020, 1, 2)), [(['MERCURY', 'JUPITER BARYCENTER'], '^2020-01-02T15:20')]) | |||
| c2 = (events.search_events(date(2020, 1, 12)), [(['MERCURY', 'SATURN BARYCENTER'], '^2020-01-12T04:34'), | |||
| (['MERCURY', 'PLUTO BARYCENTER'], '^2020-01-12T06:56')]) | |||
| for (c, expected_dates) in [c1, c2]: | |||
| self.assertEqual(len(expected_dates), len(c), | |||
| 'Expected %d event(s) for %s, got %d' % (len(expected_dates), expected_dates, len(c))) | |||
| i = 0 | |||
| for conjunction in c: | |||
| self.assertEqual('CONJUNCTION', conjunction.event_type) | |||
| objects, expected_date = expected_dates[i] | |||
| j = 0 | |||
| self.assertRegex(conjunction.start_time.utc_iso(), expected_date) | |||
| for object in objects: | |||
| self.assertEqual(object, conjunction.objects[j].skyfield_name) | |||
| j += 1 | |||
| self.assertIsNone(conjunction.end_time) | |||
| self.assertRegex(conjunction.get_description(), ' are in conjunction$') | |||
| i += 1 | |||
| if __name__ == '__main__': | |||
| unittest.main() | |||