Coverage

96.0
326
19937
13

lib/avy.ex

100.0
0
0
0
Line Hits Source
0 defmodule Avy do
1 @moduledoc """
2 Avy keeps the contexts that define your domain
3 and business logic.
4
5 Contexts are also responsible for managing your data, regardless
6 if it comes from the database, an external API or others.
7 """
8 end

lib/avy/application.ex

83.3
6
5
1
Line Hits Source
0 defmodule Avy.Application do
1 # See https://hexdocs.pm/elixir/Application.html
2 # for more information on OTP Applications
3 @moduledoc false
4
5 use Application
6
7 @impl true
8 def start(_type, _args) do
9 1 :logger.add_handler(:my_sentry_handler, Sentry.LoggerHandler, %{
10 config: %{metadata: [:file, :line]}
11 })
12
13 1 children = [
14 AvyWeb.Telemetry,
15 Avy.Repo,
16 1 {DNSCluster, query: Application.get_env(:avy, :dns_cluster_query) || :ignore},
17 {Phoenix.PubSub, name: Avy.PubSub},
18 # Start the Finch HTTP client for sending emails
19 {Finch, name: Avy.Finch},
20 # Start a worker by calling: Avy.Worker.start_link(arg)
21 # {Avy.Worker, arg},
22 # Start to serve requests, typically the last entry
23 AvyWeb.Endpoint
24 ]
25
26 # See https://hexdocs.pm/elixir/Supervisor.html
27 # for other strategies and supported options
28 1 opts = [strategy: :one_for_one, name: Avy.Supervisor]
29 1 Supervisor.start_link(children, opts)
30 end
31
32 # Tell Phoenix to update the endpoint configuration
33 # whenever the application is updated.
34 @impl true
35 def config_change(changed, _new, removed) do
36 0 AvyWeb.Endpoint.config_change(changed, removed)
37 :ok
38 end
39 end

lib/avy/data/content.ex

100.0
1
2476
0
Line Hits Source
0 defmodule Avy.Data.Content do
1 use Ecto.Schema
2
3 2476 schema "inventory_content" do
4 field :start, :utc_datetime, source: :start
5 field :end, :utc_datetime, source: :end
6 field :quality, :integer
7 field :time_ordered, :boolean, default: false
8 field :time_spans, {:array, :map}, source: :time_spans
9 field :modified, :utc_datetime, source: :modified
10 belongs_to :epoch, Avy.Metadata.Epoch, source: :epoch_id
11 end
12 end

lib/avy/filter.ex

100.0
9
288
0
Line Hits Source
0 defmodule Avy.Filter do
1 alias FdsnPlugs.SourceIdentifier
2
3 @moduledoc """
4 A filter is a structure containing all filter parameters given in an FDSN webservice request.
5
6 """
7 defstruct net: "*",
8 sta: "*",
9 loc: "*",
10 cha: "*",
11 start: DateTime.from_unix!(1),
12 end: DateTime.from_unix!(5_682_956_400),
13 quality: [:R, :D, :M, :Q],
14 includerestricted: false,
15 epochids: []
16
17 @type t() :: %Avy.Filter{
18 net: String.t(),
19 sta: String.t(),
20 loc: String.t(),
21 cha: String.t(),
22 start: DateTime.t(),
23 end: DateTime.t(),
24 quality: String.t(),
25 includerestricted: boolean,
26 epochids: list
27 }
28
29 @spec split_channel(t()) :: {:ok, nonempty_list()} | {:error, String.t()}
30 def split_channel(f) do
31 32 %SourceIdentifier{cha: f.cha}
32 32 |> SourceIdentifier.split_channel()
33 end
34
35 @spec from_source_identifier(SourceIdentifier.t(), boolean, list) :: t()
36 def from_source_identifier(sid, includerestricted \\ false, quality \\ [:R, :D, :M, :Q]) do
37 32 %Avy.Filter{
38 32 net: sid.net,
39 32 sta: sid.sta,
40 32 loc: sid.loc,
41 32 cha: sid.cha,
42 32 start: sid.start,
43 32 end: sid.end,
44 includerestricted: includerestricted,
45 quality: quality
46 }
47 end
48 end

lib/avy/mailer.ex

100.0
0
0
0
Line Hits Source
0 defmodule Avy.Mailer do
1 use Swoosh.Mailer, otp_app: :avy
2 end

lib/avy/metadata/epoch.ex

100.0
3
4263
0
Line Hits Source
0 defmodule Avy.Metadata.Epoch do
1 use Ecto.Schema
2
3 4261 schema "inventory_epoch" do
4 field :band_code, :string
5 field :instrument_code, :string
6 field :orientation_code, :string
7 field :location_code, :string, source: :location_code
8 field :start_date, :utc_datetime, source: :start_date
9 field :end_date, :utc_datetime, source: :end_date
10 field :policy, :string, default: "O"
11 field :sample_rate, :decimal, source: :sample_rate
12 field :modified, :utc_datetime, source: :modified
13 belongs_to :station, Avy.Metadata.Station
14 has_many :contents, Avy.Data.Content, preload_order: [asc: :starttime]
15 end
16
17 def source_identifier(c) do
18 1 c = Avy.Repo.preload(c, station: [:network])
19
20 1 "#{c.station.network.code}_#{c.station.code}_#{c.location_code}_#{c.band_code}_#{c.instrument_code}_#{c.orientation_code}"
21 end
22 end

lib/avy/metadata/network.ex

100.0
1
2832
0
Line Hits Source
0 defmodule Avy.Metadata.Network do
1 use Ecto.Schema
2
3 2832 schema "inventory_network" do
4 field :code, :string
5 field :start_date, :utc_datetime, source: :start_date
6 field :end_date, :utc_datetime, source: :end_date
7 has_many :stations, Avy.Metadata.Station
8 end
9 end

lib/avy/metadata/station.ex

100.0
1
2820
0
Line Hits Source
0 defmodule Avy.Metadata.Station do
1 use Ecto.Schema
2
3 2820 schema "inventory_station" do
4 field :code, :string
5 field :start_date, :utc_datetime, source: :start_date
6 field :end_date, :utc_datetime, source: :end_date
7 belongs_to :network, Avy.Metadata.Network
8
9 has_many :epochs, Avy.Metadata.Epoch,
10 preload_order: [:band_code, :instrument_code, :orientation_cde, :starttime]
11 end
12 end

lib/avy/repo.ex

97.0
34
1240
1
Line Hits Source
0 defmodule Avy.Repo do
1 use Ecto.Repo,
2 otp_app: :avy,
3 adapter: Ecto.Adapters.Postgres
4
5 import Ecto.Query
6
7 require Logger
8
9 alias FdsnPlugs.PublicationVersion
10 alias Avy.Metadata.{Epoch, Station, Network}
11 alias Avy.Data.Content
12 alias Avy.Filter
13
14 @spec get_epochs(Avy.Filter.t()) :: list(Avy.Filter.t()) | :error
15 def get_epochs(filter) do
16 # The filter is valid, it passed all previous checks.
17 # Yet, we could add some case construct here, but it should never happen.
18 32 case Filter.split_channel(filter) do
19 {:ok, [band_code, instrument_code, orientation_code]} ->
20 32 query =
21 32 from n in Network,
22 as: :net,
23 join: s in Station,
24 as: :sta,
25 on: s.network_id == n.id,
26 join: c in Epoch,
27 as: :cha,
28 on: c.station_id == s.id
29
30 32 query =
31 32 if filter.includerestricted do
32 2 query
33 else
34 30 from [net: n, sta: s, cha: c] in query,
35 where: c.policy == "O"
36 end
37
38 # Apply all possible filters to the request
39 32 query =
40 32 from [net: n, sta: s, cha: c] in query,
41 32 where: like(n.code, ^fdsn_wildcard_to_sql(filter.net)),
42 32 where: like(s.code, ^fdsn_wildcard_to_sql(filter.sta)),
43 32 where: like(c.location_code, ^fdsn_wildcard_to_sql(filter.loc)),
44 where: like(c.band_code, ^fdsn_wildcard_to_sql(band_code)),
45 where: like(c.instrument_code, ^fdsn_wildcard_to_sql(instrument_code)),
46 where: like(c.orientation_code, ^fdsn_wildcard_to_sql(orientation_code)),
47 32 where: c.start_date <= ^filter.end,
48 32 where: c.end_date > ^filter.start or is_nil(c.end_date),
49 group_by: [
50 n.code,
51 s.code,
52 c.location_code,
53 c.band_code,
54 c.instrument_code,
55 c.orientation_code
56 ],
57 select: %Avy.Filter{
58 net: n.code,
59 sta: s.code,
60 loc: c.location_code,
61 cha: fragment("?||?||?", c.band_code, c.instrument_code, c.orientation_code),
62 epochids: fragment("ARRAY_AGG(?)", c.id)
63 }
64
65 32 Logger.debug("Expanding filter: #{inspect(filter)}")
66
67 32 result =
68 Avy.Repo.all(query)
69 |> Enum.map(
70 &%{
71 &1
72 36 | start: filter.start,
73 36 end: filter.end,
74 36 includerestricted: filter.includerestricted,
75 36 quality: filter.quality
76 }
77 )
78
79 32 Logger.debug("Result: #{inspect(result, pretty: true)}")
80
81 32 result
82
83 0 _ ->
84 :error
85 end
86 end
87
88 @spec get_contents_from_epochid(Avy.Filter.t(), atom) :: list(map)
89 def get_contents_from_epochid(filter, method) do
90 36 Logger.debug("Fetching for #{method} from filter #{inspect(filter, pretty: true)}")
91
92 36 query =
93 36 from n in Network,
94 as: :net,
95 join: s in Station,
96 as: :sta,
97 on: s.network_id == n.id,
98 join: c in Epoch,
99 as: :cha,
100 on: c.station_id == s.id,
101 join: co in Content,
102 as: :content,
103 on: co.epoch_id == c.id,
104 36 where: co.epoch_id in ^filter.epochids,
105 36 where: co.quality in ^PublicationVersion.qualities_to_pubversions(filter.quality),
106 36 where: co.start <= ^filter.end,
107 36 where: co.end > ^filter.start,
108 order_by: [
109 asc: c.band_code,
110 asc: c.instrument_code,
111 asc: c.orientation_code,
112 asc: c.start_date,
113 asc: co.start
114 ],
115 select: %{
116 network: n.code,
117 station: s.code,
118 location: c.location_code,
119 channel: fragment("?||?||?", c.band_code, c.instrument_code, c.orientation_code),
120 quality: co.quality,
121 samplerate: c.sample_rate,
122 # TODO When sigma fixes #156, we can remove the AT TIME ZONE utc part
123 36 earliest: fragment("GREATEST(?, ?)", co.start, ^filter.start),
124 36 latest: fragment("LEAST(?,?)", co.end, ^filter.end),
125 timespans: co.time_spans,
126 restriction: c.policy,
127 updated: co.modified
128 }
129
130 # NOTE for my future self: this looks clever, but it wont account for all extents of each quality code.
131 # NOTE no choice but to retrieve eveerything.
132 # case method do
133 # :extent ->
134 # [query |> first |> Avy.Repo.one(), query |> last |> Avy.Repo.one()]
135 #
136 # :query ->
137 # Avy.Repo.all(query)
138 # end
139 Avy.Repo.all(query)
140 36 |> Enum.reject(fn t -> is_nil(t) end)
141 end
142
143 #
144 # From FDSN wildcard spec to SQL wildcards
145 @spec fdsn_wildcard_to_sql(String.t()) :: String.t()
146 defp fdsn_wildcard_to_sql(s) do
147 String.replace(s, "*", "%")
148 192 |> String.replace("?", "_")
149 end
150 end

lib/avy_web.ex

100.0
2
10
0
Line Hits Source
0 defmodule AvyWeb do
1 @moduledoc """
2 The entrypoint for defining your web interface, such
3 as controllers, components, channels, and so on.
4
5 This can be used in your application as:
6
7 use AvyWeb, :controller
8 use AvyWeb, :html
9
10 The definitions below will be executed for every controller,
11 component, etc, so keep them short and clean, focused
12 on imports, uses and aliases.
13
14 Do NOT define functions inside the quoted expressions
15 below. Instead, define additional modules and import
16 those modules here.
17 """
18
19 5 def static_paths,
20 do: ~w(assets fonts images favicon.ico robots.txt application.wadl documentation.html)
21
22 def router do
23 quote do
24 use Phoenix.Router, helpers: false
25
26 # Import common connection and controller functions to use in pipelines
27 import Plug.Conn
28 import Phoenix.Controller
29 end
30 end
31
32 def channel do
33 quote do
34 use Phoenix.Channel
35 end
36 end
37
38 def controller do
39 quote do
40 use Phoenix.Controller,
41 formats: [:text, :json, :geocsv, :request, :html],
42 layouts: [html: AvyWeb.Layouts]
43
44 import Plug.Conn
45
46 unquote(verified_routes())
47 end
48 end
49
50 def verified_routes do
51 quote do
52 use Phoenix.VerifiedRoutes,
53 endpoint: AvyWeb.Endpoint,
54 router: AvyWeb.Router,
55 statics: AvyWeb.static_paths()
56 end
57 end
58
59 @doc """
60 When used, dispatch to the appropriate controller/live_view/etc.
61 """
62 defmacro __using__(which) when is_atom(which) do
63 5 apply(__MODULE__, which, [])
64 end
65 end

lib/avy_web/avy_geoscv.ex

100.0
41
514
0
Line Hits Source
0 defmodule AvyWeb.AvyGEOCSV do
1 require Plug.Conn
2 require Logger
3
4 @extent_header "#dataset: GeoCSV 2.0
5 #delimiter: |
6 #field_unit: unitless|unitless|unitless|unitless|unitless|hertz|ISO_8601|ISO_8601|ISO_8601|unitless|unitless
7 #field_type: string|string|string|string|string|float|datetime|datetime|datetime|integer|string
8 Network|Station|Location|Channel|Quality|SampleRate|Earliest|Latest|Updated|TimeSpans|Restriction"
9
10 def extent(assigns) do
11 [
12 format_header(@extent_header, assigns)
13 2 | Enum.map(assigns.availabilities, fn d ->
14 4 format_datasource(d, assigns.fdsn_parameters, :extent)
15 end)
16 ]
17 2 |> Enum.join("\n")
18 end
19
20 @query_header "#dataset: GeoCSV 2.0
21 #delimiter: |
22 #field_unit: unitless|unitless|unitless|unitless|unitless|hertz|ISO_8601|ISO_8601|ISO_8601
23 #field_type: string|string|string|string|string|float|datetime|datetime|datetime
24 Network|Station|Location|Channel|Quality|SampleRate|Earliest|Latest|Updated"
25 def query(assigns) do
26 [
27 format_header(@query_header, assigns)
28 4 | Enum.map(assigns.availabilities, fn d ->
29 6 Enum.map(d.timespans, fn ts ->
30 22 %{d | timespans: [ts], earliest: List.first(ts), latest: List.last(ts)}
31 end)
32 6 |> Enum.map(fn splitd ->
33 22 format_datasource(
34 splitd,
35 22 assigns.fdsn_parameters,
36 :query
37 )
38 end)
39 end)
40 ]
41 |> List.flatten()
42 4 |> Enum.join("\n")
43 end
44
45 defp format_header(headers, %{fdsn_parameters: fdsn_params}) do
46 6 headers =
47 6 if :samplerate in fdsn_params.merge do
48 String.replace(headers, "hertz|", "")
49 |> String.replace("float|", "")
50 3 |> String.replace("SampleRate|", "")
51 else
52 3 headers
53 end
54
55 6 headers =
56 6 if :quality in fdsn_params.merge do
57 String.replace(headers, "unitless|", "", global: false)
58 |> String.replace("string|", "", global: false)
59 2 |> String.replace("Quality|", "")
60 else
61 4 headers
62 end
63
64 6 if :latestupdate in fdsn_params.show do
65 2 headers
66 else
67 String.replace(headers, "|ISO_8601", "", global: false)
68 |> String.replace("|datetime", "", global: false)
69 4 |> String.replace("|Updated", "")
70 end
71 end
72
73 # Network|Station|Location|Channel|Quality|SampleRate|Earliest|Latest|Updated|TimeSpans|Restriction
74 defp format_datasource(d, fdsn_params, method) do
75 26 attr_list =
76 26 if method == :extent do
77 4 [d.timespancount, d.restriction]
78 else
79 []
80 end
81
82 26 attr_list =
83 26 if :latestupdate in fdsn_params.show do
84 [
85 4 DateTime.to_iso8601(d.earliest),
86 4 DateTime.to_iso8601(d.latest),
87 4 DateTime.to_iso8601(d.updated) | attr_list
88 ]
89 else
90 [
91 22 DateTime.to_iso8601(d.earliest),
92 22 DateTime.to_iso8601(d.latest)
93 | attr_list
94 ]
95 end
96
97 26 attr_list =
98 26 if :samplerate in fdsn_params.merge do
99 14 attr_list
100 else
101 12 [d.samplerate | attr_list]
102 end
103
104 26 attr_list =
105 26 if :quality in fdsn_params.merge do
106 12 attr_list
107 else
108 14 [d.quality | attr_list]
109 end
110
111 26 [d.network, d.station, d.location, d.channel | attr_list]
112 26 |> Enum.join("|")
113 end
114 end

lib/avy_web/avy_json.ex

80.0
5
18
1
Line Hits Source
0 defmodule AvyWeb.AvyJSON do
1 require Plug.Conn
2 require Logger
3
4 def version(_) do
5 0 %{
6 application: Application.spec(:avy)[:app],
7 version: Application.spec(:avy)[:vsn],
8 commit: System.get_env("SENTRY_RELEASE", "unspecified")
9 }
10 end
11
12 def extent(assigns) do
13 5 %{
14 created: DateTime.utc_now(),
15 version: 1.0,
16 5 datasources: assigns.availabilities
17 }
18 end
19
20 def query(assigns) do
21 4 %{
22 created: DateTime.utc_now(),
23 version: 1.0,
24 4 datasources: assigns.availabilities
25 }
26 end
27 end

lib/avy_web/avy_request.ex

100.0
13
53
0
Line Hits Source
0 defmodule AvyWeb.AvyREQUEST do
1 require Plug.Conn
2 require Logger
3
4 @doc """
5 Request output format
6 """
7
8 def extent(assigns) do
9 1 Enum.map_join(assigns.availabilities, "\n", fn d ->
10 2 format_datasource(d)
11 end)
12 end
13
14 def query(assigns) do
15 1 Enum.map(assigns.availabilities, fn d ->
16 2 Enum.map(d.timespans, fn ts ->
17 4 %{d | earliest: List.first(ts), latest: List.last(ts)}
18 end)
19 end)
20 |> List.flatten()
21 1 |> Enum.map_join("\n", fn d -> format_datasource(d) end)
22 end
23
24 defp format_datasource(d) do
25 [
26 6 d.network,
27 6 d.station,
28 6 d.location,
29 6 d.channel,
30 6 DateTime.to_iso8601(d.earliest),
31 6 DateTime.to_iso8601(d.latest)
32 ]
33 6 |> Enum.join(" ")
34 end
35 end

lib/avy_web/avy_text.ex

100.0
49
710
0
Line Hits Source
0 defmodule AvyWeb.AvyTEXT do
1 @moduledoc """
2 Text format renderer
3 """
4 require Plug.Conn
5 require Logger
6
7 @extent_header [
8 "Network",
9 "Station",
10 "Location",
11 "Channel",
12 "Quality",
13 "SampleRate",
14 String.pad_trailing("Earliest", 27),
15 String.pad_trailing("Latest", 27),
16 String.pad_trailing("Updated", 20),
17 "TimeSpans",
18 "Restriction"
19 ]
20
21 @doc """
22 Text output
23 """
24
25 def version(_) do
26 4 "avy #{Application.spec(:avy)[:vsn]}, commit #{System.get_env("SENTRY_RELEASE", "unspecified")}"
27 end
28
29 def extent(assigns) do
30 8 headers = format_header(@extent_header, assigns)
31
32 [
33 headers
34 8 | Enum.map(assigns.availabilities, fn d ->
35 13 format_datasource(d, assigns.fdsn_parameters, :extent)
36 end)
37 ]
38 8 |> Enum.join("\n")
39 end
40
41 @query_header [
42 "Network",
43 "Station",
44 "Location",
45 "Channel",
46 "Quality",
47 "SampleRate",
48 String.pad_trailing("Earliest", 27),
49 String.pad_trailing("Latest", 27),
50 "Updated"
51 ]
52 def query(assigns) do
53 5 headers = format_header(@query_header, assigns)
54
55 [
56 headers
57 5 | Enum.map(assigns.availabilities, fn d ->
58 9 Enum.map(d.timespans, fn ts ->
59 13 %{d | timespans: [ts], earliest: List.first(ts), latest: List.last(ts)}
60 end)
61 9 |> Enum.map(fn splitd ->
62 13 format_datasource(
63 splitd,
64 13 assigns.fdsn_parameters,
65 :query
66 )
67 end)
68 end)
69 ]
70 |> List.flatten()
71 5 |> Enum.join("\n")
72 end
73
74 defp format_header(headers, %{fdsn_parameters: fdsn_params} = _) do
75 13 headers =
76 13 if :samplerate in fdsn_params.merge do
77 2 Enum.reject(headers, &String.starts_with?(&1, "SampleRate"))
78 else
79 11 headers
80 end
81
82 13 headers =
83 13 if :quality in fdsn_params.merge do
84 2 Enum.reject(headers, &String.starts_with?(&1, "Quality"))
85 else
86 11 headers
87 end
88
89 13 headers =
90 13 if :latestupdate in fdsn_params.show do
91 12 headers
92 else
93 1 Enum.reject(headers, &String.starts_with?(&1, "Updated"))
94 end
95
96 13 "#" <> Enum.join(headers, " ")
97 end
98
99 defp format_datasource(d, fdsn_params, method) do
100 26 attr_list =
101 13 if method == :extent do
102 13 [d.timespancount |> Integer.to_string() |> String.pad_trailing(9), d.restriction]
103 else
104 []
105 end
106
107 26 attr_list =
108 26 if :latestupdate in fdsn_params.show do
109 [
110 25 DateTime.to_iso8601(d.earliest),
111 25 DateTime.to_iso8601(d.latest),
112 25 DateTime.to_iso8601(d.updated) | attr_list
113 ]
114 else
115 [
116 1 DateTime.to_iso8601(d.earliest),
117 1 DateTime.to_iso8601(d.latest)
118 | attr_list
119 ]
120 end
121
122 26 attr_list =
123 26 if :samplerate in fdsn_params.merge do
124 3 attr_list
125 else
126 [
127 23 d.samplerate
128 |> Decimal.to_string(:xsd)
129 |> String.pad_trailing(10)
130 | attr_list
131 ]
132 end
133
134 26 attr_list =
135 26 if :quality in fdsn_params.merge do
136 2 attr_list
137 else
138 24 [String.pad_trailing(d.quality, 7) | attr_list]
139 end
140
141 [
142 26 String.pad_trailing(d.network, 8),
143 26 String.pad_trailing(d.station, 7),
144 26 String.pad_trailing(d.location, 8),
145 26 String.pad_trailing(d.channel, 7) | attr_list
146 ]
147 26 |> Enum.join(" ")
148 end
149 end

lib/avy_web/controllers/avy_controller.ex

93.2
134
4035
9
Line Hits Source
0 defmodule AvyWeb.AvyController do
1 @moduledoc """
2 Manages a user request.
3 """
4 alias Avy.Repo
5 require Logger
6
7 use AvyWeb, :controller
8 alias FdsnPlugs.PublicationVersion
9
10 plug :forbidden_params_extent, ["mergegaps"] when action in [:extent]
11 plug :forbidden_params_extent, ["show"] when action in [:extent]
12 plug :format
13 plug AvyWeb.Plugs.Filters
14
15 def version(conn, _params) do
16 4 render(conn, :version)
17 end
18
19 def extent(conn, _params) do
20 17 case manage_request(conn, :extent) do
21 {:ok, []} ->
22 conn
23 1 |> send_resp(conn.assigns.fdsn_parameters.nodata, "")
24 1 |> halt
25
26 {:ok, ds} ->
27 16 datasources = Enum.map(ds, &Map.drop(&1, [:timespans]))
28
29 conn
30 |> assign(:availabilities, datasources)
31 16 |> render()
32
33 {:too_much_data, msg} ->
34 0 conn |> send_resp(413, msg) |> halt()
35 end
36 end
37
38 def query(conn, _params) do
39 15 case manage_request(conn, :query) do
40 {:ok, []} ->
41 conn
42 1 |> send_resp(conn.assigns.fdsn_parameters.nodata, "")
43 1 |> halt
44
45 {:ok, ds} ->
46 14 datasources = Enum.map(ds, &Map.drop(&1, [:timespancount]))
47
48 14 Logger.info("Fetched #{length(datasources)}")
49
50 conn
51 |> assign(:availabilities, datasources)
52 14 |> render()
53
54 {:too_much_data, msg} ->
55 0 conn |> send_resp(413, msg) |> halt()
56 end
57 end
58
59 @doc """
60 This function is called on a /extent request.
61 It will fetch all the contents corresponding to the filters that were set up by
62 AvyWeb.Plugs.Filters, in the assigns of the connection plug (`conn.assigns.filters`).
63 For each filter, the database request is done asynchronously in parallel.
64 Desactivate the timeout, as the DB pool has it's own already, and some requests can get quite long.
65
66 For each entry, we call the post_process function in order to build a list of datasources and merge the contents of the same properties.
67
68 Finally, the datasources are put in the conn.assigns in order for ther renderer to access them in the `:availabilities` keyword.
69 """
70 @spec manage_request(Plug.Conn.t(), atom) :: {:ok, list} | {:too_much_data, String.t()}
71 def manage_request(conn, method) do
72 32 deterministic_filters =
73 32 Task.async_stream(conn.assigns.filters, Repo, :get_epochs, [], timeout: :infinity)
74 32 |> Enum.flat_map(fn {:ok, f} -> f end)
75
76 # if score > Application.get_env(:avy, :max_sids) do
77 32 if length(deterministic_filters) > Application.get_env(:avy, :max_sids) do
78 {:too_much_data,
79 0 "Request has #{length(deterministic_filters)} epochs. Try to restrict the source identifiers selection and/or select smaller timespans."}
80 else
81 32 datasources =
82 Task.async_stream(
83 deterministic_filters,
84 Repo,
85 :get_contents_from_epochid,
86 [method],
87 timeout: :infinity
88 )
89 |> Enum.flat_map(fn {:ok, av} ->
90 36 post_process(
91 av,
92 method,
93 36 conn.assigns.fdsn_parameters.merge,
94 36 conn.assigns.fdsn_parameters.mergegaps * 10 ** 9
95 )
96 end)
97 50 |> Enum.sort_by(&{&1.network, &1.station, &1.location, &1.channel, &1.earliest})
98 32 |> orderby(conn.assigns.fdsn_parameters.orderby, method)
99
100 {:ok, datasources}
101 end
102 end
103
104 #### Private functions for query requests
105
106 # post_process/4 contains all the logic to handle the contents retrieved in the Repo.
107 # Convert the structure returned by the Repo in a usable list of datasources, fit for the views
108 # Post process the structure returned by the Repo call in order to:
109 # - Group all elements by their discriminating properties (including restriction status)
110 # - For each group,
111 # - clean the unwanted timespans in contents
112 # - merge consecutive timespans either by using the mergegap provided in the request, or by the samplerate
113 # With the result, prepare a structure suitable for the views:
114 # - compute the number of timespans
115 # - convert publication version to quality code
116 #
117 @spec post_process(list, atom, list, integer) :: list
118 defp post_process(lst, method, merge_properties, mergegap \\ 0) do
119 lst
120 # Group all the contents by a key, depending on the user request
121 |> Enum.group_by(group_by_keys(merge_properties, method))
122 # With the grouped contents, do the post processing.
123 |> Task.async_stream(fn {k, contents} ->
124 50 Logger.info("Processing #{length(contents)} contents for #{inspect(k)}")
125 50 contents = filter_outofbound_timespans(contents, List.first(contents).earliest)
126
127 50 if length(contents) > 0 do
128 50 ref_content = List.first(contents)
129
130 50 ref_samplerate =
131 83 Enum.min_by(contents, & &1.samplerate) |> Map.fetch!(:samplerate) |> Decimal.to_float()
132
133 50 Logger.debug("Min samplerate: #{ref_samplerate}")
134
135 # When we merge contiguous timespans based on the samplerate
136 50 mergegap =
137 if mergegap <= 0 do
138 # When no mergegaps are indicated
139 36 10 ** 9 / ref_samplerate
140 else
141 14 mergegap
142 end
143
144 50 Logger.debug("Merging on gaps lower than #{mergegap}")
145
146 50 merged_timespans =
147 83 Enum.flat_map(contents, & &1.timespans)
148 |> merge_contiguous_timespans(mergegap)
149 106 |> Enum.map(fn x ->
150 [
151 Map.fetch!(x, "start") |> DateTime.from_unix!(:nanosecond),
152 Map.fetch!(x, "end") |> DateTime.from_unix!(:nanosecond)
153 ]
154 end)
155 50 |> snip_timespans(ref_content.earliest, List.last(contents).latest)
156
157 50 %{
158 50 network: ref_content.network,
159 50 station: ref_content.station,
160 50 location: ref_content.location,
161 50 channel: ref_content.channel,
162 50 quality: PublicationVersion.pubversion_to_quality!(ref_content.quality),
163 50 samplerate: ref_content.samplerate,
164 earliest: contents |> List.first() |> Map.fetch!(:earliest),
165 latest: contents |> List.last() |> Map.fetch!(:latest),
166 83 updated: Enum.max_by(contents, & &1.updated, Date) |> Map.fetch!(:updated),
167 restriction:
168 50 case ref_content.restriction do
169 48 "O" -> "OPEN"
170 2 _ -> "RESTRICTED"
171 end,
172 83 timespancount: Enum.reduce(contents, 0, fn c, acc -> acc + length(c.timespans) end),
173 timespans: merged_timespans
174 }
175 else
176 nil
177 end
178 end)
179 50 |> Enum.into([], fn {:ok, res} -> res end)
180 36 |> Enum.reject(&is_nil(&1))
181 end
182
183 #
184 # After grouping all contents by epochs,
185 # In the timespans, there can be some contents that are outside of the request boundaries.
186 # This function will remove contents that are completely out of bound
187 # And if a content is left empty, we remove it from the list.
188 defp filter_outofbound_timespans(contents, lowerbound) do
189 contents
190 |> Enum.map(fn content ->
191 83 lower_bound = DateTime.to_unix(lowerbound, :nanosecond)
192
193 %{
194 content
195 83 | timespans:
196 83 Enum.reject(content.timespans, fn ts ->
197 118 if ts["end"] <= lower_bound do
198 0 Logger.debug(
199 0 "Rejecting #{DateTime.from_unix!(ts["end"], :nanosecond)} < #{content.earliest}"
200 )
201
202 true
203 end
204 end)
205 }
206 end)
207 50 |> Enum.reject(fn content ->
208 83 if content.timespans == [] do
209 0 Logger.debug("Rejecting empty timespans in #{inspect(content)}")
210 true
211 end
212 end)
213 end
214
215 defp snip_timespans(timespans, earliest, latest) do
216 50 Logger.debug("Snip #{inspect(timespans, pretty: true)}")
217 50 Logger.debug("Over #{earliest} and #{latest}")
218
219 50 [[ts_start, ts_end] | tail] = timespans
220
221 50 snipped_timespans_at_start =
222 1 if DateTime.before?(ts_start, earliest) do
223 # TODO Faire un test qui arrive ici
224 [[earliest, ts_end] | tail]
225 else
226 49 timespans
227 end
228
229 50 [[ts_start, ts_end] | tail] =
230 Enum.reverse(snipped_timespans_at_start)
231 51 |> Enum.drop_while(fn [ts, _] -> DateTime.after?(ts, latest) end)
232
233 50 if DateTime.after?(ts_end, latest) do
234 1 [[ts_start, latest] | tail] |> Enum.reverse()
235 else
236 49 snipped_timespans_at_start
237 end
238 end
239
240 defp merge_contiguous_timespans(timespans, mergegap) do
241 Enum.reduce(
242 timespans,
243 Enum.take(timespans, 1),
244 fn timespan, acc ->
245 118 [previous | tail] = acc
246
247 118 if timespan["start"] - previous["end"] <= mergegap do
248 62 Logger.debug(
249 0 "Merging timespan #{previous["start"] |> DateTime.from_unix!(:nanosecond)} -> #{previous["end"] |> DateTime.from_unix!(:nanosecond)}"
250 )
251
252 62 Logger.debug(
253 0 "and #{timespan["start"] |> DateTime.from_unix!(:nanosecond)} -> #{timespan["end"] |> DateTime.from_unix!(:nanosecond)}"
254 )
255
256 62 Logger.debug("with gap #{timespan["start"] - previous["end"]} <= #{mergegap}")
257
258 [
259 %{
260 "start" => previous["start"],
261 "end" => timespan["end"],
262 "sample_rate" => previous["sample_rate"]
263 }
264 | tail
265 ]
266 else
267 [timespan | acc]
268 end
269 end
270 )
271 50 |> Enum.reverse()
272 end
273
274 # Plug function: In /extent action, mergegaps and show are not allowed
275 defp forbidden_params_extent(conn, [opt]) do
276 37 Logger.debug(
277 0 "Check for forbidden params for query whith #{inspect(conn.params, pretty: true)}"
278 )
279
280 37 case Map.fetch(conn.params, opt) do
281 {:ok, _} ->
282 2 send_resp(conn, 400, "Options #{opt} is only supported in the /query method.")
283 2 |> halt
284
285 :error ->
286 35 conn
287 end
288 end
289
290 @spec group_by_keys(list(atom), :query | :extent) :: function
291 defp group_by_keys(merge_properties, :extent) do
292 20 cond do
293 20 :quality in merge_properties and :samplerate in merge_properties ->
294 2 fn a ->
295 6 {a.network, a.station, a.location, a.channel, a.restriction}
296 end
297
298 18 :samplerate in merge_properties ->
299 2 fn a ->
300 6 {a.network, a.station, a.location, a.channel, a.quality, a.restriction}
301 end
302
303 16 :quality in merge_properties ->
304 4 fn a ->
305 15 {a.network, a.station, a.location, a.channel, a.samplerate, a.restriction}
306 end
307
308 # Default behaviour
309 12 true ->
310 12 fn a ->
311 19 {a.network, a.station, a.location, a.channel, a.quality, a.samplerate, a.restriction}
312 end
313 end
314 end
315
316 defp group_by_keys(merge_properties, :query) do
317 16 cond do
318 16 :quality in merge_properties and :samplerate in merge_properties ->
319 1 fn a ->
320 3 {a.network, a.station, a.location, a.channel}
321 end
322
323 15 :samplerate in merge_properties ->
324 1 fn a ->
325 3 {a.network, a.station, a.location, a.channel, a.quality}
326 end
327
328 14 :quality in merge_properties ->
329 1 fn a ->
330 3 {a.network, a.station, a.location, a.channel, a.samplerate}
331 end
332
333 # Default behaviour
334 13 true ->
335 13 fn a ->
336 28 {a.network, a.station, a.location, a.channel, a.quality, a.samplerate}
337 end
338 end
339 end
340
341 #  The API uses the "format" parameter on order to set the output format.
342 # Translate this to the _format param for Phoenix magic to take place.
343 # Also force the response content type header to text
344 defp format(conn, _) do
345 36 with {:ok, _} <- Map.fetch(conn.assigns, :fdsn_parameters),
346 32 {:ok, format} <- Map.fetch(conn.assigns.fdsn_parameters, :format) do
347 32 case format do
348 :json ->
349 conn
350 |> Phoenix.Controller.put_format("json")
351 9 |> Plug.Conn.put_resp_content_type("application/json")
352
353 :geocsv ->
354 conn
355 |> Phoenix.Controller.put_format("geocsv")
356 6 |> Plug.Conn.put_resp_content_type("text/csv")
357
358 :request ->
359 conn
360 |> Phoenix.Controller.put_format("request")
361 2 |> Plug.Conn.put_resp_content_type("text/plain")
362
363 _ ->
364 conn
365 |> Phoenix.Controller.put_format("text")
366 15 |> Plug.Conn.put_resp_content_type("text/plain")
367 end
368 else
369 _ ->
370 conn
371 |> Phoenix.Controller.put_format("text")
372 4 |> Plug.Conn.put_resp_content_type("text/plain")
373 end
374 end
375
376 defp orderby(ds, orderby, :extent) do
377 17 Logger.debug("Reorder datasources by #{orderby}")
378
379 17 case orderby do
380 1 :latestupdate -> Enum.sort_by(ds, & &1.updated, {:asc, Date})
381 1 :latestupdate_desc -> Enum.sort_by(ds, & &1.updated, {:desc, Date})
382 1 :timespancount -> Enum.sort_by(ds, & &1.timespancount, :asc)
383 1 :timespancount_desc -> Enum.sort_by(ds, & &1.timespancount, :desc)
384 13 _ -> ds
385 end
386 end
387
388 defp orderby(ds, orderby, :query) do
389 15 Logger.debug("Reorder datasources by #{orderby}")
390
391 15 case orderby do
392 1 :latestupdate -> Enum.sort_by(ds, & &1.updated, {:asc, Date})
393 1 :latestupdate_desc -> Enum.sort_by(ds, & &1.updated, {:desc, Date})
394 13 _ -> ds
395 end
396 end
397 end

lib/avy_web/controllers/error_json.ex

100.0
1
2
0
Line Hits Source
0 defmodule AvyWeb.ErrorJSON do
1 @moduledoc """
2 This module is invoked by your endpoint in case of errors on JSON requests.
3
4 See config/config.exs.
5 """
6
7 # If you want to customize a particular status code,
8 # you may add your own clauses, such as:
9 #
10 # def render("500.json", _assigns) do
11 # %{errors: %{detail: "Internal Server Error"}}
12 # end
13
14 # By default, Phoenix returns the status message from
15 # the template name. For example, "404.json" becomes
16 # "Not Found".
17 def render(template, _assigns) do
18 2 %{errors: %{detail: Phoenix.Controller.status_message_from_template(template)}}
19 end
20 end

lib/avy_web/endpoint.ex

100.0
0
0
0
Line Hits Source
0 defmodule AvyWeb.Endpoint do
1 use Sentry.PlugCapture
2 use Phoenix.Endpoint, otp_app: :avy
3
4 # The session will be stored in the cookie and signed,
5 # this means its contents can be read but not tampered with.
6 # Set :encryption_salt if you would also like to encrypt it.
7 @session_options [
8 store: :cookie,
9 key: "_avy_key",
10 signing_salt: "xyXumY7h",
11 same_site: "Lax"
12 ]
13
14 socket "/live", Phoenix.LiveView.Socket,
15 websocket: [connect_info: [session: @session_options]],
16 longpoll: [connect_info: [session: @session_options]]
17
18 # Serve at "/" the static files from "priv/static" directory.
19 #
20 # You should set gzip to true if you are running phx.digest
21 # when deploying your static files in production.
22 plug Plug.Static,
23 at: "/",
24 from: :avy,
25 gzip: true,
26 content_types: %{"application.wadl" => "text/xml"},
27 only: AvyWeb.static_paths()
28
29 # Answer to /_health request
30 plug AvyWeb.Plug.HealthCheck
31
32 # Code reloading can be explicitly enabled under the
33 # :code_reloader configuration of your endpoint.
34 if code_reloading? do
35 plug Phoenix.CodeReloader
36 plug Phoenix.Ecto.CheckRepoStatus, otp_app: :avy
37 end
38
39 plug Phoenix.LiveDashboard.RequestLogger,
40 param_key: "request_logger",
41 cookie_key: "request_logger"
42
43 plug Plug.RequestId
44 plug Plug.Telemetry, event_prefix: [:phoenix, :endpoint]
45
46 plug Plug.Parsers,
47 parsers: [:urlencoded, :multipart, :json],
48 pass: ["*/*"],
49 json_decoder: Phoenix.json_library()
50
51 plug Sentry.PlugContext
52 plug Plug.MethodOverride
53 plug Plug.Head
54 plug Plug.Session, @session_options
55 plug AvyWeb.Plugs.RedirectDoc
56
57 plug AvyWeb.Plug.TrafficDrain
58 plug AvyWeb.Router
59 end

lib/avy_web/plugs/filters.ex

100.0
7
236
0
Line Hits Source
0 defmodule AvyWeb.Plugs.Filters do
1 @moduledoc """
2 A plug to add a Filter struct to the connection assigns.
3 The filter is made of the source identifier + includerestricted + quality
4 NOTE: this is maybe redundant with FdsnPlugs.RequestParams but is more suited to filtering by Avy.Repo
5 Refactoring may be.
6 """
7 import Plug.Conn
8
9 36 def init(opts), do: opts
10
11 def call(conn, _opts) do
12 conn
13 36 |> assign(
14 :filters,
15 36 if Map.has_key?(conn.assigns, :fdsn_parameters) do
16 32 Enum.map(conn.assigns.fdsn_parameters.source_identifiers, fn s ->
17 32 Avy.Filter.from_source_identifier(
18 s,
19 32 conn.assigns.fdsn_parameters.includerestricted,
20 32 conn.assigns.fdsn_parameters.quality
21 )
22 end)
23 else
24 []
25 end
26 )
27 end
28 end

lib/avy_web/plugs/health_check.ex

100.0
3
100
0
Line Hits Source
0 defmodule AvyWeb.Plug.HealthCheck do
1 import Plug.Conn
2
3 50 def init(opts), do: opts
4
5 # If the request path matches "/_health", we return a 200 response.
6 def call(%Plug.Conn{request_path: "/__health"} = conn, _opts) do
7 conn
8 |> send_resp(
9 200,
10 ""
11 )
12 1 |> halt()
13 end
14
15 # If the request path is anything else, we pass the connection along.
16 49 def call(conn, _opts), do: conn
17 end

lib/avy_web/plugs/redirect_doc.ex

100.0
5
148
0
Line Hits Source
0 defmodule AvyWeb.Plugs.RedirectDoc do
1 @moduledoc """
2 Redirects / to /documentation.html
3 """
4
5 import Plug.Conn
6 import Phoenix.Controller
7
8 49 def init(default), do: default
9
10 def call(%Plug.Conn{request_path: path} = conn, _) do
11 49 if path == "/" do
12 conn
13 1 |> redirect(to: "#{Application.get_env(:avy, :url_prefix)}/documentation.html")
14 1 |> halt
15 else
16 48 conn
17 end
18 end
19 end

lib/avy_web/plugs/traffic_drain.ex

80.0
5
97
1
Line Hits Source
0 defmodule AvyWeb.Plug.TrafficDrain do
1 @moduledoc """
2 Plug for handling Kubernetes readinessProbe.
3
4 Plug starts responding with 503 - Service Unavailable from `/__traffic`, when traffic is being drained.
5 Otherwise we respond with 200 - OK.
6 """
7
8 import Plug.Conn
9
10 @behaviour Plug
11
12 @impl true
13 48 def init(opts), do: opts
14
15 @impl true
16 def call(%Plug.Conn{path_info: ["__traffic"]} = conn, _opts) do
17 1 case GracefulStop.get_status() do
18 :stopping ->
19 conn
20 |> put_resp_content_type("text/plain")
21 |> send_resp(:service_unavailable, "Draining")
22 0 |> halt()
23
24 :running ->
25 conn
26 |> put_resp_content_type("text/plain")
27 |> send_resp(:ok, "Serving")
28 1 |> halt()
29 end
30 end
31
32 @impl true
33 def call(conn, _opts) do
34 47 conn
35 end
36 end

lib/avy_web/router.ex

100.0
6
90
0
Line Hits Source
0 defmodule AvyWeb.Router do
1 use AvyWeb, :router
2
3 43 pipeline :fdsn do
4 plug :accepts, ["json", "text"]
5
6 plug FdsnAvailabilityPlugs
7 end
8
9 4 get "/version", AvyWeb.AvyController, :version
10
11 scope "/", AvyWeb do
12 pipe_through :fdsn
13 25 get "/extent", AvyController, :extent
14 3 post "/extent", AvyController, :extent
15 13 get "/query", AvyController, :query
16 2 post "/query", AvyController, :query
17 end
18
19 # Enable LiveDashboard and Swoosh mailbox preview in development
20 if Application.compile_env(:avy, :dev_routes) do
21 # If you want to use the LiveDashboard in production, you should put
22 # it behind authentication and allow only admins to access it.
23 # If your application does not have an admins-only section yet,
24 # you can use Plug.BasicAuth to set up some basic authentication
25 # as long as you are also using SSL (which you should anyway).
26 import Phoenix.LiveDashboard.Router
27
28 scope "/dev" do
29 pipe_through [:fetch_session, :protect_from_forgery]
30
31 live_dashboard "/dashboard",
32 metrics: AvyWeb.Telemetry,
33 additional_pages: [
34 flame_on: FlameOn.DashboardPage
35 ]
36
37 forward "/mailbox", Plug.Swoosh.MailboxPreview
38 end
39 end
40 end