from typing import Optional import click from prettytable import PrettyTable from piket_client.model import ( AardbeiActivity, ServerStatus, NetworkError, Consumption, AardbeiPeopleDiff, Person, Settlement, ConsumptionType, ) @click.group() def cli(): """Poke coco from the command line.""" pass @cli.command() def status(): """Show the current status of the server.""" status = ServerStatus.is_server_running() if isinstance(status, NetworkError): print_error(f"Failed to get data from server, error {status.value}") return print_ok("Server is available.") open_consumptions = ServerStatus.unsettled_consumptions() if isinstance(open_consumptions, NetworkError): print_error( f"Failed to get unsettled consumptions, error {open_consumptions.value}" ) return click.echo(f"There are {open_consumptions.amount} unsettled consumptions.") if open_consumptions.amount > 0: click.echo(f"First at: {open_consumptions.first_timestamp.strftime('%c')}") click.echo(f"Most recent at: {open_consumptions.last_timestamp.strftime('%c')}") @cli.group() def people(): pass @people.command("list") @click.option("--active/--inactive", default=None) def list_people(active: bool) -> None: people = Person.get_all(active=active) if isinstance(people, NetworkError): print_error(f"Could not get people: {people.value}") return table = PrettyTable() table.field_names = ["ID", "Full name", "Display name", "Active"] table.align["ID"] = "r" table.align["Full name"] = "l" table.align["Display name"] = "l" table.sortby = "Full name" for p in people: table.add_row([p.person_id, p.full_name, p.display_name, p.active]) print(table) @people.command("create") @click.option("--display-name", type=click.STRING) @click.argument("name", type=click.STRING) def create_person(name: str, display_name: str) -> None: """Create a person.""" person = Person(full_name=name, display_name=display_name).create() if isinstance(person, NetworkError): print_error(f"Could not create Person: {person.value}") return print_ok(f'Created person "{name}" with ID {person.person_id}.') @people.command("rename") @click.argument("person-id", type=click.INT) @click.option("--new-full-name", type=click.STRING) @click.option("--new-display-name", type=click.STRING) def rename_person( person_id: int, new_full_name: Optional[str], new_display_name: Optional[str], ) -> None: person = Person.get(person_id) if person is None: raise click.UsageError(f"Cannot find Person {person_id}!") if new_full_name is None and new_display_name is None: raise click.UsageError("No new full name or display name specified!") new_person = person.rename( new_full_name=new_full_name, new_display_name=new_display_name ) @cli.group() def settlements(): pass @settlements.command("show") @click.argument("settlement_id", type=click.INT) def show_settlement(settlement_id: int) -> None: """Get and view the contents of a Settlement.""" s = Settlement.get(settlement_id) if isinstance(s, NetworkError): print_error(f"Could not get Settlement: {s.value}") return output_settlement_info(s) @settlements.command("create") @click.argument("name") def create_settlement(name: str) -> None: """Create a new Settlement.""" s = Settlement.create(name) if isinstance(s, NetworkError): print_error(f"Could not create Settlement: {s.value}") return output_settlement_info(s) def output_settlement_info(s: Settlement) -> None: click.echo(f'Settlement {s.settlement_id}, "{s.name}"') click.echo(f"Summary:") for key, value in s.consumption_summary.items(): click.echo(f" - {value['count']} {value['name']} ({key})") ct_name_by_id = {key: value["name"] for key, value in s.consumption_summary.items()} table = PrettyTable() table.field_names = ["Name", *ct_name_by_id.values()] table.sortby = "Name" table.align = "r" table.align["Name"] = "l" # type: ignore zero_fields = {k: "" for k in ct_name_by_id.values()} for item in s.per_person_counts.values(): r = {"Name": item["full_name"], **zero_fields} for key, value in item["counts"].items(): r[ct_name_by_id[key]] = value table.add_row(r.values()) print(table) @cli.group() def consumption_types(): pass @consumption_types.command("list") def list_consumption_types() -> None: active = ConsumptionType.get_all(active=True) inactive = ConsumptionType.get_all(active=False) if isinstance(active, NetworkError) or isinstance(inactive, NetworkError): print_error("Could not get consumption types!") return table = PrettyTable() table.field_names = ["ID", "Name", "Active"] table.sortby = "ID" for ct in active + inactive: table.add_row([ct.consumption_type_id, ct.name, ct.active]) print(table) @consumption_types.command("create") @click.argument("name") def create_consumption_type(name: str) -> None: ct = ConsumptionType(name=name).create() if not isinstance(ct, NetworkError): print_ok(f'Created consumption type "{name}" with ID {ct.consumption_type_id}.') @consumption_types.command("activate") @click.argument("consumption_type_id", type=click.INT) def activate_consumption_type(consumption_type_id: int) -> None: ct = ConsumptionType.get(consumption_type_id) if isinstance(ct, NetworkError): print_error(f"Could not get ConsumptionType: {ct.value}") return result = ct.set_active(True) if not isinstance(result, NetworkError): print_ok( f"Consumption type {ct.consumption_type_id} ({ct.name}) is now active." ) @consumption_types.command("deactivate") @click.argument("consumption_type_id", type=click.INT) def deactivate_consumption_type(consumption_type_id: int) -> None: ct = ConsumptionType.get(consumption_type_id) if isinstance(ct, NetworkError): print_error(f"Could not get ConsumptionType: {ct.value}") return result = ct.set_active(False) if not isinstance(result, NetworkError): print_ok( f"Consumption type {ct.consumption_type_id} ({ct.name}) is now inactive." ) def print_ok(msg: str) -> None: click.echo(click.style(msg, fg="green")) def print_error(msg: str) -> None: click.echo(click.style(msg, fg="red", bold=True), err=True) @cli.group() @click.option("--token", required=True, envvar="AARDBEI_TOKEN") @click.option("--endpoint", default="http://localhost:3000", envvar="AARDBEI_ENDPOINT") @click.pass_context def aardbei(ctx, token: str, endpoint: str) -> None: ctx.ensure_object(dict) ctx.obj["AardbeiToken"] = token ctx.obj["AardbeiEndpoint"] = endpoint @aardbei.group("activities") def aardbei_activities() -> None: pass @aardbei_activities.command("list") @click.pass_context def aardbei_list_activities(ctx) -> None: acts = AardbeiActivity.get_available( token=ctx.obj["AardbeiToken"], endpoint=ctx.obj["AardbeiEndpoint"] ) if isinstance(acts, NetworkError): print_error(f"Could not get activities: {acts.value}") return table = PrettyTable() table.field_names = ["ID", "Name"] table.align = "l" for a in acts: table.add_row([a.aardbei_id, a.name]) print(table) @aardbei_activities.command("apply") @click.argument("activity_id", type=click.INT) @click.pass_context def aardbei_apply_activity(ctx, activity_id: int) -> None: result = AardbeiActivity.apply_activity( token=ctx.obj["AardbeiToken"], endpoint=ctx.obj["AardbeiEndpoint"], activity_id=activity_id, ) if isinstance(result, NetworkError): print_error("Failed to apply activity: {result.value}") return print_ok(f"Activity applied. There are now {result} active people.") @aardbei.group("people") def aardbei_people() -> None: pass @aardbei_people.command("diff") @click.pass_context def aardbei_diff_people(ctx) -> None: diff = AardbeiPeopleDiff.get_diff( token=ctx.obj["AardbeiToken"], endpoint=ctx.obj["AardbeiEndpoint"] ) if isinstance(diff, NetworkError): print_error(f"Could not get differences: {diff.value}") return if diff.num_changes == 0: print_ok("There are no changes to apply.") return click.echo(f"There are {diff.num_changes} pending changes:") show_diff(diff) @aardbei_people.command("sync") @click.pass_context def aardbei_sync_people(ctx) -> None: diff = AardbeiPeopleDiff.sync( token=ctx.obj["AardbeiToken"], endpoint=ctx.obj["AardbeiEndpoint"] ) if isinstance(diff, NetworkError): print_error(f"Could not apply differences: {diff.value}") return if diff.num_changes == 0: print_ok("There were no changes to apply.") return print_ok(f"Applied {diff.num_changes} pending changes:") show_diff(diff) def show_diff(diff: AardbeiPeopleDiff) -> None: for name in diff.new_people: click.echo(f" - Create local Person for {name}") for name in diff.link_existing: click.echo(f" - Link local and remote people for {name}") for name in diff.altered_name: click.echo(f" - Process name change for {name}") if __name__ == "__main__": cli()