-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStreamlineIntegrator.py
142 lines (108 loc) · 4.49 KB
/
StreamlineIntegrator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import math
from typing import List,Union
import numpy as np
import typing
from LookupGrid import LookupGrid
from Vector import Vector
from rk4 import rk4
def isSame(a: float, b: float) -> bool:
# to avoid floating point error
return abs(a - b) < 1e-4
class StreamlineIntegrator:
"""
for "integrating" (building) a single streamline
03/2022 created
"""
STEP_SIZE_FACTOR_TERMINATION_CONDITION = 0.9 # this constant could be changed to a member variable, but currently not really needed
# ---- status enums
FORWARD = 1
BACKWARD = 2
DONE = 3
def __init__(self,
vectorFieldNormalized: typing.Callable,
grid: LookupGrid,
stepSize: float, # from config TODO: default to 1.0 and add method setStepSize()
# dSep: float,
dTest: float # needed to decide when to end streamline "integration"
):
#self.vectorField: typing.Callable = vectorField
self.vectorFieldNormalized: typing.Callable = vectorFieldNormalized
self.grid: LookupGrid = grid
self.stepSize: float = stepSize # from config
self.streamlinePoints: List[Vector] = None
# streamlinePoints: List[Vector] = None # aka theResult aka streamline aka points
self.pos: Vector = None # TODO: remove. pass pos to the functions which need it
# self.start: Vector = None
#self.dSep = dSep # from config
self.dTest = dTest # from config # needed to decide when to end streamline "integration"
# self.state: int = None # FORWARD / BACKWARD / DONE
def buildStreamline(self, start: Vector) -> List[Vector]:
"""
the MAIN function
"""
# ---- add first point
self.streamlinePoints = [start] # aka theResult aka streamline aka points
self.pos = start
state = self.FORWARD # FORWARD / BACKWARD / DONE (TODO: rename?)
# ---- add points to the streamline
while state != self.DONE:
# print(f".{state}", end='')
# -- forward
if state == self.FORWARD:
point = self._grow(self.stepSize)
if point is not None:
self.streamlinePoints.append(point)
self.pos = point
else:
# Reset self.position to start, and grow backwards:
self.pos = start
state = self.BACKWARD
# -- backward
if state == self.BACKWARD:
point = self._grow(-self.stepSize)
if point is not None:
self.streamlinePoints.insert(0, point)
self.pos = point
else:
state = self.DONE
return self.streamlinePoints
def _grow(self, stepSize: float) -> Union[Vector, None]:
"""
private helper
"""
#print("_growForward")
velocity = rk4(self.pos, stepSize, self.vectorFieldNormalized)
#print(f"gf:{velocity}", end="")
if velocity is None or velocity.hasZeroLength(1e-8):
# print(f"singulariy 1: {velocity}")
return None # Hit the singularity.
# print("-")
candidate: Vector = self.pos + velocity
# if self.grid.isOutside(candidate.x, candidate.y):
# # TODO: add clipped point
# return None
# ---- did we hit our current streamlinePoints (hack to avoid infinite ping-pong)?
for existingPt in self.streamlinePoints:
if existingPt.distanceTo(candidate) <= abs(stepSize) * self.STEP_SIZE_FACTOR_TERMINATION_CONDITION:
return None
# ---- is point not too near to some of the previous existing streamlines, then it is valid point
if self.grid.isPointValid(candidate, self.dTest):
return candidate
return None
# def stepSizeCheck(self, distanceToCandidate: float) -> bool:
# # this is a callback function
# return distanceToCandidate < self.stepSize * 0.9
#
#
# def checkDTest(self, distanceToCandidate: float) -> bool:
# # this is a callback function
# if isSame(distanceToCandidate, self.dTest):
# return False
# return distanceToCandidate < self.dTest
#
#
# def checkDSep(self, distanceToCandidate: float) -> bool:
# # this is a callback function
# if isSame(distanceToCandidate, self.dSep):
# return False
# return distanceToCandidate < self.dSep