Simplify InputStreamSerializer, make NODE_USER role explicit

This commit is contained in:
Andras Slemmer 2016-12-06 14:02:28 +00:00
parent a601f0abf5
commit 9117ec9860
4 changed files with 21 additions and 44 deletions

View File

@ -202,12 +202,12 @@ class ImmutableClassSerializer<T : Any>(val klass: KClass<T>) : Serializer<T>()
// TODO This is a temporary inefficient serialiser for sending InputStreams through RPC. This may be done much more
// efficiently using Artemis's large message feature.
class InputStreamSerializer : Serializer<InputStream>() {
object InputStreamSerializer : Serializer<InputStream>() {
override fun write(kryo: Kryo, output: Output, stream: InputStream) {
val buffer = ByteArray(4096)
while (true) {
val numberOfBytesRead = stream.read(buffer)
if (numberOfBytesRead > 0) {
if (numberOfBytesRead != -1) {
output.writeInt(numberOfBytesRead, true)
output.writeBytes(buffer, 0, numberOfBytesRead)
} else {
@ -227,24 +227,13 @@ class InputStreamSerializer : Serializer<InputStream>() {
chunks.add(chunk)
}
}
return object : InputStream() {
var offset = 0
override fun read(): Int {
while (!chunks.isEmpty()) {
val chunk = chunks[0]
if (offset >= chunk.size) {
offset = 0
chunks.removeAt(0)
} else {
val byte = chunk[offset]
offset++
return byte.toInt() and 0xFF
}
}
return -1
}
val flattened = ByteArray(chunks.sumBy { it.size })
var offset = 0
for (chunk in chunks) {
System.arraycopy(chunk, 0, flattened, offset, chunk.size)
offset += chunk.size
}
return ByteArrayInputStream(flattened)
}
}
@ -452,7 +441,7 @@ fun createKryo(k: Kryo = Kryo()): Kryo {
/** This ensures any kotlin objects that implement [DeserializeAsKotlinObjectDef] are read back in as singletons. */
addDefaultSerializer(DeserializeAsKotlinObjectDef::class.java, KotlinObjectSerializer)
addDefaultSerializer(InputStream::class.java, InputStreamSerializer())
addDefaultSerializer(InputStream::class.java, InputStreamSerializer)
ImmutableListSerializer.registerSerializers(k)
ImmutableSetSerializer.registerSerializers(k)

View File

@ -5,6 +5,7 @@ import net.corda.core.contracts.StateAndRef
import net.corda.core.crypto.CompositeKey
import net.corda.core.crypto.SecureHash
import net.corda.core.flows.FlowLogic
import net.corda.core.flows.StateMachineRunId
import net.corda.core.messaging.CordaRPCOps
import net.corda.core.messaging.FlowHandle
import net.corda.core.messaging.StateMachineInfo
@ -64,8 +65,8 @@ class CordaRPCOpsImpl(
override fun stateMachinesAndUpdates(): Pair<List<StateMachineInfo>, Observable<StateMachineUpdate>> {
val (allStateMachines, changes) = smm.track()
return Pair(
allStateMachines.map { stateMachineInfoFromFlowStateMachineImpl(it) },
changes.map { stateMachineUpdateFromStateMachineChange(it) }
allStateMachines.map { stateMachineInfoFromFlowLogic(it.id, it.logic) },
changes.map { stateMachineUpdateFromStateMachineChange(it) }
)
}
@ -110,27 +111,14 @@ class CordaRPCOpsImpl(
override fun partyFromName(name: String) = services.identityService.partyFromName(name)
companion object {
fun stateMachineInfoFromFlowStateMachineImpl(stateMachine: FlowStateMachineImpl<*>): StateMachineInfo {
return StateMachineInfo(
id = stateMachine.id,
flowLogicClassName = stateMachine.logic.javaClass.name,
progressTrackerStepAndUpdates = stateMachine.logic.track()
)
private fun stateMachineInfoFromFlowLogic(id: StateMachineRunId, flowLogic: FlowLogic<*>): StateMachineInfo {
return StateMachineInfo(id, flowLogic.javaClass.name, flowLogic.track())
}
fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {
private fun stateMachineUpdateFromStateMachineChange(change: StateMachineManager.Change): StateMachineUpdate {
return when (change.addOrRemove) {
AddOrRemove.ADD -> {
val stateMachineInfo = StateMachineInfo(
id = change.id,
flowLogicClassName = change.logic.javaClass.name,
progressTrackerStepAndUpdates = change.logic.track()
)
StateMachineUpdate.Added(stateMachineInfo)
}
AddOrRemove.REMOVE -> {
StateMachineUpdate.Removed(change.id)
}
AddOrRemove.ADD -> StateMachineUpdate.Added(stateMachineInfoFromFlowLogic(change.id, change.logic))
AddOrRemove.REMOVE -> StateMachineUpdate.Removed(change.id)
}
}
}

View File

@ -184,9 +184,9 @@ class ArtemisMessagingServer(override val config: NodeConfiguration,
securityRoles["$INTERNAL_PREFIX#"] = setOf(nodeInternalRole) // Do not add any other roles here as it's only for the node
securityRoles[P2P_QUEUE] = setOf(nodeInternalRole, restrictedRole(PEER_ROLE, send = true))
securityRoles[RPC_REQUESTS_QUEUE] = setOf(nodeInternalRole, restrictedRole(RPC_ROLE, send = true))
// TODO remove NODE_USER once webserver doesn't need it
val possibleClientUserNames = userService.users.map { it.username } + listOf(NODE_USER)
for (username in possibleClientUserNames) {
// TODO remove the NODE_USER role once the webserver doesn't need it
securityRoles["$CLIENTS_PREFIX$NODE_USER.rpc.*"] = setOf(nodeInternalRole)
for ((username) in userService.users) {
securityRoles["$CLIENTS_PREFIX$username.rpc.*"] = setOf(
nodeInternalRole,
restrictedRole("$CLIENTS_PREFIX$username", consume = true, createNonDurableQueue = true, deleteNonDurableQueue = true))

View File

@ -161,7 +161,7 @@ abstract class RPCDispatcher(val ops: RPCOps, val userService: RPCUserService, v
}
// TODO remove this User once webserver doesn't need it
val nodeUser = User(NODE_USER, NODE_USER, setOf())
private val nodeUser = User(NODE_USER, NODE_USER, setOf())
@VisibleForTesting
protected open fun getUser(message: ClientMessage): User {
val validatedUser = message.requiredString(Message.HDR_VALIDATED_USER.toString())